Files
Ayay/SHH.MjpegPlayer/GrpcImpls/Handlers/MessageBus.cs

137 lines
4.8 KiB
C#
Raw Normal View History

using SHH.Contracts;
using SHH.Contracts.Grpc;
using System.Diagnostics;
namespace SHH.MjpegPlayer
{
/// <summary>
/// 消息总线中心 (纯 gRpc 架构)
/// 职责:解耦 gRpc 接收端与业务处理层,提供基于主题(Topic)的事件发布与统一的指令下发路由。
/// </summary>
public class MessageBus : IDisposable
{
#region
/// <summary>
/// 消息总线全局唯一实例
/// </summary>
public static MessageBus Instance { get; } = new MessageBus();
/// <summary>
/// 私有构造函数
/// </summary>
private MessageBus() { }
#endregion
#region (Topics)
/// <summary>
/// 1. 注册主题:当远程分析节点成功建立逻辑连接时触发。
/// 订阅者通常为 DeviceConfigHandler用于启动初始化配置同步。
/// </summary>
public event Action<RegisterPayload>? OnServerRegistered;
/// <summary>
/// 2. 状态主题:当收到远程节点批量上报的设备在线/离线状态时触发。
/// 订阅者通常为 DeviceStatusHandler用于更新 UI 状态。
/// </summary>
public event Action<List<StatusEventPayload>>? OnDeviceStatusReport;
#endregion
#region ( GatewayService )
/// <summary>
/// 发布节点注册事件:将 gRpc 接收到的原始注册请求推送到业务层
/// </summary>
/// <param name="p">注册载荷信息</param>
public void RaiseServerRegistered(RegisterPayload p)
{
if (p == null) return;
// 调试日志:跟踪节点上线流程
Debug.WriteLine($"[Bus] 发布注册事件: 节点ID = {p.InstanceId}");
// 执行所有已订阅该主题的业务逻辑
OnServerRegistered?.Invoke(p);
}
/// <summary>
/// 发布状态报告事件:将 gRpc 接收到的设备状态批量推送到业务层
/// </summary>
/// <param name="items">设备状态变更列表</param>
public void RaiseDeviceStatusReport(List<StatusEventPayload> items)
{
if (items == null || items.Count == 0) return;
// 执行所有已订阅状态同步的业务逻辑
OnDeviceStatusReport?.Invoke(items);
}
#endregion
#region ( Handler )
/// <summary>
/// 统一指令下发路由:自动定位目标节点的物理 gRpc 流并推送指令载荷
/// </summary>
/// <param name="instanceId">目标分析节点的唯一识别码</param>
/// <param name="payload">要发送的业务指令负载</param>
/// <returns>异步任务</returns>
public async Task SendInternalAsync(string instanceId, CommandPayload payload)
{
// 1. 获取由 GrpcSessionManager 维护的物理长连接流
var stream = GrpcSessionManager.Instance.GetSession(instanceId);
// 2. 健壮性检查:若连接不存在则终止下发
if (stream == null)
{
Debug.WriteLine($"[Bus Warning] 指令下发终止:节点 {instanceId} 尚未建立物理连接。");
return;
}
try
{
// 3. 契约转换:将业务层 CommandPayload 转换为 gRpc 生成的 Protobuf 契约对象
var protoMsg = new CommandPayloadProto
{
Protocol = payload.Protocol,
CmdCode = payload.CmdCode,
JsonParams = payload.JsonParams,
RequestId = payload.RequestId,
TimestampTicks = payload.Timestamp.Ticks
};
// 4. 执行异步推送
await stream.WriteAsync(protoMsg);
Debug.WriteLine($"[Bus] 指令推送成功 -> 目标: {instanceId}, 指令码: {payload.CmdCode}");
}
catch (Exception ex)
{
// 5. 异常处理:若推送失败,通常意味着网络链路已断开
Debug.WriteLine($"[Bus Error] 推送异常: {ex.Message},正在执行物理连接清理...");
// 立即移除失效会话,防止后续指令继续掉入“黑洞”
GrpcSessionManager.Instance.RemoveSession(instanceId);
}
}
#endregion
#region
/// <summary>
/// 释放总线资源
/// </summary>
public void Dispose()
{
// 清理所有事件订阅,防止内存泄漏
OnServerRegistered = null;
OnDeviceStatusReport = null;
}
#endregion
}
}