354 lines
14 KiB
C#
354 lines
14 KiB
C#
using Ayay.SerilogLogs;
|
||
using Serilog;
|
||
|
||
namespace SHH.CameraSdk
|
||
{
|
||
#region --- 架构基类:帧处理集群 (Frame Processor Cluster) ---
|
||
|
||
/// <summary>
|
||
/// [架构基类] 帧处理集群基类
|
||
/// 核心职责:
|
||
/// 1. 管理 Worker 线程池,实现多线程并行处理
|
||
/// 2. 基于 DeviceId 哈希分片路由,**保证单设备帧序绝对一致**
|
||
/// 3. 维护流水线责任链,支持帧数据的链式传递
|
||
/// 4. 管控帧引用计数,防止内存泄漏
|
||
/// </summary>
|
||
/// <typeparam name="TWorker">具体的工作者线程类型</typeparam>
|
||
public abstract class BaseFrameProcessor<TWorker> : IFrameProcessor
|
||
where TWorker : BaseWorker
|
||
{
|
||
#region --- 受保护成员 ---
|
||
|
||
private ILogger _sysLog = Log.ForContext("SourceContext", LogModules.Core);
|
||
|
||
/// <summary> Worker 线程池,负责具体的帧处理任务 </summary>
|
||
protected readonly List<TWorker> _workers = new List<TWorker>();
|
||
|
||
/// <summary> 线程池并行度(Worker 数量) </summary>
|
||
protected readonly int _workerCount;
|
||
|
||
/// <summary> 流水线的下一个处理环节 </summary>
|
||
private IFrameProcessor? _nextStep;
|
||
|
||
protected readonly ProcessingConfigManager _configManager; // 基类持有
|
||
|
||
#endregion
|
||
|
||
#region --- 构造函数 ---
|
||
|
||
/// <summary>
|
||
/// 初始化帧处理集群
|
||
/// </summary>
|
||
/// <param name="workerCount">Worker 线程数量(并行度)</param>
|
||
/// <param name="serviceName">服务名称(用于日志标识)</param>
|
||
protected BaseFrameProcessor(int workerCount, string serviceName, ProcessingConfigManager configManager)
|
||
{
|
||
// 校验并行度参数,避免无效配置
|
||
if (workerCount < 1)
|
||
{
|
||
_sysLog.Error("[Core] 帧处理集群初始化失败, 线程数必须 > 0.");
|
||
throw new ArgumentOutOfRangeException(nameof(workerCount), "帧处理集群初始化失败, 线程数必须 > 0.");
|
||
}
|
||
|
||
_configManager = configManager; // 先赋值配置管理器
|
||
_workerCount = workerCount;
|
||
|
||
// 通过抽象工厂模式创建 Worker 实例
|
||
for (int i = 0; i < workerCount; i++)
|
||
{
|
||
_workers.Add(CreateWorker(i));
|
||
}
|
||
|
||
_sysLog.Information($"[Core] 帧处理集群初始化成功, {serviceName} 并行数 {workerCount}. 注: 不能大于CPU核心数.");
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 流水线责任链实现 ---
|
||
|
||
/// <inheritdoc />
|
||
public void SetNext(IFrameProcessor next)
|
||
{
|
||
_nextStep = next;
|
||
}
|
||
|
||
/// <summary>
|
||
/// [中转核心] 将处理完成的帧传递到下一个流水线环节
|
||
/// </summary>
|
||
/// <param name="deviceId">设备ID</param>
|
||
/// <param name="frame">处理完成的智能帧</param>
|
||
/// <param name="decision">帧处理决策指令</param>
|
||
internal void PassToNext(long deviceId, SmartFrame frame, FrameDecision decision)
|
||
{
|
||
if (_nextStep != null)
|
||
{
|
||
// 转发给下一个处理器,延续流水线
|
||
_nextStep.Enqueue(deviceId, frame, decision);
|
||
}
|
||
else
|
||
{
|
||
// 流水线终点:提交给全局处理中心进行分发
|
||
GlobalProcessingCenter.Submit(deviceId, frame, decision);
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 帧入队与路由 ---
|
||
|
||
/// <inheritdoc />
|
||
public void Enqueue(long deviceId, SmartFrame frame, FrameDecision decision)
|
||
{
|
||
// 关键操作1:增加帧引用计数
|
||
// 因为帧进入异步处理环节,必须保证不会被外部提前释放
|
||
frame.AddRef();
|
||
|
||
try
|
||
{
|
||
// 关键操作2:哈希分片路由
|
||
int workerIndex = (int)(Math.Abs(deviceId) % _workerCount);
|
||
var targetWorker = _workers[workerIndex];
|
||
|
||
// 将帧投递到目标 Worker 的任务队列
|
||
targetWorker.Post(deviceId, frame, decision);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
// =========================================================
|
||
// 【修复 3】: 引用计数回滚
|
||
// 如果投递失败(例如计算 Index 溢出、Worker 列表为空等极端情况),
|
||
// 必须把刚才 AddRef 的一次计数释放掉,否则该帧将永久无法回池(内存泄漏)。
|
||
// =========================================================
|
||
_sysLog.Error(ex, $"[Core] 帧入队失败,回滚引用计数: DeviceId={deviceId}");
|
||
frame.Dispose();
|
||
throw; // 继续抛出异常通知上层
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 抽象工厂方法 ---
|
||
|
||
/// <summary>
|
||
/// 抽象工厂:创建具体的 Worker 实例
|
||
/// 由子类实现,按需创建不同功能的 Worker
|
||
/// </summary>
|
||
/// <param name="workerId">Worker 唯一标识</param>
|
||
/// <returns>具体的 Worker 实例</returns>
|
||
protected abstract TWorker CreateWorker(int workerId);
|
||
|
||
#endregion
|
||
|
||
#region --- 资源释放 ---
|
||
|
||
/// <summary>
|
||
/// 资源释放
|
||
/// </summary>
|
||
public virtual void Dispose()
|
||
{
|
||
// 释放所有 Worker 资源
|
||
foreach (var worker in _workers)
|
||
{
|
||
worker.Dispose();
|
||
}
|
||
_workers.Clear();
|
||
|
||
// 释放下一个流水线节点(责任链递归释放)
|
||
_nextStep?.Dispose();
|
||
_nextStep = null;
|
||
}
|
||
|
||
#endregion
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 架构基类:处理线程 (Worker Thread) ---
|
||
|
||
/// <summary>
|
||
/// [架构基类] 帧处理工作者线程基类
|
||
/// 核心职责:
|
||
/// 1. 维护线程内任务队列,实现背压控制
|
||
/// 2. 管理长驻线程生命周期,避免线程频繁创建销毁
|
||
/// 3. 保证帧引用计数的安全闭环,杜绝内存泄漏
|
||
/// 4. 提供统一的异常处理机制,确保流水线不中断
|
||
/// </summary>
|
||
public abstract class BaseWorker : IDisposable
|
||
{
|
||
#region --- 私有成员 ---
|
||
|
||
private ILogger _gRpcLog = Log.ForContext("SourceContext", LogModules.gRpc);
|
||
|
||
/// <summary> 线程内任务队列,容量限制100,防止内存溢出 </summary>
|
||
private readonly BlockingCollection<(long DeviceId, SmartFrame Frame, FrameDecision Decision)> _taskQueue = new BlockingCollection<(long, SmartFrame, FrameDecision)>(100);
|
||
|
||
/// <summary> 长驻处理线程 </summary>
|
||
private readonly Task _processingThread;
|
||
|
||
/// <summary> 线程取消令牌源,用于优雅终止 </summary>
|
||
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
|
||
|
||
#endregion
|
||
|
||
#region --- 构造函数 ---
|
||
|
||
protected BaseWorker()
|
||
{
|
||
// 启动长驻后台线程,设置 LongRunning 提升调度优先级
|
||
_processingThread = Task.Factory.StartNew(
|
||
action: ProcessLoop,
|
||
creationOptions: TaskCreationOptions.LongRunning);
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 任务入队 (背压处理) ---
|
||
|
||
/// <summary>
|
||
/// 将帧处理任务投递到当前 Worker 的队列
|
||
/// 实现背压控制:队列满时丢弃帧并释放引用,避免阻塞上游
|
||
/// </summary>
|
||
/// <param name="deviceId">设备ID</param>
|
||
/// <param name="frame">待处理的智能帧</param>
|
||
/// <param name="decision">帧处理决策指令</param>
|
||
public void Post(long deviceId, SmartFrame frame, FrameDecision decision)
|
||
{
|
||
// TryAdd 非阻塞:队列满时直接返回 false
|
||
if (!_taskQueue.TryAdd((deviceId, frame, decision)))
|
||
{
|
||
// 背压处理:丢弃当前帧,释放引用计数
|
||
frame.Dispose();
|
||
_gRpcLog.Debug($"[gRpc] 任务队列已满,BaseWorker 丢弃设备 {deviceId} 的帧.");
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 核心处理循环 ---
|
||
|
||
/// <summary>
|
||
/// Worker 线程的核心处理循环
|
||
/// 持续消费队列任务,直到收到取消信号
|
||
/// </summary>
|
||
// Modified: [BaseWorker.cs] 增加深度防御检查,防止处理僵尸帧
|
||
private void ProcessLoop()
|
||
{
|
||
try
|
||
{
|
||
foreach (var taskItem in _taskQueue.GetConsumingEnumerable(_cts.Token))
|
||
{
|
||
// 自动释放引用
|
||
using (var frame = taskItem.Frame)
|
||
{
|
||
try
|
||
{
|
||
// =========================================================
|
||
// 【修复 1】: 僵尸帧防御 (Zombie Frame Defense)
|
||
// 如果帧虽然还在引用计数内,但其内部的 OpenCV 内存已被销毁,
|
||
// 严禁调用 PerformAction,否则直接 AccessViolation 崩溃。
|
||
// =========================================================
|
||
if (frame.InternalMat == null || frame.InternalMat.IsDisposed)
|
||
{
|
||
_gRpcLog.Warning($"[BaseWorker] 拦截到已销毁的帧: DeviceId={taskItem.DeviceId}, 丢弃.");
|
||
continue;
|
||
}
|
||
|
||
// =========================================================
|
||
// 【修复 2】: 空帧防御 (Empty Frame Defense)
|
||
// 大华 SDK 偶尔会输出 0x0 或空数据,Resize 对空图操作必崩。
|
||
// =========================================================
|
||
if (frame.InternalMat.Empty())
|
||
{
|
||
_gRpcLog.Warning($"[BaseWorker] 拦截到空帧(Empty): DeviceId={taskItem.DeviceId}, 丢弃.");
|
||
continue;
|
||
}
|
||
|
||
// 只有检查通过,才执行具体的算法
|
||
PerformAction(taskItem.DeviceId, frame, taskItem.Decision);
|
||
|
||
// 通知完成
|
||
NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision);
|
||
}
|
||
catch (ObjectDisposedException)
|
||
{
|
||
// 捕获特定的“对象已释放”异常,防止线程退出
|
||
_gRpcLog.Warning($"[BaseWorker] 帧在处理过程中被释放: DeviceId={taskItem.DeviceId}");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_gRpcLog.Error(ex, $"[BaseWorker] 帧处理逻辑异常: DeviceId={taskItem.DeviceId}");
|
||
|
||
// 即使处理失败,也可以选择是否继续传递,视业务而定
|
||
// NotifyFinished(taskItem.DeviceId, frame, taskItem.Decision);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
catch (OperationCanceledException)
|
||
{
|
||
_gRpcLog.Information($"[gRpc] BaseWorker 处理循环已正常终止.");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
// 只有极其严重的系统级错误才会走到这里
|
||
_gRpcLog.Fatal(ex, $"[gRpc] BaseWorker 处理循环异常崩溃!");
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region --- 抽象方法 (子类实现具体逻辑) ---
|
||
|
||
/// <summary>
|
||
/// 子类实现:具体的图像处理算法逻辑
|
||
/// 如:缩放、灰度转换、AI推理等
|
||
/// </summary>
|
||
/// <param name="deviceId">设备ID</param>
|
||
/// <param name="frame">待处理的智能帧</param>
|
||
/// <param name="decision">帧处理决策指令</param>
|
||
protected abstract void PerformAction(long deviceId, SmartFrame frame, FrameDecision decision);
|
||
|
||
/// <summary>
|
||
/// 子类实现:通知父集群处理完成
|
||
/// 通常调用父类的 PassToNext 方法传递帧
|
||
/// </summary>
|
||
/// <param name="deviceId">设备ID</param>
|
||
/// <param name="frame">处理完成的智能帧</param>
|
||
/// <param name="decision">帧处理决策指令</param>
|
||
protected abstract void NotifyFinished(long deviceId, SmartFrame frame, FrameDecision decision);
|
||
|
||
#endregion
|
||
|
||
#region --- 资源释放 ---
|
||
|
||
/// <summary>
|
||
/// 资源释放
|
||
/// </summary>
|
||
public virtual void Dispose()
|
||
{
|
||
// 1. 发送取消信号,终止处理循环
|
||
_cts.Cancel();
|
||
|
||
// 2. 标记队列完成添加,唤醒阻塞的消费线程
|
||
_taskQueue.CompleteAdding();
|
||
|
||
// 3. 等待处理线程退出(最多等待1秒)
|
||
try { _processingThread.Wait(1000); }
|
||
catch { /* 忽略等待异常 */ }
|
||
|
||
// 4. 清理队列中未处理的任务,释放帧引用
|
||
while (_taskQueue.TryTake(out var remainingItem))
|
||
{
|
||
remainingItem.Frame.Dispose();
|
||
}
|
||
|
||
// 5. 释放核心组件资源
|
||
_taskQueue.Dispose();
|
||
_cts.Dispose();
|
||
}
|
||
|
||
#endregion
|
||
}
|
||
|
||
#endregion
|
||
} |