Files
Ayay/SHH.CameraSdk/Core/Pipeline/ProcessingPipeline.cs
2026-01-16 14:30:42 +08:00

153 lines
5.7 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Ayay.SerilogLogs;
using Serilog;
namespace SHH.CameraSdk;
/// <summary>
/// 帧处理管道(后台处理核心)
/// 功能:接收帧处理任务,在后台单线程执行二次处理(如打水印、裁剪),并分发至目标订阅者
/// 核心特性:
/// <para>1. 有界通道+DropWrite模式生产端永不阻塞管道满时丢弃新任务避免内存积压</para>
/// <para>2. 单线程处理CPU占用恒定避免多线程竞争导致的性能抖动</para>
/// <para>3. 引用计数管理:确保帧数据安全转移与释放,防止内存泄漏</para>
/// </summary>
public class ProcessingPipeline
{
#region --- (Private Resources & States) ---
private static ILogger _sysLog = Log.ForContext("SourceContext", LogModules.Core);
/// <summary> 任务队列(有界通道):存储待处理的帧任务 </summary>
private readonly Channel<ProcessingTask> _queue;
/// <summary> 取消令牌源:用于终止后台处理循环 </summary>
private readonly CancellationTokenSource _cts = new();
#endregion
#region --- (Constructor & Initialization) ---
/// <summary>
/// 初始化帧处理管道
/// </summary>
/// <param name="capacity">管道最大容量超过该值时新任务将被丢弃DropWrite模式</param>
public ProcessingPipeline(int capacity)
{
// 创建有界通道,配置核心特性
_queue = Channel.CreateBounded<ProcessingTask>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.DropWrite, // 管道满时丢弃新写入的任务
SingleReader = true, // 单线程读取保证处理顺序与CPU稳定性
SingleWriter = false // 支持多线程写入(如多相机同时提交任务)
});
// 启动后台处理循环(长期运行任务,标记为 LongRunning 提升调度优先级)
Task.Factory.StartNew(ProcessLoopAsync, TaskCreationOptions.LongRunning);
}
#endregion
#region --- (Task Submission) ---
/// <summary>
/// 尝试提交帧处理任务到管道
/// 核心逻辑:非阻塞提交,失败时回滚帧引用计数,避免内存泄漏
/// </summary>
/// <param name="task">待处理的帧任务(包含帧数据、决策、追踪上下文)</param>
/// <returns>提交成功返回 true管道满导致提交失败返回 false</returns>
public bool TrySubmit(ProcessingTask task)
{
// 1. 帧引用计数+1将帧所有权从生产端转移到管道后台线程
task.Frame.AddRef();
try
{
// 2. 非阻塞写入管道:成功则任务进入队列等待处理
if (_queue.Writer.TryWrite(task))
{
return true;
}
// 3. 写入失败(管道满):回滚引用计数,释放帧内存
task.Frame.Dispose();
return false;
}
catch
{
// 异常场景下同样回滚引用计数,确保资源释放
task.Frame.Dispose();
return false;
}
}
#endregion
#region --- (Background Processing Loop) ---
/// <summary>
/// 后台处理循环:持续读取队列任务,执行二次处理与分发
/// </summary>
private async Task ProcessLoopAsync()
{
try
{
// 异步遍历队列:收到取消信号时退出循环
await foreach (var task in _queue.Reader.ReadAllAsync(_cts.Token))
{
// 使用 using 语句:处理完成后自动调用 Frame.Dispose(),引用计数-1
using (task.Frame)
{
// 执行具体的帧处理逻辑
ExecuteProcessing(task);
}
}
}
catch (OperationCanceledException)
{
// 收到取消信号,正常退出循环,无需处理
}
}
#endregion
#region --- (Frame Processing Execution) ---
/// <summary>
/// 执行帧二次处理与分发
/// 功能:对帧进行自定义加工(如打水印、格式转换),并通过分发器发送至目标订阅者
/// </summary>
/// <param name="task">待处理的帧任务</param>
private void ExecuteProcessing(ProcessingTask task)
{
try
{
// --- 二次处理车间可添加自定义加工逻辑10ms-50ms 耗时操作安全) ---
// 示例:给帧添加序列号水印(按需启用)
// string watermarkText = $"SEQ:{task.Decision.Sequence}";
// Cv2.PutText(
// img: task.Frame.InternalMat,
// text: watermarkText,
// org: new Point(10, 50),
// fontFace: HersheyFonts.HersheySimplex,
// fontScale: 1,
// color: Scalar.Red,
// thickness: 2
// );
// --- 帧分发:将处理后的帧交给全局分发器,按决策分发至目标订阅者 ---
GlobalStreamDispatcher.Dispatch(task);
}
catch (Exception ex)
{
// 捕获处理过程中的异常,避免影响后续任务执行
_sysLog.Error($"[Pipeline] 帧处理失败 (DeviceId: {task.DeviceId}, Seq: {task.Decision.Sequence}): {ex.Message}");
}
finally
{
// 归档追踪日志:将帧处理上下文存入全局遥测,支持后续排查与分析
GlobalTelemetry.RecordLog(task.Decision.Sequence, task.Context);
}
}
#endregion
}