From 293b47c667980d16c97515e20c36e9a845624fbb Mon Sep 17 00:00:00 2001 From: Lu Xiuming Date: Fri, 30 Sep 2022 17:26:38 +0800 Subject: [PATCH 1/3] feat(relay/dynamic_push): support autoRetry for dynamic push sessions --- src/node_relay_server.js | 16 ++++++++++++++++ src/node_relay_session.js | 11 ++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/node_relay_server.js b/src/node_relay_server.js index 8206e0bc..6d9eb52b 100644 --- a/src/node_relay_server.js +++ b/src/node_relay_server.js @@ -13,12 +13,15 @@ const fs = require('fs'); const querystring = require('querystring'); const _ = require('lodash'); +const DYNAMIC_PUSH_AUTO_RETRY_TIME_MS = 5000; + class NodeRelayServer { constructor(config) { this.config = config; this.staticCycle = null; this.staticSessions = new Map(); this.dynamicSessions = new Map(); + this.dynamicPushRetryHandlers = new Map();; } async run() { @@ -206,7 +209,20 @@ class NodeRelayServer { let session = new NodeRelaySession(conf); session.id = id; session.on('end', (id) => { + const handler = this.dynamicPushRetryHandlers.get(id); + if (handler) clearTimeout(handler); this.dynamicSessions.delete(id); + this.dynamicPushRetryHandlers.delete(id); + }); + session.on('retry', (id) => { + Logger.log('[relay dynamic push] [retry] start after' + DYNAMIC_PUSH_AUTO_RETRY_TIME_MS + 'id=' + id, conf.inPath, 'to', conf.ouPath); + const handler = setTimeout( + () => { + session.run(); + Logger.log('[relay dynamic push] [retry] start, id=' + id, conf.inPath, 'to', conf.ouPath); + }, + DYNAMIC_PUSH_AUTO_RETRY_TIME_MS); + this.dynamicPushRetryHandlers.set(id, handler); }); this.dynamicSessions.set(id, session); session.run(); diff --git a/src/node_relay_session.js b/src/node_relay_session.js index db6de1db..99824acf 100644 --- a/src/node_relay_session.js +++ b/src/node_relay_session.js @@ -5,6 +5,7 @@ // const Logger = require('./node_core_logger'); const NodeCoreUtils = require('./node_core_utils'); +const context = require('./node_core_ctx'); const EventEmitter = require('events'); const { spawn } = require('child_process'); @@ -52,7 +53,15 @@ class NodeRelaySession extends EventEmitter { this.ffmpeg_exec.on('close', (code) => { Logger.log('[relay end] id=' + this.id, 'code=' + code); - this.emit('end', this.id); + const publisherAlive = context.sessions.get(this.id)?.isPublishing; + if (this.conf.autoRetry) { + Logger.log('[relay end - detect autoRetry] publisher if alive: ' + !!publisherAlive); + if (publisherAlive && this.conf.mode === 'push') { + this.emit('retry', this.id); + } + } else { + this.emit('end', this.id); + } }); } From ed3c1035685d003e09b9ab8de84a03aa76170ee0 Mon Sep 17 00:00:00 2001 From: Lu Xiuming Date: Sat, 1 Oct 2022 11:42:00 +0800 Subject: [PATCH 2/3] doc(README): introduce relay autoRetry --- README.md | 2 ++ README_CN.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/README.md b/README.md index 54bd3584..fbb7efb4 100644 --- a/README.md +++ b/README.md @@ -653,6 +653,7 @@ relay: { ## Dynamic push When the local server receives a publish request. Automatically push the stream to the edge server. +Use autoRetry flag to prevent relay end from edge disconnection when publisher is still alive. ``` relay: { @@ -662,6 +663,7 @@ relay: { app: 'live', mode: 'push', edge: 'rtmp://192.168.0.10', + autoRetry: true } ] } diff --git a/README_CN.md b/README_CN.md index 5b3336d4..66c8dc89 100644 --- a/README_CN.md +++ b/README_CN.md @@ -547,6 +547,7 @@ relay: { ### 动态推流 当本地服务器收到一个发布请求,自动将这个流推送到边缘服务器。 +设置 autoRetry 为 true ,当与边缘服务器断连时,如果请求发布者依然活跃,动态推流会自动重试。 ``` relay: { @@ -556,6 +557,7 @@ relay: { app: 'live', mode: 'push', edge: 'rtmp://192.168.0.10', + autoRetry: true } ] } From 8a962297a9d7fd1092bd1ec51653861ebf4d53f5 Mon Sep 17 00:00:00 2001 From: Lu Xiuming Date: Fri, 25 Nov 2022 17:44:23 +0800 Subject: [PATCH 3/3] fix(relay): avoid loop variable --- src/node_relay_server.js | 4 ++-- src/node_relay_session.js | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/node_relay_server.js b/src/node_relay_server.js index 6d9eb52b..8d41d64d 100644 --- a/src/node_relay_server.js +++ b/src/node_relay_server.js @@ -215,11 +215,11 @@ class NodeRelayServer { this.dynamicPushRetryHandlers.delete(id); }); session.on('retry', (id) => { - Logger.log('[relay dynamic push] [retry] start after' + DYNAMIC_PUSH_AUTO_RETRY_TIME_MS + 'id=' + id, conf.inPath, 'to', conf.ouPath); + Logger.log('[relay dynamic push] [retry] start after' + DYNAMIC_PUSH_AUTO_RETRY_TIME_MS + 'id=' + id, inPath, 'to', ouPath); const handler = setTimeout( () => { session.run(); - Logger.log('[relay dynamic push] [retry] start, id=' + id, conf.inPath, 'to', conf.ouPath); + Logger.log('[relay dynamic push] [retry] start, id=' + id, inPath, 'to', conf.ouPath); }, DYNAMIC_PUSH_AUTO_RETRY_TIME_MS); this.dynamicPushRetryHandlers.set(id, handler); diff --git a/src/node_relay_session.js b/src/node_relay_session.js index 99824acf..09be87ac 100644 --- a/src/node_relay_session.js +++ b/src/node_relay_session.js @@ -57,7 +57,7 @@ class NodeRelaySession extends EventEmitter { if (this.conf.autoRetry) { Logger.log('[relay end - detect autoRetry] publisher if alive: ' + !!publisherAlive); if (publisherAlive && this.conf.mode === 'push') { - this.emit('retry', this.id); + this.emit('retry', this.id, this.conf.inPath, this.conf.ouPath); } } else { this.emit('end', this.id);