Files
Ayay/SHH.CameraService/GrpcImpls/ImageProcs/GrpcSenderWorker.cs

102 lines
4.1 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Google.Protobuf;
using Grpc.Net.Client;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SHH.Contracts.Grpc;
namespace SHH.CameraService;
/// <summary>
/// gRPC 视频流发送工作者
/// 职责:监听特定的 StreamTarget 队列,建立 gRPC 客户端流并持续推送图片
/// </summary>
public class GrpcSenderWorker : BackgroundService
{
private readonly StreamTarget _target;
private readonly ILogger<GrpcSenderWorker> _logger;
private readonly string _grpcUrl;
public GrpcSenderWorker(StreamTarget target, ILogger<GrpcSenderWorker> logger)
{
_target = target;
_logger = logger;
// 自动适配地址:将配置的 tcp://localhost:9001 转换为 http://localhost:9001
// 并且严格使用你验证成功的 localhost
_grpcUrl = _target.Config.Endpoint.Replace("tcp://", "http://");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation($"[gRPC Worker] 启动。目标: {_target.Config.Name}, 地址: {_grpcUrl}");
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 1. 建立通道
using var channel = GrpcChannel.ForAddress(_grpcUrl);
var client = new GatewayProvider.GatewayProviderClient(channel);
// 2. 开启客户端流 (UploadVideoStream 是在 proto 中定义的)
using var call = client.UploadVideoStream(cancellationToken: stoppingToken);
_logger.LogInformation($"[gRPC Worker] 已开启视频推送流: {_target.Config.Name}");
// 3. 核心搬运循环:从内存队列 (Channel) 读取数据
await foreach (var payload in _target.Channel.Reader.ReadAllAsync(stoppingToken))
{
// 【畅通保障】检查数据时效性:丢弃超过 1 秒的积压帧
var delay = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - payload.CaptureTimestamp;
if (delay > 1000)
{
continue;
}
// 将业务 DTO 转换为 gRPC 原生 Request
var request = new VideoFrameRequest
{
CameraId = payload.CameraId ?? "0",
CaptureTimestamp = payload.CaptureTimestamp,
OriginalWidth = payload.OriginalWidth,
OriginalHeight = payload.OriginalHeight,
HasOriginalImage = payload.HasOriginalImage,
HasTargetImage = payload.HasTargetImage,
// ★ 核心:将 byte[] 转换为 gRPC 的 ByteString (高性能)
OriginalImageBytes = payload.OriginalImageBytes != null
? ByteString.CopyFrom(payload.OriginalImageBytes)
: ByteString.Empty,
TargetImageBytes = payload.TargetImageBytes != null
? ByteString.CopyFrom(payload.TargetImageBytes)
: ByteString.Empty
};
request.SubscriberIds.AddRange(payload.SubscriberIds);
// 处理诊断信息 map<string, string>
if (payload.Diagnostics != null)
{
foreach (var kv in payload.Diagnostics)
{
request.Diagnostics.Add(kv.Key, kv.Value?.ToString() ?? "");
}
}
// 4. 发送至 AiVideo
await call.RequestStream.WriteAsync(request);
}
// 正常结束流
await call.RequestStream.CompleteAsync();
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
_logger.LogError($"[gRPC Worker] 推送链路异常5秒后重连: {ex.Message}");
await Task.Delay(5000, stoppingToken);
}
}
}
}