Files
Ayay/SHH.CameraSdk/Core/Services/BaseFrameProcessor.cs
2025-12-27 14:16:50 +08:00

306 lines
11 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
namespace SHH.CameraSdk
{
#region --- (Frame Processor Cluster) ---
/// <summary>
/// [架构基类] 帧处理集群基类
/// 核心职责:
/// 1. 管理 Worker 线程池,实现多线程并行处理
/// 2. 基于 DeviceId 哈希分片路由,**保证单设备帧序绝对一致**
/// 3. 维护流水线责任链,支持帧数据的链式传递
/// 4. 管控帧引用计数,防止内存泄漏
/// </summary>
/// <typeparam name="TWorker">具体的工作者线程类型</typeparam>
public abstract class BaseFrameProcessor<TWorker> : IFrameProcessor
where TWorker : BaseWorker
{
#region --- ---
/// <summary> Worker 线程池,负责具体的帧处理任务 </summary>
protected readonly List<TWorker> _workers = new List<TWorker>();
/// <summary> 线程池并行度Worker 数量) </summary>
protected readonly int _workerCount;
/// <summary> 流水线的下一个处理环节 </summary>
private IFrameProcessor? _nextStep;
protected readonly ProcessingConfigManager _configManager; // 基类持有
#endregion
#region --- ---
/// <summary>
/// 初始化帧处理集群
/// </summary>
/// <param name="workerCount">Worker 线程数量(并行度)</param>
/// <param name="serviceName">服务名称(用于日志标识)</param>
protected BaseFrameProcessor(int workerCount, string serviceName, ProcessingConfigManager configManager)
{
// 校验并行度参数,避免无效配置
if (workerCount < 1)
throw new ArgumentOutOfRangeException(nameof(workerCount), "Worker数量必须大于0");
_configManager = configManager; // 先赋值配置管理器
_workerCount = workerCount;
// 通过抽象工厂模式创建 Worker 实例
for (int i = 0; i < workerCount; i++)
{
_workers.Add(CreateWorker(i));
}
Console.WriteLine($"[{serviceName}] 服务已初始化 (并行度: {workerCount})");
}
#endregion
#region --- 线 ---
/// <inheritdoc />
public void SetNext(IFrameProcessor next)
{
_nextStep = next;
}
/// <summary>
/// [中转核心] 将处理完成的帧传递到下一个流水线环节
/// </summary>
/// <param name="deviceId">设备ID</param>
/// <param name="frame">处理完成的智能帧</param>
/// <param name="decision">帧处理决策指令</param>
internal void PassToNext(long deviceId, SmartFrame frame, FrameDecision decision)
{
if (_nextStep != null)
{
// 转发给下一个处理器,延续流水线
_nextStep.Enqueue(deviceId, frame, decision);
}
else
{
// 流水线终点:提交给全局处理中心进行分发
GlobalProcessingCenter.Submit(deviceId, frame, decision);
}
}
#endregion
#region --- ---
/// <inheritdoc />
public void Enqueue(long deviceId, SmartFrame frame, FrameDecision decision)
{
// 关键操作1增加帧引用计数
// 因为帧进入异步处理环节,必须保证不会被外部提前释放
frame.AddRef();
// 关键操作2哈希分片路由
// 基于 DeviceId 取模,确保同一设备的帧始终分配给同一个 Worker
// 核心价值:保证单设备的帧处理顺序与原始流顺序一致
int workerIndex = (int)(Math.Abs(deviceId) % _workerCount);
var targetWorker = _workers[workerIndex];
// 将帧投递到目标 Worker 的任务队列
targetWorker.Post(deviceId, frame, decision);
}
#endregion
#region --- ---
/// <summary>
/// 抽象工厂:创建具体的 Worker 实例
/// 由子类实现,按需创建不同功能的 Worker
/// </summary>
/// <param name="workerId">Worker 唯一标识</param>
/// <returns>具体的 Worker 实例</returns>
protected abstract TWorker CreateWorker(int workerId);
#endregion
#region --- ---
/// <summary>
/// 资源释放
/// </summary>
public virtual void Dispose()
{
// 释放所有 Worker 资源
foreach (var worker in _workers)
{
worker.Dispose();
}
_workers.Clear();
// 释放下一个流水线节点(责任链递归释放)
_nextStep?.Dispose();
_nextStep = null;
}
#endregion
}
#endregion
#region --- 线 (Worker Thread) ---
/// <summary>
/// [架构基类] 帧处理工作者线程基类
/// 核心职责:
/// 1. 维护线程内任务队列,实现背压控制
/// 2. 管理长驻线程生命周期,避免线程频繁创建销毁
/// 3. 保证帧引用计数的安全闭环,杜绝内存泄漏
/// 4. 提供统一的异常处理机制,确保流水线不中断
/// </summary>
public abstract class BaseWorker : IDisposable
{
#region --- ---
/// <summary> 线程内任务队列容量限制100防止内存溢出 </summary>
private readonly BlockingCollection<(long DeviceId, SmartFrame Frame, FrameDecision Decision)> _taskQueue = new BlockingCollection<(long, SmartFrame, FrameDecision)>(100);
/// <summary> 长驻处理线程 </summary>
private readonly Task _processingThread;
/// <summary> 线程取消令牌源,用于优雅终止 </summary>
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
#endregion
#region --- ---
protected BaseWorker()
{
// 启动长驻后台线程,设置 LongRunning 提升调度优先级
_processingThread = Task.Factory.StartNew(
action: ProcessLoop,
creationOptions: TaskCreationOptions.LongRunning);
}
#endregion
#region --- () ---
/// <summary>
/// 将帧处理任务投递到当前 Worker 的队列
/// 实现背压控制:队列满时丢弃帧并释放引用,避免阻塞上游
/// </summary>
/// <param name="deviceId">设备ID</param>
/// <param name="frame">待处理的智能帧</param>
/// <param name="decision">帧处理决策指令</param>
public void Post(long deviceId, SmartFrame frame, FrameDecision decision)
{
// TryAdd 非阻塞:队列满时直接返回 false
if (!_taskQueue.TryAdd((deviceId, frame, decision)))
{
// 背压处理:丢弃当前帧,释放引用计数
frame.Dispose();
Console.WriteLine($"[Worker] 任务队列已满,丢弃设备 {deviceId} 的帧 (引用计数已释放)");
}
}
#endregion
#region --- ---
/// <summary>
/// Worker 线程的核心处理循环
/// 持续消费队列任务,直到收到取消信号
/// </summary>
private void ProcessLoop()
{
try
{
// GetConsumingEnumerable阻塞式消费队列支持取消令牌
foreach (var taskItem in _taskQueue.GetConsumingEnumerable(_cts.Token))
{
// 关键操作:使用 using 语句自动释放帧引用
// 无论处理成功/失败,都会保证 Dispose 被调用,形成引用计数闭环
using (var frame = taskItem.Frame)
{
try
{
// 调用子类实现的具体图像处理算法
PerformAction(taskItem.DeviceId, frame, taskItem.Decision);
// 通知父集群:当前帧处理完成,准备传递到下一个环节
NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision);
}
catch (Exception ex)
{
Console.WriteLine($"[Worker] 帧处理异常: {ex.Message}");
// 异常保底策略:即使处理失败,也透传帧到下一个环节,保证流水线不中断
NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision);
}
}
}
}
catch (OperationCanceledException)
{
// 正常取消:线程退出,无需报错
Console.WriteLine("[Worker] 处理循环已正常终止");
}
catch (Exception ex)
{
Console.WriteLine($"[Worker] 处理循环异常终止: {ex.Message}");
}
}
#endregion
#region --- () ---
/// <summary>
/// 子类实现:具体的图像处理算法逻辑
/// 如缩放、灰度转换、AI推理等
/// </summary>
/// <param name="deviceId">设备ID</param>
/// <param name="frame">待处理的智能帧</param>
/// <param name="decision">帧处理决策指令</param>
protected abstract void PerformAction(long deviceId, SmartFrame frame, FrameDecision decision);
/// <summary>
/// 子类实现:通知父集群处理完成
/// 通常调用父类的 PassToNext 方法传递帧
/// </summary>
/// <param name="deviceId">设备ID</param>
/// <param name="frame">处理完成的智能帧</param>
/// <param name="decision">帧处理决策指令</param>
protected abstract void NotifyFinished(long deviceId, SmartFrame frame, FrameDecision decision);
#endregion
#region --- ---
/// <summary>
/// 资源释放
/// </summary>
public virtual void Dispose()
{
// 1. 发送取消信号,终止处理循环
_cts.Cancel();
// 2. 标记队列完成添加,唤醒阻塞的消费线程
_taskQueue.CompleteAdding();
// 3. 等待处理线程退出最多等待1秒
try { _processingThread.Wait(1000); }
catch { /* 忽略等待异常 */ }
// 4. 清理队列中未处理的任务,释放帧引用
while (_taskQueue.TryTake(out var remainingItem))
{
remainingItem.Frame.Dispose();
}
// 5. 释放核心组件资源
_taskQueue.Dispose();
_cts.Dispose();
}
#endregion
}
#endregion
}