From 3f8e42e56050a5ef5b97763c261dd66e0206daf8 Mon Sep 17 00:00:00 2001 From: wilson Date: Mon, 12 Jan 2026 18:27:58 +0800 Subject: [PATCH] =?UTF-8?q?NetMQ=20=E5=8D=8F=E8=AE=AE=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=91=84=E5=83=8F=E5=A4=B4=E5=A2=9E=E3=80=81=E5=88=A0?= =?UTF-8?q?=E3=80=81=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- SHH.CameraDashboard/App.xaml.cs | 209 +++++++---------- .../Services/CommandBusClient.cs | 35 ++- .../Abstractions/Models/VideoSourceConfig.cs | 1 + .../Controllers/CamerasController.cs | 7 +- .../Controllers/Dto/DeviceUpdateDto.cs | 24 +- SHH.CameraSdk/Core/Manager/CameraManager.cs | 2 + SHH.CameraSdk/Core/SdkGlobal.cs | 13 ++ .../Core/Services/ConnectivitySentinel.cs | 109 ++++++--- .../Core/Services/FileStorageService.cs | 10 +- .../Drivers/HikVision/HikVideoSource.cs | 3 +- .../Core/CmdClients/CommandClientWorker.cs | 125 ++++++----- .../Core/CmdClients/RemoveCameraHandler.cs | 84 +++++++ .../Core/CmdClients/SyncCameraHandler.cs | 212 ++++++++++++------ .../NetSenders/NetMQProtocolExtensions.cs | 2 +- .../Core/NetSenders/NetMqSenderWorker.cs | 76 ++++--- .../Core/NetSenders/NetworkStreamingWorker.cs | 8 +- .../Core/NetSenders/VideoDataChannel.cs | 7 +- SHH.CameraService/Program.cs | 1 + SHH.Contracts/CameraConfigDto.cs | 4 +- SHH.Contracts/Commands/ProtocolHeaders.cs | 4 +- 20 files changed, 604 insertions(+), 332 deletions(-) create mode 100644 SHH.CameraSdk/Core/SdkGlobal.cs create mode 100644 SHH.CameraService/Core/CmdClients/RemoveCameraHandler.cs diff --git a/SHH.CameraDashboard/App.xaml.cs b/SHH.CameraDashboard/App.xaml.cs index 3c720c8..d9b571a 100644 --- a/SHH.CameraDashboard/App.xaml.cs +++ b/SHH.CameraDashboard/App.xaml.cs @@ -21,6 +21,29 @@ namespace SHH.CameraDashboard #endregion + #region OnExit + + /// + /// 退出时执行 + /// + /// + protected override void OnExit(ExitEventArgs e) + { + // 1. 显式停止通讯总线 + CommandBusClient.Instance.Stop(); + + // 2. 如果你有其他的单例服务需要清理(比如视频解码库),也放在这里 + base.OnExit(e); + + // 3. 终极保底:如果程序在清理逻辑执行后 3 秒还没消失,强制杀掉进程 + // 防止某些第三方 DLL(如海康 SDK)的线程卡死 + Task.Delay(3000).ContinueWith(_ => + { + System.Diagnostics.Process.GetCurrentProcess().Kill(); + }); + } + + #endregion protected override async void OnStartup(StartupEventArgs e) { @@ -99,142 +122,70 @@ namespace SHH.CameraDashboard { Console.WriteLine($"[自动化] 新服务上线: {client.InstanceId}"); - Task.Run(async () => - { - await Task.Delay(500); + //Task.Run(async () => + //{ + // await Task.Delay(500); - // 1. 构建业务配置对象 - var cameraConfig = new CameraConfigDto - { - Id = 17798, - Name = "206摄像头", - Location = "404办公室", - IpAddress = "172.16.41.88", - Username = "admin", - Password = "abcd1234", - Port = 8000, - ChannelIndex = 1, - StreamType = 0, - Brand = DeviceBrand.HikVision.GetHashCode(), // 对应 DeviceBrand 枚举 - RenderHandle = 0, // 初始化为0 - MainboardIp = "", // 留空 - MainboardPort = 0, - RtspPath = "" - }; + // // 1. 构建业务配置对象 + // var cameraConfig = new CameraConfigDto + // { + // Id = 17798, + // Name = "206摄像头", + // Location = "404办公室", + // IpAddress = "172.16.41.88", + // Username = "admin", + // Password = "abcd1234", + // Port = 8000, + // ChannelIndex = 1, + // StreamType = 0, + // Brand = DeviceBrand.HikVision.GetHashCode(), // 对应 DeviceBrand 枚举 + // RenderHandle = 0, // 初始化为0 + // MainboardIp = "", // 留空 + // MainboardPort = 0, + // RtspPath = "" + // }; - // ★ 新增:一并带上订阅要求 ★ - cameraConfig.AutoSubscriptions = new List - { - // 第一条:显示帧,要求 8 帧 - new CameraConfigSubscribeDto { - AppId = "UI_Display", - Type = 0, - TargetFps = 8, - Memo = "显示帧" - }, - // 第二条:分析帧,要求 1 帧 - new CameraConfigSubscribeDto { - AppId = "AI_Analysis", - Type = 0, - Memo = "分析帧", - TargetFps = 1 - } - }; + // // ★ 新增:一并带上订阅要求 ★ + // cameraConfig.AutoSubscriptions = new List + // { + // // 第一条:显示帧,要求 8 帧 + // new CameraConfigSubscribeDto { + // AppId = "UI_Display", + // Type = 0, + // TargetFps = 8, + // Memo = "显示帧" + // }, + // // 第二条:分析帧,要求 1 帧 + // new CameraConfigSubscribeDto { + // AppId = "AI_Analysis", + // Type = 0, + // Memo = "分析帧", + // TargetFps = 1 + // } + // }; - // 2. 构造指令包 - var command = new CommandPayload - { - Protocol = ProtocolHeaders.Command, - CmdCode = ProtocolHeaders.SyncCamera, - TargetId = client.InstanceId, - RequestId = Guid.NewGuid().ToString("N"), + // // 2. 构造指令包 + // var command = new CommandPayload + // { + // Protocol = ProtocolHeaders.Command, + // CmdCode = ProtocolHeaders.SyncCamera, + // TargetId = client.InstanceId, + // RequestId = Guid.NewGuid().ToString("N"), - // ★ 修正 1: 使用 JsonParams 属性名,并将对象序列化为 JSON 字符串 ★ - // 因为你的 DTO 定义 JsonParams 是 string 类型 - JsonParams = JsonHelper.Serialize(cameraConfig), + // // ★ 修正 1: 使用 JsonParams 属性名,并将对象序列化为 JSON 字符串 ★ + // // 因为你的 DTO 定义 JsonParams 是 string 类型 + // JsonParams = JsonHelper.Serialize(cameraConfig), - // ★ 修正 2: Timestamp 直接赋值 DateTime 对象 ★ - // 因为你的 DTO 定义 Timestamp 是 DateTime 类型 - Timestamp = DateTime.Now, + // // ★ 修正 2: Timestamp 直接赋值 DateTime 对象 ★ + // // 因为你的 DTO 定义 Timestamp 是 DateTime 类型 + // Timestamp = DateTime.Now, - RequireAck = true - }; + // RequireAck = true + // }; - // 3. 发送 - await CommandBusClient.Instance.SendInternalAsync(client.InstanceId, command); - }); - } - - /// - /// 在程序启动时订阅事件 - /// - /// - private void SetupAutomaticConfiguration(ConnectedClient obj) - { - // 监听注册事件:每当有 Service (CommandClientWorker) 连上来注册成功 - CommandServer.Instance.OnClientRegistered += (client) => - { - Console.WriteLine($"[自动化] 检测到新服务上线: {client.ServiceId} ({client.Ip})"); - - // 放到线程池去执行,避免阻塞 UI 或网络接收线程 - Task.Run(async () => - { - // 1. 稍微延时一点点 (500ms),给 Service 一点喘息时间准备接收指令 - await Task.Delay(500); - - // 2. 构造您指定的“206摄像头”配置 - var cameraConfig = new CameraConfigDto - { - Id = 17798, - Name = "206摄像头", - Location = "404办公室", - IpAddress = "172.16.41.88", - Username = "admin", - Password = "abcd1234", - Port = 8000, - ChannelIndex = 1, - StreamType = 0, - Brand = DeviceBrand.HikVision.GetHashCode(), // 对应 DeviceBrand 枚举 - RenderHandle = 0, // 初始化为0 - MainboardIp = "", // 留空 - MainboardPort = 0, - RtspPath = "" - }; - - // ★ 新增:一并带上订阅要求 ★ - cameraConfig.AutoSubscriptions = new List - { - // 第一条:显示帧,要求 8 帧 - new CameraConfigSubscribeDto { - AppId = "UI_Display", - Type = 0, - TargetFps = 8, - Memo = "显示帧" - }, - // 第二条:分析帧,要求 1 帧 - new CameraConfigSubscribeDto { - AppId = "AI_Analysis", - Type = 0, - Memo = "分析帧", - TargetFps = 1 - } - }; - - // 3. 封装协议包 - var commandPacket = new - { - Action = "SyncCamera", // 告诉 Service 执行什么动作 - Payload = cameraConfig, // 数据载荷 - Time = DateTime.Now - }; - - // 4. 定向发送 - // client.ServiceId 就是那个 "CameraApp_01" - CommandServer.Instance.SendCommand(client.ServiceId, commandPacket); - - Console.WriteLine($"[自动化] 已向 {client.ServiceId} 下发配置: 206摄像头"); - }); - }; + // // 3. 发送 + // await CommandBusClient.Instance.SendInternalAsync(client.InstanceId, command); + //}); } /// diff --git a/SHH.CameraDashboard/Services/CommandBusClient.cs b/SHH.CameraDashboard/Services/CommandBusClient.cs index acc272a..054e702 100644 --- a/SHH.CameraDashboard/Services/CommandBusClient.cs +++ b/SHH.CameraDashboard/Services/CommandBusClient.cs @@ -147,10 +147,37 @@ namespace SHH.CameraDashboard public void Stop() { - _isRunning = false; - _poller?.Stop(); - _poller?.Dispose(); - _routerSocket?.Dispose(); + // 增加锁,防止重复释放 + lock (_disposeLock) + { + if (!_isRunning) return; + _isRunning = false; + + Console.WriteLine("[Bus] 正在释放 NetMQ 资源..."); + + // 1. 停止 Poller + if (_poller != null) + { + _poller.Stop(); + _poller.Dispose(); + _poller = null; + } + + // 2. 释放 Socket + if (_routerSocket != null) + { + _routerSocket.Close(); + _routerSocket.Dispose(); + _routerSocket = null; + } + + // 3. ★★★ 解决残留的关键:强制清理静态环境 ★★★ + // 参数为 true 会等待后台 I/O 线程完成(可能卡住), + // 参数为 false 则强制放弃未完成的 I/O 直接关闭。 + NetMQConfig.Cleanup(false); + + Console.WriteLine("[Bus] NetMQ 资源已安全释放。"); + } } public void Dispose() => Stop(); diff --git a/SHH.CameraSdk/Abstractions/Models/VideoSourceConfig.cs b/SHH.CameraSdk/Abstractions/Models/VideoSourceConfig.cs index 1b218e3..833e847 100644 --- a/SHH.CameraSdk/Abstractions/Models/VideoSourceConfig.cs +++ b/SHH.CameraSdk/Abstractions/Models/VideoSourceConfig.cs @@ -62,6 +62,7 @@ public class VideoSourceConfig #endregion + #region --- 2. 厂商扩展配置 (Vendor-Specific Extensions) --- /// diff --git a/SHH.CameraSdk/Controllers/CamerasController.cs b/SHH.CameraSdk/Controllers/CamerasController.cs index 23644e8..4dcc7df 100644 --- a/SHH.CameraSdk/Controllers/CamerasController.cs +++ b/SHH.CameraSdk/Controllers/CamerasController.cs @@ -232,7 +232,7 @@ public class CamerasController : ControllerBase ChannelIndex = dto.ChannelIndex, Brand = dto.Brand, RtspPath = dto.RtspPath, - + RenderHandle = dto.RenderHandle, // ========================================== // 2. 热更新参数 (运行时属性) @@ -243,10 +243,7 @@ public class CamerasController : ControllerBase MainboardIp = dto.MainboardIp, MainboardPort = dto.MainboardPort, - RenderHandle = dto.RenderHandle, - // 注意:通常句柄是通过 bind-handle 接口单独绑定的, - // 但如果 ConfigDto 里包含了上次保存的句柄,也可以映射 - // RenderHandle = dto.RenderHandle, + // ========================================== // 3. 图像处理参数 diff --git a/SHH.CameraSdk/Controllers/Dto/DeviceUpdateDto.cs b/SHH.CameraSdk/Controllers/Dto/DeviceUpdateDto.cs index 558194f..deb624b 100644 --- a/SHH.CameraSdk/Controllers/Dto/DeviceUpdateDto.cs +++ b/SHH.CameraSdk/Controllers/Dto/DeviceUpdateDto.cs @@ -44,15 +44,9 @@ public class DeviceUpdateDto public string RtspPath { get; set; } = string.Empty; - /// 关联的主板IP (用于联动控制) - [RegularExpression(@"^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)?$", - ErrorMessage = "请输入合法的IPv4地址")] - public string MainboardIp { get; set; } - = string.Empty; - - /// 关联的主板端口 - [Range(1, 65535, ErrorMessage = "主板端口号必须在 1-65535 范围内")] - public int MainboardPort { get; set; } + /// 渲染句柄 (IntPtr 的 Long 形式) + [Range(0, long.MaxValue, ErrorMessage = "渲染句柄必须是非负整数")] + public long RenderHandle { get; set; } // ============================================================================== // 2. 热更新参数 (Hot Update) @@ -71,9 +65,15 @@ public class DeviceUpdateDto [Range(0, 1, ErrorMessage = "码流类型只能是 0(主码流) 或 1(子码流)")] public int? StreamType { get; set; } - /// 渲染句柄 (IntPtr 的 Long 形式) - [Range(0, long.MaxValue, ErrorMessage = "渲染句柄必须是非负整数")] - public long RenderHandle { get; set; } + /// 关联的主板IP (用于联动控制) + [RegularExpression(@"^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)?$", + ErrorMessage = "请输入合法的IPv4地址")] + public string MainboardIp { get; set; } + = string.Empty; + + /// 关联的主板端口 + [Range(1, 65535, ErrorMessage = "主板端口号必须在 1-65535 范围内")] + public int MainboardPort { get; set; } // ============================================================================== // 3. 图像处理参数 (Image Processing - Hot Update) diff --git a/SHH.CameraSdk/Core/Manager/CameraManager.cs b/SHH.CameraSdk/Core/Manager/CameraManager.cs index e2fad47..67efd07 100644 --- a/SHH.CameraSdk/Core/Manager/CameraManager.cs +++ b/SHH.CameraSdk/Core/Manager/CameraManager.cs @@ -281,6 +281,8 @@ public class CameraManager : IDisposable, IAsyncDisposable newConfig.Username != oldConfig.Username || newConfig.Password != oldConfig.Password || newConfig.ChannelIndex != oldConfig.ChannelIndex || + newConfig.RtspPath != oldConfig.RtspPath || + newConfig.RenderHandle != oldConfig.RenderHandle || newConfig.Brand != oldConfig.Brand; if (needColdRestart) diff --git a/SHH.CameraSdk/Core/SdkGlobal.cs b/SHH.CameraSdk/Core/SdkGlobal.cs new file mode 100644 index 0000000..5ddc02c --- /dev/null +++ b/SHH.CameraSdk/Core/SdkGlobal.cs @@ -0,0 +1,13 @@ +namespace SHH.CameraSdk +{ + /// + /// SDk 全局 + /// + public class SdkGlobal + { + /// + /// 是否保存摄像头配置 + /// + public static bool SaveCameraConfigEnable { get; set; } = false; + } +} \ No newline at end of file diff --git a/SHH.CameraSdk/Core/Services/ConnectivitySentinel.cs b/SHH.CameraSdk/Core/Services/ConnectivitySentinel.cs index 2e94728..7ecd4a1 100644 --- a/SHH.CameraSdk/Core/Services/ConnectivitySentinel.cs +++ b/SHH.CameraSdk/Core/Services/ConnectivitySentinel.cs @@ -1,4 +1,6 @@ -using System.Drawing; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Drawing; using System.Net.NetworkInformation; namespace SHH.CameraSdk; @@ -9,23 +11,32 @@ namespace SHH.CameraSdk; /// 1. 低耦合:不依赖具体驱动,只依赖接口 /// 2. 高性能:使用 Parallel.ForEachAsync 实现受控并行 /// 3. 智能策略:播放中不Ping,空闲时才Ping +/// 4. 稳定性:基于“持续断联时间”判定离线,防止网络瞬抖 /// public class ConnectivitySentinel { - private readonly CameraManager _manager; // [cite: 329] + private readonly CameraManager _manager; // private readonly PeriodicTimer _timer; private readonly CancellationTokenSource _cts = new(); - // [关键] 状态缓存:用于“去重”。 - // 只有当状态真的从 true 变 false (或反之) 时,才通知 Manager。 - // 防止每 3 秒发一次 "在线" 骚扰上层。 + // [关键] 状态缓存:用于“去重”上报 private readonly ConcurrentDictionary _lastStates = new(); + // [新增] 故障计时器:记录设备“首次探测失败”的时间点 + // Key: DeviceId, Value: 首次失败时间 + private readonly ConcurrentDictionary _failureStartTimes = new(); + // [关键配置] 最大并发度 - // 建议值:CPU 核心数 * 4,或者固定 16-32 - // 50 个摄像头,设为 16,意味着分 4 批完成,总耗时极短 private const int MAX_PARALLELISM = 16; + // [配置] 判定离线的持续时间阈值 (秒) + // 只有连续 Ping 不通超过 30秒,才认定为断线 + private const int OFFLINE_DURATION_THRESHOLD = 30; + + // [配置] 单次 Ping 的超时时间 (毫秒) + // 设为 1000ms,保证一轮检查快速结束,不依赖 Ping 的默认 5秒 超时 + private const int PING_TIMEOUT = 1000; + public ConnectivitySentinel(CameraManager manager) { _manager = manager; @@ -44,11 +55,9 @@ public class ConnectivitySentinel while (await _timer.WaitForNextTickAsync(_cts.Token)) { // 1. 获取当前所有设备的快照 - // CameraManager.GetAllDevices() 返回的是 BaseVideoSource,它实现了 IDeviceConnectivity var devices = _manager.GetAllDevices().Cast(); // 2. [核心回答] 受控并行执行 - // .NET 6+ 提供的超级 API,专门解决“一下子 50 个”的问题 await Parallel.ForEachAsync(devices, new ParallelOptions { MaxDegreeOfParallelism = MAX_PARALLELISM, @@ -66,37 +75,84 @@ public class ConnectivitySentinel private async Task CheckSingleDeviceAsync(IDeviceConnectivity device) { - bool isAlive = false; + // 1. 获取“瞬时”连通性 (Raw Status) + bool isResponsive = false; - // [智能策略]:如果设备正在取流,直接检查帧心跳(省流模式) + // [智能策略]:如果设备正在取流,优先检查帧心跳 if (device.Status == VideoSourceStatus.Playing || device.Status == VideoSourceStatus.Streaming) { long now = Environment.TickCount64; - // 5秒内有帧,就算在线 - isAlive = (now - device.LastFrameTick) < 5000; + // 5秒内有帧,就算瞬时在线 + isResponsive = (now - device.LastFrameTick) < 5000; + + // [双重保障] 如果帧心跳断了,立即 Ping 确认,防止只是解码卡死而非断网 + if (!isResponsive) + { + isResponsive = await PingAsync(device.IpAddress); + } } else { // [主动探测]:空闲或离线时,发射 ICMP Ping - isAlive = await PingAsync(device.IpAddress); + isResponsive = await PingAsync(device.IpAddress); } - // [状态注入]:将探测结果“注入”回设备 - device.SetNetworkStatus(isAlive); + // 2. [核心逻辑] 基于持续时间的稳定性判定 (Stable Status) + bool isLogicallyOnline; + + if (isResponsive) + { + // --- 情况 A: 瞬时探测通了 --- + // 只要通一次,立即清除故障计时,认为设备在线 + _failureStartTimes.TryRemove(device.Id, out _); + isLogicallyOnline = true; + } + else + { + // --- 情况 B: 瞬时探测失败 --- + // 记录或获取“首次失败时间” + var nowTime = DateTime.Now; + var firstFailureTime = _failureStartTimes.GetOrAdd(device.Id, nowTime); + + // 计算已经持续失败了多久 + var failureDuration = (nowTime - firstFailureTime).TotalSeconds; + + if (failureDuration >= OFFLINE_DURATION_THRESHOLD) + { + // 只有持续失败超过 30秒,才“真的”判定为离线 + isLogicallyOnline = false; + } + else + { + // 还没到 30秒,处于“抖动观察期” + // 策略:维持上一次的已知状态(如果之前是在线,就假装还在线;之前是离线,就继续离线) + // 这样可以防止网络微小抖动导致的 Status 频繁跳变 + isLogicallyOnline = _lastStates.TryGetValue(device.Id, out bool last) ? last : true; + + // 调试日志 (可选) + // Console.WriteLine($"[Sentinel] 设备 {device.Id} 瞬时异常,观察中: {failureDuration:F1}s / {OFFLINE_DURATION_THRESHOLD}s"); + } + } + + // [状态注入]:将经过时间滤波后的“稳定状态”注入回设备 + device.SetNetworkStatus(isLogicallyOnline); // 3. [状态去重与上报] - // 获取上一次的状态,如果没记录过,假设它之前是反状态(强制第一次上报) - bool lastState = _lastStates.TryGetValue(device.Id, out bool val) ? val : !isAlive; + // 获取上一次上报的状态,默认为反状态以触发首次上报 + bool lastReported = _lastStates.TryGetValue(device.Id, out bool val) ? val : !isLogicallyOnline; - if (lastState != isAlive) + if (lastReported != isLogicallyOnline) { // 记录新状态 - _lastStates[device.Id] = isAlive; + _lastStates[device.Id] = isLogicallyOnline; - // ★★★ 核心动作:只通知 Manager,不做任何网络操作 ★★★ - _manager.NotifyStatusChange(device.Id, isAlive, "网络连通性哨兵检测结论"); + // 构造原因描述 + string reason = isLogicallyOnline + ? "网络探测恢复" + : $"持续断连超过{OFFLINE_DURATION_THRESHOLD}秒"; - // Console.WriteLine($"[Sentinel] 诊断变化: {device.Id} -> {isAlive}"); + // ★★★ 核心动作:通知 Manager ★★★ + _manager.NotifyStatusChange(device.Id, isLogicallyOnline, reason); } } @@ -106,8 +162,11 @@ public class ConnectivitySentinel try { using var ping = new Ping(); - // 超时设为 800ms,快速失败,避免拖慢整体批次 - var reply = await ping.SendPingAsync(ip, 800); + // [修改] 超时设为 1000ms (1秒) + // 理由:我们要快速探测,不要等待 5秒。 + // 即使 Ping 因为网络延迟用了 4秒 才返回,Ping 类也会在 1秒 时抛出超时, + // 这会被视为一次“瞬时失败”,然后由外层的 30秒 时间窗口来容错。 + var reply = await ping.SendPingAsync(ip, PING_TIMEOUT); return reply.Status == IPStatus.Success; } catch diff --git a/SHH.CameraSdk/Core/Services/FileStorageService.cs b/SHH.CameraSdk/Core/Services/FileStorageService.cs index a103448..64c88ca 100644 --- a/SHH.CameraSdk/Core/Services/FileStorageService.cs +++ b/SHH.CameraSdk/Core/Services/FileStorageService.cs @@ -55,8 +55,11 @@ public class FileStorageService : IStorageService await _configLock.WaitAsync(); try { - var json = JsonSerializer.Serialize(configs, _jsonOptions); - await File.WriteAllTextAsync(_devicesPath, json); + if (SdkGlobal.SaveCameraConfigEnable) + { + var json = JsonSerializer.Serialize(configs, _jsonOptions); + await File.WriteAllTextAsync(_devicesPath, json); + } } catch (Exception ex) { @@ -72,6 +75,9 @@ public class FileStorageService : IStorageService await _configLock.WaitAsync(); try { + if (!SdkGlobal.SaveCameraConfigEnable) + return new List(); + var json = await File.ReadAllTextAsync(_devicesPath); if (string.IsNullOrWhiteSpace(json)) return new List(); diff --git a/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs b/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs index eba3e99..1678ed6 100644 --- a/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs +++ b/SHH.CameraSdk/Drivers/HikVision/HikVideoSource.cs @@ -386,7 +386,8 @@ public class HikVideoSource : BaseVideoSource, // ========================================================================= // 【修正】删除这里的 GlobalStreamDispatcher.Dispatch! // 严禁在这里分发,因为这时的图是“生的”,还没经过 Pipeline 处理。 - // =========================================================================GlobalStreamDispatcher.Dispatch(Id, smartFrame); + // ========================================================================= + //GlobalStreamDispatcher.Dispatch(Id, smartFrame); // 4. [分发] 将决策结果传递给处理中心 // decision.TargetAppIds 包含了 "谁需要这一帧" 的信息 diff --git a/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs b/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs index dabbf55..123138c 100644 --- a/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs +++ b/SHH.CameraService/Core/CmdClients/CommandClientWorker.cs @@ -1,10 +1,11 @@ -using MessagePack; +using System.Text; +using MessagePack; using Microsoft.Extensions.Hosting; using NetMQ; +using NetMQ.Monitoring; // ★ 1. 必须引用 Monitoring 命名空间 using NetMQ.Sockets; using SHH.CameraSdk; using SHH.Contracts; -using System.Text; namespace SHH.CameraService; @@ -16,6 +17,10 @@ public class CommandClientWorker : BackgroundService // 管理多个 Socket private readonly List _sockets = new(); + + // ★ 2. 新增:保存 Monitor 列表,防止被 GC 回收 + private readonly List _monitors = new(); + private NetMQPoller? _poller; public CommandClientWorker( @@ -34,71 +39,53 @@ public class CommandClientWorker : BackgroundService if (!_config.ShouldConnect || _config.CommandEndpoints.Count == 0) return; - // 1. 建立连接 (但不立即启动 Poller) _poller = new NetMQPoller(); + // ------------------------------------------------------------- + // 核心修改区:建立连接并挂载监控器 + // ------------------------------------------------------------- foreach (var ep in _config.CommandEndpoints) { try { var socket = new DealerSocket(); - // 建议加上 Socket 索引或 UUID 以防服务端认为 Identity 冲突 - // 或者保持原样,取决于服务端逻辑。通常同一个 AppId 连不同 Server 是没问题的。 socket.Options.Identity = Encoding.UTF8.GetBytes(_config.AppId); - socket.Connect(ep.Uri); + var monitorUrl = $"inproc://monitor_{Guid.NewGuid():N}"; + var monitor = new NetMQMonitor(socket, monitorUrl, SocketEvents.Connected); + + monitor.Connected += async (s, args) => + { + Console.WriteLine($"[指令] 网络连接建立: {ep.Uri} -> 正在补发注册包..."); + await SendRegisterAsync(socket); + }; + + // ★★★ 修正点:使用 AttachToPoller 代替 Add ★★★ + // 错误写法: _poller.Add(monitor); + monitor.AttachToPoller(_poller); + + // 依然需要保存引用,防止被 GC 回收 + _monitors.Add(monitor); + + socket.Connect(ep.Uri); socket.ReceiveReady += OnSocketReceiveReady; _sockets.Add(socket); _poller.Add(socket); - Console.WriteLine($"[指令] 建立通道: {ep.Uri}"); + Console.WriteLine($"[指令] 通道初始化完成: {ep.Uri} (带自动重连监控)"); + } + catch (Exception ex) + { + Console.WriteLine($"[指令] 连接初始化异常: {ex.Message}"); } - catch (Exception ex) { Console.WriteLine($"[指令] 连接异常: {ex.Message}"); } } if (_sockets.Count == 0) return; // ================================================================= - // 2. 发送注册包 (在 Poller 启动前发送,绝对线程安全) + // 6. 绑定 ACK 逻辑 (保持不变) // ================================================================= - var registerPayload = new RegisterPayload - { - Protocol = ProtocolHeaders.ServerRegister, - InstanceId = _config.AppId, - ProcessId = Environment.ProcessId, - Version = "1.0.0", - ServerIp = "127.0.0.1", - WebApiPort = _config.BasePort, - StartTime = DateTime.Now - }; - - try - { - byte[] regData = MessagePackSerializer.Serialize(registerPayload); - var ctx = await _pipeline.ExecuteSendAsync(ProtocolHeaders.ServerRegister, regData); - - if (ctx != null) - { - foreach (var socket in _sockets) - { - // 此时 Poller 还没跑,主线程发送是安全的 - socket.SendMoreFrame(ctx.Protocol).SendFrame(ctx.Data); - } - Console.WriteLine($"[指令] 注册包已广播至 {_sockets.Count} 个目标"); - } - } - catch (Exception ex) - { - Console.WriteLine($"[指令] 注册失败: {ex.Message}"); - } - - // ================================================================= - // 3. 绑定 ACK 逻辑 - // ================================================================= - // 关键修正:直接使用 async void,不要包裹在 Task.Run 中! - // 因为 OnResponseReady 是由 Dispatcher 触发的,而 Dispatcher 是由 Poller 线程触发的。 - // 所以这里就在 Poller 线程内,可以直接操作 Socket。 _dispatcher.OnResponseReady += async (result) => { try @@ -122,8 +109,11 @@ public class CommandClientWorker : BackgroundService }; // ================================================================= - // 4. 启动 Poller (开始监听接收) + // 7. 启动 Poller // ================================================================= + // 注意:我们不需要手动发第一次注册包了, + // 因为 Poller 启动后,底层 TCP 会建立连接,从而触发 monitor.Connected 事件, + // 事件里会自动发送注册包。这就是“自动档”的好处。 _poller.RunAsync(); // 阻塞直到取消 @@ -135,12 +125,49 @@ public class CommandClientWorker : BackgroundService // 清理 _poller.Stop(); _poller.Dispose(); + foreach (var m in _monitors) m.Dispose(); // 释放监控器 foreach (var s in _sockets) s.Dispose(); } + // ================================================================= + // ★ 8. 抽离出的注册包发送逻辑 (供 Monitor 调用) + // ================================================================= + private async Task SendRegisterAsync(DealerSocket targetSocket) + { + try + { + var registerPayload = new RegisterPayload + { + Protocol = ProtocolHeaders.ServerRegister, + InstanceId = _config.AppId, + ProcessId = Environment.ProcessId, + Version = "1.0.0", + ServerIp = "127.0.0.1", // 建议优化:获取本机真实IP + WebApiPort = _config.BasePort, + StartTime = DateTime.Now + }; + + byte[] regData = MessagePackSerializer.Serialize(registerPayload); + + // 执行拦截器 + var ctx = await _pipeline.ExecuteSendAsync(ProtocolHeaders.ServerRegister, regData); + + if (ctx != null) + { + // 直接向触发事件的那个 Socket 发送 + // DealerSocket 允许在连接未完全就绪时 Send,它会缓存直到网络通畅 + targetSocket.SendMoreFrame(ctx.Protocol).SendFrame(ctx.Data); + // Console.WriteLine($"[指令] 身份注册包已推入队列: {targetSocket.Options.Identity}"); + } + } + catch (Exception ex) + { + Console.WriteLine($"[指令] 注册包发送失败: {ex.Message}"); + } + } + private async void OnSocketReceiveReady(object? sender, NetMQSocketEventArgs e) { - // 这里的代码运行在 Poller 线程 NetMQMessage incomingMsg = new NetMQMessage(); if (e.Socket.TryReceiveMultipartMessage(ref incomingMsg)) { @@ -154,8 +181,6 @@ public class CommandClientWorker : BackgroundService var ctx = await _pipeline.ExecuteReceiveAsync(rawProtocol, rawData); if (ctx != null) { - // DispatchAsync 会同步触发 OnResponseReady, - // 从而在同一个线程内完成 ACK 发送,线程安全且高效。 await _dispatcher.DispatchAsync(ctx.Protocol, ctx.Data); } } diff --git a/SHH.CameraService/Core/CmdClients/RemoveCameraHandler.cs b/SHH.CameraService/Core/CmdClients/RemoveCameraHandler.cs new file mode 100644 index 0000000..43fec85 --- /dev/null +++ b/SHH.CameraService/Core/CmdClients/RemoveCameraHandler.cs @@ -0,0 +1,84 @@ +using Newtonsoft.Json.Linq; +using SHH.CameraSdk; +using SHH.Contracts; + +namespace SHH.CameraService +{ + /// + /// 移除设备指令处理器 + /// + public class RemoveCameraHandler : ICommandHandler + { + private readonly CameraManager _cameraManager; + + /// + /// 指令名称 + /// + public string ActionName => ProtocolHeaders.Remove_Camera; + + /// + /// 构造函数 + /// + /// + public RemoveCameraHandler(CameraManager cameraManager) + { + _cameraManager = cameraManager; + } + + /// + /// 处理指令 + /// + /// + public async Task ExecuteAsync(JToken payload) + { + long deviceId = 0; + + try + { + // 1. 增强型 ID 解析 + if (payload.Type == JTokenType.Object) + { + // 兼容大小写不敏感的解析 + var idToken = payload["Id"] ?? payload["id"]; + if (idToken != null) deviceId = idToken.Value(); + } + else if (payload.Type == JTokenType.Integer || payload.Type == JTokenType.String) + { + // 兼容字符串形式的 ID + long.TryParse(payload.ToString(), out deviceId); + } + + if (deviceId <= 0) + { + Console.WriteLine($"[{ActionName}] 收到无效指令: ID解析失败 ({payload})"); + return; + } + + // 2. 预检查 + var device = _cameraManager.GetDevice(deviceId); + if (device == null) + { + Console.WriteLine($"[{ActionName}] 设备 {deviceId} 已经不在管理池中,无需操作。"); + return; + } + + // 3. 安全移除 + // 这里建议增加审计日志,记录谁触发了删除(如果协议里有用户信息的话) + device.AddAuditLog("收到远程指令:彻底移除设备"); + Console.WriteLine($"[{ActionName}] 正在安全移除设备: {deviceId} ({device.Config.Name})"); + + // CameraManager 内部会:StopAsync -> DisposeAsync -> TryRemove -> SaveChanges + await _cameraManager.RemoveDeviceAsync(deviceId); + + Console.WriteLine($"[{ActionName}] 设备 {deviceId} 已彻底清理并从持久化库中移除。"); + + // 4. (可选) 此处可以调用 CommandDispatcher 发送 Success ACK + } + catch (Exception ex) + { + // 捕获异常,防止影响全局 Socket 轮询 + Console.WriteLine($"[{ActionName}] 移除设备 {deviceId} 过程中发生致命错误: {ex.Message}"); + } + } + } +} \ No newline at end of file diff --git a/SHH.CameraService/Core/CmdClients/SyncCameraHandler.cs b/SHH.CameraService/Core/CmdClients/SyncCameraHandler.cs index 59a8dbc..217e681 100644 --- a/SHH.CameraService/Core/CmdClients/SyncCameraHandler.cs +++ b/SHH.CameraService/Core/CmdClients/SyncCameraHandler.cs @@ -1,103 +1,177 @@ using Newtonsoft.Json.Linq; -using SHH.CameraSdk; // 引用包含 FrameController 和 FrameRequirement 的命名空间 +using SHH.CameraSdk; using SHH.Contracts; namespace SHH.CameraService; +/// +/// 同步设备配置处理器 +/// public class SyncCameraHandler : ICommandHandler { private readonly CameraManager _cameraManager; - public string ActionName => ProtocolHeaders.SyncCamera; + /// + /// 命令名称 + /// + public string ActionName => ProtocolHeaders.Sync_Camera; + /// + /// 构造函数 + /// + /// public SyncCameraHandler(CameraManager cameraManager) { _cameraManager = cameraManager; } + /// + /// 执行处理 + /// + /// + /// public async Task ExecuteAsync(JToken payload) { - // 1. 解析配置 + // 1. 反序列化配置 DTO var dto = payload.ToObject(); if (dto == null) return; - // 2. 添加设备到管理器 (这一步是必须的,不然没有 Device 就没有 Controller) - var videoConfig = new VideoSourceConfig - { - Id = dto.Id, - Name = dto.Name, - IpAddress = dto.IpAddress, - Port = dto.Port, - Username = dto.Username, - Password = dto.Password, - ChannelIndex = dto.ChannelIndex, - StreamType = dto.StreamType, - Brand = (DeviceBrand)dto.Brand, - RenderHandle = (IntPtr)dto.RenderHandle, - MainboardIp = dto.MainboardIp, - MainboardPort = dto.MainboardPort, - // 必须给个默认值,防止空引用 - VendorArguments = new Dictionary(), - }; - - // 如果设备不存在才添加,如果已存在,后续逻辑会直接获取 - if (_cameraManager.GetDevice(videoConfig.Id) == null) - { - _cameraManager.AddDevice(videoConfig); - } - - // 3. 核心:直接获取设备实例 + // 2. 尝试获取现有设备 var device = _cameraManager.GetDevice(dto.Id); - if (device == null) - { - Console.WriteLine($"[SyncError] 设备 {dto.Id} 创建失败,无法执行自动订阅。"); - return; - } - // 4. 拿到你的“宝贝”控制器 (FrameController) - var controller = device.Controller; - if (controller == null) + if (device != null) { - Console.WriteLine($"[SyncError] 设备 {dto.Id} 不支持流控调度 (Controller is null)。"); - return; - } + // ========================================================= + // 场景 A: 设备已存在 -> 执行智能更新 (Smart Update) + // ========================================================= + Console.WriteLine($"[Sync] 更新设备配置: {dto.Id} ({dto.Name})"); - // 5. 暴力注册订阅需求 (Loop AutoSubscriptions) - if (dto.AutoSubscriptions != null && dto.AutoSubscriptions.Count > 0) - { - foreach (var subItem in dto.AutoSubscriptions) + // 将全量配置映射为部分更新 DTO + var updateDto = new DeviceUpdateDto { - // 生成 AppId (照抄你给的逻辑) - string finalAppId = string.IsNullOrWhiteSpace(subItem.AppId) - ? $"SUB_{Guid.NewGuid().ToString("N").Substring(0, 8).ToUpper()}" - : subItem.AppId; + // --- 冷更新参数 (变更会触发重启) --- + IpAddress = dto.IpAddress, + Port = dto.Port, + Username = dto.Username, + Password = dto.Password, + ChannelIndex = dto.ChannelIndex, + Brand = dto.Brand, + RtspPath = dto.RtspPath, + RenderHandle = dto.RenderHandle, // long 类型直接赋值 - Console.WriteLine($"[自动化] 正在注册流控: {finalAppId}, 目标: {subItem.TargetFps} FPS"); + // --- 热更新参数 (变更立即生效) --- + Name = dto.Name, + Location = dto.Location, + StreamType = dto.StreamType, - // 构造 FrameRequirement 对象 (完全匹配你 FrameController 的入参) - // 这里的属性赋值对应你代码里 req.Type, req.SavePath 等逻辑 - var requirement = new FrameRequirement + MainboardIp = dto.MainboardIp, + MainboardPort = dto.MainboardPort, + + // --- 图像处理参数 (热更新) --- + AllowCompress = dto.AllowCompress, + AllowExpand = dto.AllowExpand, + TargetResolution = dto.TargetResolution, + EnhanceImage = dto.EnhanceImage, + UseGrayscale = dto.UseGrayscale + }; + + // 调用 Manager 的核心更新逻辑 (它会自动判断是 Stop->Start 还是直接应用) + await _cameraManager.UpdateDeviceConfigAsync(dto.Id, updateDto); + } + else + { + // ========================================================= + // 场景 B: 设备不存在 -> 执行新增 (Add New) + // ========================================================= + Console.WriteLine($"[Sync] 新增设备: {dto.Id} ({dto.Name})"); + + // 构造全新的设备配置 + var newConfig = new VideoSourceConfig + { + Id = dto.Id, + Name = dto.Name, + Brand = (DeviceBrand)dto.Brand, // int -> Enum 强转 + IpAddress = dto.IpAddress, + Port = dto.Port, + Username = dto.Username, + Password = dto.Password, + ChannelIndex = dto.ChannelIndex, + StreamType = dto.StreamType, + RtspPath = dto.RtspPath, + MainboardIp = dto.MainboardIp, + MainboardPort = dto.MainboardPort, + RenderHandle = (IntPtr)dto.RenderHandle, // long -> IntPtr 转换 + ConnectionTimeoutMs = 5000 // 默认超时 + }; + + // 添加到管理器池 + _cameraManager.AddDevice(newConfig); + + // 重新获取引用以进行后续操作 + device = _cameraManager.GetDevice(dto.Id); + + } + + // ★★★ 核心修复:统一处理“运行意图” ★★★ + if (device != null) + { + // 将 DTO 的立即执行标志直接同步给设备的运行意图 + device.IsRunning = dto.ImmediateExecution; + + if (dto.ImmediateExecution) + { + // 情况 1: 收到“启动”指令 + if (!device.IsOnline) // 只有没在线时才点火 { - AppId = finalAppId, - TargetFps = subItem.TargetFps, // 8帧 或 1帧 - Type = (SubscriptionType)subItem.Type, // 业务类型 (LocalWindow, NetworkTrans...) - Memo = subItem.Memo ?? "Auto Sync", - - // 其它字段给默认空值,防止 Controller 内部逻辑报错 - Handle = "", - SavePath = "" - }; - - // ★★★ 见证奇迹的时刻:直接调用 Register ★★★ - controller.Register(requirement); + Console.WriteLine($"[Sync] 指令:立即启动设备 {dto.Id}"); + _ = device.StartAsync(); + } + } + else + { + // 情况 2: 收到“停止”指令 (即 ImmediateExecution = false) + if (device.IsOnline) // 只有在线时才熄火 + { + Console.WriteLine($"[Sync] 指令:立即停止设备 {dto.Id}"); + _ = device.StopAsync(); + } } } - //// 6. 启动设备 - //// 你的积分算法会在 device 内部的推流循环中被 MakeDecision 调用 - if (dto.ImmediateExecution) - await device.StartAsync(); + // ========================================================= + // 3. 处理自动订阅策略 (Auto Subscriptions) + // ========================================================= + // 无论新增还是更新,都确保订阅策略是最新的 + if (device != null && dto.AutoSubscriptions != null) + { + var controller = device.Controller; + if (controller != null) + { + foreach (var sub in dto.AutoSubscriptions) + { + // 如果没有 AppId,生成一个临时的(通常 Dashboard 会下发固定的 AppId) + string appId = string.IsNullOrWhiteSpace(sub.AppId) + ? $"AUTO_{Guid.NewGuid().ToString("N")[..8]}" + : sub.AppId; - Console.WriteLine($"[SyncSuccess] 设备 {dto.Id} 同步完成,策略已下发。"); + // 构造流控需求 + var req = new FrameRequirement + { + AppId = appId, + TargetFps = sub.TargetFps, + Type = (SubscriptionType)sub.Type, // int -> Enum + Memo = sub.Memo ?? "Sync Auto", + + // 自动订阅通常不包含具体的 Handle 或 SavePath,除非协议里带了 + // 如果需要支持网络转发,这里可以扩展映射 sub.TargetIp 等 + Handle = "", + SavePath = "" + }; + + // 注册到帧控制器 + controller.Register(req); + } + } + } } } \ No newline at end of file diff --git a/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs b/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs index f6d70f0..709ad8d 100644 --- a/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs +++ b/SHH.CameraService/Core/NetSenders/NetMQProtocolExtensions.cs @@ -53,7 +53,7 @@ namespace SHH.CameraService /// public static VideoPayload ToVideoPayload(this NetMQMessage msg) { - if (msg == null || msg.FrameCount < 4) return null; + if (msg == null || msg.FrameCount < 2) return null; // Frame 0 Check if (msg[0].ConvertToString() != PROTOCOL_HEADER) return null; diff --git a/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs b/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs index da5ea0f..3f29db3 100644 --- a/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs +++ b/SHH.CameraService/Core/NetSenders/NetMqSenderWorker.cs @@ -19,43 +19,65 @@ public class NetMqSenderWorker : BackgroundService } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - Console.WriteLine($"[NetMqSender] 正在连接至服务端: {_target.Config.Endpoint} ..."); - - // ★★★ 修正点:必须使用 PublisherSocket 来配合接收端的 SubscriberSocket ★★★ - // 虽然是 Connect 模式,Publisher 依然可以 Connect - using var clientSocket = new PublisherSocket(); - - // 设置高水位 (HWM) - // 对于 Publisher,如果队列满了,默认行为就是丢弃旧数据,这非常符合视频流需求 - clientSocket.Options.SendHighWatermark = 1000; - - // 主动连接 - clientSocket.Connect(_target.Config.Endpoint); - - Console.WriteLine("[NetMqSender] 连接成功,开始从通道搬运数据..."); - - await foreach (var payload in _target.Channel.Reader.ReadAllAsync(stoppingToken)) + // 增加重启保护 + while (!stoppingToken.IsCancellationRequested) { try { - var msg = payload.ToNetMqMessage(); + Console.WriteLine($"[NetMqSender] 连接至: {_target.Config.Endpoint}"); - // 发送消息 - // PublisherSocket 的 TrySend 如果没人订阅或者队列满了,通常不会阻塞,而是直接丢弃或返回 - // 注意:PUB 模式下,第一帧 ("SHH_V1") 会被当作订阅的主题 (Topic)。 - // 你的接收端订阅了 "" (空字符串),所以能收到以任何字符串开头的数据。 - bool sent = clientSocket.TrySendMultipartMessage(msg); + using var clientSocket = new PublisherSocket(); + clientSocket.Options.SendHighWatermark = 1000; + // 关键:增加 TCP 保活,防止防火墙静默断开长连接 + clientSocket.Options.TcpKeepalive = true; + clientSocket.Options.TcpKeepaliveIdle = TimeSpan.FromSeconds(5); - if (!sent) + clientSocket.Connect(_target.Config.Endpoint); + + int frameCount = 0; + + // 使用更稳健的读取方式 + await foreach (var payload in _target.Channel.Reader.ReadAllAsync(stoppingToken)) { - // 这种情况通常意味着网络断了且 HWM 队列也满了 - Console.WriteLine($"[NetMqSender] 警告: 发送队列已满,正在丢帧..."); - msg.Clear(); // 手动清理(可选) + try + { + // 1. 构造消息 (内部执行了 MessagePack 序列化) + var msg = payload.ToNetMqMessage(); + + // 2. 发送 + bool sent = clientSocket.TrySendMultipartMessage(msg); + + if (!sent) + { + Console.WriteLine($"[NetMqSender] 发送缓冲区满,丢弃帧: {payload.CameraId}"); + // ★ 如果没有发送成功,建议显式清理消息帧,防止内存滞留 + msg.Clear(); + } + else + { + frameCount++; + if (frameCount % 100 == 0) + Console.WriteLine($"[NetMqSender] 已搬运 100 帧至缓冲区."); + } + } + catch (Exception ex) + { + Console.WriteLine($"[NetMqSender] 内部循环异常: {ex.Message}"); + } } } + catch (OperationCanceledException) { break; } catch (Exception ex) { - Console.WriteLine($"[NetMqSender] 异常: {ex.Message}"); + // ★★★ 核心改进:捕获异常并等待重试 ★★★ + // 防止因为一次内存溢出或网络波动导致整个 BackgroundService 永久停止 + Console.WriteLine($"[NetMqSender] 发生致命异常,5秒后尝试重建连接: {ex.Message}"); + await Task.Delay(5000, stoppingToken); + } + finally + { + // 确保每次循环退出(无论是异常还是正常)都清理环境 + NetMQConfig.Cleanup(false); } } } diff --git a/SHH.CameraService/Core/NetSenders/NetworkStreamingWorker.cs b/SHH.CameraService/Core/NetSenders/NetworkStreamingWorker.cs index 0fa6466..4547b17 100644 --- a/SHH.CameraService/Core/NetSenders/NetworkStreamingWorker.cs +++ b/SHH.CameraService/Core/NetSenders/NetworkStreamingWorker.cs @@ -106,8 +106,12 @@ public class NetworkStreamingWorker : BackgroundService // 实现了"物理隔离":一个管道满了(云端卡顿),不影响另一个管道(大屏流畅)。 foreach (var target in _targets) { - // WriteLog 是非阻塞的。满了就丢弃,返回 false。 - target.Channel.WriteLog(payload); + bool ok = target.Channel.WriteLog(payload); + if (!ok) + { + // 如果这里打印,说明管道由于某种原因被关闭了(通常是程序正在退出) + Console.WriteLine($"[DEBUG] 管道写入失败,目标: {target.Config.Name}"); + } } } catch (Exception ex) diff --git a/SHH.CameraService/Core/NetSenders/VideoDataChannel.cs b/SHH.CameraService/Core/NetSenders/VideoDataChannel.cs index ab1a910..2e8c2e0 100644 --- a/SHH.CameraService/Core/NetSenders/VideoDataChannel.cs +++ b/SHH.CameraService/Core/NetSenders/VideoDataChannel.cs @@ -26,10 +26,11 @@ namespace SHH.CameraService /// /// [生产者] 写入一个封装好的数据包 (非阻塞) /// - public void WriteLog(VideoPayload payload) + public bool WriteLog(VideoPayload payload) // 改为返回 bool { - // TryWrite 永远不会等待,满了就丢旧的写入新的,返回 true - _channel.Writer.TryWrite(payload); + // TryWrite 在 DropOldest 模式下虽然几乎总是返回 true, + // 但如果 Channel 被 Complete (关闭) 了,它会返回 false。 + return _channel.Writer.TryWrite(payload); } /// diff --git a/SHH.CameraService/Program.cs b/SHH.CameraService/Program.cs index e5113a2..3ba6013 100644 --- a/SHH.CameraService/Program.cs +++ b/SHH.CameraService/Program.cs @@ -92,6 +92,7 @@ public class Program // 2. 注册具体的指令处理器 (每写一个新的 Handler,就在这里注册一下,或者用反射批量注册) builder.Services.AddSingleton(); + builder.Services.AddSingleton(); // ============================================================= // 6. 构建与管道配置 diff --git a/SHH.Contracts/CameraConfigDto.cs b/SHH.Contracts/CameraConfigDto.cs index 875f615..3bdaaad 100644 --- a/SHH.Contracts/CameraConfigDto.cs +++ b/SHH.Contracts/CameraConfigDto.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using Newtonsoft.Json; +using System.Collections.Generic; using System.ComponentModel.DataAnnotations; namespace SHH.Contracts @@ -137,6 +138,7 @@ namespace SHH.Contracts /// /// 是否立即执行 /// + [JsonProperty("ImmediateExecution")] // 确保 JSON 里的这个 key 能精准对应到这个属性 public bool ImmediateExecution { get; set; } } diff --git a/SHH.Contracts/Commands/ProtocolHeaders.cs b/SHH.Contracts/Commands/ProtocolHeaders.cs index c7ea18b..7379478 100644 --- a/SHH.Contracts/Commands/ProtocolHeaders.cs +++ b/SHH.Contracts/Commands/ProtocolHeaders.cs @@ -8,6 +8,8 @@ public const string Command = "COMMAND"; public const string CommandResult = "COMMAND_RESULT"; - public const string SyncCamera = "Sync_Camera"; + public const string Sync_Camera = "Sync_Camera"; + + public const string Remove_Camera = "Remove_Camera"; } } \ No newline at end of file