Files
Ayay/SHH.CameraService/GrpcImpls/Handlers/DeviceStatusHandler.cs
2026-01-16 15:17:23 +08:00

169 lines
5.9 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Ayay.SerilogLogs;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Hosting;
using Serilog;
using SHH.CameraSdk;
using SHH.Contracts;
using SHH.Contracts.Grpc;
using System.Collections.Concurrent;
namespace SHH.CameraService;
/// <summary>
/// 设备状态监控工作者 (gRpc 版)
/// 职责:监控相机状态并在状态变更或心跳周期内,通过 gRpc 批量上报至所有配置的端点
/// </summary>
public class DeviceStatusHandler : BackgroundService
{
private ILogger _gRpcLog = Log.ForContext("SourceContext", LogModules.gRpc);
private readonly CameraManager _manager;
private readonly ServiceConfig _config;
// 状态存储CameraId -> 状态载荷
private readonly ConcurrentDictionary<string, StatusEventPayload> _stateStore = new();
private volatile bool _isDirty = false;
private long _lastSendTick = 0;
public DeviceStatusHandler(
CameraManager manager,
ServiceConfig config)
{
_manager = manager;
_config = config;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 1. 初始化本地状态缓存
foreach (var dev in _manager.GetAllDevices())
{
UpdateLocalState(dev.Id, false, "Service Init");
}
// 2. 订阅 SDK 状态变更事件
_manager.OnDeviceStatusChanged += OnSdkStatusChanged;
_gRpcLog.Information($"[gRpc] 状态上报已启动,配置节点数: {_config.CommandEndpoints.Count}");
// 3. 定时循环 (1秒1次检查)
var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
try
{
while (await timer.WaitForNextTickAsync(stoppingToken))
{
await CheckAndBroadcastAsync(stoppingToken);
}
}
catch (OperationCanceledException) { /* 正常退出 */ }
catch (Exception ex)
{
_gRpcLog.Error($"[gRpc] 状态上报运行异常");
}
finally
{
_manager.OnDeviceStatusChanged -= OnSdkStatusChanged;
}
}
/// <summary>
/// SDK 状态变更回调
/// </summary>
private void OnSdkStatusChanged(long deviceId, bool isOnline, string reason)
{
UpdateLocalState(deviceId, isOnline, reason);
_isDirty = true;
}
private void UpdateLocalState(long deviceId, bool isOnline, string reason)
{
var evt = new StatusEventPayload
{
CameraId = deviceId.ToString(),
IsOnline = isOnline,
Reason = reason,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
_stateStore[deviceId.ToString()] = evt;
}
/// <summary>
/// 执行广播逻辑
/// </summary>
private async Task CheckAndBroadcastAsync(CancellationToken ct)
{
long now = Environment.TickCount64;
// 策略: 有变更(Dirty) 或 超过 2 秒(强制心跳)
bool shouldSend = _isDirty || (now - _lastSendTick > 2000);
if (shouldSend && _config.CommandEndpoints.Any())
{
// 1. 构建 gRpc 请求包
var request = new StatusBatchRequest
{
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
// 转换内存中的状态快照为 Protobuf 列表
foreach (var item in _stateStore.Values)
{
request.Items.Add(new StatusEventItem
{
CameraId = item.CameraId,
IsOnline = item.IsOnline,
Reason = item.Reason,
});
}
// 2. 遍历所有端点进行发送
foreach (var endpoint in _config.CommandEndpoints)
{
try
{
string grpcUrl = endpoint.Uri.Replace("tcp://", "http://").Trim();
// --- 增加以下诊断代码 ---
using var channel = GrpcChannel.ForAddress(grpcUrl);
var client = new GatewayProvider.GatewayProviderClient(channel);
// 获取 gRpc 内部生成的服务全称
// 这就是客户端尝试调用的真实路径:/包名.服务名/方法名
var serviceName = client.GetType().DeclaringType?.Name ?? "Unknown";
_gRpcLog.Debug("[gRpc] 准备调用端点: {Url}", grpcUrl);
_gRpcLog.Debug("[gRpc] 客户端契约服务名: {Service}", serviceName);
// 执行调用
var response = await client.ReportStatusBatchAsync(request,
deadline: DateTime.UtcNow.AddSeconds(2), cancellationToken: ct);
if (response.Success)
{
_gRpcLog.Information("[gRpc] 设备状态上报成功, 共计: {Count} 个, Url: {Url}", request.Items.Count, grpcUrl);
_gRpcLog.Debug("[gRpc] 设备状态上报成功: {Url} Items:{Items}", grpcUrl, request.Items);
_isDirty = false;
_lastSendTick = Environment.TickCount64;
}
}
catch (RpcException ex)
{
// 这里是关键:打印 RpcException 的详细状态
_gRpcLog.Error("[gRpc] StatusCode: {Code}, Detail: {Detail}", ex.StatusCode, ex.Status.Detail);
// 如果是 Unimplemented通常意味着路径不对
if (ex.StatusCode == StatusCode.Unimplemented)
{
_gRpcLog.Error("[gRpc] 请检查服务端是否注册了名为 'GatewayProvider' 的服务,且其 package 声明与客户端一致。");
}
}
catch (Exception ex)
{
_gRpcLog.Error("[gRpc] 非 RPC 异常: {Msg}", ex.Message);
}
}
}
}
}