using OpenCvSharp; namespace SHH.CameraSdk; /// /// [标准动作环境] 图像预处理集群 /// 职责:透明拦截,计算缩放图挂载到 TargetMat 注入 SmartFrame,然后提交给全局中心 /// 特性: /// 1. 设备分片:基于 DeviceId 哈希路由,保证单设备帧顺序严格一致 /// 2. 零内存拷贝:直接操作 SmartFrame 引用,仅在生成新 TargetMat 时申请内存 /// 3. 闭环流转:处理完成后自动投递到 GlobalProcessingCenter /// /// /// 职责:透明拦截,计算缩放图挂载到 TargetMat,然后提交给全局中心 /// public class ImageScaleCluster : IFrameProcessor { private readonly List _workers = new(); private readonly int _workerCount; public ImageScaleCluster(int workerCount = 4) { _workerCount = workerCount; for (int i = 0; i < workerCount; i++) { _workers.Add(new ProcessingWorker(i)); } Console.WriteLine($"[ScaleCluster] 缩放服务已就绪 (Worker: {_workerCount})"); } public void Enqueue(long deviceId, SmartFrame frame, FrameDecision decision) { // 1. 增加引用计数:跨线程持有 frame.AddRef(); // 2. 哈希分片路由:保证保序 int index = (int)(Math.Abs(deviceId) % _workerCount); _workers[index].Post(deviceId, frame, decision); } public void Dispose() => _workers.ForEach(w => w.Dispose()); } internal class ProcessingWorker : IDisposable { private readonly BlockingCollection<(long DeviceId, SmartFrame Frame, FrameDecision Decision)> _queue = new(100); private readonly Task _thread; private readonly CancellationTokenSource _cts = new(); public ProcessingWorker(int id) { _thread = Task.Factory.StartNew(ProcessLoop, TaskCreationOptions.LongRunning); } public void Post(long did, SmartFrame frame, FrameDecision decision) { if (!_queue.TryAdd((did, frame, decision))) { // 背压丢弃 frame.Dispose(); } } private void ProcessLoop() { foreach (var item in _queue.GetConsumingEnumerable(_cts.Token)) { using (var frame = item.Frame) { try { // ------------------------------------------------- // 核心动作:缩放逻辑 // ------------------------------------------------- int targetW = 704; int targetH = 576; // 仅当原图大于目标时才缩放 if (frame.InternalMat.Width > targetW) { Mat targetMat = new Mat(); Cv2.Resize(frame.InternalMat, targetMat, new Size(targetW, targetH), 0, 0, InterpolationFlags.Linear); // [关键] 挂载到 SmartFrame 的衍生属性中 // 标记为 Shrink (缩小) frame.AttachTarget(targetMat, FrameScaleType.Shrink); } // ------------------------------------------------- // 交付下一站:GlobalProcessingCenter // 消费端对此无感知,它收到的是同一个 frame 对象 // ------------------------------------------------- GlobalProcessingCenter.Submit(item.DeviceId, frame, item.Decision); } catch (Exception ex) { Console.WriteLine($"[ScaleWorker] 异常: {ex.Message}"); // 即使处理失败,也要尝试把原图发出去,保证画面不断 GlobalProcessingCenter.Submit(item.DeviceId, frame, item.Decision); } } } } public void Dispose() { _cts.Cancel(); _queue.CompleteAdding(); while (_queue.TryTake(out var item)) item.Frame.Dispose(); _queue.Dispose(); _cts.Dispose(); } }