海康摄像头取流示例初始签入
This commit is contained in:
64
SHH.CameraSdk/Core/Pipeline/GlobalProcessingCenter.cs
Normal file
64
SHH.CameraSdk/Core/Pipeline/GlobalProcessingCenter.cs
Normal file
@@ -0,0 +1,64 @@
|
||||
using SHH.CameraSdk;
|
||||
|
||||
/// <summary>
|
||||
/// 全局帧处理中心(静态类)
|
||||
/// 功能:接收驱动层的帧数据与决策,封装为处理任务并投递至处理管道,是驱动层与处理管道的桥梁
|
||||
/// 核心修复:初始化全链路追踪上下文并绑定到任务,避免空引用异常;管道满时记录丢弃日志并释放帧资源
|
||||
/// </summary>
|
||||
public static class GlobalProcessingCenter
|
||||
{
|
||||
#region --- 私有资源 (Private Resources) ---
|
||||
|
||||
/// <summary> 全局帧处理管道实例 </summary>
|
||||
/// <remarks> 固定容量 50,采用 DropWrite 模式,生产端永不阻塞 </remarks>
|
||||
private static readonly ProcessingPipeline _pipeline = new ProcessingPipeline(capacity: 50);
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 核心任务提交 (Core Task Submission) ---
|
||||
|
||||
/// <summary>
|
||||
/// 提交帧数据与决策到处理中心
|
||||
/// 功能:封装帧为处理任务,初始化追踪上下文,投递至管道;投递失败时记录丢弃日志并释放资源
|
||||
/// </summary>
|
||||
/// <param name="deviceId">产生帧的设备唯一标识</param>
|
||||
/// <param name="frame">待处理的智能帧数据</param>
|
||||
/// <param name="decision">帧处理决策(包含是否保留、分发目标等信息)</param>
|
||||
public static void Submit(long deviceId, SmartFrame frame, FrameDecision decision)
|
||||
{
|
||||
// 1. 初始化全链路追踪上下文:绑定决策信息,记录帧进入处理中心的初始日志
|
||||
var context = new FrameContext
|
||||
{
|
||||
FrameSequence = decision.Sequence,
|
||||
Timestamp = decision.Timestamp,
|
||||
IsCaptured = true,
|
||||
TargetAppIds = decision.TargetAppIds // 记录帧的分发目标列表
|
||||
};
|
||||
// 添加初始日志:标记帧由驱动层提交至处理中心
|
||||
context.AddLog("Driver: Submitted to Global Center");
|
||||
|
||||
// 2. 封装为处理任务:关联设备ID、帧数据、决策、追踪上下文
|
||||
var task = new ProcessingTask
|
||||
{
|
||||
DeviceId = deviceId,
|
||||
Frame = frame,
|
||||
Decision = decision,
|
||||
Context = context // 绑定上下文,修复空引用问题
|
||||
};
|
||||
|
||||
// 3. 尝试投递任务到处理管道
|
||||
if (!_pipeline.TrySubmit(task))
|
||||
{
|
||||
// 投递失败:管道已满,记录丢弃原因并更新上下文状态
|
||||
context.DropReason = "GlobalPipelineFull";
|
||||
context.IsCaptured = false;
|
||||
// 归档丢弃日志到全局遥测,用于问题排查
|
||||
GlobalTelemetry.RecordLog(decision.Sequence, context);
|
||||
|
||||
// 释放帧资源:避免内存泄漏
|
||||
frame.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
157
SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs
Normal file
157
SHH.CameraSdk/Core/Pipeline/GlobalStreamDispatcher.cs
Normal file
@@ -0,0 +1,157 @@
|
||||
namespace SHH.CameraSdk;
|
||||
|
||||
/// <summary>
|
||||
/// 全局流分发器(静态类 | 线程安全)
|
||||
/// 核心职责:
|
||||
/// <para>1. 接收处理完成的帧任务,基于 AppId 路由策略实现帧的精准定向分发</para>
|
||||
/// <para>2. 隔离 UI 预览、AI 分析等不同消费场景,支撑多模块并行消费</para>
|
||||
/// 设计特性:
|
||||
/// <para>✅ 线程安全:基于 ConcurrentDictionary 实现并发订阅/取消订阅</para>
|
||||
/// <para>✅ 精准路由:按 TargetAppIds 点对点投递,避免广播风暴</para>
|
||||
/// <para>✅ 异常隔离:单个订阅者异常不影响其他模块消费</para>
|
||||
/// </summary>
|
||||
public static class GlobalStreamDispatcher
|
||||
{
|
||||
#region --- 1. 预设订阅通道 (Predefined Subscription Channels) ---
|
||||
|
||||
/// <summary>
|
||||
/// UI 预览订阅通道:供 UI 模块订阅帧数据,用于实时画面显示
|
||||
/// 回调参数:(设备唯一标识, 处理后的智能帧数据)
|
||||
/// 特性:低延迟优先,支持画面渲染相关的轻量级处理
|
||||
/// </summary>
|
||||
public static event Action<long, SmartFrame>? OnPreviewFrame;
|
||||
|
||||
/// <summary>
|
||||
/// AI 分析订阅通道:供 AI 模块订阅帧数据,用于行为识别/人脸检测/车牌识别等
|
||||
/// 回调参数:(设备唯一标识, 处理后的智能帧数据)
|
||||
/// 特性:高吞吐优先,支持复杂算法处理,延迟容忍度较高
|
||||
/// </summary>
|
||||
public static event Action<long, SmartFrame>? OnAnalysisFrame;
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 2. 动态路由表 (Dynamic Routing Table) ---
|
||||
|
||||
/// <summary>
|
||||
/// 动态订阅路由表:Key = 业务 AppId,Value = 帧处理多播委托
|
||||
/// 实现:ConcurrentDictionary 保证高并发场景下的读写安全
|
||||
/// 用途:支持自定义业务模块的精准订阅,扩展帧消费能力
|
||||
/// </summary>
|
||||
private static readonly ConcurrentDictionary<string, Action<long, SmartFrame>> _routingTable = new();
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 3. 订阅管理接口 (Subscription Management API) ---
|
||||
|
||||
/// <summary>
|
||||
/// 精准订阅:为指定 AppId 注册帧处理回调
|
||||
/// 线程安全:支持多线程并发调用,委托自动合并(多播)
|
||||
/// </summary>
|
||||
/// <param name="appId">业务唯一标识(需与 FrameController.Register 中的 AppId 一致)</param>
|
||||
/// <param name="handler">帧处理回调函数</param>
|
||||
/// <exception cref="ArgumentNullException">appId 或 handler 为空时抛出</exception>
|
||||
public static void Subscribe(string appId, Action<long, SmartFrame> handler)
|
||||
{
|
||||
// 入参合法性校验
|
||||
if (string.IsNullOrWhiteSpace(appId))
|
||||
throw new ArgumentNullException(nameof(appId), "AppId 不能为空");
|
||||
if (handler == null)
|
||||
throw new ArgumentNullException(nameof(handler), "帧处理回调不能为空");
|
||||
|
||||
// 线程安全添加/更新委托:新订阅追加,重复订阅合并
|
||||
_routingTable.AddOrUpdate(
|
||||
key: appId,
|
||||
addValue: handler,
|
||||
updateValueFactory: (_, existingHandler) => existingHandler + handler
|
||||
);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 取消订阅:移除指定 AppId 的帧处理回调
|
||||
/// 线程安全:支持多线程并发调用,无订阅时静默处理
|
||||
/// </summary>
|
||||
/// <param name="appId">业务唯一标识</param>
|
||||
/// <param name="handler">需要移除的帧处理回调</param>
|
||||
public static void Unsubscribe(string appId, Action<long, SmartFrame> handler)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(appId) || handler == null)
|
||||
return;
|
||||
|
||||
// 尝试获取当前委托并移除目标回调
|
||||
if (_routingTable.TryGetValue(appId, out var currentHandler))
|
||||
{
|
||||
var updatedHandler = currentHandler - handler;
|
||||
if (updatedHandler == null)
|
||||
{
|
||||
// 委托为空时移除路由项,避免内存泄漏
|
||||
_routingTable.TryRemove(appId, out _);
|
||||
}
|
||||
else
|
||||
{
|
||||
// 委托非空时更新路由表
|
||||
_routingTable.TryUpdate(appId, updatedHandler, currentHandler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 4. 核心分发逻辑 (Core Dispatch Logic) ---
|
||||
|
||||
/// <summary>
|
||||
/// 帧任务分发入口:基于任务的 TargetAppIds 实现精准点对点投递
|
||||
/// 核心优化:摒弃广播模式,仅投递到指定订阅者,降低系统资源消耗
|
||||
/// </summary>
|
||||
/// <param name="task">处理完成的帧任务(包含目标 AppId 列表、帧数据、上下文)</param>
|
||||
/// <exception cref="ArgumentNullException">task 为空时抛出</exception>
|
||||
public static void Dispatch(ProcessingTask task)
|
||||
{
|
||||
// 入参合法性校验
|
||||
if (task == null)
|
||||
throw new ArgumentNullException(nameof(task), "帧任务不能为空");
|
||||
|
||||
var deviceId = task.DeviceId;
|
||||
var frame = task.Frame;
|
||||
var targetAppIds = task.Decision.TargetAppIds;
|
||||
var sequence = task.Decision.Sequence;
|
||||
|
||||
// 记录分发日志
|
||||
task.Context.AddLog($"开始分发帧任务 [Seq:{sequence}],目标 AppId 列表:[{string.Join(", ", targetAppIds)}]");
|
||||
|
||||
// 遍历目标 AppId 列表,执行精准投递
|
||||
foreach (var appId in targetAppIds)
|
||||
{
|
||||
// 1. 优先匹配动态路由表中的自定义订阅者
|
||||
if (_routingTable.TryGetValue(appId, out var customHandler))
|
||||
{
|
||||
try
|
||||
{
|
||||
customHandler.Invoke(deviceId, frame);
|
||||
task.Context.AddLog($"帧任务 [Seq:{sequence}] 成功投递到自定义 AppId: {appId}");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// 单个订阅者异常隔离,不影响其他分发流程
|
||||
task.Context.AddLog($"帧任务 [Seq:{sequence}] 投递到 AppId:{appId} 失败:{ex.Message}");
|
||||
Console.WriteLine($"[DispatchError] AppId={appId}, DeviceId={deviceId}, Error={ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 匹配预设的全局通道(兼容旧版订阅逻辑)
|
||||
switch (appId.ToUpperInvariant())
|
||||
{
|
||||
case "UI_PREVIEW":
|
||||
OnPreviewFrame?.Invoke(deviceId, frame);
|
||||
break;
|
||||
case "AI_ANALYSIS":
|
||||
OnAnalysisFrame?.Invoke(deviceId, frame);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// 分发完成后记录遥测数据
|
||||
GlobalTelemetry.RecordLog(sequence, task.Context);
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
148
SHH.CameraSdk/Core/Pipeline/ProcessingPipeline.cs
Normal file
148
SHH.CameraSdk/Core/Pipeline/ProcessingPipeline.cs
Normal file
@@ -0,0 +1,148 @@
|
||||
namespace SHH.CameraSdk;
|
||||
|
||||
/// <summary>
|
||||
/// 帧处理管道(后台处理核心)
|
||||
/// 功能:接收帧处理任务,在后台单线程执行二次处理(如打水印、裁剪),并分发至目标订阅者
|
||||
/// 核心特性:
|
||||
/// <para>1. 有界通道+DropWrite模式:生产端永不阻塞,管道满时丢弃新任务,避免内存积压</para>
|
||||
/// <para>2. 单线程处理:CPU占用恒定,避免多线程竞争导致的性能抖动</para>
|
||||
/// <para>3. 引用计数管理:确保帧数据安全转移与释放,防止内存泄漏</para>
|
||||
/// </summary>
|
||||
public class ProcessingPipeline
|
||||
{
|
||||
#region --- 私有资源与状态 (Private Resources & States) ---
|
||||
|
||||
/// <summary> 任务队列(有界通道):存储待处理的帧任务 </summary>
|
||||
private readonly Channel<ProcessingTask> _queue;
|
||||
|
||||
/// <summary> 取消令牌源:用于终止后台处理循环 </summary>
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 构造与初始化 (Constructor & Initialization) ---
|
||||
|
||||
/// <summary>
|
||||
/// 初始化帧处理管道
|
||||
/// </summary>
|
||||
/// <param name="capacity">管道最大容量:超过该值时,新任务将被丢弃(DropWrite模式)</param>
|
||||
public ProcessingPipeline(int capacity)
|
||||
{
|
||||
// 创建有界通道,配置核心特性
|
||||
_queue = Channel.CreateBounded<ProcessingTask>(new BoundedChannelOptions(capacity)
|
||||
{
|
||||
FullMode = BoundedChannelFullMode.DropWrite, // 管道满时丢弃新写入的任务
|
||||
SingleReader = true, // 单线程读取,保证处理顺序与CPU稳定性
|
||||
SingleWriter = false // 支持多线程写入(如多相机同时提交任务)
|
||||
});
|
||||
|
||||
// 启动后台处理循环(长期运行任务,标记为 LongRunning 提升调度优先级)
|
||||
Task.Factory.StartNew(ProcessLoopAsync, TaskCreationOptions.LongRunning);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 任务提交 (Task Submission) ---
|
||||
|
||||
/// <summary>
|
||||
/// 尝试提交帧处理任务到管道
|
||||
/// 核心逻辑:非阻塞提交,失败时回滚帧引用计数,避免内存泄漏
|
||||
/// </summary>
|
||||
/// <param name="task">待处理的帧任务(包含帧数据、决策、追踪上下文)</param>
|
||||
/// <returns>提交成功返回 true,管道满导致提交失败返回 false</returns>
|
||||
public bool TrySubmit(ProcessingTask task)
|
||||
{
|
||||
// 1. 帧引用计数+1:将帧所有权从生产端转移到管道后台线程
|
||||
task.Frame.AddRef();
|
||||
|
||||
try
|
||||
{
|
||||
// 2. 非阻塞写入管道:成功则任务进入队列等待处理
|
||||
if (_queue.Writer.TryWrite(task))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// 3. 写入失败(管道满):回滚引用计数,释放帧内存
|
||||
task.Frame.Dispose();
|
||||
return false;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// 异常场景下同样回滚引用计数,确保资源释放
|
||||
task.Frame.Dispose();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 后台处理循环 (Background Processing Loop) ---
|
||||
|
||||
/// <summary>
|
||||
/// 后台处理循环:持续读取队列任务,执行二次处理与分发
|
||||
/// </summary>
|
||||
private async Task ProcessLoopAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
// 异步遍历队列:收到取消信号时退出循环
|
||||
await foreach (var task in _queue.Reader.ReadAllAsync(_cts.Token))
|
||||
{
|
||||
// 使用 using 语句:处理完成后自动调用 Frame.Dispose(),引用计数-1
|
||||
using (task.Frame)
|
||||
{
|
||||
// 执行具体的帧处理逻辑
|
||||
ExecuteProcessing(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// 收到取消信号,正常退出循环,无需处理
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 帧处理执行 (Frame Processing Execution) ---
|
||||
|
||||
/// <summary>
|
||||
/// 执行帧二次处理与分发
|
||||
/// 功能:对帧进行自定义加工(如打水印、格式转换),并通过分发器发送至目标订阅者
|
||||
/// </summary>
|
||||
/// <param name="task">待处理的帧任务</param>
|
||||
private void ExecuteProcessing(ProcessingTask task)
|
||||
{
|
||||
try
|
||||
{
|
||||
// --- 二次处理车间:可添加自定义加工逻辑(10ms-50ms 耗时操作安全) ---
|
||||
// 示例:给帧添加序列号水印(按需启用)
|
||||
// string watermarkText = $"SEQ:{task.Decision.Sequence}";
|
||||
// Cv2.PutText(
|
||||
// img: task.Frame.InternalMat,
|
||||
// text: watermarkText,
|
||||
// org: new Point(10, 50),
|
||||
// fontFace: HersheyFonts.HersheySimplex,
|
||||
// fontScale: 1,
|
||||
// color: Scalar.Red,
|
||||
// thickness: 2
|
||||
// );
|
||||
|
||||
// --- 帧分发:将处理后的帧交给全局分发器,按决策分发至目标订阅者 ---
|
||||
GlobalStreamDispatcher.Dispatch(task);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// 捕获处理过程中的异常,避免影响后续任务执行
|
||||
Console.WriteLine($"[PipelineError] 帧处理失败 (DeviceId: {task.DeviceId}, Seq: {task.Decision.Sequence}): {ex.Message}");
|
||||
}
|
||||
finally
|
||||
{
|
||||
// 归档追踪日志:将帧处理上下文存入全局遥测,支持后续排查与分析
|
||||
GlobalTelemetry.RecordLog(task.Decision.Sequence, task.Context);
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
40
SHH.CameraSdk/Core/Pipeline/ProcessingTask.cs
Normal file
40
SHH.CameraSdk/Core/Pipeline/ProcessingTask.cs
Normal file
@@ -0,0 +1,40 @@
|
||||
namespace SHH.CameraSdk;
|
||||
|
||||
/// <summary>
|
||||
/// 帧处理任务模型
|
||||
/// 功能:封装单帧数据的处理任务信息,包含帧数据、分发决策、全链路追踪上下文,是帧处理管道的核心数据载体
|
||||
/// 用途:在全局处理中心与分发器之间传递,串联帧的二次处理、分发与追踪流程
|
||||
/// </summary>
|
||||
public class ProcessingTask
|
||||
{
|
||||
#region --- 任务核心标识 (Task Core Identification) ---
|
||||
|
||||
/// <summary> 设备唯一标识(关联产生该帧的相机设备ID) </summary>
|
||||
public long DeviceId { get; set; }
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 帧核心数据 (Frame Core Data) ---
|
||||
|
||||
/// <summary> 待处理的智能帧数据(包含原始图像数据与引用计数管理) </summary>
|
||||
/// <remarks> 非空约束:任务必须关联有效帧数据,不可为 null </remarks>
|
||||
public SmartFrame Frame { get; set; } = null!;
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 帧分发决策 (Frame Distribution Decision) ---
|
||||
|
||||
/// <summary> 帧处理决策信息(包含是否保留帧、分发目标AppId列表等) </summary>
|
||||
/// <remarks> 非空约束:任务必须携带决策信息,指导后续分发逻辑 </remarks>
|
||||
public FrameDecision Decision { get; set; } = null!;
|
||||
|
||||
#endregion
|
||||
|
||||
#region --- 全链路追踪上下文 (Full-Link Tracing Context) ---
|
||||
|
||||
/// <summary> 帧全链路追踪上下文(用于记录帧处理过程中的日志、耗时、状态等信息) </summary>
|
||||
/// <remarks> 非空约束:支持通过 AddLog 方法补充追踪日志,支撑问题排查 </remarks>
|
||||
public FrameContext Context { get; set; } = null!;
|
||||
|
||||
#endregion
|
||||
}
|
||||
Reference in New Issue
Block a user