Files
Ayay/SHH.CameraService/GrpcImpls/ImageFactory/ImageMonitorController.cs

241 lines
10 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Ayay.SerilogLogs;
using Microsoft.Extensions.Hosting;
using OpenCvSharp;
using Serilog;
using SHH.CameraSdk; // 引用 SDK 核心
using SHH.Contracts;
using System.Diagnostics;
using TurboJpegWrapper;
namespace SHH.CameraService;
/// <summary>
/// 图像监控采集控制器 (流媒体分发引擎)
/// <para>功能:监听全局图像采集总线,对图像进行实时 JPG 编码,并动态分发至云端、大屏等订阅目标。</para>
/// <para>设计模式:发布-订阅模式 + 扇出 (Fan-out) 分发。</para>
/// </summary>
public class ImageMonitorController : BackgroundService
{
private ILogger _sysLog = Log.ForContext("SourceContext", LogModules.Core);
// 注入所有注册的目标(云端、大屏等),实现动态分发
private readonly IEnumerable<StreamTarget> _targets;
// 编码参数JPG 质量 75 (平衡画质与带宽)
// 工业经验75 是甜点,体积只有 100 的 1/3肉眼几无区别。
// 如果您确实需要 100请注意带宽压力。此处我保留您要求的 100但建议未来调优。
private readonly int[] _encodeParams = { (int)ImwriteFlags.JpegQuality, 100 };
/// <summary>
/// 构造函数
/// </summary>
/// <param name="targets"></param>
public ImageMonitorController(IEnumerable<StreamTarget> targets)
{
_targets = targets;
}
/// <summary>
/// 启动后台服务:挂载事件总线
/// </summary>
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_sysLog.Information("[Core] 启动流媒体采集引擎...");
// =========================================================
// 订阅逻辑:接入 "上帝模式" (God Mode)
// =========================================================
// 理由gRpc 需要无差别地获取所有设备的图像。
GlobalStreamDispatcher.OnGlobalFrame += ProcessFrame;
_sysLog.Information($"[StreamWorker] 已挂载至全局广播总线,正在监听帧信息.");
var tcs = new TaskCompletionSource();
stoppingToken.Register(() =>
{
// 停止时反注册,防止静态事件内存泄漏
GlobalStreamDispatcher.OnGlobalFrame -= ProcessFrame;
_sysLog.Information("[Core] 流媒体采集引擎已断开全局广播连接.");
tcs.SetResult();
});
return tcs.Task;
}
/// <summary>
/// [回调函数] 处理实时帧
/// <para>注意:此方法由 SDK 采集线程池触发,必须保持极速处理,严禁在内部执行 IO 等耗时阻塞操作。</para>
/// </summary>
/// <param name="deviceId">设备唯一标识 ID</param>
/// <param name="frame">包含原始图像(InternalMat)和处理后图像(TargetMat)的帧数据</param>
private void ProcessFrame(long deviceId, SmartFrame frame)
{
try
{
// 1. 基础校验 (合法性检查)
if (frame == null || frame.InternalMat.Empty()) return;
long startTick = Stopwatch.GetTimestamp();
// =========================================================
// 2. 一次编码 (One Encode) - CPU 消耗点
// =========================================================
// 理由:在这里同步编码是最安全的,因为出了这个函数 frame 内存就会失效。
// 且只编一次,后续分发给 10 个目标也只用这一份数据。
byte[]? jpgBytes = null;
// 如果有更小的图片, 原始图片不压缩, 除非有特殊需求
if (frame.TargetMat == null)
{
jpgBytes = SdkGlobal.UseTurboJpegWrapper
? TurboEncodeImage(frame.InternalMat)
: EncodeImage(frame.InternalMat);
}
// 双流支持:如果存在处理后的 AI 图,也一并编码
byte[]? targetBytes = null;
if (frame.TargetMat != null && !frame.TargetMat.Empty())
{
targetBytes = SdkGlobal.UseTurboJpegWrapper
? TurboEncodeImage(frame.TargetMat)
: EncodeImage(frame.TargetMat);
}
// =========================================================
// 3. 构建 Payload (数据载荷)
// =========================================================
var payload = new VideoPayload
{
CameraId = deviceId.ToString(),
CaptureTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
OriginalImageBytes = jpgBytes, // 引用赋值
OriginalWidth = frame.InternalWidth,
OriginalHeight = frame.InnernalHeight,
TargetImageBytes = targetBytes, // 引用赋值
TargetWidth = frame.TargetWidth,
TargetHeight = frame.TargetHeight,
DispatchTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
// 添加订阅者
payload.SubscriberIds.AddRange(frame.SubscriberIds);
// 计算转码耗时(ms)
double processMs = (Stopwatch.GetTimestamp() - startTick) * 1000.0 / Stopwatch.Frequency;
payload.Diagnostics["encode_ms"] = Math.Round(processMs, 2);
// =========================================================
// 4. 动态扇出 (Dynamic Fan-Out) - 内存消耗极低
// =========================================================
// 遍历所有目标,往各自独立的管道里写数据。
// 实现了"物理隔离":一个管道满了(云端卡顿),不影响另一个管道(大屏流畅)。
foreach (var target in _targets)
{
bool ok = target.Channel.WriteLog(payload);
if (!ok)
{
// 如果这里打印,说明管道由于某种原因被关闭了(通常是程序正在退出)
_sysLog.Warning($"[ImageMonitor] 管道写入失败,目标: {target.Config.Name}");
}
}
}
catch (Exception ex)
{
// 极少发生的内存错误,打印日志但不抛出,避免崩溃 SDK 线程
_sysLog.Error($"[ImageMonitor] 采集处理异常: {ex.Message}");
}
}
/// <summary>
/// 调用 OpenCV 进行内存级图片编码
/// </summary>
/// <param name="mat">待编码的 OpenCV Mat 矩阵</param>
/// <returns>JPG 字节数组</returns>
private byte[]? EncodeImage(Mat mat)
{
if (mat == null || mat.Empty())
return null;
// ImEncode 将 Mat 编码为一维字节数组 (托管内存)
Cv2.ImEncode(".jpg", mat, out byte[] buf, _encodeParams);
return buf;
}
// 建议将转换器定义为类成员,避免重复创建(内部持有句柄)
private static readonly ThreadLocal<TJCompressor> _encoderPool = new(() => new TJCompressor());
/// <summary>
/// TurboJPEG 快速编码
/// </summary>
/// <param name="mat"></param>
/// <returns></returns>
private byte[]? TurboEncodeImage(Mat mat)
{
// 1. 空引用与销毁状态防御
if (mat == null || mat.Empty() || mat.IsDisposed)
return Array.Empty<byte>();
try
{
// 2. 线程安全防护 (如果不用 ThreadLocal至少保留 lock)
var encoder = _encoderPool.Value;
if (encoder == null)
{
_sysLog.Error("[Perf] ThreadLocal 编码器实例初始化失败,降级使用 OpenCV.");
return EncodeImage(mat); // 自动降级,保证业务不中断
}
// 3. 内存连续性确保
// 保持原逻辑:不连续则 Clone这是最稳妥的零拷贝退守方案已通过您的严格测试
if (!mat.IsContinuous())
{
using var continuousMat = mat.Clone();
return encoder.Compress(continuousMat.Data, (int)continuousMat.Step(),
continuousMat.Width, continuousMat.Height,
// 2026-01-31 解决黄色变蓝色问题
// 原因:经实测当前 Mat 内存排布为 RGB原 BGR 参数导致红蓝通道反转
TJPixelFormats.TJPF_RGB,
TJSubsamplingOptions.TJSAMP_420, 95, TJFlags.NONE);
}
// 执行并行编码
// 注意TJPF_BGR 确保了 OpenCV 默认内存排布,防止色偏
return encoder.Compress(mat.Data, (int)mat.Step(), mat.Width, mat.Height,
// 2026-01-31 解决黄色变蓝色问题
// 修正像素格式为 RGB匹配底层数据流确保工业视频颜色还原准确
TJPixelFormats.TJPF_RGB,
TJSubsamplingOptions.TJSAMP_420, 95, TJFlags.NONE);
}
catch (ObjectDisposedException)
{
// 自动降级,保证业务不中断
SdkGlobal.DisableTurboJpegAcceleration();
return EncodeImage(mat);
}
catch (Exception ex)
{
// 4. 记录异常但不让采集线程崩掉
_sysLog.Error(ex, "[Perf] TurboJpeg 编码失败,请检查依赖或内存状态");
// 自动降级,保证业务不中断
SdkGlobal.DisableTurboJpegAcceleration();
return EncodeImage(mat);
}
}
/// <summary>
/// 释放资源
/// </summary>
public override void Dispose()
{
GlobalStreamDispatcher.OnGlobalFrame -= ProcessFrame;
if (_encoderPool.IsValueCreated)
{
// 严谨做法:由于 ThreadLocal 无法直接遍历销毁所有线程的实例,
// 建议通过清理当前线程并由 GC 处理剩余部分,或在更高级的对象池中管理 Dispose。
_encoderPool.Dispose();
}
base.Dispose();
}
}