Files
Ayay/SHH.CameraSdk/Core/Pipeline/ProcessingPipeline.cs

153 lines
5.7 KiB
C#
Raw Normal View History

2026-01-16 14:30:42 +08:00
using Ayay.SerilogLogs;
using Serilog;
namespace SHH.CameraSdk;
/// <summary>
/// 帧处理管道(后台处理核心)
/// 功能:接收帧处理任务,在后台单线程执行二次处理(如打水印、裁剪),并分发至目标订阅者
/// 核心特性:
/// <para>1. 有界通道+DropWrite模式生产端永不阻塞管道满时丢弃新任务避免内存积压</para>
/// <para>2. 单线程处理CPU占用恒定避免多线程竞争导致的性能抖动</para>
/// <para>3. 引用计数管理:确保帧数据安全转移与释放,防止内存泄漏</para>
/// </summary>
public class ProcessingPipeline
{
#region --- (Private Resources & States) ---
2026-01-16 15:17:23 +08:00
private ILogger _sysLog = Log.ForContext("SourceContext", LogModules.Core);
2026-01-16 14:30:42 +08:00
/// <summary> 任务队列(有界通道):存储待处理的帧任务 </summary>
private readonly Channel<ProcessingTask> _queue;
/// <summary> 取消令牌源:用于终止后台处理循环 </summary>
private readonly CancellationTokenSource _cts = new();
#endregion
#region --- (Constructor & Initialization) ---
/// <summary>
/// 初始化帧处理管道
/// </summary>
/// <param name="capacity">管道最大容量超过该值时新任务将被丢弃DropWrite模式</param>
public ProcessingPipeline(int capacity)
{
// 创建有界通道,配置核心特性
_queue = Channel.CreateBounded<ProcessingTask>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.DropWrite, // 管道满时丢弃新写入的任务
SingleReader = true, // 单线程读取保证处理顺序与CPU稳定性
SingleWriter = false // 支持多线程写入(如多相机同时提交任务)
});
// 启动后台处理循环(长期运行任务,标记为 LongRunning 提升调度优先级)
Task.Factory.StartNew(ProcessLoopAsync, TaskCreationOptions.LongRunning);
}
#endregion
#region --- (Task Submission) ---
/// <summary>
/// 尝试提交帧处理任务到管道
/// 核心逻辑:非阻塞提交,失败时回滚帧引用计数,避免内存泄漏
/// </summary>
/// <param name="task">待处理的帧任务(包含帧数据、决策、追踪上下文)</param>
/// <returns>提交成功返回 true管道满导致提交失败返回 false</returns>
public bool TrySubmit(ProcessingTask task)
{
// 1. 帧引用计数+1将帧所有权从生产端转移到管道后台线程
task.Frame.AddRef();
try
{
// 2. 非阻塞写入管道:成功则任务进入队列等待处理
if (_queue.Writer.TryWrite(task))
{
return true;
}
// 3. 写入失败(管道满):回滚引用计数,释放帧内存
task.Frame.Dispose();
return false;
}
catch
{
// 异常场景下同样回滚引用计数,确保资源释放
task.Frame.Dispose();
return false;
}
}
#endregion
#region --- (Background Processing Loop) ---
/// <summary>
/// 后台处理循环:持续读取队列任务,执行二次处理与分发
/// </summary>
private async Task ProcessLoopAsync()
{
try
{
// 异步遍历队列:收到取消信号时退出循环
await foreach (var task in _queue.Reader.ReadAllAsync(_cts.Token))
{
// 使用 using 语句:处理完成后自动调用 Frame.Dispose(),引用计数-1
using (task.Frame)
{
// 执行具体的帧处理逻辑
ExecuteProcessing(task);
}
}
}
catch (OperationCanceledException)
{
// 收到取消信号,正常退出循环,无需处理
}
}
#endregion
#region --- (Frame Processing Execution) ---
/// <summary>
/// 执行帧二次处理与分发
/// 功能:对帧进行自定义加工(如打水印、格式转换),并通过分发器发送至目标订阅者
/// </summary>
/// <param name="task">待处理的帧任务</param>
private void ExecuteProcessing(ProcessingTask task)
{
try
{
// --- 二次处理车间可添加自定义加工逻辑10ms-50ms 耗时操作安全) ---
// 示例:给帧添加序列号水印(按需启用)
// string watermarkText = $"SEQ:{task.Decision.Sequence}";
// Cv2.PutText(
// img: task.Frame.InternalMat,
// text: watermarkText,
// org: new Point(10, 50),
// fontFace: HersheyFonts.HersheySimplex,
// fontScale: 1,
// color: Scalar.Red,
// thickness: 2
// );
// --- 帧分发:将处理后的帧交给全局分发器,按决策分发至目标订阅者 ---
GlobalStreamDispatcher.Dispatch(task);
}
catch (Exception ex)
{
// 捕获处理过程中的异常,避免影响后续任务执行
2026-01-16 14:30:42 +08:00
_sysLog.Error($"[Pipeline] 帧处理失败 (DeviceId: {task.DeviceId}, Seq: {task.Decision.Sequence}): {ex.Message}");
}
finally
{
// 归档追踪日志:将帧处理上下文存入全局遥测,支持后续排查与分析
GlobalTelemetry.RecordLog(task.Decision.Sequence, task.Context);
}
}
#endregion
}