diff --git a/doc/servermd/StreamAPI.md b/doc/servermd/StreamAPI.md index 8783d7989..f66589b53 100644 --- a/doc/servermd/StreamAPI.md +++ b/doc/servermd/StreamAPI.md @@ -19,7 +19,7 @@ Because it's REST, management clients can be implemented by different programmin To enable stream API, add experimental targets `stream-service` and `customized-agent` during packing. `stream-service` is the module that provide stream related API. `customized-agent` is the module that provide server side customization for stream related API. After packing, the stream API configuration is in stream_service/service.toml. -Edit portal/portal.toml, set `stream_engine_name` to the same value of stream API configuration. +Edit portal/portal.toml, set `stream_engine_name` to the same value of stream API configuration(`service.name` or `scheduler.name` in stream_service/service.toml). Edit management_api/management_api.toml, set `stream_engine` and `control_agent` to the same values of stream API configuration. Start OWT service with updated configuration, stream API should be enabled. @@ -236,7 +236,9 @@ request body: object(ListQuery): { - KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}` + query: { + KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}` + } } response body: @@ -273,7 +275,9 @@ request body: type: string(publishType), // E.g, "streaming", "video", ... participant: string(participantId), // Or use domain name as participant ID. media: object(MediaTrack) | object(MediaInfo), - info: object(TypeSpecificInfo) + info: object(TypeSpecificInfo), + connection: object(ConnectionInfo) | undefined, // For "streaming" + processor: string(processorId) | undefined, // For "audio", "video" } object(MediaTrack) { // For WebRTC publications tracks: [ object(TrackInfo) ], @@ -287,6 +291,11 @@ request body: parameters: object(VideoParameters) } } + object(ConnectionInfo) { // For streaming publications + url: string(streamingUrl), + transportProtocol: "tcp" | "udp", + bufferSize: number(bufferSize), + }, For *object(TrackInfo)*, refers to [tracks in MediaOptions](../Client-Portal%20Protocol.md#331-participant-joins-a-room). For *format* and *parameters*, refers to [REST API](RESTAPI.md#53-streams-StreamAPIsection53). @@ -367,7 +376,9 @@ request body: object(ListQuery): { - KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}` + query: { + KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}` + } } response body: @@ -404,7 +415,9 @@ request body: type: string(subscribeType), // E.g, "streaming", "video", ... participant: string(participantId), // Or use domain name as participant ID. media: object(MediaTrack) | object(MediaInfo), - info: object(TypeSpecificInfo) + info: object(TypeSpecificInfo), + connection: object(ConnectionInfo) | undefined, // For "streaming", "recording" + processor: string(processorId) | undefined, // For "audio", "video", "analytics" } object(MediaTrack) { // For WebRTC subscriptions tracks: [ object(TrackInfo) ], @@ -413,12 +426,21 @@ request body: audio: { from: string(sourceAudioId), // Could be publication ID or source track ID format: object(AudioFormat), - }, + } | boolean(enable), video: { from: string(sourceVideoId), // Could be publication ID or source track ID format: object(VideoFormat), parameters: object(VideoParameters) - } + } | boolean(enable) + } + object(ConnectionInfo) { + container: "mkv" | "mp4" | undefined, // For "recording" + url: string(url) | undefined, // For "streaming" + algorithm: string(algorithmName) | undefined, // For "analytics" + video: { // For "analytics" + format: object(VideoFormat), // Analytics output video format + parameters: object(VideoParameters), // Analytics output video parameters + } | undefined, } For *object(TrackInfo)*, refers to [tracks in MediaOptions](../Client-Portal%20Protocol.md#331-participant-joins-a-room). @@ -497,7 +519,9 @@ request body: object(ListQuery): { - KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}` + query: { + KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}` + } } response body: @@ -577,6 +601,14 @@ request body: id: string(analyticsId) } + // For "sip" type processor + sip: { + server: string(serverHost), + user: string(sipUser), + password: string(sipPasswd) + }, + stream: string(outgoingSipStream) + For *object(Region)*, refers to [REST API](RESTAPI.md#51-rooms-StreamAPIsection51). response body: @@ -605,7 +637,109 @@ response body: **Empty** -## 5.5 Nodes {#StreamAPIsection5_5} +## 5.5 Participants {#StreamAPIsection5_5} +Description:
+Participants represents owner of publications and subscriptions in OWT server.
+ +Resources: + +- /v1.1/stream-engine/participants +- /v1.1/stream-engine/participants/{participantId} + +Data Model: + + Object(Participant) { + id: string(ParticipantID), + domain: string(domainName), // For example, room ID + portal: string(portalId), + notifying: boolean(notifyOthers), // Notify other participants about join/leave. + } + +### List Participants {#StreamAPIsection5_5_1} +**GET ${host}/v1.1/stream-engine/participants** +**GET ${host}/v1.1/stream-engine/participants/{participantId}** + +Description:
+List participants in stream engine.
+ +request body: + +| type | content | +|:-------------|:-------| +| json | object(ListQuery) | + +**Note**: Definition of *ListQuery*.
+ + object(ListQuery): + { + query: { + KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}` + } + } + +response body: + +| type | content | +|:-------------|:-------| +| json | Object(ListResult) | + +**Note**: Definition of *ListResult*.
+ + object(ListResult): + { + total: number(ListSize), + start: number(offsetInList), + data: [ object(Participant) ] + } + +### Create Participant {#StreamAPIsection5_5_2} +**POST ${host}/v1.1/stream-engine/participants** + +Description:
+Create a participant with configuration.
+ +request body: + +| type | content | +|:-------------|:-------| +| json | object(ParticipantRequest) | + +**Note**: Definition of *ParticipantRequest*.
+ + Object(ParticipantRequest) { + id: string(ParticipantID), + domain: string(domainName), + notifying: boolean(notifyOthers), // Notify other participants about join/leave. + } + +response body: + +| type | content | +|:-------------|:-------| +| json | object(IdObject) | + +**Note**: Definition of *IdObject*.
+ + Object(IdObject) { + id: string(createdParticipantId) + } + +### Delete Participant {#StreamAPIsection5_5_3} +**DELETE ${host}/v1.1/stream-engine/participants/{participantId}** + +Description:
+Drop the specified participant, all related publications and subscriptions will be stopped as well.
+ +request body: + + **Empty** + +response body: + + **Empty** + + +## 5.6 Nodes {#StreamAPIsection5_6} Description:
Node represents working process in stream engine.
@@ -624,7 +758,7 @@ Data Model: streamAddr: {ip: string(host), port: number(port)} } -### List Nodes {#StreamAPIsection5_5_1} +### List Nodes {#StreamAPIsection5_6_1} **GET ${host}/v1.1/stream-engine/nodes** Description:
@@ -640,7 +774,9 @@ request body: object(ListQuery): { - KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}` + query: { + KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}` + } } response body: diff --git a/source/agent/analytics/index.js b/source/agent/analytics/index.js index 9ff1ebe7c..3db7467bd 100644 --- a/source/agent/analytics/index.js +++ b/source/agent/analytics/index.js @@ -210,6 +210,9 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) { this.connectionclose = () => { destroyStream(options.controller, newStreamId); + // Notify stream engine if needed + const data = {id: newStreamId}; + notifyStatus(options.controller, connectionId, 'onStreamRemoved', data); } inputs[connectionId] = true; @@ -259,11 +262,10 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) { } } - // For Stream Engine, onSessionProgress(id, name, data) + generateStream(options.controller, newStreamId, streamInfo); + // Notify stream engine if needed streamInfo.id = newStreamId; - notifyStatus(controller, connectionId, 'onNewStream', streamInfo); - - // generateStream(options.controller, newStreamId, streamInfo); + notifyStatus(controller, connectionId, 'onStreamAdded', streamInfo); } catch (e) { log.error("Parse stream added data with error:", e); } diff --git a/source/agent/conference/rpcRequest.js b/source/agent/conference/rpcRequest.js index 72c774da0..7f9d9dfb5 100644 --- a/source/agent/conference/rpcRequest.js +++ b/source/agent/conference/rpcRequest.js @@ -574,10 +574,12 @@ const RpcRequest = function(rpcChannel, listener) { }; that.addSipNode = function (workerNode) { - grpcNode[workerNode] = grpcTools.startClient( - 'sip', - workerNode - ); + if (enableGrpc) { + grpcNode[workerNode] = grpcTools.startClient( + 'sip', + workerNode + ); + } } return that; diff --git a/source/agent/customized/conferenceLite.js b/source/agent/customized/conferenceLite.js index c23b96122..258d10942 100644 --- a/source/agent/customized/conferenceLite.js +++ b/source/agent/customized/conferenceLite.js @@ -55,7 +55,7 @@ class AggregatedStream { this.streams = []; this.processor = null; } - addSetting(id, setting) { + addSetting(id, setting, data) { const stream = this.streams.find((stream) => { if (this.type === 'audio') { return isAudioSettingMatch(stream.setting, setting); @@ -64,10 +64,12 @@ class AggregatedStream { } }); if (!stream) { - this.streams.push({id, setting}); + this.streams.push({id, setting, data}); return true; + } else { + stream.id = id; + return false; } - return false; } getSetting(setting) { const stream = this.streams.find((stream) => { @@ -77,7 +79,7 @@ class AggregatedStream { return isVideoSettingMatch(stream.setting, setting); } }); - return stream?.id || null; + return stream; } getSettings() { return this.streams.map((stream) => { @@ -225,15 +227,6 @@ class Conference { initProms.push(p); this.videos.set(mixedId, new AggregatedStream(mixedId, 'video')); } - const pubReq = { - id: mixedId, - type: 'virtual', - data: mixedPub, - domain: roomId, - participant: roomId - }; - // initProms.push(rpcChannel.makeRPC( - // STREAM_ENGINE, 'publish', [pubReq])); this.mixedStreams.set(mixedId, mixedPub); } await Promise.all(initProms); @@ -252,14 +245,7 @@ class Conference { // Publish from portal async publish(req) { log.debug('publish:', req); - if (req.type === 'virtual') { - return req; - } // Validate request - if (!this.participants.has(req.participant) && - req.participant !== this.roomId) { - throw new Error('Invalid participant ID'); - } req.id = req.id || randomUUID().replace(/-/g, ''); if (req.type !== 'video' && req.type !== 'audio') { req.info = Object.assign(req.info || {}, { @@ -296,7 +282,18 @@ class Conference { formatPreference }; // Check subscribe source - if (req.type === 'webrtc') { + if (req.type !== 'audio' && req.type !== 'video') { + if (req.type !== 'webrtc') { + req.media.tracks = []; + if (req.media.audio) { + req.media.audio.type = 'audio'; + req.media.tracks.push(req.media.audio); + } + if (req.media.video) { + req.media.video.type = 'video'; + req.media.tracks.push(req.media.video); + } + } // Save track from for later linkup for (const track of req.media.tracks) { const tmpTrackId = track.id ?? req.id; @@ -409,54 +406,16 @@ class Conference { if (from && this.audios.has(from)) { log.debug('Pending link from:', track.id, from); const stream = this.audios.get(from); - const mappedId = stream.getSetting(track); - if (mappedId) { - track.from = mappedId; + const audioSetting = {format: track.format}; + const mapped = stream.getSetting(audioSetting); + if (mapped) { + track.from = await mapped.data; + log.debug('Update track audio from:', track.id, track.from); } else { - // Generate audio - if (!stream.processor) { - if (this.mixedStreams.has(from)) { - log.error('Audio mixer is not ready'); - throw new Error('Audio mixer is not ready'); - } - const txReq = { - type: 'audio', - id: this.roomId + '-' + from, - transcoding: {id: from}, - domain: this.roomId, - participant: this.roomId - }; - stream.processor = {id: txReq.id}; - stream.processor = await this.rpcChannel.makeRPC( - STREAM_ENGINE, 'addProcessor', [txReq]); - // Add input - const audioOption = { - from, - format: stream.getSettings()[0].format, - } - const inputReq = { - type: 'audio', - id: randomUUID().replace(/-/g, ''), - media: {audio: audioOption}, - processor: stream.processor.id, - participant: this.roomId, - info: {owner: ''} - }; - await this.rpcChannel.makeRPC( - STREAM_ENGINE, 'subscribe', [inputReq]); - } - const genReq = { - type: 'audio', - id: randomUUID().replace(/-/g, ''), - media: {audio: {format: track.format}}, - processor: stream.processor.id, - participant: this.roomId, - info: {owner: '', hidden: true} - }; - const outputId = await this.rpcChannel.makeRPC( - STREAM_ENGINE, 'publish', [genReq]); - stream.addSetting(outputId.id, {format: track.format}); - track.from = outputId.id; + const transcode = this._transcodeAudio( + stream, from, audioSetting); + stream.addSetting(null, audioSetting, transcode); + track.from = await transcode; log.debug('Update track audio from:', track.id, track.from); } hasUpdate = true; @@ -474,59 +433,19 @@ class Conference { } if (from && this.videos.has(from)) { const stream = this.videos.get(from); - const mappedId = stream.getSetting(track); - if (mappedId) { - track.from = mappedId; + const videoSetting = { + format: track.format, + parameters: track.parameters, + }; + const mapped = stream.getSetting(videoSetting); + if (mapped) { + track.from = await mapped.data; + log.debug('Update track video from:', track.id, track.from); } else { - // Generate video - if (!stream.processor) { - if (this.mixedStreams.has(from)) { - log.error('Video mixer is not ready'); - throw new Error('Video mixer is not ready'); - } - const txReq = { - type: 'video', - id: this.roomId + '-' + from, - transcoding: {id: from}, - domain: this.roomId, - participant: this.roomId, - }; - stream.processor = {id: txReq.id}; - stream.processor = await this.rpcChannel.makeRPC( - STREAM_ENGINE, 'addProcessor', [txReq]); - // Add input - const videoOption = { - from, - format: stream.getSettings()[0].format, - parameters: stream.getSettings()[0].parameters, - } - const inputReq = { - type: 'video', - id: randomUUID().replace(/-/g, ''), - media: {video: videoOption}, - processor: stream.processor.id, - participant: this.roomId, - info: {owner: ''} - }; - await this.rpcChannel.makeRPC( - STREAM_ENGINE, 'subscribe', [inputReq]); - } - const videoSetting = { - format: track.format, - parameters: track.parameters, - }; - const genReq = { - type: 'video', - id: randomUUID().replace(/-/g, ''), - media: {video: videoSetting}, - processor: stream.processor.id, - participant: this.roomId, - info: {owner: '', hidden: true} - }; - const outputId = await this.rpcChannel.makeRPC( - STREAM_ENGINE, 'publish', [genReq]); - stream.addSetting(outputId.id, videoSetting); - track.from = outputId.id; + const transcode = this._transcodeVideo( + stream, from, videoSetting); + stream.addSetting(null, videoSetting, transcode); + track.from = await transcode; log.debug('Update track video from:', track.id, track.from); } hasUpdate = true; @@ -559,6 +478,102 @@ class Conference { req.id = req.id || randomUUID().replace(/-/g, ''); return req; } + + async _transcodeAudio(stream, from, audioSetting) { + // Generate audio + if (!stream.processor) { + if (this.mixedStreams.has(from)) { + log.error('Audio mixer is not ready'); + throw new Error('Audio mixer is not ready'); + } + const txReq = { + type: 'audio', + id: this.roomId + '-' + from, + transcoding: {id: from}, + domain: this.roomId, + participant: this.roomId + }; + stream.processor = {id: txReq.id}; + stream.processor = await this.rpcChannel.makeRPC( + STREAM_ENGINE, 'addProcessor', [txReq]); + // Add input + const audioOption = { + from, + format: stream.getSettings()[0].format, + } + const inputReq = { + type: 'audio', + id: randomUUID().replace(/-/g, ''), + media: {audio: audioOption}, + processor: stream.processor.id, + participant: this.roomId, + info: {owner: ''} + }; + await this.rpcChannel.makeRPC( + STREAM_ENGINE, 'subscribe', [inputReq]); + } + const genReq = { + type: 'audio', + id: randomUUID().replace(/-/g, ''), + media: {audio: {format: audioSetting.format}}, + processor: stream.processor.id, + participant: this.roomId, + info: {owner: '', hidden: true} + }; + const outputId = await this.rpcChannel.makeRPC( + STREAM_ENGINE, 'publish', [genReq]); + stream.addSetting(outputId.id, audioSetting); + log.debug('Transcode audio completed:', stream.id, outputId.id, audioSetting); + } + + async _transcodeVideo(stream, from, videoSetting) { + // Generate video + if (!stream.processor) { + if (this.mixedStreams.has(from)) { + log.error('Video mixer is not ready'); + throw new Error('Video mixer is not ready'); + } + const txReq = { + type: 'video', + id: this.roomId + '-' + from, + transcoding: {id: from}, + domain: this.roomId, + participant: this.roomId, + }; + stream.processor = {id: txReq.id}; + stream.processor = await this.rpcChannel.makeRPC( + STREAM_ENGINE, 'addProcessor', [txReq]); + // Add input + const videoOption = { + from, + format: stream.getSettings()[0].format, + parameters: stream.getSettings()[0].parameters, + } + const inputReq = { + type: 'video', + id: randomUUID().replace(/-/g, ''), + media: {video: videoOption}, + processor: stream.processor.id, + participant: this.roomId, + info: {owner: ''} + }; + await this.rpcChannel.makeRPC( + STREAM_ENGINE, 'subscribe', [inputReq]); + } + const genReq = { + type: 'video', + id: randomUUID().replace(/-/g, ''), + media: {video: videoSetting}, + processor: stream.processor.id, + participant: this.roomId, + info: {owner: '', hidden: true} + }; + const outputId = await this.rpcChannel.makeRPC( + STREAM_ENGINE, 'publish', [genReq]); + stream.addSetting(outputId.id, videoSetting); + log.debug('Transcode video completed:', stream.id, outputId.id, videoSetting); + return outputId.id; + } } module.exports.Conference = Conference; diff --git a/source/agent/sip/index.js b/source/agent/sip/index.js index 6c773e3f4..9e7ed2d13 100644 --- a/source/agent/sip/index.js +++ b/source/agent/sip/index.js @@ -247,6 +247,7 @@ module.exports = function (rpcC, selfRpcId, parentRpcId, clusterWorkerIP) { var router = new InternalConnectionRouter(global.config.internal); if (enableGRPC) { + erizo.id = null; const grpcTools = require('./grpcTools'); cluster_name = global.config?.cluster?.grpc_host || 'localhost:10080'; makeRPC = function (_, node, method, args, onOk, onError) { @@ -713,7 +714,9 @@ module.exports = function (rpcC, selfRpcId, parentRpcId, clusterWorkerIP) { that.init = function(options, callback) { log.debug('init SipGateway:', options.sip_server, options.sip_user); - erizo.id = rpcC.rpcAddress + if (!erizo.id) { + erizo.id = rpcClient.rpcAddress + } if (typeof options.room_id !== 'string' || options.room_id === '') { log.error('Invalid room id'); diff --git a/source/agent/sip/log4cxx.properties b/source/agent/sip/log4cxx.properties index 8f75a46dc..97a9f0900 100644 --- a/source/agent/sip/log4cxx.properties +++ b/source/agent/sip/log4cxx.properties @@ -11,6 +11,9 @@ log4j.appender.A1.layout.ConversionPattern=%d - %p: %c - %m%n # The raw UDP and TCP transports which are used for the connection between the Gateway and AVS. log4j.logger.owt.RawTransport=INFO +log4j.logger.owt.TransportSession=INFO +log4j.logger.owt.TransportServer=INFO +log4j.logger.owt.TransportClient=INFO # If the SctpTransport log is set to debug, heavy IO would affact the connections log4j.logger.owt.SctpTransport=INFO diff --git a/source/management_api/resource/v1.1/index.js b/source/management_api/resource/v1.1/index.js index 74ee22b96..17833ee56 100644 --- a/source/management_api/resource/v1.1/index.js +++ b/source/management_api/resource/v1.1/index.js @@ -12,6 +12,7 @@ const tokensResource = require('../v1/tokensResource'); const publicationsResource = require('./publicationsResource'); const subscriptionsResource = require('./subscriptionsResource'); const processorsResource = require('./processorsResource'); +const participantsResource = require('./participantsResource'); const routerV1 = require('../v1'); // Stream(including external streaming-in) management @@ -44,6 +45,11 @@ router.get('/stream-engine/processors/:processors', processorsResource.get); router.post('/stream-engine/processors', processorsResource.add); router.delete('/stream-engine/processors/:processor', processorsResource.delete); +router.get('/stream-engine/participants', participantsResource.getList); +router.get('/stream-engine/participants/:participant', participantsResource.get); +router.post('/stream-engine/participants', participantsResource.add); +router.delete('/stream-engine/participants/:participant', participantsResource.delete); + // Same as previous version router.use(routerV1); diff --git a/source/management_api/resource/v1.1/participantsResource.js b/source/management_api/resource/v1.1/participantsResource.js new file mode 100644 index 000000000..061739589 --- /dev/null +++ b/source/management_api/resource/v1.1/participantsResource.js @@ -0,0 +1,75 @@ +// Copyright (C) <2021> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +'use strict'; +const { query } = require('express'); +const e = require('../../errors'); + +// Logger +const log = require('../../logger').logger.getLogger('ParticipantsResource'); +const rpc = require('../../rpc/rpc'); + +const STREAM_SERVICE_ID = global.config.cluster.stream_engine; + +function callStreamService(methodName, args, callback) { + rpc.callRpc(STREAM_SERVICE_ID, methodName, args, {callback: function(ret) { + if (ret === 'timeout' || ret === 'error') { + callback(ret); + } else { + callback(null, ret); + } + }}); +} + +exports.getList = function (req, res, next) { + log.debug('Representing participants for service ', req.authData.service._id); + const query = req.body?.query || {}; + callStreamService('getParticipants', [{query}], (err, rets) => { + if (err) { + next(new e.CloudError('Failed to get participants')); + } else { + res.send(rets); + } + }); +}; + +exports.get = function (req, res, next) { + log.debug('Representing publication:', req.params.participant); + const query = {id: req.params.participant}; + callStreamService('getParticipants', [{query}], (err, rets) => { + if (err) { + next(new e.CloudError('Failed to get participants')); + } else { + res.send(rets[0]); + } + }); +}; + +exports.add = function (req, res, next) { + log.debug('Join for service ', req.authData.service._id); + const data = { + id: req.body?.id, + domain: req.body?.domain, + portal: req.body?.portal, + notifying: req.body?.notifying + }; + callStreamService('join', [data], (err, ret) => { + if (err) { + next(new e.CloudError('Failed to join')); + } else { + res.send(ret); + } + }); +}; + +exports.delete = function (req, res, next) { + log.debug('Leave for service ', req.params.participant, req.authData.service._id); + callStreamService('leave', [{id: req.params.participant}], (err, ret) => { + if (err) { + next(new e.CloudError('Failed to leave')); + } else { + res.send(ret); + } + }); +}; diff --git a/source/management_api/resource/v1.1/processorsResource.js b/source/management_api/resource/v1.1/processorsResource.js index 80c630b71..ed79b6246 100644 --- a/source/management_api/resource/v1.1/processorsResource.js +++ b/source/management_api/resource/v1.1/processorsResource.js @@ -3,12 +3,10 @@ // SPDX-License-Identifier: Apache-2.0 'use strict'; -const requestHandler = require('../../requestHandler'); const e = require('../../errors'); // Logger const log = require('../../logger').logger.getLogger('ProcessorsResource'); - const rpc = require('../../rpc/rpc'); const STREAM_SERVICE_ID = global.config.cluster.stream_engine; @@ -24,9 +22,9 @@ function callStreamService(methodName, args, callback) { } exports.getList = function (req, res, next) { - log.debug('Representing processors for domain ', req.params.domain, - 'and service', req.authData.service._id); - callStreamService('getProcessors', [{}], (err, pubs) => { + log.debug('Representing processors for service', req.authData.service._id); + const query = req.body?.query || {}; + callStreamService('getProcessors', [{query}], (err, pubs) => { if (err) { next(new e.CloudError('Failed to get subscriptions')); } else { @@ -39,7 +37,7 @@ exports.get = function (req, res, next) { log.debug('Representing processor:', req.params.processor, ' for domain ', req.params.domain); const query = {id: req.params.processor}; - callStreamService('getProcessors', [query], (err, pubs) => { + callStreamService('getProcessors', [{query}], (err, pubs) => { if (err) { next(new e.CloudError('Failed to get processors')); } else { diff --git a/source/management_api/resource/v1.1/publicationsResource.js b/source/management_api/resource/v1.1/publicationsResource.js index 89ab720ca..a7ec6fa07 100644 --- a/source/management_api/resource/v1.1/publicationsResource.js +++ b/source/management_api/resource/v1.1/publicationsResource.js @@ -3,12 +3,10 @@ // SPDX-License-Identifier: Apache-2.0 'use strict'; -const requestHandler = require('../../requestHandler'); const e = require('../../errors'); // Logger const log = require('../../logger').logger.getLogger('PublicationsResource'); - const rpc = require('../../rpc/rpc'); const STREAM_SERVICE_ID = global.config.cluster.stream_engine; @@ -24,9 +22,9 @@ function callStreamService(methodName, args, callback) { } exports.getList = function (req, res, next) { - log.debug('Representing publications for domain ', req.params.domain, - 'and service', req.authData.service._id); - callStreamService('getPublications', [{}], (err, pubs) => { + log.debug('Representing publications for service', req.authData.service._id); + const query = req.body?.query || {}; + callStreamService('getPublications', [{query}], (err, pubs) => { if (err) { next(new e.CloudError('Failed to get publications')); } else { @@ -39,7 +37,7 @@ exports.get = function (req, res, next) { log.debug('Representing publication:', req.params.publication, ' for domain ', req.params.domain); const query = {id: req.params.publication}; - callStreamService('getPublications', [query], (err, pubs) => { + callStreamService('getPublications', [{query}], (err, pubs) => { if (err) { next(new e.CloudError('Failed to get publications')); } else { diff --git a/source/management_api/resource/v1.1/subscriptionsResource.js b/source/management_api/resource/v1.1/subscriptionsResource.js index ec9f82d08..9c480dc41 100644 --- a/source/management_api/resource/v1.1/subscriptionsResource.js +++ b/source/management_api/resource/v1.1/subscriptionsResource.js @@ -3,12 +3,10 @@ // SPDX-License-Identifier: Apache-2.0 'use strict'; -const requestHandler = require('../../requestHandler'); const e = require('../../errors'); // Logger const log = require('../../logger').logger.getLogger('SubscriptionsResource'); - const rpc = require('../../rpc/rpc'); const STREAM_SERVICE_ID = global.config.cluster.stream_engine; @@ -24,9 +22,9 @@ function callStreamService(methodName, args, callback) { } exports.getList = function (req, res, next) { - log.debug('Representing subscriptions for domain ', req.params.domain, - 'and service', req.authData.service._id); - callStreamService('getSubscriptions', [{}], (err, pubs) => { + log.debug('Representing subscriptions for service', req.authData.service._id); + const query = req.body?.query || {}; + callStreamService('getSubscriptions', [{query}], (err, pubs) => { if (err) { next(new e.CloudError('Failed to get subscriptions')); } else { @@ -39,7 +37,7 @@ exports.get = function (req, res, next) { log.debug('Representing subscription:', req.params.subscription, ' for domain ', req.params.domain); const query = {id: req.params.subscription}; - callStreamService('getSubscriptions', [query], (err, pubs) => { + callStreamService('getSubscriptions', [{query}], (err, pubs) => { if (err) { next(new e.CloudError('Failed to get subscriptions')); } else { diff --git a/source/portal/portal.toml b/source/portal/portal.toml index f39fa3029..29ce3531b 100644 --- a/source/portal/portal.toml +++ b/source/portal/portal.toml @@ -21,7 +21,6 @@ cors = ["*"] # Setup as GRPC server #enable_grpc = true -#customized_controller = "conference2" #stream_engine_name = "stream-service" [cluster] diff --git a/source/sip_portal/sipErizoHelper.js b/source/sip_portal/sipErizoHelper.js index 9624df92d..69ff066e9 100644 --- a/source/sip_portal/sipErizoHelper.js +++ b/source/sip_portal/sipErizoHelper.js @@ -48,7 +48,7 @@ module.exports = function (spec) { onError(err); } else { const addr = result.info.ip + ':' + result.info.grpcPort; - onOk(addr); + onOk({id: addr}); } }); } else if (method === 'getNode') { @@ -167,14 +167,13 @@ module.exports = function (spec) { if (attempt <= 0) { return on_failed('Failed in scheduling a sip agent.'); } - makeRPC( rpcClient, cluster, 'schedule', ['sip', for_whom/*FIXME: use room_id as taskId temporarily, should use for_whom instead later.*/, 'preference'/*FIXME: should fill-in actual preference*/, 10 * 1000], function (result) { - on_ok({id: result}); + on_ok(result); keepTrying = false; }, function (reason) { if (keepTrying) { diff --git a/source/stream_service/controllers.json b/source/stream_service/controllers.json index 34cdea8d2..ee598901d 100644 --- a/source/stream_service/controllers.json +++ b/source/stream_service/controllers.json @@ -19,6 +19,11 @@ "requirePath": "./controllers/streamingController", "test": {} }, + "recording": { + "className": "StreamingController", + "requirePath": "./controllers/streamingController", + "test": {} + }, "analytics": { "className": "AnalyticsController", "requirePath": "./controllers/analyticsController", @@ -28,10 +33,5 @@ "className": "QuicController", "requirePath": "./controllers/quicController", "test": {} - }, - "virtual": { - "className": "VirtualController", - "requirePath": "./controllers/virtualController", - "test": {} } } \ No newline at end of file diff --git a/source/stream_service/controllers/analyticsController.js b/source/stream_service/controllers/analyticsController.js index f2dfaa52a..450e2ec3a 100644 --- a/source/stream_service/controllers/analyticsController.js +++ b/source/stream_service/controllers/analyticsController.js @@ -196,8 +196,8 @@ class AnalyticsController extends TypeController { onSessionProgress(id, type, data) { switch(type) { - case 'onNewStream': { - log.debug('onNewStream:', id, type, data); + case 'onStreamAdded': { + log.debug('onStreamAdded:', id, type, data); const session = this.sessions.get(id); const outputConfig = { id: data.id, @@ -213,6 +213,12 @@ class AnalyticsController extends TypeController { .catch((e) => log.debug('Failed to create session on progress:', e)); break; } + case 'onStreamRemoved': { + log.debug('onStreamRemoved:', id, type, data); + this.removeSession(id, 'in', 'onStreamRemoved') + .catch((e) => log.debug('Failed to remove session on progress:', e)); + break; + } default: log.warn('Unknown progress type:', type); break; diff --git a/source/stream_service/controllers/audioController.js b/source/stream_service/controllers/audioController.js index 045b7b5d4..825775e10 100644 --- a/source/stream_service/controllers/audioController.js +++ b/source/stream_service/controllers/audioController.js @@ -63,7 +63,8 @@ class AudioController extends TypeController { const locality = await this.getWorkerNode( 'audio', audioConfig.domain, audioConfig.id, mediaPreference); if (audioConfig.mixing) { - const amixer = new Processor(audioConfig.id, 'amixer', audioConfig); + const amixer = new Processor(audioConfig.id, 'audio', audioConfig); + amixer.label = 'amixer'; amixer.locality = locality; amixer.domain = audioConfig.domain; const mixConfig = audioConfig.mixing; @@ -78,7 +79,8 @@ class AudioController extends TypeController { this.processors.set(audioConfig.id, amixer); return amixer; } else if (audioConfig.transcoding) { - const atranscoder = new Processor(audioConfig.id, 'axcoder', audioConfig); + const atranscoder = new Processor(audioConfig.id, 'audio', audioConfig); + atranscoder.label = 'axcoder'; atranscoder.locality = locality; atranscoder.domain = audioConfig.domain; const transcodeConfig = audioConfig.transcoding; @@ -93,7 +95,8 @@ class AudioController extends TypeController { this.processors.set(audioConfig.id, atranscoder); return atranscoder; } else if (audioConfig.selecting) { - const aselector = new Processor(audioConfig.id, 'aselector', audioConfig); + const aselector = new Processor(audioConfig.id, 'audio', audioConfig); + aselector.label = 'aselector'; aselector.locality = locality; aselector.domain = audioConfig.domain; const selectorConfig = audioConfig.selecting; @@ -103,10 +106,15 @@ class AudioController extends TypeController { // Create publication for active audio streams after return process.nextTick(() => { for (const streamId of selectorConfig.activeStreamIds) { + const format = { + codec: 'opus', + sampleRate: 48000, + channelNum: 2 + }; const publication = new Publication(output.id, 'audio', sessionConfig.info); publication.domain = audioConfig.domain; publication.locality = locality; - const audioTrack = {id: streamId, format: {codec: 'opus'}}; + const audioTrack = {id: streamId, format}; publication.source.audio.push(audioTrack); aselector.outputs.audio.push(audioTrack); this.emit('session-established', streamId, publication); diff --git a/source/stream_service/controllers/rtcController.js b/source/stream_service/controllers/rtcController.js index 495ebafde..8ca36a3f9 100644 --- a/source/stream_service/controllers/rtcController.js +++ b/source/stream_service/controllers/rtcController.js @@ -238,8 +238,7 @@ class RtcController extends TypeController { throw new Error(`Cannot find track for mute/unmute: ${id}`); } } else if (config.operation === 'update') { - this.emit('session-updated', config.id, - {type: 'update', data: Subscription.from(config.data)}); + super.controlSession(direction, config); } else { throw new Error(`Operation not supported: ${config.operation}`); } diff --git a/source/stream_service/controllers/typeController.js b/source/stream_service/controllers/typeController.js index bc75e0ab3..24ce56a0e 100644 --- a/source/stream_service/controllers/typeController.js +++ b/source/stream_service/controllers/typeController.js @@ -6,6 +6,7 @@ const { EventEmitter } = require('events'); const log = require('../logger').logger.getLogger('TypeController'); +const {Publication, Subscription} = require('../stateTypes') /* Events * 'session-established': (id, Publication|Subscription) @@ -54,6 +55,16 @@ class TypeController extends EventEmitter { const args = [locality.node, {room: domain, task: taskId}]; return this.makeRPC(locality.agent, 'recycleNode', args); } + + async controlSession(direction, config) { + if (config.operation === 'update') { + const data = direction === 'in' ? + Publication.from(config.data) : Subscription.from(config.data); + this.emit('session-updated', config.id, {type: 'update', data}); + } else { + throw new Error(`Unknown control operation: ${config.operation}`); + } + } } exports.TypeController = TypeController; diff --git a/source/stream_service/controllers/videoController.js b/source/stream_service/controllers/videoController.js index e388c6a7e..c1a063022 100644 --- a/source/stream_service/controllers/videoController.js +++ b/source/stream_service/controllers/videoController.js @@ -26,6 +26,20 @@ function videoFormatStr(format) { return str; } +function toVideoFormat(str) { + const i = str.indexOf(str); + if (i < 0) { + return { + codec: str + }; + } else { + return { + codec: str.substring(0, i), + profile: str.substring(i + 1) + }; + } +} + /* Events * 'session-established': (id, Publication|Subscription) * 'session-updated': (id, Publication|Subscription) @@ -60,13 +74,16 @@ class VideoController extends TypeController { const locality = await this.getWorkerNode( 'video', videoConfig.domain, videoConfig.id, mediaPreference); if (videoConfig.mixing) { - const vmixer = new Processor(videoConfig.id, 'vmixer', videoConfig); + const vmixer = new Processor(videoConfig.id, 'video', videoConfig); + vmixer.label = 'vmixer'; vmixer.locality = locality; vmixer.domain = videoConfig.domain; const mixConfig = videoConfig.mixing; try { - await this.makeRPC(locality.node, 'init', + const ret = await this.makeRPC(locality.node, 'init', ['mixing', mixConfig, mixConfig.id, this.selfId, mixConfig.view]); + log.debug('Mixer init:', ret); + vmixer.codecs = ret.codecs; } catch (e) { this.recycleWorkerNode(locality, videoConfig.domain, videoConfig.id) .catch((err) => log.debug('Failed to recycleNode:', err)); @@ -76,13 +93,16 @@ class VideoController extends TypeController { return vmixer; } else if (videoConfig.transcoding) { - const vtranscoder = new Processor(videoConfig.id, 'vxcoder', videoConfig); + const vtranscoder = new Processor(videoConfig.id, 'video', videoConfig); + vtranscoder.label = 'vxcoder'; vtranscoder.locality = locality; vtranscoder.domain = videoConfig.domain; const transcodeConfig = videoConfig.transcoding; try { - await this.makeRPC(locality.node, 'init', - ['transcoding', transcodeConfig, transcodeConfig.id, this.selfId, '']) + const ret = await this.makeRPC(locality.node, 'init', + ['transcoding', transcodeConfig, transcodeConfig.id, this.selfId, '']); + log.debug('Transcoder init:', ret); + vtranscoder.codecs = ret.codecs; } catch (e) { this.recycleWorkerNode(locality, videoConfig.domain, videoConfig.id) .catch((err) => log.debug('Failed to recycleNode:', err)); @@ -163,14 +183,24 @@ class VideoController extends TypeController { // output = {id, resolution, framerate, bitrate, keyFrameInterval} const output = await this.makeRPC(processor.locality.node, 'generate', [videoFormatStr(format), resolution, framerate, bitrate, keyFrameInterval]); + log.debug('Get output:', output); sessionConfig.id = output.id; + const outputVideo = { + format: toVideoFormat(processor.codecs?.encode?.[0]), + parameters: { + resolution: output.resolution, + framerate: output.framerate, + bitrate: output.bitrate, + keyFrameInterval: output.keyFrameInterval, + } + } this.sessions.set(output.id, session); // Create publication const publication = new Publication(output.id, 'video', sessionConfig.info); publication.domain = processor.domain; publication.locality = processor.locality; publication.participant = sessionConfig.participant; - const videoTrack = Object.assign({id: output.id}, sessionConfig.media.video); + const videoTrack = Object.assign({id: output.id}, outputVideo); publication.source.video.push(videoTrack); processor.outputs.video.push(videoTrack); const ret = Promise.resolve(output.id); @@ -183,18 +213,25 @@ class VideoController extends TypeController { } else { // Add input const inputId = sessionConfig.id; + if (!sessionConfig.media?.video) { + return Promise.reject('No media.video for mixer input'); + } + if (!sessionConfig.media.video.format) { + sessionConfig.media.video.format = + toVideoFormat(processor.codecs?.decode?.[0]); + } const inputConfig = { controller: this.selfId, publisher: sessionConfig.info?.owner || 'common', video: { - codec: videoFormatStr(sessionConfig.media?.video?.format) + codec: videoFormatStr(sessionConfig.media.video.format) }, }; await this.makeRPC(processor.locality.node, 'publish', [inputId, 'internal', inputConfig]); this.sessions.set(inputId, session); // Create subscription - const subscription = new Subscription(inputId, 'video', sessionConfig.info); + const subscription = new Subscription(sessionConfig.id, 'video', sessionConfig.info); subscription.domain = processor.domain; subscription.locality = processor.locality; subscription.participant = processor.participant; @@ -210,18 +247,15 @@ class VideoController extends TypeController { async removeSession(id, direction, reason) { if (this.sessions.has(id)) { - const rpcChannel = this.rpcChannel; const session = this.sessions.get(id); const processor = this.processors.get(session.processor); if (!processor) { throw new Error(`Processor for ${id} not found`); } - if (session.direction === 'in') { // Degenerate - rpcChannel.makeRPC(processor.locality.node, 'degenerate', [id]) + this.makeRPC(processor.locality.node, 'degenerate', [id]) .catch((e) => log.debug('degenerate:', e)); - const idx = processor.outputs.video.findIndex((track) => track.id === id); if (idx >= 0) { processor.outputs.video.splice(idx, 1); @@ -232,7 +266,7 @@ class VideoController extends TypeController { log.debug('session:', session); // Let cutoff do remove-input const inputId = session.media?.video?.from; - rpcChannel.makeRPC(processor.locality.node, 'unpublish', [inputId]) + this.makeRPC(processor.locality.node, 'unpublish', [inputId]) .catch((e) => log.debug('ignore unpublish callback')); const idx = processor.inputs.video.findIndex((track) => track.id === id); if (idx >= 0) { diff --git a/source/stream_service/dist.json b/source/stream_service/dist.json index 90c9492dc..6cc34c608 100644 --- a/source/stream_service/dist.json +++ b/source/stream_service/dist.json @@ -23,6 +23,7 @@ "../common/rpcChannel.js", "../common/rpcStarter.js", "../common/grpcTools.js", + "../common/formatUtil.js", "../protos/protoConfig.json", "../protos/*.proto", "../../scripts/release/initauth.js", diff --git a/source/stream_service/scheduler.js b/source/stream_service/scheduler.js index 75150c8b9..b6db2ebc4 100644 --- a/source/stream_service/scheduler.js +++ b/source/stream_service/scheduler.js @@ -19,6 +19,7 @@ function stringHash(str) { } const CHECK_INTERVAL = 60 * 1000; // 1 min +const MAX_RPC_FAILS = 5; class ServiceScheduler { static supportedMethods = [ @@ -37,6 +38,7 @@ class ServiceScheduler { 'removeProcessor', 'getProcessors', 'onSessionSignaling', + 'getParticipants', ]; constructor(rpcChannel, stateStores) { this.rpcChannel = rpcChannel; @@ -44,6 +46,7 @@ class ServiceScheduler { this.checkAliveTimer = setInterval(() => { this.checkService(); }, CHECK_INTERVAL); + this.failureCounts = new Map(); // Node => count } async scheduleService(req) { @@ -58,18 +61,24 @@ class ServiceScheduler { const scheduled = await this.stateStores.read('scheduleMaps', {_id: hash}); if (scheduled) { log.debug('scheduled:', scheduled); - return scheduled.node; - } else { - const node = serviceNodes[hash % serviceNodes.length].id; - log.debug('node:', node); - try { - const map = {_id: hash, node}; - await this.stateStores.create('scheduleMaps', map); - } catch (e) { - log.debug('Failed to update schedule map:', e?.message); + if (!serviceNodes.find((node) => (node.id === scheduled.node))) { + // Out of date + log.debug('Node out of date:', scheduled); + this.stateStores.delete('scheduleMaps', {_id: hash}) + .catch((e) => log.debug('Failed to delete scheduleMaps:', hash)); + } else { + return scheduled.node; } - return node; } + const node = serviceNodes[hash % serviceNodes.length].id; + log.debug('node:', node); + try { + const map = {_id: hash, node}; + await this.stateStores.create('scheduleMaps', map); + } catch (e) { + log.debug('Failed to update schedule map:', e?.message); + } + return node; } // Check service availability @@ -78,13 +87,18 @@ class ServiceScheduler { this.stateStores.readMany('streamEngineNodes', {}) .then(async (ret) => { const serviceNodes = ret.data || []; - const req = {id: 'non-existent'}; + const req = {query: {_id: 'non-existent'}}; for (const service of serviceNodes) { try { await this.rpcChannel.makeRPC( service.id, 'getNodes', [req]); + this.failureCounts.delete(service.id); } catch (e) { log.warn('Failed to call service node:', service.id); + if (!this.failureCounts.has(service.id)) { + this.failureCounts.set(service.id, 1); + } + this._handleCheckFailure(service.id); } } }).catch((e) => { @@ -97,20 +111,38 @@ class ServiceScheduler { const self = this; for (const method of ServiceScheduler.supportedMethods) { api[method] = async function (req, callback) { + let serviceNode = null; try { - const serviceNode = await self.scheduleService(req); + serviceNode = await self.scheduleService(req); log.debug('Schedule req:', req, serviceNode); const ret = await self.rpcChannel.makeRPC( serviceNode, method, [req]); log.debug('Schedule ret:', ret, serviceNode); callback('callback', ret); } catch (e) { + if (serviceNode) { + self._handleCheckFailure(serviceNode); + } callback('callback', 'error', e?.message || e); } }; } return api; } + + _handleCheckFailure(id) { + if (!this.failureCounts.has(id)) { + return; + } + let count = this.failureCounts.get(id) + 1; + if (count >= MAX_RPC_FAILS) { + this.stateStores.delete('streamEngineNodes', {id}) + .catch((e) => log.info('Fail to clean service node:', id)) + this.failureCounts.delete(id); + } else { + this.failureCounts.set(service.id, count); + } + } } module.exports.ServiceScheduler = ServiceScheduler; diff --git a/source/stream_service/stateTypes.js b/source/stream_service/stateTypes.js index fa1cc3ff5..5b27a487f 100644 --- a/source/stream_service/stateTypes.js +++ b/source/stream_service/stateTypes.js @@ -6,6 +6,8 @@ 'use strict'; +const {calcResolution} = require('./formatUtil'); + // Domain and its controller node class Domain { constructor(id, node) { @@ -139,15 +141,12 @@ class Publication { } } const videoOptional = info.optional?.video; + const params = videoOptional?.parameters; if (videoOptional) { // Check info optional format if (Array.isArray(videoOptional.format)) { optional.video.format = videoOptional.format; } - const params = videoOptional.params; - if (Array.isArray(params?.resolution)) { - optional.video.parameters.resolution = params.resolution; - } if (Array.isArray(params?.bitrate)) { optional.video.parameters.bitrate = params.bitrate; } @@ -158,11 +157,26 @@ class Publication { optional.video.parameters.keyFrameInterval = params.keyFrameInterval; } } + const generateResolutions = (base) => { + if (Array.isArray(params?.resolution)) { + const resolutions = params.resolution.map((res) => { + return calcResolution(res, base); + }).filter((res) => { + return res.width <= base.width && res.height <= base.height; + }); + return resolutions; + } + return []; + }; this.source.audio.forEach((track) => { tracks.push(Object.assign( {type: 'audio', optional: optional.audio}, track)); }); this.source.video.forEach((track) => { + if (track.parameters?.resolution) { + optional.video.parameters.resolution = + generateResolutions(track.parameters.resolution); + } tracks.push(Object.assign( {type: 'video', optional: optional.video}, track)); }); diff --git a/source/stream_service/streamService.js b/source/stream_service/streamService.js index 7844ee650..39e1a5a8e 100644 --- a/source/stream_service/streamService.js +++ b/source/stream_service/streamService.js @@ -259,7 +259,7 @@ function streamEngine(rpcClient) { // Link subscription tracks to their subscribed source const linkSubscription = async function (subscription) { // Linkup - log.debug('linkSubscription:', JSON.stringify(subscription)); + log.debug('linkSubscription:', subscription.id); // SubTrack => SubSource {audio, video, data} const links = new Map(); const updatePros = []; @@ -710,6 +710,8 @@ function streamEngine(rpcClient) { // Ongoing session const req = publishings.get(id) || subscribings.get(id); controllers[req.type].onSessionProgress(id, name, data); + } else if (controllers[data?.type]) { + controllers[data?.type].onSessionProgress(id, name, data); } else { // log.warn('Unknown SessionProgress:', id, name, data); } @@ -719,7 +721,7 @@ function streamEngine(rpcClient) { // Interface for portal signaling that.onSessionSignaling = function (req, callback) { log.debug('onSessionSignaling:', req); - const type = (publishings.get(req.id) || subscribings.get(req.id))?.type; + const type = req.type || 'webrtc'; controllers[type].onClientTransportSignaling(req.id, req.signaling) .then(() => { callback('callback', 'ok'); @@ -772,6 +774,16 @@ function streamEngine(rpcClient) { callback('callback', 'error', err && err.message); }); }; + that.getParticipants = function (filter, callback) { + log.debug('getParticipants:', filter, callback); + const query = filter?.query || {}; + stateStores.readMany('participants', query).then((ret) => { + callback('callback', ret); + }).catch((e) => { + log.debug('Get participants error:', e, e?.stack); + callback('callback', 'error', e?.message); + }); + }; // Interfaces for publication that.publish = function(req, callback) { log.debug('publish:', req.type, req); @@ -973,7 +985,7 @@ function streamEngine(rpcClient) { } const removed = await stateStores.delete('processors', {id: procId}); if (removed) { - await controllers[req.type].removeProcessor(procId); + await controllers[proc.type].removeProcessor(procId); } callback('callback', 'ok'); }).catch((err) => { @@ -1014,6 +1026,7 @@ function streamEngine(rpcClient) { await stateStores.delete('publications', {}); await stateStores.delete('subscriptions', {}); await stateStores.delete('sourceTracks', {}); + await stateStores.delete('processors', {}); } } catch (e) { log.debug('Clean state stores:', e);