diff --git a/README.md b/README.md index 54bd3584..fbb7efb4 100644 --- a/README.md +++ b/README.md @@ -653,6 +653,7 @@ relay: { ## Dynamic push When the local server receives a publish request. Automatically push the stream to the edge server. +Use autoRetry flag to prevent relay end from edge disconnection when publisher is still alive. ``` relay: { @@ -662,6 +663,7 @@ relay: { app: 'live', mode: 'push', edge: 'rtmp://192.168.0.10', + autoRetry: true } ] } diff --git a/README_CN.md b/README_CN.md index 5b3336d4..66c8dc89 100644 --- a/README_CN.md +++ b/README_CN.md @@ -547,6 +547,7 @@ relay: { ### 动态推流 当本地服务器收到一个发布请求,自动将这个流推送到边缘服务器。 +设置 autoRetry 为 true ,当与边缘服务器断连时,如果请求发布者依然活跃,动态推流会自动重试。 ``` relay: { @@ -556,6 +557,7 @@ relay: { app: 'live', mode: 'push', edge: 'rtmp://192.168.0.10', + autoRetry: true } ] } diff --git a/src/node_relay_server.js b/src/node_relay_server.js index 8206e0bc..8d41d64d 100644 --- a/src/node_relay_server.js +++ b/src/node_relay_server.js @@ -13,12 +13,15 @@ const fs = require('fs'); const querystring = require('querystring'); const _ = require('lodash'); +const DYNAMIC_PUSH_AUTO_RETRY_TIME_MS = 5000; + class NodeRelayServer { constructor(config) { this.config = config; this.staticCycle = null; this.staticSessions = new Map(); this.dynamicSessions = new Map(); + this.dynamicPushRetryHandlers = new Map();; } async run() { @@ -206,7 +209,20 @@ class NodeRelayServer { let session = new NodeRelaySession(conf); session.id = id; session.on('end', (id) => { + const handler = this.dynamicPushRetryHandlers.get(id); + if (handler) clearTimeout(handler); this.dynamicSessions.delete(id); + this.dynamicPushRetryHandlers.delete(id); + }); + session.on('retry', (id) => { + Logger.log('[relay dynamic push] [retry] start after' + DYNAMIC_PUSH_AUTO_RETRY_TIME_MS + 'id=' + id, inPath, 'to', ouPath); + const handler = setTimeout( + () => { + session.run(); + Logger.log('[relay dynamic push] [retry] start, id=' + id, inPath, 'to', conf.ouPath); + }, + DYNAMIC_PUSH_AUTO_RETRY_TIME_MS); + this.dynamicPushRetryHandlers.set(id, handler); }); this.dynamicSessions.set(id, session); session.run(); diff --git a/src/node_relay_session.js b/src/node_relay_session.js index db6de1db..09be87ac 100644 --- a/src/node_relay_session.js +++ b/src/node_relay_session.js @@ -5,6 +5,7 @@ // const Logger = require('./node_core_logger'); const NodeCoreUtils = require('./node_core_utils'); +const context = require('./node_core_ctx'); const EventEmitter = require('events'); const { spawn } = require('child_process'); @@ -52,7 +53,15 @@ class NodeRelaySession extends EventEmitter { this.ffmpeg_exec.on('close', (code) => { Logger.log('[relay end] id=' + this.id, 'code=' + code); - this.emit('end', this.id); + const publisherAlive = context.sessions.get(this.id)?.isPublishing; + if (this.conf.autoRetry) { + Logger.log('[relay end - detect autoRetry] publisher if alive: ' + !!publisherAlive); + if (publisherAlive && this.conf.mode === 'push') { + this.emit('retry', this.id, this.conf.inPath, this.conf.ouPath); + } + } else { + this.emit('end', this.id); + } }); }