Skip to content

Commit

Permalink
Initial Support for Spacebar WebRTC
Browse files Browse the repository at this point in the history
Signed-off-by: Christopher Lentocha <[email protected]>
  • Loading branch information
CE1CECL committed Dec 28, 2023
1 parent 105b3de commit 1e0f8f9
Show file tree
Hide file tree
Showing 18 changed files with 66,652 additions and 949 deletions.
67,297 changes: 66,472 additions & 825 deletions assets/schemas.json

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,16 @@
"@spacebar/api": "dist/api",
"@spacebar/cdn": "dist/cdn",
"@spacebar/gateway": "dist/gateway",
"@spacebar/util": "dist/util"
"@spacebar/util": "dist/util",
"@spacebar/webrtc": "dist/webrtc"
},
"optionalDependencies": {
"erlpack": "^0.1.4",
"medooze-media-server": "^0.129.9",
"nodemailer-mailgun-transport": "^2.1.5",
"nodemailer-mailjet-transport": "github:n0script22/nodemailer-mailjet-transport",
"nodemailer-sendgrid-transport": "github:Maria-Golomb/nodemailer-sendgrid-transport",
"semantic-sdp": "^3.26.0",
"sqlite3": "^5.1.6"
}
}
10 changes: 9 additions & 1 deletion src/bundle/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import http from "http";
import * as Api from "@spacebar/api";
import * as Gateway from "@spacebar/gateway";
import { CDNServer } from "@spacebar/cdn";
import * as Webrtc from "@spacebar/webrtc";
import express from "express";
import { green, bold } from "picocolors";
import { Config, initDatabase, Sentry } from "@spacebar/util";
Expand All @@ -36,12 +37,14 @@ server.on("request", app);
const api = new Api.SpacebarServer({ server, port, production, app });
const cdn = new CDNServer({ server, port, production, app });
const gateway = new Gateway.Server({ server, port, production });
const webrtc = new Webrtc.Server({ server, port, production });

process.on("SIGTERM", async () => {
console.log("Shutting down due to SIGTERM");
await gateway.stop();
await cdn.stop();
await api.stop();
await webrtc.stop();
server.close();
Sentry.close();
});
Expand All @@ -54,7 +57,12 @@ async function main() {
await new Promise((resolve) =>
server.listen({ port }, () => resolve(undefined)),
);
await Promise.all([api.start(), cdn.start(), gateway.start()]);
await Promise.all([
api.start(),
cdn.start(),
gateway.start(),
webrtc.start(),
]);

Sentry.errorHandler(app);

Expand Down
1 change: 1 addition & 0 deletions src/gateway/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export class Server {
}

this.server.on("upgrade", (request, socket, head) => {
if (request.url?.includes("voice")) return;
this.ws.handleUpgrade(request, socket, head, (socket) => {
this.ws.emit("connection", socket, request);
});
Expand Down
2 changes: 1 addition & 1 deletion src/gateway/opcodes/VoiceStateUpdate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ export async function onVoiceStateUpdate(this: WebSocket, data: Payload) {
guild_id: voiceState.guild_id,
endpoint: guildRegion.endpoint,
},
guild_id: voiceState.guild_id,
user_id: this.user_id,
} as VoiceServerUpdateEvent);
}
}
4 changes: 2 additions & 2 deletions src/gateway/util/Constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

// import { VoiceOPCodes } from "@spacebar/webrtc";
import { VoiceOPCodes } from "@spacebar/webrtc";

export enum OPCODES {
Dispatch = 0,
Expand Down Expand Up @@ -63,7 +63,7 @@ export enum CLOSECODES {
}

export interface Payload {
op: OPCODES /* | VoiceOPCodes */;
op: OPCODES | VoiceOPCodes;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
d?: any;
s?: number;
Expand Down
4 changes: 2 additions & 2 deletions src/gateway/util/WebSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { Intents, ListenEventOpts, Permissions } from "@spacebar/util";
import WS from "ws";
import { Deflate, Inflate } from "fast-zlib";
import { Capabilities } from "./Capabilities";
// import { Client } from "@spacebar/webrtc";
import { Client } from "@spacebar/webrtc";

export interface WebSocket extends WS {
version: number;
Expand All @@ -42,5 +42,5 @@ export interface WebSocket extends WS {
member_events: Record<string, () => unknown>;
listen_options: ListenEventOpts;
capabilities?: Capabilities;
// client?: Client;
webrtcClient?: Client;
}
2 changes: 1 addition & 1 deletion src/util/entities/VoiceState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export class VoiceState extends BaseClass {
@Column({ nullable: true })
self_stream?: boolean;

@Column()
@Column({ nullable: true })
self_video: boolean;

@Column()
Expand Down
20 changes: 11 additions & 9 deletions src/webrtc/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import { closeDatabase, Config, initDatabase, initEvent } from "@spacebar/util";
import dotenv from "dotenv";
import http from "http";
import MediaServer from "medooze-media-server";
import ws from "ws";
import { Connection } from "./events/Connection";
dotenv.config();
Expand Down Expand Up @@ -48,18 +49,18 @@ export class Server {
});
}

// this.server.on("upgrade", (request, socket, head) => {
// if (!request.url?.includes("voice")) return;
// this.ws.handleUpgrade(request, socket, head, (socket) => {
// // @ts-ignore
// socket.server = this;
// this.ws.emit("connection", socket, request);
// });
// });
this.server.on("upgrade", (request, socket, head) => {
if (!request.url?.includes("voice")) return;
this.ws.handleUpgrade(request, socket, head, (socket) => {
// @ts-ignore
socket.server = this;
this.ws.emit("connection", socket, request);
});
});

this.ws = new ws.Server({
maxPayload: 1024 * 1024 * 100,
server: this.server,
noServer: true,
});
this.ws.on("connection", Connection);
this.ws.on("error", console.error);
Expand All @@ -77,6 +78,7 @@ export class Server {

async stop() {
closeDatabase();
MediaServer.terminate();
this.server.close();
}
}
5 changes: 1 addition & 4 deletions src/webrtc/events/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ const PayloadSchema = {

export async function onMessage(this: WebSocket, buffer: Buffer) {
try {
var data: Payload = JSON.parse(buffer.toString());
const data: Payload = JSON.parse(buffer.toString());
if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id)
return this.close(CLOSECODES.Not_authenticated);

// @ts-ignore
const OPCodeHandler = OPCodeHandlers[data.op];
if (!OPCodeHandler) {
// @ts-ignore
console.error("[WebRTC] Unkown opcode " + VoiceOPCodes[data.op]);
// TODO: if all opcodes are implemented comment this out:
// this.close(CloseCodes.Unknown_opcode);
Expand All @@ -49,7 +47,6 @@ export async function onMessage(this: WebSocket, buffer: Buffer) {
data.op as VoiceOPCodes,
)
) {
// @ts-ignore
console.log("[WebRTC] Opcode " + VoiceOPCodes[data.op]);
}

Expand Down
4 changes: 2 additions & 2 deletions src/webrtc/opcodes/BackendVersion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { Send, WebSocket } from "@spacebar/gateway";
import { VoiceOPCodes } from "../util";

export async function onBackendVersion(this: WebSocket, data: Payload) {
export async function onBackendVersion(this: WebSocket) {
await Send(this, {
op: VoiceOPCodes.VOICE_BACKEND_VERSION,
d: { voice: "0.8.43", rtc_worker: "0.3.26" },
Expand Down
22 changes: 15 additions & 7 deletions src/webrtc/opcodes/Identify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
} from "@spacebar/util";
import { endpoint, getClients, VoiceOPCodes, PublicIP } from "@spacebar/webrtc";
import SemanticSDP from "semantic-sdp";
const defaultSDP = require("./sdp.json");
import defaultSDP from "./sdp.json";

export async function onIdentify(this: WebSocket, data: Payload) {
clearTimeout(this.readyTimeout);
Expand All @@ -47,7 +47,7 @@ export async function onIdentify(this: WebSocket, data: Payload) {
}),
);

this.client = {
this.webrtcClient = {
websocket: this,
out: {
tracks: new Map(),
Expand All @@ -61,24 +61,32 @@ export async function onIdentify(this: WebSocket, data: Payload) {
channel_id: voiceState.channel_id,
};

const clients = getClients(voiceState.channel_id)!;
clients.add(this.client);
const clients = getClients(voiceState.channel_id);
clients.add(this.webrtcClient);

this.on("close", () => {
clients.delete(this.client!);
if (this.webrtcClient) clients.delete(this.webrtcClient);
});

await Send(this, {
op: VoiceOPCodes.READY,
d: {
streams: [
// { type: "video", ssrc: this.ssrc + 1, rtx_ssrc: this.ssrc + 2, rid: "100", quality: 100, active: false }
// {
// type: "video",
// ssrc: this.webrtcClient.in.video_ssrc,
// rtx_ssrc: this.webrtcClient.in.rtx_ssrc,
// rid: "100",
// quality: 100,
// active: false,
// },
],
ssrc: -1,
ssrc: 1,
port: endpoint.getLocalPort(),
modes: [
"aead_aes256_gcm_rtpsize",
"aead_aes256_gcm",
"aead_xchacha20_poly1305_rtpsize",
"xsalsa20_poly1305_lite_rtpsize",
"xsalsa20_poly1305_lite",
"xsalsa20_poly1305_suffix",
Expand Down
54 changes: 35 additions & 19 deletions src/webrtc/opcodes/SelectProtocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,28 @@
import { Payload, Send, WebSocket } from "@spacebar/gateway";
import { SelectProtocolSchema, validateSchema } from "@spacebar/util";
import { PublicIP, VoiceOPCodes, endpoint } from "@spacebar/webrtc";
import SemanticSDP, { MediaInfo, SDPInfo } from "semantic-sdp";
import MediaServer from "medooze-media-server";
import SemanticSDP, { MediaInfo } from "semantic-sdp";
import DefaultSDP from "./sdp.json";

export async function onSelectProtocol(this: WebSocket, payload: Payload) {
if (!this.client) return;
if (!this.webrtcClient) return;

const data = validateSchema(
"SelectProtocolSchema",
payload.d,
) as SelectProtocolSchema;

const offer = SemanticSDP.SDPInfo.parse("m=audio\n" + data.sdp!);
this.client.sdp!.setICE(offer.getICE());
this.client.sdp!.setDTLS(offer.getDTLS());
const offer = SemanticSDP.SDPInfo.parse("m=audio\n" + data.sdp);
//@ts-ignore
offer.getMedias()[0].type = "audio";
this.webrtcClient.sdp.setICE(offer.getICE());
this.webrtcClient.sdp.setDTLS(offer.getDTLS());

const transport = endpoint.createTransport(this.client.sdp!);
this.client.transport = transport;
transport.setRemoteProperties(this.client.sdp!);
transport.setLocalProperties(this.client.sdp!);
const transport = endpoint.createTransport(this.webrtcClient.sdp);
this.webrtcClient.transport = transport;
transport.setRemoteProperties(this.webrtcClient.sdp);
transport.setLocalProperties(this.webrtcClient.sdp);

const dtls = transport.getLocalDTLSInfo();
const ice = transport.getLocalICEInfo();
Expand All @@ -45,21 +49,33 @@ export async function onSelectProtocol(this: WebSocket, payload: Payload) {
const candidates = transport.getLocalCandidates();
const candidate = candidates[0];

// discord answer
/*
m=audio 50026 ICE/SDP\n
a=fingerprint:sha-256 4A:79:94:16:44:3F:BD:05:41:5A:C7:20:F3:12:54:70:00:73:5D:33:00:2D:2C:80:9B:39:E1:9F:2D:A7:49:87\n
c=IN IP4 66.22.206.174\n
a=rtcp:50026\n
a=ice-ufrag:XxnE\n
a=ice-pwd:GLQatPT3Q9dCZVVgVf3J1F\n
a=fingerprint:sha-256 4A:79:94:16:44:3F:BD:05:41:5A:C7:20:F3:12:54:70:00:73:5D:33:00:2D:2C:80:9B:39:E1:9F:2D:A7:49:87\n
a=candidate:1 1 UDP 4261412862 66.22.206.174 50026 typ host\n
*/

const answer =
`m=audio ${port} ICE/SDP` +
`a=fingerprint:${fingerprint}` +
`c=IN IP4 ${PublicIP}` +
`a=rtcp:${port}` +
`a=ice-ufrag:${ice.getUfrag()}` +
`a=ice-pwd:${ice.getPwd()}` +
`a=fingerprint:${fingerprint}` +
`a=candidate:1 1 ${candidate.getTransport()} ${candidate.getFoundation()} ${candidate.getAddress()} ${candidate.getPort()} typ host`;
`m=audio ${port} ICE/SDP\n` +
`a=fingerprint:${fingerprint}\n` +
`c=IN IP4 ${PublicIP}\n` +
`a=rtcp:${port}\n` +
`a=ice-ufrag:${ice.getUfrag()}\n` +
`a=ice-pwd:${ice.getPwd()}\n` +
`a=fingerprint:${fingerprint}\n` +
`a=candidate:1 1 ${candidate.getTransport()} ${candidate.getFoundation()} ${candidate.getAddress()} ${candidate.getPort()} typ host\n`;

await Send(this, {
op: VoiceOPCodes.SESSION_DESCRIPTION,
d: {
video_codec: "H264",
sdp: answer,
video_codec: "VP8",
sdp: answer.toString(),
media_session_id: this.session_id,
audio_codec: "opus",
},
Expand Down
9 changes: 5 additions & 4 deletions src/webrtc/opcodes/Speaking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import { getClients, VoiceOPCodes } from "../util";
// {"speaking":1,"delay":5,"ssrc":2805246727}

export async function onSpeaking(this: WebSocket, data: Payload) {
if (!this.client) return;
if (!this.webrtcClient) return;

getClients(this.client.channel_id).forEach((client) => {
if (client === this.client) return;
const ssrc = this.client!.out.tracks.get(client.websocket.user_id);
getClients(this.webrtcClient.channel_id).forEach((client) => {
if (client === this.webrtcClient) return;
if (!this.webrtcClient) return;
const ssrc = this.webrtcClient.out.tracks.get(client.websocket.user_id);

Send(client.websocket, {
op: VoiceOPCodes.SPEAKING,
Expand Down
Loading

0 comments on commit 1e0f8f9

Please sign in to comment.