158 lines
6.4 KiB
C#
158 lines
6.4 KiB
C#
using MessagePack;
|
||
using NetMQ;
|
||
using NetMQ.Sockets;
|
||
using SHH.CameraDashboard.Services.Processors;
|
||
using SHH.Contracts;
|
||
using System.Collections.Concurrent;
|
||
using System.Diagnostics;
|
||
|
||
namespace SHH.CameraDashboard
|
||
{
|
||
public class CommandBusClient : IDisposable
|
||
{
|
||
private RouterSocket? _routerSocket;
|
||
private NetMQPoller? _poller;
|
||
private volatile bool _isRunning;
|
||
private readonly object _disposeLock = new object();
|
||
|
||
// 单例模式
|
||
public static CommandBusClient Instance { get; } = new CommandBusClient();
|
||
|
||
// 处理器字典
|
||
private readonly Dictionary<string, IProtocolProcessor> _processors = new();
|
||
|
||
private readonly ConcurrentDictionary<string, TaskCompletionSource<CommandResult>> _pendingRequests
|
||
= new ConcurrentDictionary<string, TaskCompletionSource<CommandResult>>();
|
||
|
||
private readonly ConcurrentDictionary<string, byte[]> _sessions
|
||
= new ConcurrentDictionary<string, byte[]>();
|
||
|
||
// ★★★ 新增:拦截器管道 ★★★
|
||
public InterceptorPipeline Pipeline { get; } = new InterceptorPipeline();
|
||
|
||
public event Action<RegisterPayload>? OnServerRegistered;
|
||
public event Action<List<StatusEventPayload>>? OnDeviceStatusReport;
|
||
public event Action<CommandPayload>? OnCommandReceived;
|
||
|
||
// 注册处理器的方法
|
||
public void RegisterProcessor(IProtocolProcessor processor)
|
||
{
|
||
_processors[processor.ProtocolType] = processor;
|
||
}
|
||
|
||
public void Start(int port)
|
||
{
|
||
if (_isRunning) return;
|
||
lock (_disposeLock)
|
||
{
|
||
_routerSocket = new RouterSocket();
|
||
_routerSocket.Bind($"tcp://*:{port}");
|
||
_routerSocket.ReceiveReady += OnReceiveReady;
|
||
|
||
// --- 注册处理器 ---
|
||
this.RegisterProcessor(new RegisterProcessor(this));
|
||
this.RegisterProcessor(new StatusBatchProcessor(this));
|
||
this.RegisterProcessor(new CommandResultProcessor(this));
|
||
this.RegisterProcessor(new CommandProcessor(this));
|
||
|
||
_poller = new NetMQPoller { _routerSocket };
|
||
_poller.RunAsync();
|
||
_isRunning = true;
|
||
}
|
||
}
|
||
|
||
// 注意:NetMQ 的事件处理器本质上是同步的 (void)。
|
||
// 为了调用异步拦截器,我们需要在这里使用 async void (仅限顶层事件处理)
|
||
private async void OnReceiveReady(object? sender, NetMQSocketEventArgs e)
|
||
{
|
||
try
|
||
{
|
||
NetMQMessage msg = new NetMQMessage();
|
||
// 1. 尝试接收多帧消息
|
||
if (!e.Socket.TryReceiveMultipartMessage(ref msg)) return;
|
||
|
||
// 2. 帧校验 (Router 收到 Dealer 消息:[Identity] [Protocol] [Data])
|
||
// 此时 msg 应该有 3 帧
|
||
if (msg.FrameCount < 3) return;
|
||
|
||
byte[] identity = msg[0].Buffer; // Frame 0: 路由ID
|
||
string protocol = msg[1].ConvertToString(); // Frame 1: 协议标识
|
||
byte[] rawData = msg[2].ToByteArray(); // Frame 2: 原始数据
|
||
|
||
// =========================================================
|
||
// ★★★ 核心改造 A: 接收拦截 (Inbound) ★★★
|
||
// =========================================================
|
||
// 执行管道处理
|
||
var ctx = await Pipeline.ExecuteReceiveAsync(protocol, rawData);
|
||
|
||
if (ctx != null) // 如果没被拦截
|
||
{
|
||
// 使用处理后的协议和数据进行分发
|
||
if (_processors.TryGetValue(ctx.Protocol, out var processor))
|
||
{
|
||
processor.Process(identity, ctx.Data);
|
||
}
|
||
else
|
||
{
|
||
Debug.WriteLine($"[Bus] 未知协议: {ctx.Protocol}");
|
||
}
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Debug.WriteLine($"[Bus-Err] {ex.Message}");
|
||
}
|
||
}
|
||
|
||
// --- 供 Processor 调用的内部方法 (保持不变) ---
|
||
internal void UpdateSession(string instanceId, byte[] identity) => _sessions[instanceId] = identity;
|
||
internal void RaiseServerRegistered(RegisterPayload p) => OnServerRegistered?.Invoke(p);
|
||
internal void RaiseDeviceStatusReport(List<StatusEventPayload> i) => OnDeviceStatusReport?.Invoke(i);
|
||
internal void RaiseCommandReceived(CommandPayload payload) => OnCommandReceived?.Invoke(payload);
|
||
|
||
internal void HandleResponse(CommandResult result)
|
||
{
|
||
if (_pendingRequests.TryRemove(result.RequestId, out var tcs))
|
||
tcs.TrySetResult(result);
|
||
}
|
||
|
||
// =========================================================
|
||
// ★★★ 核心改造 B: 发送拦截 (Outbound) ★★★
|
||
// =========================================================
|
||
// 改为 async Task 以支持异步拦截器
|
||
public async Task SendInternalAsync(string instanceId, CommandPayload payload)
|
||
{
|
||
if (_sessions.TryGetValue(instanceId, out byte[]? identity))
|
||
{
|
||
// 1. 序列化
|
||
byte[] rawData = MessagePackSerializer.Serialize(payload);
|
||
|
||
// 2. 执行管道处理
|
||
var ctx = await Pipeline.ExecuteSendAsync(payload.Protocol, rawData);
|
||
|
||
// 3. 发送 (如果没被拦截)
|
||
if (ctx != null && _routerSocket != null)
|
||
{
|
||
// 注意:Socket 非线程安全,但 RouterSocket 的 SendMultipartMessage 通常是线程安全的
|
||
// 或者通过 Poller 线程去发。但在 Router 模式下,多线程直接 Send 通常是允许的。
|
||
var msg = new NetMQMessage();
|
||
msg.Append(identity);
|
||
msg.Append(ctx.Protocol); // 使用拦截器处理后的 Protocol
|
||
msg.Append(ctx.Data); // 使用拦截器处理后的 Data
|
||
|
||
_routerSocket.SendMultipartMessage(msg);
|
||
}
|
||
}
|
||
}
|
||
|
||
public void Stop()
|
||
{
|
||
_isRunning = false;
|
||
_poller?.Stop();
|
||
_poller?.Dispose();
|
||
_routerSocket?.Dispose();
|
||
}
|
||
|
||
public void Dispose() => Stop();
|
||
}
|
||
} |