Files
Ayay/SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs

330 lines
14 KiB
C#
Raw Permalink Normal View History

2026-01-16 14:30:42 +08:00
using Ayay.SerilogLogs;
using Serilog;
namespace SHH.CameraSdk;
/// <summary>
/// 全局流分发器(静态类 | 线程安全)
/// 核心职责:
/// <para>1. 接收处理完成的帧任务,基于 AppId 路由策略实现帧的精准定向分发</para>
/// <para>2. 隔离 UI 预览、AI 分析等不同消费场景,支撑多模块并行消费</para>
/// 设计特性:
/// <para>✅ 线程安全:基于 ConcurrentDictionary 实现并发订阅/取消订阅</para>
/// <para>✅ 精准路由:按 TargetAppIds 点对点投递,避免广播风暴</para>
/// <para>✅ 异常隔离:单个订阅者异常不影响其他模块消费</para>
/// </summary>
public static class GlobalStreamDispatcher
{
#region --- 1. (Predefined Subscription Channels) ---
2026-01-16 14:30:42 +08:00
private static ILogger _sysLog = Log.ForContext("SourceContext", LogModules.Core);
/// <summary>
/// UI 预览订阅通道:供 UI 模块订阅帧数据,用于实时画面显示
/// 回调参数:(设备唯一标识, 处理后的智能帧数据)
/// 特性:低延迟优先,支持画面渲染相关的轻量级处理
/// </summary>
public static event Action<long, SmartFrame>? OnPreviewFrame;
/// <summary>
/// AI 分析订阅通道:供 AI 模块订阅帧数据,用于行为识别/人脸检测/车牌识别等
/// 回调参数:(设备唯一标识, 处理后的智能帧数据)
/// 特性:高吞吐优先,支持复杂算法处理,延迟容忍度较高
/// </summary>
public static event Action<long, SmartFrame>? OnAnalysisFrame;
#endregion
2025-12-29 08:09:14 +08:00
// =================================================================
// 1. 新增:真正的全局广播总线 (上帝模式)
// 任何订阅了这个事件的人,都能收到【所有设备】的每一帧
// =================================================================
public static event Action<long, SmartFrame> OnGlobalFrame;
/// <summary>
/// 统一入口:驱动层调用此方法分发图像
/// </summary>
public static void Dispatch(long deviceId, SmartFrame frame)
{
// A. 优先触发全局广播 (给 ZeroMQ 用)
try
{
// ?.Invoke 是线程安全的,如果设备被删除了,驱动层不调用 Dispatch这里自然就不会触发
// 如果新设备增加了,驱动层开始调用 Dispatch这里自动就会触发
OnGlobalFrame?.Invoke(deviceId, frame);
}
catch (Exception ex)
{
2026-01-16 14:30:42 +08:00
_sysLog.Error($"[GlobalBus] 帧广播分发异常: {ex.Message}");
2025-12-29 08:09:14 +08:00
}
// B. 执行你原有的定向分发逻辑 (给处理链用)
// DispatchToTargets(deviceId, frame);
}
#region --- 2. (Dynamic Routing Table) ---
/// <summary>
/// 动态订阅路由表Key = 业务 AppIdValue = 帧处理多播委托
/// 实现ConcurrentDictionary 保证高并发场景下的读写安全
/// 用途:支持自定义业务模块的精准订阅,扩展帧消费能力
/// </summary>
private static readonly ConcurrentDictionary<string, Action<long, SmartFrame>> _routingTable = new();
// [新增] 旁路订阅支持
// 用于 NetworkService 这种需要针对单个设备进行订阅/取消订阅的场景
private static readonly ConcurrentDictionary<string, ConcurrentDictionary<long, Action<SmartFrame>>> _deviceSpecificTable = new();
#endregion
#region --- 3. (Subscription Management API) ---
/// <summary>
/// 精准订阅:为指定 AppId 注册帧处理回调
/// 线程安全:支持多线程并发调用,委托自动合并(多播)
/// </summary>
/// <param name="appId">业务唯一标识(需与 FrameController.Register 中的 AppId 一致)</param>
/// <param name="handler">帧处理回调函数</param>
/// <exception cref="ArgumentNullException">appId 或 handler 为空时抛出</exception>
public static void Subscribe(string appId, Action<long, SmartFrame> handler)
{
// 入参合法性校验
if (string.IsNullOrWhiteSpace(appId))
throw new ArgumentNullException(nameof(appId), "AppId 不能为空");
if (handler == null)
throw new ArgumentNullException(nameof(handler), "帧处理回调不能为空");
// 线程安全添加/更新委托:新订阅追加,重复订阅合并
_routingTable.AddOrUpdate(
key: appId,
addValue: handler,
updateValueFactory: (_, existingHandler) => existingHandler + handler
);
}
///// <summary>
///// [新增] 精准订阅:仅监听指定设备的特定 AppId 帧
///// 优势:内部自动过滤 DeviceId回调函数无需再写 if 判断
///// </summary>
///// <param name="appId">需求标识</param>
///// <param name="specificDeviceId">只接收此设备的帧</param>
///// <param name="handler">处理回调(注意:此处签名不含 deviceId因为已隐式确定</param>
//public static void Subscribe(string appId, long specificDeviceId, Action<SmartFrame> handler)
//{
// // 创建一个“过滤器”闭包
// Action<long, SmartFrame> wrapper = (id, frame) =>
// {
// // 只有当来源 ID 与订阅 ID 一致时,才触发用户的业务回调
// if (id == specificDeviceId)
// {
// handler(frame);
// }
// };
// // 将过滤器注册到基础路由表中
// Subscribe(appId, wrapper);
//}
/// <summary>
/// [重写] 精准订阅:仅监听指定设备的特定 AppId 帧
/// 修改说明:不再使用闭包 + 多播委托,而是存入二级字典,以便能精准取消
/// </summary>
public static void Subscribe(string appId, long specificDeviceId, Action<SmartFrame> handler)
{
if (string.IsNullOrWhiteSpace(appId) || handler == null) return;
// 1. 获取或创建该 AppId 的设备映射表
var deviceMap = _deviceSpecificTable.GetOrAdd(appId, _ => new ConcurrentDictionary<long, Action<SmartFrame>>());
// 2. 添加或更新该设备的订阅
// 注意:这里使用多播委托 (+),支持同一个 App 同一个 Device 有多个处理逻辑(虽然很少见)
deviceMap.AddOrUpdate(specificDeviceId, handler, (_, existing) => existing + handler);
}
/// <summary>
/// [新增] 精准取消订阅:移除指定 AppId 下指定设备的订阅
/// NetworkService 必须调用此方法来防止内存泄漏
/// </summary>
public static void Unsubscribe(string appId, long specificDeviceId)
{
if (string.IsNullOrWhiteSpace(appId)) return;
// 1. 查找该 AppId 是否有记录
if (_deviceSpecificTable.TryGetValue(appId, out var deviceMap))
{
// 2. 移除该设备的订阅委托
if (deviceMap.TryRemove(specificDeviceId, out _))
{
// 可选:如果该 AppId 下没设备了,是否清理外层字典?(为了性能通常不清理,或者定期清理)
2026-01-16 14:30:42 +08:00
_sysLog.Information($"[Dispatcher] {appId} 已停止订阅设备 ID:{specificDeviceId }");
}
}
}
/// <summary>
/// 取消订阅:移除指定 AppId 的帧处理回调
/// 线程安全:支持多线程并发调用,无订阅时静默处理
/// </summary>
/// <param name="appId">业务唯一标识</param>
/// <param name="handler">需要移除的帧处理回调</param>
public static void Unsubscribe(string appId, Action<long, SmartFrame> handler)
{
if (string.IsNullOrWhiteSpace(appId) || handler == null)
return;
// 尝试获取当前委托并移除目标回调
if (_routingTable.TryGetValue(appId, out var currentHandler))
{
var updatedHandler = currentHandler - handler;
if (updatedHandler == null)
{
// 委托为空时移除路由项,避免内存泄漏
_routingTable.TryRemove(appId, out _);
}
else
{
// 委托非空时更新路由表
_routingTable.TryUpdate(appId, updatedHandler, currentHandler);
}
}
}
#endregion
#region --- 4. (Core Dispatch Logic) ---
/// <summary>
/// 帧任务分发入口:基于任务的 TargetAppIds 实现精准点对点投递
/// 核心优化:摒弃广播模式,仅投递到指定订阅者,降低系统资源消耗
/// </summary>
/// <param name="task">处理完成的帧任务(包含目标 AppId 列表、帧数据、上下文)</param>
/// <exception cref="ArgumentNullException">task 为空时抛出</exception>
public static void Dispatch(ProcessingTask task)
{
// 入参合法性校验
if (task == null)
throw new ArgumentNullException(nameof(task), "帧任务不能为空");
var deviceId = task.DeviceId;
var frame = task.Frame;
var targetAppIds = task.Decision.TargetAppIds;
var sequence = task.Decision.Sequence;
// 记录分发日志
task.Context.AddLog($"开始分发帧任务 [Seq:{sequence}],目标 AppId 列表:[{string.Join(", ", targetAppIds)}]");
// 遍历目标 AppId 列表,执行精准投递
foreach (var appId in targetAppIds)
{
// 1. 优先匹配动态路由表中的自定义订阅者
if (_routingTable.TryGetValue(appId, out var customHandler))
{
try
{
customHandler.Invoke(deviceId, frame);
task.Context.AddLog($"帧任务 [Seq:{sequence}] 成功投递到自定义 AppId: {appId}");
}
catch (Exception ex)
{
// 单个订阅者异常隔离,不影响其他分发流程
task.Context.AddLog($"帧任务 [Seq:{sequence}] 投递到 AppId:{appId} 失败:{ex.Message}");
2026-01-16 14:30:42 +08:00
_sysLog.Error($"[Dispatch] AppId={appId}, DeviceId={deviceId}, Error={ex.Message}");
}
}
// =========================================================
// B. [新增逻辑] 匹配设备级 AppId 订阅 (如 NetworkService)
// =========================================================
if (_deviceSpecificTable.TryGetValue(appId, out var deviceMap))
{
// 查找当前设备是否有订阅者
if (deviceMap.TryGetValue(deviceId, out var deviceHandler))
{
try
{
deviceHandler.Invoke(frame);
task.Context.AddLog($"帧任务 设备级 [Seq:{sequence}] 投递到 AppId:{appId}");
}
catch (Exception ex)
{
2026-01-16 14:30:42 +08:00
_sysLog.Error($"[Dispatch] DeviceSpecific AppId={appId}, Dev={deviceId}: {ex.Message}");
}
}
}
// 2. 匹配预设的全局通道(兼容旧版订阅逻辑)
switch (appId.ToUpperInvariant())
{
case "UI_PREVIEW":
OnPreviewFrame?.Invoke(deviceId, frame);
break;
case "AI_ANALYSIS":
OnAnalysisFrame?.Invoke(deviceId, frame);
break;
}
}
// =========================================================================
// 2. [旁路通道] 扫描设备级订阅表 (NetworkService, 录像服务 等)
// 这是外部服务“被动”监听的目标,不在 targetAppIds 白名单里也要发
// =========================================================================
if (!_deviceSpecificTable.IsEmpty)
{
// 遍历所有注册了旁路监听的 AppId (例如 "NetService")
foreach (var kvp in _deviceSpecificTable)
{
string sidecarAppId = kvp.Key;
var deviceMap = kvp.Value;
// 优化:如果这个 AppId 已经在上面的 targetAppIds 里处理过了,就跳过,防止重复发送
// (例如:如果设备未来真的把 NetService 加入了白名单,这里就不重复发了)
if (targetAppIds.Contains(sidecarAppId)) continue;
// 检查这个 AppId 下,是否有人订阅了当前这台设备
if (deviceMap.TryGetValue(deviceId, out var handler))
{
try
{
handler.Invoke(frame);
// task.Context.AddLog($"帧任务 [Seq:{sequence}] 旁路投递到: {sidecarAppId}");
}
catch (Exception ex)
{
2026-01-16 14:30:42 +08:00
_sysLog.Error($"[Dispatch] App={sidecarAppId}, Dev={deviceId}: {ex.Message}");
}
}
}
}
// =========================================================================
// 3. [上帝通道] 全局广播
// =========================================================================
OnGlobalFrame?.Invoke(deviceId, frame);
// 分发完成后记录遥测数据
GlobalTelemetry.RecordLog(sequence, task.Context);
}
#endregion
#region Unsubscribe
/// <summary>
/// [新增重载] 强制取消订阅:直接移除指定 AppId 的整个路由项
/// 用途:当业务模块(如播放窗口)销毁时,彻底切断该 AppId 的数据流
/// </summary>
/// <param name="appId">业务唯一标识</param>
public static void Unsubscribe(string appId)
{
if (string.IsNullOrWhiteSpace(appId)) return;
// 直接从字典中移除 Key这将丢弃该 Key 下挂载的所有委托链
// TryRemove 是原子的、线程安全的
if (_routingTable.TryRemove(appId, out _))
{
2026-01-16 14:30:42 +08:00
_sysLog.Error($"[Dispatcher] 已强制移除 AppId [{appId}] 的所有订阅路由.");
}
}
#endregion
}