using Ayay.SerilogLogs;
using Serilog;
namespace SHH.CameraSdk;
///
/// 帧处理管道(后台处理核心)
/// 功能:接收帧处理任务,在后台单线程执行二次处理(如打水印、裁剪),并分发至目标订阅者
/// 核心特性:
/// 1. 有界通道+DropWrite模式:生产端永不阻塞,管道满时丢弃新任务,避免内存积压
/// 2. 单线程处理:CPU占用恒定,避免多线程竞争导致的性能抖动
/// 3. 引用计数管理:确保帧数据安全转移与释放,防止内存泄漏
///
public class ProcessingPipeline
{
#region --- 私有资源与状态 (Private Resources & States) ---
private ILogger _sysLog = Log.ForContext("SourceContext", LogModules.Core);
/// 任务队列(有界通道):存储待处理的帧任务
private readonly Channel _queue;
/// 取消令牌源:用于终止后台处理循环
private readonly CancellationTokenSource _cts = new();
#endregion
#region --- 构造与初始化 (Constructor & Initialization) ---
///
/// 初始化帧处理管道
///
/// 管道最大容量:超过该值时,新任务将被丢弃(DropWrite模式)
public ProcessingPipeline(int capacity)
{
// 创建有界通道,配置核心特性
_queue = Channel.CreateBounded(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.DropWrite, // 管道满时丢弃新写入的任务
SingleReader = true, // 单线程读取,保证处理顺序与CPU稳定性
SingleWriter = false // 支持多线程写入(如多相机同时提交任务)
});
// 启动后台处理循环(长期运行任务,标记为 LongRunning 提升调度优先级)
Task.Factory.StartNew(ProcessLoopAsync, TaskCreationOptions.LongRunning);
}
#endregion
#region --- 任务提交 (Task Submission) ---
///
/// 尝试提交帧处理任务到管道
/// 核心逻辑:非阻塞提交,失败时回滚帧引用计数,避免内存泄漏
///
/// 待处理的帧任务(包含帧数据、决策、追踪上下文)
/// 提交成功返回 true,管道满导致提交失败返回 false
public bool TrySubmit(ProcessingTask task)
{
// 1. 帧引用计数+1:将帧所有权从生产端转移到管道后台线程
task.Frame.AddRef();
try
{
// 2. 非阻塞写入管道:成功则任务进入队列等待处理
if (_queue.Writer.TryWrite(task))
{
return true;
}
// 3. 写入失败(管道满):回滚引用计数,释放帧内存
task.Frame.Dispose();
return false;
}
catch
{
// 异常场景下同样回滚引用计数,确保资源释放
task.Frame.Dispose();
return false;
}
}
#endregion
#region --- 后台处理循环 (Background Processing Loop) ---
///
/// 后台处理循环:持续读取队列任务,执行二次处理与分发
///
private async Task ProcessLoopAsync()
{
try
{
// 异步遍历队列:收到取消信号时退出循环
await foreach (var task in _queue.Reader.ReadAllAsync(_cts.Token))
{
// 使用 using 语句:处理完成后自动调用 Frame.Dispose(),引用计数-1
using (task.Frame)
{
// 执行具体的帧处理逻辑
ExecuteProcessing(task);
}
}
}
catch (OperationCanceledException)
{
// 收到取消信号,正常退出循环,无需处理
}
}
#endregion
#region --- 帧处理执行 (Frame Processing Execution) ---
///
/// 执行帧二次处理与分发
/// 功能:对帧进行自定义加工(如打水印、格式转换),并通过分发器发送至目标订阅者
///
/// 待处理的帧任务
private void ExecuteProcessing(ProcessingTask task)
{
try
{
// --- 二次处理车间:可添加自定义加工逻辑(10ms-50ms 耗时操作安全) ---
// 示例:给帧添加序列号水印(按需启用)
// string watermarkText = $"SEQ:{task.Decision.Sequence}";
// Cv2.PutText(
// img: task.Frame.InternalMat,
// text: watermarkText,
// org: new Point(10, 50),
// fontFace: HersheyFonts.HersheySimplex,
// fontScale: 1,
// color: Scalar.Red,
// thickness: 2
// );
// --- 帧分发:将处理后的帧交给全局分发器,按决策分发至目标订阅者 ---
GlobalStreamDispatcher.Dispatch(task);
}
catch (Exception ex)
{
// 捕获处理过程中的异常,避免影响后续任务执行
_sysLog.Error($"[Pipeline] 帧处理失败 (DeviceId: {task.DeviceId}, Seq: {task.Decision.Sequence}): {ex.Message}");
}
finally
{
// 归档追踪日志:将帧处理上下文存入全局遥测,支持后续排查与分析
GlobalTelemetry.RecordLog(task.Decision.Sequence, task.Context);
}
}
#endregion
}