Files
Ayay/SHH.CameraService/GrpcImpls/Handlers/DeviceStatusHandler.cs

233 lines
9.1 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Ayay.SerilogLogs;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Hosting;
using Serilog;
using SHH.CameraSdk;
using SHH.Contracts;
using SHH.Contracts.Grpc;
using System.Collections.Concurrent;
namespace SHH.CameraService;
/// <summary>
/// 设备状态监控工作者 (gRpc 版)
/// 职责:监控相机状态并在状态变更或心跳周期内,通过 gRpc 批量上报至所有配置的端点
/// </summary>
public class DeviceStatusHandler : BackgroundService
{
private ILogger _gRpcLog = Log.ForContext("SourceContext", LogModules.gRpc);
private readonly CameraManager _manager;
private readonly ServiceConfig _config;
// 状态存储CameraId -> 状态载荷
private readonly ConcurrentDictionary<string, StatusEventPayload> _stateStore = new();
// 记录上一次成功发送的状态快照,用于增量日志对比
private readonly Dictionary<string, bool> _lastPublishedStates = new();
private volatile bool _isDirty = false;
private long _lastSendTick = 0;
public DeviceStatusHandler(
CameraManager manager,
ServiceConfig config)
{
_manager = manager;
_config = config;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 1. 初始化本地状态缓存
foreach (var dev in _manager.GetAllDevices())
{
UpdateLocalState(dev.Id, dev.Config.IpAddress, false, "Service Init");
}
// 2. 订阅 SDK 状态变更事件
_manager.OnDeviceStatusChanged += OnSdkStatusChanged;
_gRpcLog.Information($"[gRpc] 状态上报已启动,配置节点数: {_config.CommandEndpoints.Count}");
// 3. 定时循环 (1秒1次检查)
var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
try
{
while (await timer.WaitForNextTickAsync(stoppingToken))
{
await CheckAndBroadcastAsync(stoppingToken);
}
}
catch (OperationCanceledException) { /* 正常退出 */ }
catch (Exception ex)
{
_gRpcLog.Error($"[gRpc] 状态上报运行异常");
}
finally
{
_manager.OnDeviceStatusChanged -= OnSdkStatusChanged;
}
}
/// <summary>
/// SDK 状态变更回调
/// </summary>
private void OnSdkStatusChanged(long deviceId, string ipAddress, bool isOnline, string reason)
{
UpdateLocalState(deviceId, ipAddress, isOnline, reason);
_isDirty = true;
}
private void UpdateLocalState(long deviceId, string ipAddress, bool isOnline, string reason)
{
var evt = new StatusEventPayload
{
CameraId = deviceId.ToString(),
IpAddress = ipAddress,
IsOnline = isOnline,
Reason = reason,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
_stateStore[deviceId.ToString()] = evt;
}
/// <summary>
/// 执行广播逻辑
/// </summary>
private async Task CheckAndBroadcastAsync(CancellationToken ct)
{
long now = Environment.TickCount64;
// 策略: 有变更(Dirty) 或 超过 2 秒(强制心跳)
bool shouldSend = _isDirty || (now - _lastSendTick > 2000);
if (shouldSend && _config.CommandEndpoints.Any())
{
// 1. 构建 gRpc 请求包
var request = new StatusBatchRequest
{
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
// 转换内存中的状态快照为 Protobuf 列表
foreach (var item in _stateStore.Values)
{
request.Items.Add(new StatusEventItem
{
CameraId = item.CameraId,
IsOnline = item.IsOnline,
Reason = item.Reason,
});
}
// 2. 遍历所有端点进行发送
foreach (var endpoint in _config.CommandEndpoints)
{
try
{
string grpcUrl = endpoint.Uri.Replace("tcp://", "http://").Trim();
// --- 增加以下诊断代码 ---
using var channel = GrpcChannel.ForAddress(grpcUrl);
var client = new GatewayProvider.GatewayProviderClient(channel);
// 获取 gRpc 内部生成的服务全称
// 这就是客户端尝试调用的真实路径:/包名.服务名/方法名
var serviceName = client.GetType().DeclaringType?.Name ?? "Unknown";
_gRpcLog.Debug("[gRpc] 准备调用端点: {Url}, 客户端契约服务名: {Service}", grpcUrl, serviceName);
// 执行调用
var response = await client.ReportStatusBatchAsync(request,
deadline: DateTime.UtcNow.AddSeconds(2), cancellationToken: ct);
if (response.Success)
{
// 1. 处理变更日志 (Information)
var diffList = new List<string>();
foreach (var item in request.Items)
{
// 只有状态翻转时才记录变更
if (!_lastPublishedStates.TryGetValue(item.CameraId, out bool lastStatus) || lastStatus != item.IsOnline)
{
// 从内存 Store 中抓取带有 IP 的原始对象
_stateStore.TryGetValue(item.CameraId, out var payload);
string ip = payload?.IpAddress ?? "Unknown IP";
string statusText = item.IsOnline ? "上线" : "离线";
diffList.Add($"[{item.CameraId}({ip})] {statusText}");
// // Modified: 记录当前状态供下次对比
_lastPublishedStates[item.CameraId] = item.IsOnline;
}
}
if (diffList.Any())
{
_gRpcLog.Information("[gRpc] 设备状态变更: {DiffDetails}, Url: {Url}",
string.Join(", ", diffList), grpcUrl);
}
// 2. 处理详细统计日志 (Debug)
// Optimized: 通过映射获取 IP不修改 StatusEventItem 契约
var onlineDetails = request.Items
.Where(x => x.IsOnline)
.Select(x => {
_stateStore.TryGetValue(x.CameraId, out var p);
return $"{x.CameraId}({p?.IpAddress ?? "N/A"})";
}).ToList();
var offlineDetails = request.Items
.Where(x => !x.IsOnline)
.Select(x => {
_stateStore.TryGetValue(x.CameraId, out var p);
return $"{x.CameraId}({p?.IpAddress ?? "N/A"})";
}).ToList();
var detailParts = new List<string>();
detailParts.Add($"其中在线 {onlineDetails.Count} 个");
detailParts.Add($"离线 {offlineDetails.Count} 个");
if (offlineDetails.Any())
{
detailParts.Add($"离线设备【{string.Join(",", offlineDetails)}】");
}
if (onlineDetails.Any())
{
detailParts.Add($"在线设备【{string.Join(",", onlineDetails)}】");
}
string detailMsg = string.Join("", detailParts);
// // Optimized: 最终输出格式化的详细日志
_gRpcLog.Debug("[gRpc] 设备状态上报详细: {Url} 总数:{Count} {Detail}",
grpcUrl,
request.Items.Count,
detailMsg);
_isDirty = false;
_lastSendTick = Environment.TickCount64;
}
}
catch (RpcException ex)
{
// 这里是关键:打印 RpcException 的详细状态
_gRpcLog.Error("[gRpc] StatusCode: {Code}, Detail: {Detail}, Uri:{Uri}", ex.StatusCode, ex.Status.Detail, endpoint.Uri);
// 如果是 Unimplemented通常意味着路径不对
if (ex.StatusCode == StatusCode.Unimplemented)
{
_gRpcLog.Error("[gRpc] 请检查服务端是否注册了名为 'GatewayProvider' 的服务,且其 package 声明与客户端一致。");
}
}
catch (Exception ex)
{
_gRpcLog.Error("[gRpc] 非 RPC 异常: {Msg}", ex.Message);
}
}
}
}
}