namespace SHH.CameraSdk; /// /// 全局流分发器(静态类 | 线程安全) /// 核心职责: /// 1. 接收处理完成的帧任务,基于 AppId 路由策略实现帧的精准定向分发 /// 2. 隔离 UI 预览、AI 分析等不同消费场景,支撑多模块并行消费 /// 设计特性: /// ✅ 线程安全:基于 ConcurrentDictionary 实现并发订阅/取消订阅 /// ✅ 精准路由:按 TargetAppIds 点对点投递,避免广播风暴 /// ✅ 异常隔离:单个订阅者异常不影响其他模块消费 /// public static class GlobalStreamDispatcher { #region --- 1. 预设订阅通道 (Predefined Subscription Channels) --- /// /// UI 预览订阅通道:供 UI 模块订阅帧数据,用于实时画面显示 /// 回调参数:(设备唯一标识, 处理后的智能帧数据) /// 特性:低延迟优先,支持画面渲染相关的轻量级处理 /// public static event Action? OnPreviewFrame; /// /// AI 分析订阅通道:供 AI 模块订阅帧数据,用于行为识别/人脸检测/车牌识别等 /// 回调参数:(设备唯一标识, 处理后的智能帧数据) /// 特性:高吞吐优先,支持复杂算法处理,延迟容忍度较高 /// public static event Action? OnAnalysisFrame; #endregion // ================================================================= // 1. 新增:真正的全局广播总线 (上帝模式) // 任何订阅了这个事件的人,都能收到【所有设备】的每一帧 // ================================================================= public static event Action OnGlobalFrame; /// /// 统一入口:驱动层调用此方法分发图像 /// public static void Dispatch(long deviceId, SmartFrame frame) { // A. 优先触发全局广播 (给 ZeroMQ 用) try { // ?.Invoke 是线程安全的,如果设备被删除了,驱动层不调用 Dispatch,这里自然就不会触发 // 如果新设备增加了,驱动层开始调用 Dispatch,这里自动就会触发 OnGlobalFrame?.Invoke(deviceId, frame); } catch (Exception ex) { Console.WriteLine($"[GlobalBus Error] 广播异常: {ex.Message}"); } // B. 执行你原有的定向分发逻辑 (给处理链用) // DispatchToTargets(deviceId, frame); } #region --- 2. 动态路由表 (Dynamic Routing Table) --- /// /// 动态订阅路由表:Key = 业务 AppId,Value = 帧处理多播委托 /// 实现:ConcurrentDictionary 保证高并发场景下的读写安全 /// 用途:支持自定义业务模块的精准订阅,扩展帧消费能力 /// private static readonly ConcurrentDictionary> _routingTable = new(); // [新增] 旁路订阅支持 // 用于 NetworkService 这种需要针对单个设备进行订阅/取消订阅的场景 private static readonly ConcurrentDictionary>> _deviceSpecificTable = new(); #endregion #region --- 3. 订阅管理接口 (Subscription Management API) --- /// /// 精准订阅:为指定 AppId 注册帧处理回调 /// 线程安全:支持多线程并发调用,委托自动合并(多播) /// /// 业务唯一标识(需与 FrameController.Register 中的 AppId 一致) /// 帧处理回调函数 /// appId 或 handler 为空时抛出 public static void Subscribe(string appId, Action handler) { // 入参合法性校验 if (string.IsNullOrWhiteSpace(appId)) throw new ArgumentNullException(nameof(appId), "AppId 不能为空"); if (handler == null) throw new ArgumentNullException(nameof(handler), "帧处理回调不能为空"); // 线程安全添加/更新委托:新订阅追加,重复订阅合并 _routingTable.AddOrUpdate( key: appId, addValue: handler, updateValueFactory: (_, existingHandler) => existingHandler + handler ); } ///// ///// [新增] 精准订阅:仅监听指定设备的特定 AppId 帧 ///// 优势:内部自动过滤 DeviceId,回调函数无需再写 if 判断 ///// ///// 需求标识 ///// 只接收此设备的帧 ///// 处理回调(注意:此处签名不含 deviceId,因为已隐式确定) //public static void Subscribe(string appId, long specificDeviceId, Action handler) //{ // // 创建一个“过滤器”闭包 // Action wrapper = (id, frame) => // { // // 只有当来源 ID 与订阅 ID 一致时,才触发用户的业务回调 // if (id == specificDeviceId) // { // handler(frame); // } // }; // // 将过滤器注册到基础路由表中 // Subscribe(appId, wrapper); //} /// /// [重写] 精准订阅:仅监听指定设备的特定 AppId 帧 /// 修改说明:不再使用闭包 + 多播委托,而是存入二级字典,以便能精准取消 /// public static void Subscribe(string appId, long specificDeviceId, Action handler) { if (string.IsNullOrWhiteSpace(appId) || handler == null) return; // 1. 获取或创建该 AppId 的设备映射表 var deviceMap = _deviceSpecificTable.GetOrAdd(appId, _ => new ConcurrentDictionary>()); // 2. 添加或更新该设备的订阅 // 注意:这里使用多播委托 (+),支持同一个 App 同一个 Device 有多个处理逻辑(虽然很少见) deviceMap.AddOrUpdate(specificDeviceId, handler, (_, existing) => existing + handler); } /// /// [新增] 精准取消订阅:移除指定 AppId 下指定设备的订阅 /// NetworkService 必须调用此方法来防止内存泄漏 /// public static void Unsubscribe(string appId, long specificDeviceId) { if (string.IsNullOrWhiteSpace(appId)) return; // 1. 查找该 AppId 是否有记录 if (_deviceSpecificTable.TryGetValue(appId, out var deviceMap)) { // 2. 移除该设备的订阅委托 if (deviceMap.TryRemove(specificDeviceId, out _)) { // 可选:如果该 AppId 下没设备了,是否清理外层字典?(为了性能通常不清理,或者定期清理) // Console.WriteLine($"[Dispatcher] {appId} 已停止订阅设备 {specificDeviceId}"); } } } /// /// 取消订阅:移除指定 AppId 的帧处理回调 /// 线程安全:支持多线程并发调用,无订阅时静默处理 /// /// 业务唯一标识 /// 需要移除的帧处理回调 public static void Unsubscribe(string appId, Action handler) { if (string.IsNullOrWhiteSpace(appId) || handler == null) return; // 尝试获取当前委托并移除目标回调 if (_routingTable.TryGetValue(appId, out var currentHandler)) { var updatedHandler = currentHandler - handler; if (updatedHandler == null) { // 委托为空时移除路由项,避免内存泄漏 _routingTable.TryRemove(appId, out _); } else { // 委托非空时更新路由表 _routingTable.TryUpdate(appId, updatedHandler, currentHandler); } } } #endregion #region --- 4. 核心分发逻辑 (Core Dispatch Logic) --- /// /// 帧任务分发入口:基于任务的 TargetAppIds 实现精准点对点投递 /// 核心优化:摒弃广播模式,仅投递到指定订阅者,降低系统资源消耗 /// /// 处理完成的帧任务(包含目标 AppId 列表、帧数据、上下文) /// task 为空时抛出 public static void Dispatch(ProcessingTask task) { // 入参合法性校验 if (task == null) throw new ArgumentNullException(nameof(task), "帧任务不能为空"); var deviceId = task.DeviceId; var frame = task.Frame; var targetAppIds = task.Decision.TargetAppIds; var sequence = task.Decision.Sequence; // 记录分发日志 task.Context.AddLog($"开始分发帧任务 [Seq:{sequence}],目标 AppId 列表:[{string.Join(", ", targetAppIds)}]"); // 遍历目标 AppId 列表,执行精准投递 foreach (var appId in targetAppIds) { // 1. 优先匹配动态路由表中的自定义订阅者 if (_routingTable.TryGetValue(appId, out var customHandler)) { try { customHandler.Invoke(deviceId, frame); task.Context.AddLog($"帧任务 [Seq:{sequence}] 成功投递到自定义 AppId: {appId}"); } catch (Exception ex) { // 单个订阅者异常隔离,不影响其他分发流程 task.Context.AddLog($"帧任务 [Seq:{sequence}] 投递到 AppId:{appId} 失败:{ex.Message}"); Console.WriteLine($"[DispatchError] AppId={appId}, DeviceId={deviceId}, Error={ex.Message}"); } } // ========================================================= // B. [新增逻辑] 匹配设备级 AppId 订阅 (如 NetworkService) // ========================================================= if (_deviceSpecificTable.TryGetValue(appId, out var deviceMap)) { // 查找当前设备是否有订阅者 if (deviceMap.TryGetValue(deviceId, out var deviceHandler)) { try { deviceHandler.Invoke(frame); task.Context.AddLog($"帧任务 设备级 [Seq:{sequence}] 投递到 AppId:{appId}"); } catch (Exception ex) { Console.WriteLine($"[DispatchError] DeviceSpecific AppId={appId}, Dev={deviceId}: {ex.Message}"); } } } // 2. 匹配预设的全局通道(兼容旧版订阅逻辑) switch (appId.ToUpperInvariant()) { case "UI_PREVIEW": OnPreviewFrame?.Invoke(deviceId, frame); break; case "AI_ANALYSIS": OnAnalysisFrame?.Invoke(deviceId, frame); break; } } // ========================================================================= // 2. [旁路通道] 扫描设备级订阅表 (NetworkService, 录像服务 等) // 这是外部服务“被动”监听的目标,不在 targetAppIds 白名单里也要发 // ========================================================================= if (!_deviceSpecificTable.IsEmpty) { // 遍历所有注册了旁路监听的 AppId (例如 "NetService") foreach (var kvp in _deviceSpecificTable) { string sidecarAppId = kvp.Key; var deviceMap = kvp.Value; // 优化:如果这个 AppId 已经在上面的 targetAppIds 里处理过了,就跳过,防止重复发送 // (例如:如果设备未来真的把 NetService 加入了白名单,这里就不重复发了) if (targetAppIds.Contains(sidecarAppId)) continue; // 检查这个 AppId 下,是否有人订阅了当前这台设备 if (deviceMap.TryGetValue(deviceId, out var handler)) { try { handler.Invoke(frame); // task.Context.AddLog($"帧任务 [Seq:{sequence}] 旁路投递到: {sidecarAppId}"); } catch (Exception ex) { Console.WriteLine($"[SidecarDispatchError] App={sidecarAppId}, Dev={deviceId}: {ex.Message}"); } } } } // ========================================================================= // 3. [上帝通道] 全局广播 // ========================================================================= OnGlobalFrame?.Invoke(deviceId, frame); // 分发完成后记录遥测数据 GlobalTelemetry.RecordLog(sequence, task.Context); } #endregion #region Unsubscribe /// /// [新增重载] 强制取消订阅:直接移除指定 AppId 的整个路由项 /// 用途:当业务模块(如播放窗口)销毁时,彻底切断该 AppId 的数据流 /// /// 业务唯一标识 public static void Unsubscribe(string appId) { if (string.IsNullOrWhiteSpace(appId)) return; // 直接从字典中移除 Key,这将丢弃该 Key 下挂载的所有委托链 // TryRemove 是原子的、线程安全的 if (_routingTable.TryRemove(appId, out _)) { Console.WriteLine($"[Dispatcher] 已强制移除 AppId [{appId}] 的所有订阅路由"); } } #endregion }