using Ayay.SerilogLogs; using Google.Protobuf; using Grpc.Net.Client; using Microsoft.Extensions.Hosting; using Serilog; using SHH.Contracts.Grpc; namespace SHH.CameraService; /// /// gRpc 视频流发送工作者 /// 职责:监听特定的 StreamTarget 队列,建立 gRpc 客户端流并持续推送图片 /// public class GrpcSenderWorker : BackgroundService { private ILogger _gRpcLog = Log.ForContext("SourceContext", LogModules.gRpc); private readonly StreamTarget _target; private readonly string _grpcUrl; public GrpcSenderWorker(StreamTarget target) { _target = target; // 自动适配地址:将配置的 tcp://localhost:9001 转换为 http://localhost:9001 // 并且严格使用你验证成功的 localhost _grpcUrl = _target.Config.Endpoint.Replace("tcp://", "http://"); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _gRpcLog.Information($"[gRpc] 视频流发送业务启动, 目标: {_target.Config.Name}, 地址: {_grpcUrl}"); while (!stoppingToken.IsCancellationRequested) { try { // 1. 建立通道 using var channel = GrpcChannel.ForAddress(_grpcUrl); var client = new GatewayProvider.GatewayProviderClient(channel); // 2. 开启客户端流 (UploadVideoStream 是在 proto 中定义的) using var call = client.UploadVideoStream(cancellationToken: stoppingToken); _gRpcLog.Information($"[gRpc] 已开启视频推送流, 目标: {_target.Config.Name}, 地址: {_grpcUrl}"); // 3. 核心搬运循环:从内存队列 (Channel) 读取数据 await foreach (var payload in _target.Channel.Reader.ReadAllAsync(stoppingToken)) { // 【畅通保障】检查数据时效性:丢弃超过 1 秒的积压帧 var delay = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - payload.CaptureTimestamp; if (delay > 1000) { continue; } // 将业务 DTO 转换为 gRpc 原生 Request var request = new VideoFrameRequest { CameraId = payload.CameraId ?? "0", CaptureTimestamp = payload.CaptureTimestamp, OriginalWidth = payload.OriginalWidth, OriginalHeight = payload.OriginalHeight, HasOriginalImage = payload.HasOriginalImage, HasTargetImage = payload.HasTargetImage, // ★ 核心:将 byte[] 转换为 gRpc 的 ByteString (高性能) OriginalImageBytes = payload.OriginalImageBytes != null ? ByteString.CopyFrom(payload.OriginalImageBytes) : ByteString.Empty, TargetImageBytes = payload.TargetImageBytes != null ? ByteString.CopyFrom(payload.TargetImageBytes) : ByteString.Empty }; request.SubscriberIds.AddRange(payload.SubscriberIds); // 处理诊断信息 map if (payload.Diagnostics != null) { foreach (var kv in payload.Diagnostics) { request.Diagnostics.Add(kv.Key, kv.Value?.ToString() ?? ""); } } // 4. 发送至 AiVideo await call.RequestStream.WriteAsync(request); } // 正常结束流 await call.RequestStream.CompleteAsync(); } catch (OperationCanceledException) { break; } catch (Exception ex) { _gRpcLog.Warning($"[gRpc] 视频推送流链路异常, 目标: {_target.Config.Name}, 地址: {_grpcUrl}, 5秒后重连: {ex.Message}."); await Task.Delay(5000, stoppingToken); } } } }