Files
Ayay/SHH.CameraService/GrpcImpls/Handlers/GatewayService.cs
2026-01-16 14:30:42 +08:00

102 lines
4.2 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Ayay.SerilogLogs;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Hosting;
using Serilog;
using SHH.CameraSdk;
using SHH.Contracts.Grpc; // 引用 Proto 生成的命名空间
namespace SHH.CameraService
{
/// <summary>
/// gRpc 指令接收后台服务
/// 职责:
/// 1. 维护与 AiVideo 的 gRpc 长连接。
/// 2. 完成节点逻辑注册。
/// 3. 监听 Server Streaming 指令流并移交给 Dispatcher。
/// </summary>
public class GatewayService : BackgroundService
{
private readonly ServiceConfig _config;
private readonly CommandDispatcher _dispatcher;
public GatewayService(
ServiceConfig config,
CommandDispatcher dispatcher)
{
_config = config;
_dispatcher = dispatcher;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var gRpcLog = Log.ForContext("SourceContext", LogModules.gRpc);
// 预留系统启动缓冲时间,确保数据库和 SDK 已就绪
gRpcLog.Information("[gRpc] 指令接收服务启动,等待环境预热...");
await Task.Delay(3000, stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 1. 地址适配:将 tcp 转换为 http并将 127.0.0.1 修正为 localhost 解决 Unimplemented 异常
var ep = _config.CommandEndpoints.First();
string targetUrl = ep.Uri.Replace("tcp://", "http://").Replace("127.0.0.1", "localhost");
using var channel = GrpcChannel.ForAddress(targetUrl);
var client = new GatewayProvider.GatewayProviderClient(channel);
// --- 第一步:发起节点逻辑注册 (Unary) ---
gRpcLog.Information("[gRpc] 正在发起逻辑注册: {Url}", targetUrl);
var regResp = await client.RegisterInstanceAsync(new RegisterRequest
{
InstanceId = _config.AppId,
Version = "2.0.0-grpc",
ServerIp = "127.0.0.1",
StartTimeTicks = DateTime.Now.Ticks
}, cancellationToken: stoppingToken);
if (regResp.Success)
{
gRpcLog.Information("[gRpc] 注册成功, 正在建立双向指令通道...");
// --- 第二步:开启 Server Streaming 指令流 ---
using var call = client.OpenCommandChannel(new CommandStreamRequest
{
InstanceId = _config.AppId
}, cancellationToken: stoppingToken);
// --- 第三步:循环读取服务端推送的指令 ---
// 只要服务端流未断开,此处会一直阻塞等待新消息
while (await call.ResponseStream.MoveNext(stoppingToken))
{
var protoMsg = call.ResponseStream.Current;
// 核心变更:不再直接处理业务,而是通过分发器进行路由
// 使用 _ = 异步处理,避免某个 Handler 执行过慢导致指令流阻塞
_ = _dispatcher.DispatchAsync(protoMsg);
}
}
}
catch (OperationCanceledException)
{
// 响应系统正常退出信号
break;
}
catch (RpcException ex)
{
gRpcLog.Debug("[gRpc] RPC 异常 (Status: {Code}): {Msg}", ex.StatusCode, ex.Message);
// 链路异常,进入重连等待阶段
await Task.Delay(5000, stoppingToken);
}
catch (Exception ex)
{
gRpcLog.Debug("[gRpc] 非预期链路异常: {Msg}5秒后尝试重连", ex.Message);
await Task.Delay(5000, stoppingToken);
}
}
}
}
}