namespace SHH.CameraSdk { #region --- 架构基类:帧处理集群 (Frame Processor Cluster) --- /// /// [架构基类] 帧处理集群基类 /// 核心职责: /// 1. 管理 Worker 线程池,实现多线程并行处理 /// 2. 基于 DeviceId 哈希分片路由,**保证单设备帧序绝对一致** /// 3. 维护流水线责任链,支持帧数据的链式传递 /// 4. 管控帧引用计数,防止内存泄漏 /// /// 具体的工作者线程类型 public abstract class BaseFrameProcessor : IFrameProcessor where TWorker : BaseWorker { #region --- 受保护成员 --- /// Worker 线程池,负责具体的帧处理任务 protected readonly List _workers = new List(); /// 线程池并行度(Worker 数量) protected readonly int _workerCount; /// 流水线的下一个处理环节 private IFrameProcessor? _nextStep; protected readonly ProcessingConfigManager _configManager; // 基类持有 #endregion #region --- 构造函数 --- /// /// 初始化帧处理集群 /// /// Worker 线程数量(并行度) /// 服务名称(用于日志标识) protected BaseFrameProcessor(int workerCount, string serviceName, ProcessingConfigManager configManager) { // 校验并行度参数,避免无效配置 if (workerCount < 1) throw new ArgumentOutOfRangeException(nameof(workerCount), "Worker数量必须大于0"); _configManager = configManager; // 先赋值配置管理器 _workerCount = workerCount; // 通过抽象工厂模式创建 Worker 实例 for (int i = 0; i < workerCount; i++) { _workers.Add(CreateWorker(i)); } Console.WriteLine($"[{serviceName}] 服务已初始化 (并行度: {workerCount})"); } #endregion #region --- 流水线责任链实现 --- /// public void SetNext(IFrameProcessor next) { _nextStep = next; } /// /// [中转核心] 将处理完成的帧传递到下一个流水线环节 /// /// 设备ID /// 处理完成的智能帧 /// 帧处理决策指令 internal void PassToNext(long deviceId, SmartFrame frame, FrameDecision decision) { if (_nextStep != null) { // 转发给下一个处理器,延续流水线 _nextStep.Enqueue(deviceId, frame, decision); } else { // 流水线终点:提交给全局处理中心进行分发 GlobalProcessingCenter.Submit(deviceId, frame, decision); } } #endregion #region --- 帧入队与路由 --- /// public void Enqueue(long deviceId, SmartFrame frame, FrameDecision decision) { // 关键操作1:增加帧引用计数 // 因为帧进入异步处理环节,必须保证不会被外部提前释放 frame.AddRef(); // 关键操作2:哈希分片路由 // 基于 DeviceId 取模,确保同一设备的帧始终分配给同一个 Worker // 核心价值:保证单设备的帧处理顺序与原始流顺序一致 int workerIndex = (int)(Math.Abs(deviceId) % _workerCount); var targetWorker = _workers[workerIndex]; // 将帧投递到目标 Worker 的任务队列 targetWorker.Post(deviceId, frame, decision); } #endregion #region --- 抽象工厂方法 --- /// /// 抽象工厂:创建具体的 Worker 实例 /// 由子类实现,按需创建不同功能的 Worker /// /// Worker 唯一标识 /// 具体的 Worker 实例 protected abstract TWorker CreateWorker(int workerId); #endregion #region --- 资源释放 --- /// /// 资源释放 /// public virtual void Dispose() { // 释放所有 Worker 资源 foreach (var worker in _workers) { worker.Dispose(); } _workers.Clear(); // 释放下一个流水线节点(责任链递归释放) _nextStep?.Dispose(); _nextStep = null; } #endregion } #endregion #region --- 架构基类:处理线程 (Worker Thread) --- /// /// [架构基类] 帧处理工作者线程基类 /// 核心职责: /// 1. 维护线程内任务队列,实现背压控制 /// 2. 管理长驻线程生命周期,避免线程频繁创建销毁 /// 3. 保证帧引用计数的安全闭环,杜绝内存泄漏 /// 4. 提供统一的异常处理机制,确保流水线不中断 /// public abstract class BaseWorker : IDisposable { #region --- 私有成员 --- /// 线程内任务队列,容量限制100,防止内存溢出 private readonly BlockingCollection<(long DeviceId, SmartFrame Frame, FrameDecision Decision)> _taskQueue = new BlockingCollection<(long, SmartFrame, FrameDecision)>(100); /// 长驻处理线程 private readonly Task _processingThread; /// 线程取消令牌源,用于优雅终止 private readonly CancellationTokenSource _cts = new CancellationTokenSource(); #endregion #region --- 构造函数 --- protected BaseWorker() { // 启动长驻后台线程,设置 LongRunning 提升调度优先级 _processingThread = Task.Factory.StartNew( action: ProcessLoop, creationOptions: TaskCreationOptions.LongRunning); } #endregion #region --- 任务入队 (背压处理) --- /// /// 将帧处理任务投递到当前 Worker 的队列 /// 实现背压控制:队列满时丢弃帧并释放引用,避免阻塞上游 /// /// 设备ID /// 待处理的智能帧 /// 帧处理决策指令 public void Post(long deviceId, SmartFrame frame, FrameDecision decision) { // TryAdd 非阻塞:队列满时直接返回 false if (!_taskQueue.TryAdd((deviceId, frame, decision))) { // 背压处理:丢弃当前帧,释放引用计数 frame.Dispose(); Console.WriteLine($"[Worker] 任务队列已满,丢弃设备 {deviceId} 的帧 (引用计数已释放)"); } } #endregion #region --- 核心处理循环 --- /// /// Worker 线程的核心处理循环 /// 持续消费队列任务,直到收到取消信号 /// private void ProcessLoop() { try { // GetConsumingEnumerable:阻塞式消费队列,支持取消令牌 foreach (var taskItem in _taskQueue.GetConsumingEnumerable(_cts.Token)) { // 关键操作:使用 using 语句自动释放帧引用 // 无论处理成功/失败,都会保证 Dispose 被调用,形成引用计数闭环 using (var frame = taskItem.Frame) { try { // 调用子类实现的具体图像处理算法 PerformAction(taskItem.DeviceId, frame, taskItem.Decision); // 通知父集群:当前帧处理完成,准备传递到下一个环节 NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision); } catch (Exception ex) { Console.WriteLine($"[Worker] 帧处理异常: {ex.Message}"); // 异常保底策略:即使处理失败,也透传帧到下一个环节,保证流水线不中断 NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision); } } } } catch (OperationCanceledException) { // 正常取消:线程退出,无需报错 Console.WriteLine("[Worker] 处理循环已正常终止"); } catch (Exception ex) { Console.WriteLine($"[Worker] 处理循环异常终止: {ex.Message}"); } } #endregion #region --- 抽象方法 (子类实现具体逻辑) --- /// /// 子类实现:具体的图像处理算法逻辑 /// 如:缩放、灰度转换、AI推理等 /// /// 设备ID /// 待处理的智能帧 /// 帧处理决策指令 protected abstract void PerformAction(long deviceId, SmartFrame frame, FrameDecision decision); /// /// 子类实现:通知父集群处理完成 /// 通常调用父类的 PassToNext 方法传递帧 /// /// 设备ID /// 处理完成的智能帧 /// 帧处理决策指令 protected abstract void NotifyFinished(long deviceId, SmartFrame frame, FrameDecision decision); #endregion #region --- 资源释放 --- /// /// 资源释放 /// public virtual void Dispose() { // 1. 发送取消信号,终止处理循环 _cts.Cancel(); // 2. 标记队列完成添加,唤醒阻塞的消费线程 _taskQueue.CompleteAdding(); // 3. 等待处理线程退出(最多等待1秒) try { _processingThread.Wait(1000); } catch { /* 忽略等待异常 */ } // 4. 清理队列中未处理的任务,释放帧引用 while (_taskQueue.TryTake(out var remainingItem)) { remainingItem.Frame.Dispose(); } // 5. 释放核心组件资源 _taskQueue.Dispose(); _cts.Dispose(); } #endregion } #endregion }