using NetMQ; using NetMQ.Sockets; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System.Collections.Concurrent; using System.Diagnostics; using System.Text; namespace SHH.CameraDashboard; /// /// [Dashboard端] 指令控制服务 /// 职责:监听 6001 端口,接收 CameraService 的注册/心跳,并下发控制指令。 /// public class CommandServer : IDisposable { // 单例模式 public static CommandServer Instance { get; } = new CommandServer(); // ================================================================= // 事件定义 // ================================================================= // 当有新设备注册成功时触发 (UI 可以订阅这个来刷新列表) public event Action? OnClientRegistered; // 当收到通用业务消息时触发 public event Action? OnMessageReceived; // ================================================================= // 内部成员 // ================================================================= private RouterSocket? _routerSocket; private NetMQPoller? _poller; private NetMQQueue? _sendQueue; // 在线设备表 (Key: Identity/AppId) // 线程安全字典,存储客户端的详细信息(包括视频地址) private readonly ConcurrentDictionary _clients = new(); public int ListenPort { get; private set; } public bool IsRunning => _poller != null && _poller.IsRunning; // 获取当前所有在线客户端的副本 public List GetClients() => _clients.Values.ToList(); private CommandServer() { } public void Start(int port) { ListenPort = port; if (IsRunning) return; try { // 1. 初始化 Router Socket _routerSocket = new RouterSocket(); _routerSocket.Bind($"tcp://*:{ListenPort}"); // 监听所有网卡 _routerSocket.ReceiveReady += OnSocketReady; // 2. 初始化发送队列 (确保 UI 线程可以安全发送) _sendQueue = new NetMQQueue(); _sendQueue.ReceiveReady += OnQueueReady; // 3. 启动 Poller _poller = new NetMQPoller { _routerSocket, _sendQueue }; _poller.RunAsync(); Console.WriteLine($"[Dashboard] 指令服务启动,监听: tcp://*:{ListenPort}"); } catch (Exception ex) { Console.WriteLine($"[Dashboard] 指令端口绑定失败: {ex.Message}"); throw; } } /// /// [Poller线程] 处理网络接收 /// private void OnSocketReady(object? sender, NetMQSocketEventArgs e) { try { // Router 接收逻辑: // Frame 1: 发送者的 Identity (NetMQ 自动处理) // Frame 2: 真实数据 // 1. 读取身份 (Identity) var identityBytes = e.Socket.ReceiveFrameBytes(); string serviceId = Encoding.UTF8.GetString(identityBytes); // e.g., "CameraApp_01" // 2. 读取消息内容 // 兼容性处理:有些 Dealer 实现可能会发空帧,这里做个简单尝试 // 如果发现在 Identity 后紧跟的是空帧,则再读一帧 // 但在我们目前的 Dealer 实现中,是直接发的 JSON string message = e.Socket.ReceiveFrameString(); if (string.IsNullOrWhiteSpace(message)) { if (e.Socket.HasIn) message = e.Socket.ReceiveFrameString(); } // 3. 协议解析与业务分发 ProcessMessage(serviceId, message, identityBytes); } catch (Exception ex) { Debug.WriteLine($"[Command Receive Error] {ex.Message}"); } } /// /// 核心业务逻辑处理 /// private void ProcessMessage(string serviceId, string json, byte[] identityBytes) { try { // 尝试解析基础结构 var jObj = JObject.Parse(json); string action = jObj["Action"]?.ToString() ?? "Unknown"; // 更新最后心跳时间 (如果已存在) if (_clients.TryGetValue(serviceId, out var existingClient)) { existingClient.LastHeartbeat = DateTime.Now; } // ★★★ 处理注册握手 ★★★ if (action == "Register") { HandleRegistration(serviceId, jObj, identityBytes); } else { // 其他业务消息,透传给上层 Console.WriteLine($"[指令] From {serviceId}: {json}"); OnMessageReceived?.Invoke(serviceId, json); } } catch (JsonException) { Console.WriteLine($"[指令] 收到非 JSON 消息 From {serviceId}: {json}"); } } /// /// 处理注册逻辑 /// private void HandleRegistration(string serviceId, JObject jObj, byte[] identityBytes) { var payload = jObj["Payload"]; if (payload == null) return; // 1. 提取客户端信息 var client = new ConnectedClient { ServiceId = serviceId, Ip = payload["Ip"]?.ToString() ?? "Unknown", // ★★★ 解析新字段 ★★★ WebPort = payload["WebPort"]?.Value() ?? 5000, Version = payload["Version"]?.ToString() ?? "Unknown", Pid = payload["Pid"]?.Value() ?? 0, TargetVideoNodes = payload["TargetVideoNodes"]?.ToObject>() ?? new List(), LastHeartbeat = DateTime.Now }; // 2. 存入内存表 (Add or Update) _clients.AddOrUpdate(serviceId, client, (key, old) => client); Console.WriteLine($"[注册成功] {serviceId}"); // 3. 回复 ACK (握手确认) // 告诉客户端:我收到你的注册了,连接建立成功 var ackPacket = new { Action = "ACK", Message = $"Registered {serviceId}", Time = DateTime.Now }; string ackJson = JsonConvert.SerializeObject(ackPacket); // 直接在 Poller 线程发回,不需要走 Queue (因为拥有 Socket 所有权) _routerSocket?.SendMoreFrame(identityBytes).SendFrame(ackJson); // 4. 通知 UI 更新列表 OnClientRegistered?.Invoke(client); } /// /// [Poller线程] 处理发送队列 /// private void OnQueueReady(object? sender, NetMQQueueEventArgs e) { if (_routerSocket == null) return; if (e.Queue.TryDequeue(out var packet, TimeSpan.Zero)) { // Router 发送:[Identity] [Data] _routerSocket.SendMoreFrame(packet.TargetId) .SendFrame(packet.JsonData); Console.WriteLine($"[发送] To {packet.TargetId}: {packet.JsonData}"); } } /// /// [公共API] 向指定 Service 发送指令 /// public void SendCommand(string targetServiceId, object commandData) { if (_sendQueue == null) return; var json = JsonConvert.SerializeObject(commandData); _sendQueue.Enqueue(new CommandPacket { TargetId = targetServiceId, JsonData = json }); } public void Dispose() { _poller?.Stop(); _poller?.Dispose(); _routerSocket?.Dispose(); _sendQueue?.Dispose(); } // ============================================================= // 数据模型 // ============================================================= private class CommandPacket { public string TargetId { get; set; } = ""; public string JsonData { get; set; } = ""; } }