视频SDK新协议签入

This commit is contained in:
2026-01-15 09:31:57 +08:00
parent 3f8e42e560
commit 81580a8f55
14 changed files with 844 additions and 472 deletions

View File

@@ -1,84 +1,93 @@
using Microsoft.Extensions.Hosting;
using NetMQ;
using NetMQ.Sockets;
using Google.Protobuf;
using Grpc.Net.Client;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SHH.Contracts.Grpc;
namespace SHH.CameraService;
/// <summary>
/// NetMQ 发送工作者
/// 职责:从指定目标的 VideoDataChannel 读取 Payload通过 ZeroMQ 发送出去
/// gRPC 视频流发送工作者
/// 职责:监听特定的 StreamTarget 队列,建立 gRPC 客户端流并持续推送图片
/// </summary>
public class NetMqSenderWorker : BackgroundService
public class GrpcSenderWorker : BackgroundService
{
private readonly StreamTarget _target;
private readonly ILogger<GrpcSenderWorker> _logger;
private readonly string _grpcUrl;
// 构造函数注入特定的目标对象 (由 Program.cs 的工厂方法提供)
public NetMqSenderWorker(StreamTarget target)
public GrpcSenderWorker(StreamTarget target, ILogger<GrpcSenderWorker> logger)
{
_target = target;
_logger = logger;
// 自动适配地址:将配置的 tcp://localhost:9001 转换为 http://localhost:9001
// 并且严格使用你验证成功的 localhost
_grpcUrl = _target.Config.Endpoint.Replace("tcp://", "http://");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 增加重启保护
_logger.LogInformation($"[gRPC Worker] 启动。目标: {_target.Config.Name}, 地址: {_grpcUrl}");
while (!stoppingToken.IsCancellationRequested)
{
try
{
Console.WriteLine($"[NetMqSender] 连接至: {_target.Config.Endpoint}");
// 1. 建立通道
using var channel = GrpcChannel.ForAddress(_grpcUrl);
var client = new GatewayProvider.GatewayProviderClient(channel);
using var clientSocket = new PublisherSocket();
clientSocket.Options.SendHighWatermark = 1000;
// 关键:增加 TCP 保活,防止防火墙静默断开长连接
clientSocket.Options.TcpKeepalive = true;
clientSocket.Options.TcpKeepaliveIdle = TimeSpan.FromSeconds(5);
// 2. 开启客户端流 (UploadVideoStream 是在 proto 中定义的)
using var call = client.UploadVideoStream(cancellationToken: stoppingToken);
clientSocket.Connect(_target.Config.Endpoint);
_logger.LogInformation($"[gRPC Worker] 已开启视频推送流: {_target.Config.Name}");
int frameCount = 0;
// 使用更稳健的读取方式
// 3. 核心搬运循环:从内存队列 (Channel) 读取数据
await foreach (var payload in _target.Channel.Reader.ReadAllAsync(stoppingToken))
{
try
// 将业务 DTO 转换为 gRPC 原生 Request
var request = new VideoFrameRequest
{
// 1. 构造消息 (内部执行了 MessagePack 序列化)
var msg = payload.ToNetMqMessage();
CameraId = payload.CameraId ?? "Unknown",
CaptureTimestamp = payload.CaptureTimestamp,
OriginalWidth = payload.OriginalWidth,
OriginalHeight = payload.OriginalHeight,
HasOriginalImage = payload.HasOriginalImage,
HasTargetImage = payload.HasTargetImage,
// 2. 发送
bool sent = clientSocket.TrySendMultipartMessage(msg);
// ★ 核心:将 byte[] 转换为 gRPC 的 ByteString (高性能)
OriginalImageBytes = payload.OriginalImageBytes != null
? ByteString.CopyFrom(payload.OriginalImageBytes)
: ByteString.Empty,
if (!sent)
TargetImageBytes = payload.TargetImageBytes != null
? ByteString.CopyFrom(payload.TargetImageBytes)
: ByteString.Empty
};
// 处理诊断信息 map<string, string>
if (payload.Diagnostics != null)
{
foreach (var kv in payload.Diagnostics)
{
Console.WriteLine($"[NetMqSender] 发送缓冲区满,丢弃帧: {payload.CameraId}");
// ★ 如果没有发送成功,建议显式清理消息帧,防止内存滞留
msg.Clear();
}
else
{
frameCount++;
if (frameCount % 100 == 0)
Console.WriteLine($"[NetMqSender] 已搬运 100 帧至缓冲区.");
request.Diagnostics.Add(kv.Key, kv.Value?.ToString() ?? "");
}
}
catch (Exception ex)
{
Console.WriteLine($"[NetMqSender] 内部循环异常: {ex.Message}");
}
// 4. 发送至 AiVideo
await call.RequestStream.WriteAsync(request);
}
// 正常结束流
await call.RequestStream.CompleteAsync();
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
// ★★★ 核心改进:捕获异常并等待重试 ★★★
// 防止因为一次内存溢出或网络波动导致整个 BackgroundService 永久停止
Console.WriteLine($"[NetMqSender] 发生致命异常5秒后尝试重建连接: {ex.Message}");
_logger.LogError($"[gRPC Worker] 推送链路异常5秒后重连: {ex.Message}");
await Task.Delay(5000, stoppingToken);
}
finally
{
// 确保每次循环退出(无论是异常还是正常)都清理环境
NetMQConfig.Cleanup(false);
}
}
}
}