using MessagePack; using NetMQ; using NetMQ.Sockets; using Newtonsoft.Json; using SHH.Contracts; // ★★★ 必须引用契约库 ★★★ using System.Diagnostics; namespace SHH.CameraDashboard; public class StreamReceiverService : IDisposable { // 单例模式 public static StreamReceiverService Instance { get; } = new StreamReceiverService(); // ★★★ 核心变更:使用强类型契约载体 ★★★ public event Action? OnPayloadReceived; private SubscriberSocket? _subSocket; private Task? _receiveTask; private CancellationTokenSource? _cts; public int ListenPort { get; private set; } // 运行状态检查 public bool IsRunning => _receiveTask != null && !_receiveTask.IsCompleted; private StreamReceiverService() { } public void Start(int port = 6000) { if (IsRunning) return; ListenPort = port; _cts = new CancellationTokenSource(); try { _subSocket = new SubscriberSocket(); // 设置高水位,防止 UI 卡顿时内存溢出 _subSocket.Options.ReceiveHighWatermark = 1000; _subSocket.Bind($"tcp://*:{ListenPort}"); _subSocket.Subscribe(""); // 订阅所有内容(这是 Dealer-Router/Pub-Sub 的基础) Console.WriteLine($"[Dashboard] 视频流接收服务启动: tcp://*:{ListenPort}"); } catch (Exception ex) { // 明确抛出异常,让 App.xaml.cs 知道启动失败了 _subSocket?.Dispose(); _subSocket = null; throw new Exception($"端口 {port} 绑定失败,可能被占用。", ex); } _receiveTask = Task.Run(ReceiveLoop, _cts.Token); } private void ReceiveLoop() { var token = _cts?.Token ?? CancellationToken.None; while (!token.IsCancellationRequested) { try { if (_subSocket == null) break; // ========================================================= // 核心解析逻辑:适配 Service 端的 4 帧复合协议 // ========================================================= NetMQMessage msg = new NetMQMessage(); // 1. 非阻塞接收多帧消息 if (!_subSocket.TryReceiveMultipartMessage(TimeSpan.FromMilliseconds(500), ref msg)) continue; // 2. 协议完整性检查 if (msg.FrameCount < 4) continue; // 3. 协议头校验 (Frame 0) if (msg[0].ConvertToString() != "SHH_V1") continue; //// 4. 反序列化元数据 (Frame 1) //string json = msg[1].ConvertToString(); //var payload = JsonConvert.DeserializeObject(json); // 4. 反序列化元数据 (Frame 1) // 直接获取二进制数据,不需要转 String (省去了 UTF8 解码开销) byte[] metaBytes = msg[1].ToByteArray(); // 极速反序列化 var payload = MessagePackSerializer.Deserialize(metaBytes); if (payload == null) continue; // 5. 填充二进制图像数据 (Frame 2 & 3) // 注意:NetMQ 的 msg 数据是非托管内存,转为 byte[] 实现了拷贝,安全供 UI 使用 if (payload.HasOriginalImage) payload.OriginalImageBytes = msg[2].ToByteArray(); if (payload.HasTargetImage) payload.TargetImageBytes = msg[3].ToByteArray(); // 6. 触发事件 OnPayloadReceived?.Invoke(payload); } catch (Exception ex) { Debug.WriteLine($"[Receiver Error] {ex.Message}"); } } Console.WriteLine("[Dashboard] 接收循环已停止"); } public void Stop() { _cts?.Cancel(); try { _subSocket?.Dispose(); } catch { } _subSocket = null; _receiveTask = null; } public void Dispose() => Stop(); }