diff --git a/SHH.CameraSdk/Core/Services/BaseFrameProcessor.cs b/SHH.CameraSdk/Core/Services/BaseFrameProcessor.cs
new file mode 100644
index 0000000..1fc15e5
--- /dev/null
+++ b/SHH.CameraSdk/Core/Services/BaseFrameProcessor.cs
@@ -0,0 +1,302 @@
+namespace SHH.CameraSdk
+{
+ #region --- 架构基类:帧处理集群 (Frame Processor Cluster) ---
+
+ ///
+ /// [架构基类] 帧处理集群基类
+ /// 核心职责:
+ /// 1. 管理 Worker 线程池,实现多线程并行处理
+ /// 2. 基于 DeviceId 哈希分片路由,**保证单设备帧序绝对一致**
+ /// 3. 维护流水线责任链,支持帧数据的链式传递
+ /// 4. 管控帧引用计数,防止内存泄漏
+ ///
+ /// 具体的工作者线程类型
+ public abstract class BaseFrameProcessor : IFrameProcessor
+ where TWorker : BaseWorker
+ {
+ #region --- 受保护成员 ---
+ /// Worker 线程池,负责具体的帧处理任务
+ protected readonly List _workers = new List();
+
+ /// 线程池并行度(Worker 数量)
+ protected readonly int _workerCount;
+
+ /// 流水线的下一个处理环节
+ private IFrameProcessor? _nextStep;
+
+ #endregion
+
+ #region --- 构造函数 ---
+
+ ///
+ /// 初始化帧处理集群
+ ///
+ /// Worker 线程数量(并行度)
+ /// 服务名称(用于日志标识)
+ protected BaseFrameProcessor(int workerCount, string serviceName)
+ {
+ // 校验并行度参数,避免无效配置
+ if (workerCount < 1)
+ throw new ArgumentOutOfRangeException(nameof(workerCount), "Worker数量必须大于0");
+
+ _workerCount = workerCount;
+
+ // 通过抽象工厂模式创建 Worker 实例
+ for (int i = 0; i < workerCount; i++)
+ {
+ _workers.Add(CreateWorker(i));
+ }
+
+ Console.WriteLine($"[{serviceName}] 服务已初始化 (并行度: {workerCount})");
+ }
+
+ #endregion
+
+ #region --- 流水线责任链实现 ---
+
+ ///
+ public void SetNext(IFrameProcessor next)
+ {
+ _nextStep = next;
+ }
+
+ ///
+ /// [中转核心] 将处理完成的帧传递到下一个流水线环节
+ ///
+ /// 设备ID
+ /// 处理完成的智能帧
+ /// 帧处理决策指令
+ internal void PassToNext(long deviceId, SmartFrame frame, FrameDecision decision)
+ {
+ if (_nextStep != null)
+ {
+ // 转发给下一个处理器,延续流水线
+ _nextStep.Enqueue(deviceId, frame, decision);
+ }
+ else
+ {
+ // 流水线终点:提交给全局处理中心进行分发
+ GlobalProcessingCenter.Submit(deviceId, frame, decision);
+ }
+ }
+
+ #endregion
+
+ #region --- 帧入队与路由 ---
+
+ ///
+ public void Enqueue(long deviceId, SmartFrame frame, FrameDecision decision)
+ {
+ // 关键操作1:增加帧引用计数
+ // 因为帧进入异步处理环节,必须保证不会被外部提前释放
+ frame.AddRef();
+
+ // 关键操作2:哈希分片路由
+ // 基于 DeviceId 取模,确保同一设备的帧始终分配给同一个 Worker
+ // 核心价值:保证单设备的帧处理顺序与原始流顺序一致
+ int workerIndex = (int)(Math.Abs(deviceId) % _workerCount);
+ var targetWorker = _workers[workerIndex];
+
+ // 将帧投递到目标 Worker 的任务队列
+ targetWorker.Post(deviceId, frame, decision);
+ }
+
+ #endregion
+
+ #region --- 抽象工厂方法 ---
+
+ ///
+ /// 抽象工厂:创建具体的 Worker 实例
+ /// 由子类实现,按需创建不同功能的 Worker
+ ///
+ /// Worker 唯一标识
+ /// 具体的 Worker 实例
+ protected abstract TWorker CreateWorker(int workerId);
+
+ #endregion
+
+ #region --- 资源释放 ---
+
+ ///
+ /// 资源释放
+ ///
+ public virtual void Dispose()
+ {
+ // 释放所有 Worker 资源
+ foreach (var worker in _workers)
+ {
+ worker.Dispose();
+ }
+ _workers.Clear();
+
+ // 释放下一个流水线节点(责任链递归释放)
+ _nextStep?.Dispose();
+ _nextStep = null;
+ }
+
+ #endregion
+ }
+
+ #endregion
+
+ #region --- 架构基类:处理线程 (Worker Thread) ---
+
+ ///
+ /// [架构基类] 帧处理工作者线程基类
+ /// 核心职责:
+ /// 1. 维护线程内任务队列,实现背压控制
+ /// 2. 管理长驻线程生命周期,避免线程频繁创建销毁
+ /// 3. 保证帧引用计数的安全闭环,杜绝内存泄漏
+ /// 4. 提供统一的异常处理机制,确保流水线不中断
+ ///
+ public abstract class BaseWorker : IDisposable
+ {
+ #region --- 私有成员 ---
+
+ /// 线程内任务队列,容量限制100,防止内存溢出
+ private readonly BlockingCollection<(long DeviceId, SmartFrame Frame, FrameDecision Decision)> _taskQueue = new BlockingCollection<(long, SmartFrame, FrameDecision)>(100);
+
+ /// 长驻处理线程
+ private readonly Task _processingThread;
+
+ /// 线程取消令牌源,用于优雅终止
+ private readonly CancellationTokenSource _cts = new CancellationTokenSource();
+
+ #endregion
+
+ #region --- 构造函数 ---
+
+ protected BaseWorker()
+ {
+ // 启动长驻后台线程,设置 LongRunning 提升调度优先级
+ _processingThread = Task.Factory.StartNew(
+ action: ProcessLoop,
+ creationOptions: TaskCreationOptions.LongRunning);
+ }
+
+ #endregion
+
+ #region --- 任务入队 (背压处理) ---
+
+ ///
+ /// 将帧处理任务投递到当前 Worker 的队列
+ /// 实现背压控制:队列满时丢弃帧并释放引用,避免阻塞上游
+ ///
+ /// 设备ID
+ /// 待处理的智能帧
+ /// 帧处理决策指令
+ public void Post(long deviceId, SmartFrame frame, FrameDecision decision)
+ {
+ // TryAdd 非阻塞:队列满时直接返回 false
+ if (!_taskQueue.TryAdd((deviceId, frame, decision)))
+ {
+ // 背压处理:丢弃当前帧,释放引用计数
+ frame.Dispose();
+ Console.WriteLine($"[Worker] 任务队列已满,丢弃设备 {deviceId} 的帧 (引用计数已释放)");
+ }
+ }
+
+ #endregion
+
+ #region --- 核心处理循环 ---
+
+ ///
+ /// Worker 线程的核心处理循环
+ /// 持续消费队列任务,直到收到取消信号
+ ///
+ private void ProcessLoop()
+ {
+ try
+ {
+ // GetConsumingEnumerable:阻塞式消费队列,支持取消令牌
+ foreach (var taskItem in _taskQueue.GetConsumingEnumerable(_cts.Token))
+ {
+ // 关键操作:使用 using 语句自动释放帧引用
+ // 无论处理成功/失败,都会保证 Dispose 被调用,形成引用计数闭环
+ using (var frame = taskItem.Frame)
+ {
+ try
+ {
+ // 调用子类实现的具体图像处理算法
+ PerformAction(frame, taskItem.Decision);
+
+ // 通知父集群:当前帧处理完成,准备传递到下一个环节
+ NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision);
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"[Worker] 帧处理异常: {ex.Message}");
+
+ // 异常保底策略:即使处理失败,也透传帧到下一个环节,保证流水线不中断
+ NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision);
+ }
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // 正常取消:线程退出,无需报错
+ Console.WriteLine("[Worker] 处理循环已正常终止");
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"[Worker] 处理循环异常终止: {ex.Message}");
+ }
+ }
+
+ #endregion
+
+ #region --- 抽象方法 (子类实现具体逻辑) ---
+
+ ///
+ /// 子类实现:具体的图像处理算法逻辑
+ /// 如:缩放、灰度转换、AI推理等
+ ///
+ /// 待处理的智能帧
+ /// 帧处理决策指令
+ protected abstract void PerformAction(SmartFrame frame, FrameDecision decision);
+
+ ///
+ /// 子类实现:通知父集群处理完成
+ /// 通常调用父类的 PassToNext 方法传递帧
+ ///
+ /// 设备ID
+ /// 处理完成的智能帧
+ /// 帧处理决策指令
+ protected abstract void NotifyFinished(long deviceId, SmartFrame frame, FrameDecision decision);
+
+ #endregion
+
+ #region --- 资源释放 ---
+
+ ///
+ /// 资源释放
+ ///
+ public virtual void Dispose()
+ {
+ // 1. 发送取消信号,终止处理循环
+ _cts.Cancel();
+
+ // 2. 标记队列完成添加,唤醒阻塞的消费线程
+ _taskQueue.CompleteAdding();
+
+ // 3. 等待处理线程退出(最多等待1秒)
+ try { _processingThread.Wait(1000); }
+ catch { /* 忽略等待异常 */ }
+
+ // 4. 清理队列中未处理的任务,释放帧引用
+ while (_taskQueue.TryTake(out var remainingItem))
+ {
+ remainingItem.Frame.Dispose();
+ }
+
+ // 5. 释放核心组件资源
+ _taskQueue.Dispose();
+ _cts.Dispose();
+ }
+
+ #endregion
+ }
+
+ #endregion
+}
\ No newline at end of file
diff --git a/SHH.CameraSdk/Core/Services/ImageEnhanceCluster.cs b/SHH.CameraSdk/Core/Services/ImageEnhanceCluster.cs
new file mode 100644
index 0000000..f7abbd2
--- /dev/null
+++ b/SHH.CameraSdk/Core/Services/ImageEnhanceCluster.cs
@@ -0,0 +1,40 @@
+using OpenCvSharp;
+
+namespace SHH.CameraSdk.Core.Services
+{
+ ///
+ /// [图像增亮服务]
+ /// 实现:对流水线中的 TargetMat 执行像素级亮度提升
+ ///
+ public class ImageEnhanceCluster : BaseFrameProcessor
+ {
+ public ImageEnhanceCluster(int count) : base(count, "EnhanceCluster") { }
+
+ protected override EnhanceWorker CreateWorker(int id) => new EnhanceWorker(this);
+ }
+
+ public class EnhanceWorker : BaseWorker
+ {
+ private readonly ImageEnhanceCluster _parent;
+ public EnhanceWorker(ImageEnhanceCluster parent) => _parent = parent;
+
+ protected override void PerformAction(SmartFrame frame, FrameDecision decision)
+ {
+ // 业务逻辑:只处理已经过缩放的 TargetMat
+ if (frame.TargetMat != null && !frame.TargetMat.IsDisposed)
+ {
+ Mat brightMat = new Mat();
+ // 亮度线性提升:原像素 * 1.0 + 30 偏移量
+ frame.TargetMat.ConvertTo(brightMat, -1, 1.0, 30);
+
+ // 替换掉原来的 TargetMat(旧的会在 AttachTarget 内部被自动 Dispose)
+ frame.AttachTarget(brightMat, frame.ScaleType);
+ }
+ }
+
+ protected override void NotifyFinished(long did, SmartFrame frame, FrameDecision dec)
+ {
+ _parent.PassToNext(did, frame, dec);
+ }
+ }
+}
\ No newline at end of file
diff --git a/SHH.CameraSdk/Core/Services/ImageScaleCluster.cs b/SHH.CameraSdk/Core/Services/ImageScaleCluster.cs
index a9897bc..3cfc36e 100644
--- a/SHH.CameraSdk/Core/Services/ImageScaleCluster.cs
+++ b/SHH.CameraSdk/Core/Services/ImageScaleCluster.cs
@@ -1,113 +1,41 @@
using OpenCvSharp;
-
-namespace SHH.CameraSdk;
-
-///
-/// [标准动作环境] 图像预处理集群
-/// 职责:透明拦截,计算缩放图挂载到 TargetMat 注入 SmartFrame,然后提交给全局中心
-/// 特性:
-/// 1. 设备分片:基于 DeviceId 哈希路由,保证单设备帧顺序严格一致
-/// 2. 零内存拷贝:直接操作 SmartFrame 引用,仅在生成新 TargetMat 时申请内存
-/// 3. 闭环流转:处理完成后自动投递到 GlobalProcessingCenter
-///
-///
-/// 职责:透明拦截,计算缩放图挂载到 TargetMat,然后提交给全局中心
-///
-public class ImageScaleCluster : IFrameProcessor
+namespace SHH.CameraSdk
{
- private readonly List _workers = new();
- private readonly int _workerCount;
-
- public ImageScaleCluster(int workerCount = 4)
+ ///
+ /// [图像缩放服务]
+ /// 实现:基于基类,专注于将原图缩放并挂载到 TargetMat
+ ///
+ public class ImageScaleCluster : BaseFrameProcessor
{
- _workerCount = workerCount;
- for (int i = 0; i < workerCount; i++)
+ public ImageScaleCluster(int count) : base(count, "ScaleCluster") { }
+
+ protected override ScaleWorker CreateWorker(int id) => new ScaleWorker(this);
+ }
+
+ public class ScaleWorker : BaseWorker
+ {
+ private readonly ImageScaleCluster _parent;
+ public ScaleWorker(ImageScaleCluster parent) => _parent = parent;
+
+ protected override void PerformAction(SmartFrame frame, FrameDecision decision)
{
- _workers.Add(new ProcessingWorker(i));
- }
- Console.WriteLine($"[ScaleCluster] 缩放服务已就绪 (Worker: {_workerCount})");
- }
+ int targetW = 704;
+ int targetH = 576;
- public void Enqueue(long deviceId, SmartFrame frame, FrameDecision decision)
- {
- // 1. 增加引用计数:跨线程持有
- frame.AddRef();
-
- // 2. 哈希分片路由:保证保序
- int index = (int)(Math.Abs(deviceId) % _workerCount);
- _workers[index].Post(deviceId, frame, decision);
- }
-
- public void Dispose() => _workers.ForEach(w => w.Dispose());
-}
-
-internal class ProcessingWorker : IDisposable
-{
- private readonly BlockingCollection<(long DeviceId, SmartFrame Frame, FrameDecision Decision)> _queue = new(100);
- private readonly Task _thread;
- private readonly CancellationTokenSource _cts = new();
-
- public ProcessingWorker(int id)
- {
- _thread = Task.Factory.StartNew(ProcessLoop, TaskCreationOptions.LongRunning);
- }
-
- public void Post(long did, SmartFrame frame, FrameDecision decision)
- {
- if (!_queue.TryAdd((did, frame, decision)))
- {
- // 背压丢弃
- frame.Dispose();
- }
- }
-
- private void ProcessLoop()
- {
- foreach (var item in _queue.GetConsumingEnumerable(_cts.Token))
- {
- using (var frame = item.Frame)
+ // 算法逻辑:若尺寸符合要求则执行 Resize
+ if (frame.InternalMat.Width > targetW)
{
- try
- {
- // -------------------------------------------------
- // 核心动作:缩放逻辑
- // -------------------------------------------------
- int targetW = 704;
- int targetH = 576;
+ Mat targetMat = new Mat();
+ Cv2.Resize(frame.InternalMat, targetMat, new Size(targetW, targetH), 0, 0, InterpolationFlags.Linear);
- // 仅当原图大于目标时才缩放
- if (frame.InternalMat.Width > targetW)
- {
- Mat targetMat = new Mat();
- Cv2.Resize(frame.InternalMat, targetMat, new Size(targetW, targetH), 0, 0, InterpolationFlags.Linear);
-
- // [关键] 挂载到 SmartFrame 的衍生属性中
- // 标记为 Shrink (缩小)
- frame.AttachTarget(targetMat, FrameScaleType.Shrink);
- }
-
- // -------------------------------------------------
- // 交付下一站:GlobalProcessingCenter
- // 消费端对此无感知,它收到的是同一个 frame 对象
- // -------------------------------------------------
- GlobalProcessingCenter.Submit(item.DeviceId, frame, item.Decision);
- }
- catch (Exception ex)
- {
- Console.WriteLine($"[ScaleWorker] 异常: {ex.Message}");
- // 即使处理失败,也要尝试把原图发出去,保证画面不断
- GlobalProcessingCenter.Submit(item.DeviceId, frame, item.Decision);
- }
+ // 挂载到衍生属性
+ frame.AttachTarget(targetMat, FrameScaleType.Shrink);
}
}
- }
- public void Dispose()
- {
- _cts.Cancel();
- _queue.CompleteAdding();
- while (_queue.TryTake(out var item)) item.Frame.Dispose();
- _queue.Dispose();
- _cts.Dispose();
+ protected override void NotifyFinished(long did, SmartFrame frame, FrameDecision dec)
+ {
+ _parent.PassToNext(did, frame, dec);
+ }
}
}
\ No newline at end of file
diff --git a/SHH.CameraSdk/Program.cs b/SHH.CameraSdk/Program.cs
index 2d87ef5..e8145c7 100644
--- a/SHH.CameraSdk/Program.cs
+++ b/SHH.CameraSdk/Program.cs
@@ -108,7 +108,11 @@ public class Program
var builder = WebApplication.CreateBuilder();
// 注册缩放集群服务 (建议 Worker 数 = CPU 核心数,这里设为 4)
- var scaleService = new ImageScaleCluster(4);
+ var scaleService = new ImageScaleCluster(4); // 环节一:缩放
+ var enhanceService = new ImageEnhanceCluster(4); // 环节二:增亮
+
+ // 逻辑:缩放 -> 增亮 -> (自动到终点)
+ scaleService.SetNext(enhanceService);
// 2. [核心] 将缩放服务“挂载”到全局路由上
// 从此刻起,所有驱动层的帧都会先流经 scaleService
@@ -116,6 +120,7 @@ public class Program
// 3. 注册到 DI 容器 (以便 Controller 或其他服务可以管理它,例如动态调整并行度)
builder.Services.AddSingleton(scaleService);
+ builder.Services.AddSingleton(enhanceService);
// 1. 配置 CORS
builder.Services.AddCors(options =>