using MessagePack; using NetMQ; using NetMQ.Sockets; using SHH.CameraDashboard.Services.Processors; using SHH.Contracts; using System.Collections.Concurrent; using System.Diagnostics; namespace SHH.CameraDashboard { public class CommandBusClient : IDisposable { private RouterSocket? _routerSocket; private NetMQPoller? _poller; private volatile bool _isRunning; private readonly object _disposeLock = new object(); // 单例模式 public static CommandBusClient Instance { get; } = new CommandBusClient(); // 处理器字典 private readonly Dictionary _processors = new(); private readonly ConcurrentDictionary> _pendingRequests = new ConcurrentDictionary>(); private readonly ConcurrentDictionary _sessions = new ConcurrentDictionary(); // ★★★ 新增:拦截器管道 ★★★ public InterceptorPipeline Pipeline { get; } = new InterceptorPipeline(); public event Action? OnServerRegistered; public event Action>? OnDeviceStatusReport; public event Action? OnCommandReceived; // 注册处理器的方法 public void RegisterProcessor(IProtocolProcessor processor) { _processors[processor.ProtocolType] = processor; } public void Start(int port) { if (_isRunning) return; lock (_disposeLock) { _routerSocket = new RouterSocket(); _routerSocket.Bind($"tcp://*:{port}"); _routerSocket.ReceiveReady += OnReceiveReady; // --- 注册处理器 --- this.RegisterProcessor(new RegisterProcessor(this)); this.RegisterProcessor(new StatusBatchProcessor(this)); this.RegisterProcessor(new CommandResultProcessor(this)); this.RegisterProcessor(new CommandProcessor(this)); _poller = new NetMQPoller { _routerSocket }; _poller.RunAsync(); _isRunning = true; } } // 注意:NetMQ 的事件处理器本质上是同步的 (void)。 // 为了调用异步拦截器,我们需要在这里使用 async void (仅限顶层事件处理) private async void OnReceiveReady(object? sender, NetMQSocketEventArgs e) { try { NetMQMessage msg = new NetMQMessage(); // 1. 尝试接收多帧消息 if (!e.Socket.TryReceiveMultipartMessage(ref msg)) return; // 2. 帧校验 (Router 收到 Dealer 消息:[Identity] [Protocol] [Data]) // 此时 msg 应该有 3 帧 if (msg.FrameCount < 3) return; byte[] identity = msg[0].Buffer; // Frame 0: 路由ID string protocol = msg[1].ConvertToString(); // Frame 1: 协议标识 byte[] rawData = msg[2].ToByteArray(); // Frame 2: 原始数据 // ========================================================= // ★★★ 核心改造 A: 接收拦截 (Inbound) ★★★ // ========================================================= // 执行管道处理 var ctx = await Pipeline.ExecuteReceiveAsync(protocol, rawData); if (ctx != null) // 如果没被拦截 { // 使用处理后的协议和数据进行分发 if (_processors.TryGetValue(ctx.Protocol, out var processor)) { processor.Process(identity, ctx.Data); } else { Debug.WriteLine($"[Bus] 未知协议: {ctx.Protocol}"); } } } catch (Exception ex) { Debug.WriteLine($"[Bus-Err] {ex.Message}"); } } // --- 供 Processor 调用的内部方法 (保持不变) --- internal void UpdateSession(string instanceId, byte[] identity) => _sessions[instanceId] = identity; internal void RaiseServerRegistered(RegisterPayload p) => OnServerRegistered?.Invoke(p); internal void RaiseDeviceStatusReport(List i) => OnDeviceStatusReport?.Invoke(i); internal void RaiseCommandReceived(CommandPayload payload) => OnCommandReceived?.Invoke(payload); internal void HandleResponse(CommandResult result) { if (_pendingRequests.TryRemove(result.RequestId, out var tcs)) tcs.TrySetResult(result); } // ========================================================= // ★★★ 核心改造 B: 发送拦截 (Outbound) ★★★ // ========================================================= // 改为 async Task 以支持异步拦截器 public async Task SendInternalAsync(string instanceId, CommandPayload payload) { if (_sessions.TryGetValue(instanceId, out byte[]? identity)) { // 1. 序列化 byte[] rawData = MessagePackSerializer.Serialize(payload); // 2. 执行管道处理 var ctx = await Pipeline.ExecuteSendAsync(payload.Protocol, rawData); // 3. 发送 (如果没被拦截) if (ctx != null && _routerSocket != null) { // 注意:Socket 非线程安全,但 RouterSocket 的 SendMultipartMessage 通常是线程安全的 // 或者通过 Poller 线程去发。但在 Router 模式下,多线程直接 Send 通常是允许的。 var msg = new NetMQMessage(); msg.Append(identity); msg.Append(ctx.Protocol); // 使用拦截器处理后的 Protocol msg.Append(ctx.Data); // 使用拦截器处理后的 Data _routerSocket.SendMultipartMessage(msg); } } } public void Stop() { _isRunning = false; _poller?.Stop(); _poller?.Dispose(); _routerSocket?.Dispose(); } public void Dispose() => Stop(); } }