using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Configuration;
using NetMQ;
using NetMQ.Sockets;
using Newtonsoft.Json;
using SHH.CameraSdk;
using SHH.Contracts;
namespace SHH.CameraService
{
///
/// 双模指令总线服务 (Enterprise V2)
/// 核心职责:建立 TCP 指令通道,接收客户端指令并分发给 CameraManager
/// 增强特性:
/// 1. 支持双模:被动监听 (Bind) 与 主动投递 (Connect)
/// 2. 幂等性控制:利用 MemoryCache 防止客户端重试导致的重复执行
/// 3. 顺序一致性:利用时间戳防止指令乱序
///
public class CommandBusService : BackgroundService
{
#region --- 1. 字段与依赖 ---
private readonly CameraManager _cameraManager;
private readonly IConfiguration _config;
private readonly IMemoryCache _cache; // 核心:用于请求去重
private readonly int _processId;
// 运行状态标志
private volatile bool _isRunning = false;
// 两种模式的 Socket (互斥存在)
private ResponseSocket? _repSocket; // 模式A: 被动监听 (Server-Listening)
private DealerSocket? _dealerSocket; // 模式B: 主动投递 (Server-Dialing)
// 顺序一致性锁:记录每个设备最后处理的指令时间戳
// Key: TargetId (设备ID), Value: Timestamp (最后执行时间)
private readonly Dictionary _deviceLastCmdTime = new();
#endregion
#region --- 2. 构造函数 ---
///
/// 构造函数 (注意:必须在 Program.cs 注册 AddMemoryCache)
///
public CommandBusService(CameraManager manager, IConfiguration config, IMemoryCache cache)
{
_cameraManager = manager;
_config = config;
_cache = cache;
// 获取当前进程 ID (默认为 1)
_processId = _config.GetValue("ProcessId", 1);
}
#endregion
#region --- 3. 核心生命周期 ---
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// 在后台线程启动,避免阻塞 Web 主线程
return Task.Run(() =>
{
_isRunning = true;
// 1. 读取网络策略
// 优先读取配置中的主动目标,如果没有则回退到被动监听
string? activeTargetIp = _config["Network:ActiveTargets:0:Ip"];
bool isActiveMode = !string.IsNullOrEmpty(activeTargetIp);
try
{
if (isActiveMode)
{
// === 模式 B: 主动投递 (Server Connects Client) ===
// 场景:服务器在内网,主动连接公网/固定IP的客户端
int cmdPort = _config.GetValue("Network:ActiveTargets:0:CmdPort", 7000);
string addr = $"tcp://{activeTargetIp}:{cmdPort}";
RunActiveMode(addr, stoppingToken);
}
else
{
// === 模式 A: 被动监听 (Server Binds Port) ===
// 场景:服务器有固定IP,等待客户端连接
int basePort = _config.GetValue("Network:Passive:CmdPortBase", 7000);
int listenPort = basePort + (_processId - 1);
string addr = $"tcp://*:{listenPort}";
RunPassiveMode(addr, stoppingToken);
}
}
catch (Exception ex)
{
Console.WriteLine($"[CmdBus] 致命错误停止: {ex.Message}");
}
finally
{
_isRunning = false;
CleanupSockets();
}
}, stoppingToken);
}
private void CleanupSockets()
{
try
{
_repSocket?.Dispose();
_dealerSocket?.Dispose();
}
catch { /* 忽略销毁时的异常 */ }
}
#endregion
#region --- 4. 模式实现:被动监听 (Passive) ---
private void RunPassiveMode(string address, CancellationToken token)
{
using (_repSocket = new ResponseSocket())
{
_repSocket.Bind(address);
Console.WriteLine($"[CmdBus] [被动模式] 指令监听已启动: {address}");
while (!token.IsCancellationRequested)
{
try
{
// 1. 阻塞等待请求 (超时1秒以便响应 Cancel 信号)
if (!_repSocket.TryReceiveFrameString(TimeSpan.FromSeconds(1), out string reqJson))
continue;
// 2. 处理业务 (带 去重 + ID回填 逻辑)
CommandResult result = this.ProcessRequest(reqJson);
// 3. 发送回执
// 注意:REP 模式必须发送应答,即使 result 为 null (Fire-and-Forget) 也建议发一个空 ACK 防止 Socket 状态错乱
// 但为了协议统一,建议 Passive 模式下总是返回结果
string respJson = result != null ? JsonConvert.SerializeObject(result) : "{}";
_repSocket.SendFrame(respJson);
}
catch (Exception ex)
{
Console.WriteLine($"[CmdBus-Passive] 异常: {ex.Message}");
}
}
}
}
#endregion
#region --- 5. 模式实现:主动投递 (Active) ---
private void RunActiveMode(string address, CancellationToken token)
{
// 外层循环:断线重连机制
while (!token.IsCancellationRequested)
{
try
{
using (_dealerSocket = new DealerSocket())
{
Console.WriteLine($"[CmdBus] [主动模式] 正在连接指令中心: {address}");
_dealerSocket.Connect(address);
// ★★★ 关键步骤:连接成功后,立即发送【身份注册包】 ★★★
// 客户端收到这个包后,才能在界面上显示"设备在线"
SendRegistration(_dealerSocket);
// 内层循环:消息收发
while (!token.IsCancellationRequested)
{
// 1. 接收指令
// DealerSocket 是异步全双工的,这里即使没收到消息也不会阻塞发送
if (!_dealerSocket.TryReceiveFrameString(TimeSpan.FromSeconds(1), out string reqJson))
{
// 空闲周期,可在此处添加心跳发送逻辑 (Ping)
continue;
}
// 2. 处理业务 (带 去重 + ID回填 逻辑)
CommandResult result = this.ProcessRequest(reqJson);
// 3. 发送结果 (QoS控制)
// 如果结果为 null,说明指令是 Fire-and-Forget (无需回执),则不发送网络包节省带宽
if (result != null)
{
_dealerSocket.SendFrame(JsonConvert.SerializeObject(result));
}
}
}
}
catch (Exception ex)
{
Console.WriteLine($"[CmdBus-Active] 连接中断或异常: {ex.Message}");
// 避免死循环狂刷 CPU,等待 3 秒再重连
Thread.Sleep(3000);
}
}
}
///
/// 发送身份注册包 (Active 模式专用)
///
private void SendRegistration(DealerSocket socket)
{
try
{
// 计算实际端口信息
int portOffset = _processId - 1;
var regInfo = new ServerRegistrationDto
{
ProcessId = _processId,
InstanceId = $"Gateway_{_processId}",
ServerIp = GetLocalIpAddress(),
WebApiPort = 5000 + portOffset,
VideoPort = 5555 + portOffset,
CmdPort = 7000 + portOffset,
StartTime = DateTime.Now,
Description = "Active Mode Connection (V2)"
};
// 封装信封 (系统级指令)
var payload = new CommandPayload
{
CmdCode = "SERVER_REGISTER",
TargetId = "SYSTEM",
JsonParams = JsonConvert.SerializeObject(regInfo),
RequestId = Guid.NewGuid().ToString("N"),
RequireAck = false // 注册包通常不需要回执,只要连上就行
};
socket.SendFrame(JsonConvert.SerializeObject(payload));
Console.WriteLine($"[CmdBus] 身份注册包已发送 -> {regInfo.ServerIp}:{regInfo.WebApiPort}");
}
catch (Exception ex)
{
Console.WriteLine($"[CmdBus] 注册包发送失败: {ex.Message}");
}
}
private string GetLocalIpAddress()
{
try
{
var host = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
foreach (var ip in host.AddressList)
{
if (ip.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork)
return ip.ToString();
}
}
catch { }
return "127.0.0.1";
}
#endregion
#region --- 6. 协议处理核心 (★★ V2 核心增强 ★★) ---
///
/// 统一处理请求协议:去重 -> 排序 -> 执行 -> 回填 ID
///
private CommandResult ProcessRequest(string json)
{
if (string.IsNullOrWhiteSpace(json)) return CommandResult.Fail("Empty Request");
CommandPayload? payload;
try { payload = JsonConvert.DeserializeObject(json); }
catch { return CommandResult.Fail("Invalid JSON Protocol"); }
if (payload == null) return CommandResult.Fail("Null Payload");
// =========================================================
// A. 【幂等性检查】(Idempotency Check)
// =========================================================
// 查缓存:如果这个 RequestId 10秒内处理过,直接返回上次的结果
// 这样即使客户端重试发了 10 次,业务逻辑也只跑 1 次
if (_cache.TryGetValue(payload.RequestId, out CommandResult cachedResult))
{
Console.WriteLine($"[Dedup] 拦截重复请求: {payload.RequestId} (Retry: {payload.RetryCount})");
return cachedResult;
}
// =========================================================
// B. 【顺序一致性检查】(Order Guarantee)
// =========================================================
// 防止乱序:比如先发的“停止”因为网络卡顿,比后发的“开始”晚到
if (payload.TargetId != "SYSTEM")
{
lock (_deviceLastCmdTime)
{
if (_deviceLastCmdTime.TryGetValue(payload.TargetId, out DateTime lastTime))
{
if (payload.Timestamp < lastTime)
{
Console.WriteLine($"[Order] 丢弃乱序指令: {payload.CmdCode}");
return CommandResult.Fail("Order Violation: Stale Command Dropped");
}
}
_deviceLastCmdTime[payload.TargetId] = payload.Timestamp;
}
}
// =========================================================
// C. 【业务执行】
// =========================================================
CommandResult result;
try
{
// 调用纯逻辑层
result = CommandBusProcessor.ProcessBusinessLogic(_cameraManager, payload);
}
catch (Exception ex)
{
result = CommandResult.Fail($"Internal Logic Error: {ex.Message}");
}
// =========================================================
// D. 【闭环回填】
// =========================================================
// 必须把身份证号贴回去,不然客户端不知道这是谁的回执
result.RequestId = payload.RequestId;
// =========================================================
// E. 【存入缓存】
// =========================================================
// 缓存 10 秒,覆盖客户端的重试窗口
_cache.Set(payload.RequestId, result, TimeSpan.FromSeconds(10));
// =========================================================
// F. 【QoS 过滤】
// =========================================================
// 如果客户端说不需要回信,返回 null
if (!payload.RequireAck)
{
return null;
}
return result;
}
#endregion
}
}