using Grpc.Core; using SHH.Contracts.Grpc; using System.Collections.Concurrent; namespace SHH.MjpegPlayer { /// /// gRpc 会话管理器 /// 职责:专门负责维护、检索和清理所有远程客户端(分析节点)的 gRpc 指令下发物理通道 (Stream)。 /// 它是连接“业务逻辑”与“物理传输”的桥梁,确保指令能准确投递到对应的连接流中。 /// public class GrpcSessionManager { #region 单例模式 /// /// 获取会话管理器的全局单例实例。 /// public static GrpcSessionManager Instance { get; } = new GrpcSessionManager(); /// /// 私有构造函数,防止外部实例化。 /// private GrpcSessionManager() { } #endregion #region 内部存储 /// /// 物理流存储字典 /// Key: 远程服务实例唯一 ID (InstanceId) /// Value: gRpc 双向流或服务端推送流的写入器句柄 (IServerStreamWriter) /// 使用 ConcurrentDictionary 确保在多客户端并发连接/断开时的线程安全性。 /// private readonly ConcurrentDictionary> _sessionStreams = new ConcurrentDictionary>(); #endregion #region 公共管理接口 /// /// 注册/更新物理物理通道。 /// 当客户端调用 OpenCommandChannel 并成功建立 Server Streaming 连接时,由 GatewayService 调用此方法。 /// /// 客户端实例唯一标识 /// 该客户端对应的 gRpc 响应流句柄 public void RegisterSession(string instanceId, IServerStreamWriter responseStream) { // 1. 参数校验:无效 ID 不予处理 if (string.IsNullOrEmpty(instanceId)) return; // 2. 登记或覆盖物理流: // 如果客户端异常断开后迅速重连,此处会覆盖旧的流句柄,确保指令始终通过最新的管道下发。 _sessionStreams[instanceId] = responseStream; // 3. 记录日志:便于运维监控连接状态 Console.WriteLine($"[Session] 物理通道就绪通知 -> 节点 ID: {instanceId}, 当前在线总数: {_sessionStreams.Count}"); } /// /// 移除物理通道。 /// 当 gRpc 连接由于网络波动、客户端崩溃或主动关闭而断开时,由 GatewayService 的 finally 块调用。 /// /// 要注销的客户端实例 ID public void RemoveSession(string instanceId) { // 1. 参数校验 if (string.IsNullOrEmpty(instanceId)) return; // 2. 安全移除:若 ID 存在则移除并释放相关内部引用 if (_sessionStreams.TryRemove(instanceId, out _)) { Console.WriteLine($"[Session] 物理通道移除通知 -> 节点 ID: {instanceId}, 剩余在线总数: {_sessionStreams.Count}"); } } /// /// 检索目标节点的物理流句柄。 /// 供 MessageBus 使用,它是指令下发前定位物理路径的关键步骤。 /// /// 目标节点的唯一 ID /// 返回对应的 IServerStreamWriter 实例;若节点不在线则返回 null public IServerStreamWriter GetSession(string instanceId) { // 1. 参数校验 if (string.IsNullOrEmpty(instanceId)) return null; // 2. 尝试从缓存字典中获取流句柄 _sessionStreams.TryGetValue(instanceId, out var stream); return stream; } /// /// 检查指定节点是否处于物理连接状态。 /// /// 实例 ID /// True 表示物理通道已建立 public bool IsSessionActive(string instanceId) { return !string.IsNullOrEmpty(instanceId) && _sessionStreams.ContainsKey(instanceId); } #endregion } }