using Ayay.SerilogLogs; using Serilog; namespace SHH.MjpegPlayer { /// /// RTMP 推流参数同步服务器 /// 职责:定期将本地图片通道信息同步至流媒体服务器,并获取最新的 RTMP 推流地址。 /// public class RtmpPushServer { private static ILogger _sysLog = Log.ForContext("SourceContext", LogModules.Core); #region Instance /// /// 获取 RTMP 推流处理器的全局单例实例 /// public static RtmpPushServer Instance { get; } = new RtmpPushServer(); // 私有构造函数防止外部 new private RtmpPushServer() { } #endregion #region Start /// /// 启动 RTMP 推流任务 (对接新架构 TaskManager) /// public void Start() { // Optimized: 使用 TaskManager.Run 替代旧的线程启动方式,实现任务可视化管理 TaskManager.Run("RtmpPushSync", "Monitor", async (token) => { try { _sysLog.Information("RTMP 推流同步任务正在启动..."); // 1. 初始化延迟:稍作延迟,等待系统其他组件初始化完成 await Task.Delay(2000, token); // 2. 配置校验 if (!MjpegStatics.Cfg.UseRtmpServer) { _sysLog.Warning("配置未启用 RTMP 服务,推流任务已跳过"); return; } #region 启动连接检测 bool isConnected = false; while (!isConnected && !token.IsCancellationRequested) { try { var cfg = MjpegStatics.Cfg; var testItems = new List { new { deviceIp = "", deviceId = "", algCode = "" } }; var result = testItems.PostJson(cfg.RtmpServerDjhUri); if (result != null && result.IsSuccess) { _sysLog.Information("流媒体服务接口检测通过: {Uri}", cfg.RtmpServerDjhUri); isConnected = true; } else { _sysLog.Warning("检测流媒体服务接口失败, 30秒后再试... Uri: {Uri}", cfg.RtmpServerDjhUri); await Task.Delay(30000, token); } } catch (Exception ex) { _sysLog.Error(ex, "流媒体服务接口检测异常"); await Task.Delay(5000, token); } } #endregion #region 周期性同步任务 (代替 OnDoTaskAction) while (!token.IsCancellationRequested) { // 执行业务同步逻辑 await SyncRtmpParametersAsync(token); // 每 50ms 执行一次 (对应原 Start 中的 50ms 频率) await Task.Delay(50, token); } #endregion } catch (OperationCanceledException) { _sysLog.Information("RTMP 推流同步任务已正常取消"); } catch (Exception ex) { _sysLog.Fatal(ex, "RTMP 推流任务发生致命错误"); } }); } #endregion #region 核心业务逻辑 private async Task SyncRtmpParametersAsync(CancellationToken token) { try { var cfg = MjpegStatics.Cfg; var imgChns = MjpegStatics.ImageChannels; var pushItems = new List(); // 构建上报数据 foreach (var kvp in imgChns.Channels) { if (token.IsCancellationRequested) return; var imgChn = kvp.Value; if (imgChn == null || !imgChn.UseRtmp) continue; pushItems.Add(new { deviceIp = string.IsNullOrEmpty(imgChn.IpAddress) ? "127.0.0.1" : imgChn.IpAddress, deviceId = imgChn.DeviceId.ToString(), algCode = imgChn.Type }); } if (pushItems.Count > 0) { // 使用 await 配合异步扩展,释放线程池线程 // 如果你的 NetHttpExtension 已支持异步,则直接 await。 // 否则使用 await Task.Run(() => pushItems.PostJson(...)) 暂时过渡。 var result = await pushItems.PostJsonAsync(cfg.RtmpServerDjhUri, 2000); if (result?.rtmpVoList != null && result.IsSuccess) { foreach (var item in result.rtmpVoList) { var channel = imgChns.Get(item.deviceId, item.algCode); if (channel != null && channel.RtmpUri != item.rtmp) { // 假设后续我们会统计实例中的成功次数 channel.RtmpUri = item.rtmp; } } } } } catch (Exception ex) { // 使用统一的 Serilog 对象输出结构化日志 _sysLog.Error(ex, "SyncRtmpParametersAsync 执行异常"); } } #endregion } }