180 lines
7.4 KiB
C#
180 lines
7.4 KiB
C#
namespace SHH.CameraSdk;
|
||
|
||
/// <summary>
|
||
/// 全局流分发器(静态类 | 线程安全)
|
||
/// 核心职责:
|
||
/// <para>1. 接收处理完成的帧任务,基于 AppId 路由策略实现帧的精准定向分发</para>
|
||
/// <para>2. 隔离 UI 预览、AI 分析等不同消费场景,支撑多模块并行消费</para>
|
||
/// 设计特性:
|
||
/// <para>✅ 线程安全:基于 ConcurrentDictionary 实现并发订阅/取消订阅</para>
|
||
/// <para>✅ 精准路由:按 TargetAppIds 点对点投递,避免广播风暴</para>
|
||
/// <para>✅ 异常隔离:单个订阅者异常不影响其他模块消费</para>
|
||
/// </summary>
|
||
public static class GlobalStreamDispatcher
|
||
{
|
||
#region --- 1. 预设订阅通道 (Predefined Subscription Channels) ---
|
||
|
||
/// <summary>
|
||
/// UI 预览订阅通道:供 UI 模块订阅帧数据,用于实时画面显示
|
||
/// 回调参数:(设备唯一标识, 处理后的智能帧数据)
|
||
/// 特性:低延迟优先,支持画面渲染相关的轻量级处理
|
||
/// </summary>
|
||
public static event Action<long, SmartFrame>? OnPreviewFrame;
|
||
|
||
/// <summary>
|
||
/// AI 分析订阅通道:供 AI 模块订阅帧数据,用于行为识别/人脸检测/车牌识别等
|
||
/// 回调参数:(设备唯一标识, 处理后的智能帧数据)
|
||
/// 特性:高吞吐优先,支持复杂算法处理,延迟容忍度较高
|
||
/// </summary>
|
||
public static event Action<long, SmartFrame>? OnAnalysisFrame;
|
||
|
||
#endregion
|
||
|
||
#region --- 2. 动态路由表 (Dynamic Routing Table) ---
|
||
|
||
/// <summary>
|
||
/// 动态订阅路由表:Key = 业务 AppId,Value = 帧处理多播委托
|
||
/// 实现:ConcurrentDictionary 保证高并发场景下的读写安全
|
||
/// 用途:支持自定义业务模块的精准订阅,扩展帧消费能力
|
||
/// </summary>
|
||
private static readonly ConcurrentDictionary<string, Action<long, SmartFrame>> _routingTable = new();
|
||
|
||
#endregion
|
||
|
||
#region --- 3. 订阅管理接口 (Subscription Management API) ---
|
||
|
||
/// <summary>
|
||
/// 精准订阅:为指定 AppId 注册帧处理回调
|
||
/// 线程安全:支持多线程并发调用,委托自动合并(多播)
|
||
/// </summary>
|
||
/// <param name="appId">业务唯一标识(需与 FrameController.Register 中的 AppId 一致)</param>
|
||
/// <param name="handler">帧处理回调函数</param>
|
||
/// <exception cref="ArgumentNullException">appId 或 handler 为空时抛出</exception>
|
||
public static void Subscribe(string appId, Action<long, SmartFrame> handler)
|
||
{
|
||
// 入参合法性校验
|
||
if (string.IsNullOrWhiteSpace(appId))
|
||
throw new ArgumentNullException(nameof(appId), "AppId 不能为空");
|
||
if (handler == null)
|
||
throw new ArgumentNullException(nameof(handler), "帧处理回调不能为空");
|
||
|
||
// 线程安全添加/更新委托:新订阅追加,重复订阅合并
|
||
_routingTable.AddOrUpdate(
|
||
key: appId,
|
||
addValue: handler,
|
||
updateValueFactory: (_, existingHandler) => existingHandler + handler
|
||
);
|
||
}
|
||
|
||
/// <summary>
|
||
/// [新增] 精准订阅:仅监听指定设备的特定 AppId 帧
|
||
/// 优势:内部自动过滤 DeviceId,回调函数无需再写 if 判断
|
||
/// </summary>
|
||
/// <param name="appId">需求标识</param>
|
||
/// <param name="specificDeviceId">只接收此设备的帧</param>
|
||
/// <param name="handler">处理回调(注意:此处签名不含 deviceId,因为已隐式确定)</param>
|
||
public static void Subscribe(string appId, long specificDeviceId, Action<SmartFrame> handler)
|
||
{
|
||
// 创建一个“过滤器”闭包
|
||
Action<long, SmartFrame> wrapper = (id, frame) =>
|
||
{
|
||
// 只有当来源 ID 与订阅 ID 一致时,才触发用户的业务回调
|
||
if (id == specificDeviceId)
|
||
{
|
||
handler(frame);
|
||
}
|
||
};
|
||
|
||
// 将过滤器注册到基础路由表中
|
||
Subscribe(appId, wrapper);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 取消订阅:移除指定 AppId 的帧处理回调
|
||
/// 线程安全:支持多线程并发调用,无订阅时静默处理
|
||
/// </summary>
|
||
/// <param name="appId">业务唯一标识</param>
|
||
/// <param name="handler">需要移除的帧处理回调</param>
|
||
public static void Unsubscribe(string appId, Action<long, SmartFrame> handler)
|
||
{
|
||
if (string.IsNullOrWhiteSpace(appId) || handler == null)
|
||
return;
|
||
|
||
// 尝试获取当前委托并移除目标回调
|
||
if (_routingTable.TryGetValue(appId, out var currentHandler))
|
||
{
|
||
var updatedHandler = currentHandler - handler;
|
||
if (updatedHandler == null)
|
||
{
|
||
// 委托为空时移除路由项,避免内存泄漏
|
||
_routingTable.TryRemove(appId, out _);
|
||
}
|
||
else
|
||
{
|
||
// 委托非空时更新路由表
|
||
_routingTable.TryUpdate(appId, updatedHandler, currentHandler);
|
||
}
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 4. 核心分发逻辑 (Core Dispatch Logic) ---
|
||
|
||
/// <summary>
|
||
/// 帧任务分发入口:基于任务的 TargetAppIds 实现精准点对点投递
|
||
/// 核心优化:摒弃广播模式,仅投递到指定订阅者,降低系统资源消耗
|
||
/// </summary>
|
||
/// <param name="task">处理完成的帧任务(包含目标 AppId 列表、帧数据、上下文)</param>
|
||
/// <exception cref="ArgumentNullException">task 为空时抛出</exception>
|
||
public static void Dispatch(ProcessingTask task)
|
||
{
|
||
// 入参合法性校验
|
||
if (task == null)
|
||
throw new ArgumentNullException(nameof(task), "帧任务不能为空");
|
||
|
||
var deviceId = task.DeviceId;
|
||
var frame = task.Frame;
|
||
var targetAppIds = task.Decision.TargetAppIds;
|
||
var sequence = task.Decision.Sequence;
|
||
|
||
// 记录分发日志
|
||
task.Context.AddLog($"开始分发帧任务 [Seq:{sequence}],目标 AppId 列表:[{string.Join(", ", targetAppIds)}]");
|
||
|
||
// 遍历目标 AppId 列表,执行精准投递
|
||
foreach (var appId in targetAppIds)
|
||
{
|
||
// 1. 优先匹配动态路由表中的自定义订阅者
|
||
if (_routingTable.TryGetValue(appId, out var customHandler))
|
||
{
|
||
try
|
||
{
|
||
customHandler.Invoke(deviceId, frame);
|
||
task.Context.AddLog($"帧任务 [Seq:{sequence}] 成功投递到自定义 AppId: {appId}");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
// 单个订阅者异常隔离,不影响其他分发流程
|
||
task.Context.AddLog($"帧任务 [Seq:{sequence}] 投递到 AppId:{appId} 失败:{ex.Message}");
|
||
Console.WriteLine($"[DispatchError] AppId={appId}, DeviceId={deviceId}, Error={ex.Message}");
|
||
}
|
||
}
|
||
|
||
// 2. 匹配预设的全局通道(兼容旧版订阅逻辑)
|
||
switch (appId.ToUpperInvariant())
|
||
{
|
||
case "UI_PREVIEW":
|
||
OnPreviewFrame?.Invoke(deviceId, frame);
|
||
break;
|
||
case "AI_ANALYSIS":
|
||
OnAnalysisFrame?.Invoke(deviceId, frame);
|
||
break;
|
||
}
|
||
}
|
||
|
||
// 分发完成后记录遥测数据
|
||
GlobalTelemetry.RecordLog(sequence, task.Context);
|
||
}
|
||
|
||
#endregion
|
||
} |