Skip to content
This repository has been archived by the owner on Oct 8, 2024. It is now read-only.

Commit

Permalink
Fix stream API issues (#1313)
Browse files Browse the repository at this point in the history
  • Loading branch information
starwarfan authored Mar 6, 2023
1 parent 1246a33 commit d26b35e
Show file tree
Hide file tree
Showing 23 changed files with 559 additions and 207 deletions.
158 changes: 147 additions & 11 deletions doc/servermd/StreamAPI.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Because it's REST, management clients can be implemented by different programmin
To enable stream API, add experimental targets `stream-service` and `customized-agent` during packing. `stream-service` is the module that provide stream related API. `customized-agent` is the module that provide server side customization for stream related API.

After packing, the stream API configuration is in stream_service/service.toml.
Edit portal/portal.toml, set `stream_engine_name` to the same value of stream API configuration.
Edit portal/portal.toml, set `stream_engine_name` to the same value of stream API configuration(`service.name` or `scheduler.name` in stream_service/service.toml).
Edit management_api/management_api.toml, set `stream_engine` and `control_agent` to the same values of stream API configuration.

Start OWT service with updated configuration, stream API should be enabled.
Expand Down Expand Up @@ -236,7 +236,9 @@ request body:

object(ListQuery):
{
KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
query: {
KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
}
}

response body:
Expand Down Expand Up @@ -273,7 +275,9 @@ request body:
type: string(publishType), // E.g, "streaming", "video", ...
participant: string(participantId), // Or use domain name as participant ID.
media: object(MediaTrack) | object(MediaInfo),
info: object(TypeSpecificInfo)
info: object(TypeSpecificInfo),
connection: object(ConnectionInfo) | undefined, // For "streaming"
processor: string(processorId) | undefined, // For "audio", "video"
}
object(MediaTrack) { // For WebRTC publications
tracks: [ object(TrackInfo) ],
Expand All @@ -287,6 +291,11 @@ request body:
parameters: object(VideoParameters)
}
}
object(ConnectionInfo) { // For streaming publications
url: string(streamingUrl),
transportProtocol: "tcp" | "udp",
bufferSize: number(bufferSize),
},

For *object(TrackInfo)*, refers to [tracks in MediaOptions](../Client-Portal%20Protocol.md#331-participant-joins-a-room).
For *format* and *parameters*, refers to [REST API](RESTAPI.md#53-streams-StreamAPIsection53).
Expand Down Expand Up @@ -367,7 +376,9 @@ request body:

object(ListQuery):
{
KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
query: {
KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
}
}

response body:
Expand Down Expand Up @@ -404,7 +415,9 @@ request body:
type: string(subscribeType), // E.g, "streaming", "video", ...
participant: string(participantId), // Or use domain name as participant ID.
media: object(MediaTrack) | object(MediaInfo),
info: object(TypeSpecificInfo)
info: object(TypeSpecificInfo),
connection: object(ConnectionInfo) | undefined, // For "streaming", "recording"
processor: string(processorId) | undefined, // For "audio", "video", "analytics"
}
object(MediaTrack) { // For WebRTC subscriptions
tracks: [ object(TrackInfo) ],
Expand All @@ -413,12 +426,21 @@ request body:
audio: {
from: string(sourceAudioId), // Could be publication ID or source track ID
format: object(AudioFormat),
},
} | boolean(enable),
video: {
from: string(sourceVideoId), // Could be publication ID or source track ID
format: object(VideoFormat),
parameters: object(VideoParameters)
}
} | boolean(enable)
}
object(ConnectionInfo) {
container: "mkv" | "mp4" | undefined, // For "recording"
url: string(url) | undefined, // For "streaming"
algorithm: string(algorithmName) | undefined, // For "analytics"
video: { // For "analytics"
format: object(VideoFormat), // Analytics output video format
parameters: object(VideoParameters), // Analytics output video parameters
} | undefined,
}

For *object(TrackInfo)*, refers to [tracks in MediaOptions](../Client-Portal%20Protocol.md#331-participant-joins-a-room).
Expand Down Expand Up @@ -497,7 +519,9 @@ request body:

object(ListQuery):
{
KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
query: {
KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
}
}

response body:
Expand Down Expand Up @@ -577,6 +601,14 @@ request body:
id: string(analyticsId)
}

// For "sip" type processor
sip: {
server: string(serverHost),
user: string(sipUser),
password: string(sipPasswd)
},
stream: string(outgoingSipStream)

For *object(Region)*, refers to [REST API](RESTAPI.md#51-rooms-StreamAPIsection51).

response body:
Expand Down Expand Up @@ -605,7 +637,109 @@ response body:

**Empty**

## 5.5 Nodes {#StreamAPIsection5_5}
## 5.5 Participants {#StreamAPIsection5_5}
Description:<br>
Participants represents owner of publications and subscriptions in OWT server.<br>

Resources:

- /v1.1/stream-engine/participants
- /v1.1/stream-engine/participants/{participantId}

Data Model:

Object(Participant) {
id: string(ParticipantID),
domain: string(domainName), // For example, room ID
portal: string(portalId),
notifying: boolean(notifyOthers), // Notify other participants about join/leave.
}

### List Participants {#StreamAPIsection5_5_1}
**GET ${host}/v1.1/stream-engine/participants**
**GET ${host}/v1.1/stream-engine/participants/{participantId}**

Description:<br>
List participants in stream engine.<br>

request body:

| type | content |
|:-------------|:-------|
| json | object(ListQuery) |

**Note**: Definition of *ListQuery*.<br>

object(ListQuery):
{
query: {
KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
}
}

response body:

| type | content |
|:-------------|:-------|
| json | Object(ListResult) |

**Note**: Definition of *ListResult*.<br>

object(ListResult):
{
total: number(ListSize),
start: number(offsetInList),
data: [ object(Participant) ]
}

### Create Participant {#StreamAPIsection5_5_2}
**POST ${host}/v1.1/stream-engine/participants**

Description:<br>
Create a participant with configuration.<br>

request body:

| type | content |
|:-------------|:-------|
| json | object(ParticipantRequest) |

**Note**: Definition of *ParticipantRequest*.<br>

Object(ParticipantRequest) {
id: string(ParticipantID),
domain: string(domainName),
notifying: boolean(notifyOthers), // Notify other participants about join/leave.
}

response body:

| type | content |
|:-------------|:-------|
| json | object(IdObject) |

**Note**: Definition of *IdObject*.<br>

Object(IdObject) {
id: string(createdParticipantId)
}

### Delete Participant {#StreamAPIsection5_5_3}
**DELETE ${host}/v1.1/stream-engine/participants/{participantId}**

Description:<br>
Drop the specified participant, all related publications and subscriptions will be stopped as well.<br>

request body:

**Empty**

response body:

**Empty**


## 5.6 Nodes {#StreamAPIsection5_6}
Description:<br>
Node represents working process in stream engine.<br>

Expand All @@ -624,7 +758,7 @@ Data Model:
streamAddr: {ip: string(host), port: number(port)}
}

### List Nodes {#StreamAPIsection5_5_1}
### List Nodes {#StreamAPIsection5_6_1}
**GET ${host}/v1.1/stream-engine/nodes**

Description:<br>
Expand All @@ -640,7 +774,9 @@ request body:

object(ListQuery):
{
KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
query: {
KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
}
}

response body:
Expand Down
10 changes: 6 additions & 4 deletions source/agent/analytics/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {

this.connectionclose = () => {
destroyStream(options.controller, newStreamId);
// Notify stream engine if needed
const data = {id: newStreamId};
notifyStatus(options.controller, connectionId, 'onStreamRemoved', data);
}
inputs[connectionId] = true;

Expand Down Expand Up @@ -259,11 +262,10 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
}
}

// For Stream Engine, onSessionProgress(id, name, data)
generateStream(options.controller, newStreamId, streamInfo);
// Notify stream engine if needed
streamInfo.id = newStreamId;
notifyStatus(controller, connectionId, 'onNewStream', streamInfo);

// generateStream(options.controller, newStreamId, streamInfo);
notifyStatus(controller, connectionId, 'onStreamAdded', streamInfo);
} catch (e) {
log.error("Parse stream added data with error:", e);
}
Expand Down
10 changes: 6 additions & 4 deletions source/agent/conference/rpcRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,12 @@ const RpcRequest = function(rpcChannel, listener) {
};

that.addSipNode = function (workerNode) {
grpcNode[workerNode] = grpcTools.startClient(
'sip',
workerNode
);
if (enableGrpc) {
grpcNode[workerNode] = grpcTools.startClient(
'sip',
workerNode
);
}
}

return that;
Expand Down
Loading

0 comments on commit d26b35e

Please sign in to comment.