using Grpc.Core; using SHH.Contracts; using SHH.Contracts.Grpc; namespace SHH.MjpegPlayer { /// /// gRpc 网关服务 /// 职责:作为服务端通讯入口,负责接收客户端(分析节点)的所有 gRpc 请求,将其转译为内部业务载荷, /// 并通过消息总线 MessageBus 分发至对应的业务处理器。 /// public class GatewayService : GatewayProvider.GatewayProviderBase { #region 1. 逻辑身份注册 (Unary 调用) /// /// 处理分析节点的注册请求 /// /// 包含节点实例 ID 和服务器 IP 的请求对象 /// gRpc 调用上下文 /// 操作成功响应 public override Task RegisterInstance(RegisterRequest request, ServerCallContext context) { // 1. 将 Protobuf 契约对象转换为业务层的 RegisterPayload (DTO) // 职责:将外部传输格式映射为内部业务模型,实现协议与业务逻辑的解耦 var payload = new RegisterPayload { // 身份标识映射 ProcessId = request.ProcessId, InvokeProcId = request.InvokeProcessId, InstanceId = request.InstanceId, Version = request.Version, // 网络诊断信息映射 ServerIp = request.ServerIp, WebApiPort = request.WebapiPort, GrpcPort = request.GrpcPort, // 运行时状态映射 // 注意:将 int64 类型的 Ticks 转换为 C# 的 DateTime 对象 StartTime = new DateTime(request.StartTimeTicks), Description = request.Description }; // 2. 将注册载荷抛给总线,触发如 DeviceConfigHandler 的配置初始化逻辑 // 职责:通过中介者模式分发事件,网关层不需要知道谁在处理这些数据 MessageBus.Instance.RaiseServerRegistered(payload); return Task.FromResult(new GenericResponse { Success = true }); } #endregion #region 2. 指令下发长连接 (Server Streaming) /// /// 建立并维持一个从服务器向客户端单向推送指令的长连接通道 /// /// 连接请求(包含 InstanceId) /// 响应流,用于后续异步推送指令 /// gRpc 调用上下文 /// 异步任务 public override async Task OpenCommandChannel(CommandStreamRequest request, IServerStreamWriter responseStream, ServerCallContext context) { // 1. 物理流登记:将此响应流句柄存入 GrpcSessionManager,以便 MessageBus 随时调用 GrpcSessionManager.Instance.RegisterSession(request.InstanceId, responseStream); try { // 2. 挂起连接:利用 Task.Delay(-1) 配合取消令牌无限期挂起连接,直到客户端断开 await Task.Delay(-1, context.CancellationToken); } catch (OperationCanceledException) { // 客户端主动取消连接属于正常预期,无需抛出异常 } finally { // 3. 物理流清理:当连接断开时,必须从会话管理器中移除,防止下发指令时产生死连接 GrpcSessionManager.Instance.RemoveSession(request.InstanceId); } } #endregion #region 3. 设备状态批量上报 (Unary 调用) /// /// 接收来自分析节点的相机在线/离线状态批量上报 /// /// 包含多个设备状态项的请求对象 /// gRpc 调用上下文 /// 操作成功响应 public override Task ReportStatusBatch(StatusBatchRequest request, ServerCallContext context) { if (request.Items == null || !request.Items.Any()) return Task.FromResult(new GenericResponse { Success = true }); // 1. 数据映射:将 Proto 集合转换为业务层的 StatusEventPayload 列表 var payloads = request.Items.Select(item => new StatusEventPayload { CameraId = item.CameraId, IsOnline = item.IsOnline, Reason = item.Reason, Timestamp = request.Timestamp }).ToList(); // 2. 路由分发:通过总线发布状态主题,驱动 DeviceStatusHandler 执行同步 MessageBus.Instance.RaiseDeviceStatusReport(payloads); return Task.FromResult(new GenericResponse { Success = true }); } #endregion #region 4. 视频流传输接收 (Client Streaming) /// /// 接收分析节点持续推送的视频帧数据流 /// /// 客户端异步流读取器 /// gRpc 调用上下文 /// 流关闭后的最终响应 public override async Task UploadVideoStream(IAsyncStreamReader requestStream, ServerCallContext context) { // 1. 持续读取客户端推送的每一帧数据,直到流关闭或被取消 while (await requestStream.MoveNext(context.CancellationToken)) { var frame = requestStream.Current; // 2. 将 Protobuf 帧数据转换为业务视频载荷 VideoPayload // 注意:ByteString 需要显式调用 ToByteArray 转换 var videoPayload = new VideoPayload { CameraId = frame.CameraId, CaptureTimestamp = frame.CaptureTimestamp, OriginalWidth = frame.OriginalWidth, OriginalHeight = frame.OriginalHeight, OriginalImageBytes = frame.OriginalImageBytes.ToByteArray(), TargetImageBytes = frame.TargetImageBytes.ToByteArray(), TargetWidth = frame.TargetWidth, TargetHeight = frame.TargetHeight, SubscriberIds = frame.SubscriberIds.ToList(), HasOriginalImage = true }; // 3. 导流:将图像数据直接投递给图像分发控制器进行 UI 渲染或二次处理 ImageMonitorController.Instance.ReceivePayload(videoPayload); } return new GenericResponse { Success = true, Message = "Video stream ended" }; } #endregion } }