diff --git a/ts/@live-compositor/core/src/compositor.ts b/ts/@live-compositor/core/src/compositor.ts index 31cf12a76..db26c6ba5 100644 --- a/ts/@live-compositor/core/src/compositor.ts +++ b/ts/@live-compositor/core/src/compositor.ts @@ -35,6 +35,7 @@ export class LiveCompositor { const apiRequest = intoRegisterOutput(request, output.scene()); const result = await this.api.registerOutput(outputId, apiRequest); this.outputs[outputId] = output; + await output.ready(); return result; } @@ -45,13 +46,19 @@ export class LiveCompositor { } public async registerInput(inputId: string, request: RegisterInput): Promise { - const result = await this.api.registerInput(inputId, intoRegisterInput(request)); - this.store.addInput({ inputId }); - return result; + return this.store.runBlocking(async updateStore => { + const result = await this.api.registerInput(inputId, intoRegisterInput(request)); + updateStore({ type: 'add_input', input: { inputId } }); + return result; + }); } public async unregisterInput(inputId: string): Promise { - return this.api.unregisterInput(inputId); + return this.store.runBlocking(async updateStore => { + const result = this.api.unregisterInput(inputId); + updateStore({ type: 'remove_input', inputId }); + return result; + }); } public async registerShader(shaderId: string, request: Api.ShaderSpec): Promise { diff --git a/ts/@live-compositor/core/src/event.ts b/ts/@live-compositor/core/src/event.ts index d662f1366..a31145002 100644 --- a/ts/@live-compositor/core/src/event.ts +++ b/ts/@live-compositor/core/src/event.ts @@ -7,17 +7,35 @@ export function onCompositorEvent(store: InstanceContextStore, rawEvent: unknown if (!event) { return; } else if (event.type === CompositorEventType.VIDEO_INPUT_DELIVERED) { - store.updateInput({ inputId: event.inputId, videoState: 'ready' }); + store.dispatchUpdate({ + type: 'update_input', + input: { inputId: event.inputId, videoState: 'ready' }, + }); } else if (event.type === CompositorEventType.VIDEO_INPUT_PLAYING) { - store.updateInput({ inputId: event.inputId, videoState: 'playing' }); + store.dispatchUpdate({ + type: 'update_input', + input: { inputId: event.inputId, videoState: 'playing' }, + }); } else if (event.type === CompositorEventType.VIDEO_INPUT_EOS) { - store.updateInput({ inputId: event.inputId, videoState: 'finished' }); + store.dispatchUpdate({ + type: 'update_input', + input: { inputId: event.inputId, videoState: 'finished' }, + }); } else if (event.type === CompositorEventType.AUDIO_INPUT_DELIVERED) { - store.updateInput({ inputId: event.inputId, audioState: 'ready' }); + store.dispatchUpdate({ + type: 'update_input', + input: { inputId: event.inputId, audioState: 'ready' }, + }); } else if (event.type === CompositorEventType.AUDIO_INPUT_PLAYING) { - store.updateInput({ inputId: event.inputId, audioState: 'playing' }); + store.dispatchUpdate({ + type: 'update_input', + input: { inputId: event.inputId, audioState: 'playing' }, + }); } else if (event.type === CompositorEventType.AUDIO_INPUT_EOS) { - store.updateInput({ inputId: event.inputId, audioState: 'finished' }); + store.dispatchUpdate({ + type: 'update_input', + input: { inputId: event.inputId, audioState: 'finished' }, + }); } } diff --git a/ts/@live-compositor/core/src/output.ts b/ts/@live-compositor/core/src/output.ts index 9dc52ff33..e6f6f060a 100644 --- a/ts/@live-compositor/core/src/output.ts +++ b/ts/@live-compositor/core/src/output.ts @@ -13,8 +13,8 @@ class Output { outputId: string; outputCtx: OutputContext; outputShutdownStateStore: OutputShutdownStateStore; - initialized: boolean = false; + shouldUpdateWhenReady: boolean = false; throttledUpdate: () => void; videoRenderer?: Renderer; initialAudioConfig?: Outputs.AudioInputsConfiguration; @@ -28,9 +28,16 @@ class Output { this.api = api; this.outputId = outputId; this.outputShutdownStateStore = new OutputShutdownStateStore(); - this.initialAudioConfig = registerRequest.audio?.initial; + this.shouldUpdateWhenReady = false; + this.throttledUpdate = () => { + this.shouldUpdateWhenReady = true; + }; + + if (registerRequest.audio) { + this.initialAudioConfig = registerRequest.audio.initial ?? { inputs: [] }; + } - const onUpdate = () => this.throttledUpdate?.(); + const onUpdate = () => this.throttledUpdate(); this.outputCtx = new _liveCompositorInternals.OutputContext(onUpdate, !!registerRequest.audio); if (registerRequest.video) { @@ -47,10 +54,6 @@ class Output { idPrefix: `${outputId}-`, }); } - - this.throttledUpdate = throttle(async () => { - await api.updateScene(this.outputId, this.scene()); - }, 30); } public scene(): { video?: Api.Video; audio?: Api.Audio } { @@ -67,6 +70,15 @@ class Output { // callback before it is called this.outputShutdownStateStore.close(); } + + public async ready() { + this.throttledUpdate = throttle(async () => { + await this.api.updateScene(this.outputId, this.scene()); + }, 30); + if (this.shouldUpdateWhenReady) { + this.throttledUpdate(); + } + } } // External store to share shutdown information between React tree diff --git a/ts/examples/src/audio.tsx b/ts/examples/src/audio.tsx index 7b44c00b9..71e924ac4 100644 --- a/ts/examples/src/audio.tsx +++ b/ts/examples/src/audio.tsx @@ -75,7 +75,6 @@ async function run() { type: 'opus', channels: 'stereo', }, - initial: { inputs: [] }, }, }); gstReceiveTcpStream('127.0.0.1', 8001); diff --git a/ts/examples/src/dynamic-inputs.tsx b/ts/examples/src/dynamic-inputs.tsx index 4827195b9..db4210f69 100644 --- a/ts/examples/src/dynamic-inputs.tsx +++ b/ts/examples/src/dynamic-inputs.tsx @@ -32,8 +32,10 @@ function InputTile({ inputId }: { inputId: string }) { - - Input ID: {inputId} + + + Input ID: {inputId} + ); diff --git a/ts/examples/src/dynamic-outputs.tsx b/ts/examples/src/dynamic-outputs.tsx new file mode 100644 index 000000000..ce3ee1cb2 --- /dev/null +++ b/ts/examples/src/dynamic-outputs.tsx @@ -0,0 +1,120 @@ +import LiveCompositor from '@live-compositor/node'; +import { Text, InputStream, Tiles, Rescaler, View, useInputStreams } from 'live-compositor'; +import { downloadAllAssets, gstReceiveTcpStream, sleep } from './utils'; +import path from 'path'; +import fs from 'fs-extra'; + +function ExampleApp() { + const inputs = useInputStreams(); + return ( + + {Object.values(inputs).map(input => ( + + ))} + + ); +} + +function InputTile({ inputId }: { inputId: string }) { + return ( + + + + + + + Input ID: {inputId} + + + + ); +} + +async function run() { + await fs.mkdirp(path.join(__dirname, '../.workingdir')); + await downloadAllAssets(); + const compositor = await LiveCompositor.create(); + + const RESOLUTION = { + width: 1920, + height: 1080, + } as const; + const VIDEO_ENCODER_OPTS = { + type: 'ffmpeg_h264', + preset: 'ultrafast', + } as const; + + await compositor.registerOutput('output_stream', { + type: 'rtp_stream', + port: 8001, + transportProtocol: 'tcp_server', + video: { + encoder: VIDEO_ENCODER_OPTS, + resolution: RESOLUTION, + root: , + }, + audio: { + encoder: { + type: 'opus', + channels: 'stereo', + }, + }, + }); + gstReceiveTcpStream('127.0.0.1', 8001); + await compositor.registerOutput('output_recording', { + type: 'mp4', + serverPath: path.join(__dirname, '../.workingdir/dynamic_outputs_recording.mp4'), + video: { + encoder: VIDEO_ENCODER_OPTS, + resolution: RESOLUTION, + root: , + }, + audio: { + encoder: { + type: 'aac', + channels: 'stereo', + }, + }, + }); + + await compositor.registerInput('input_1', { + type: 'mp4', + serverPath: path.join(__dirname, '../.assets/BigBuckBunny.mp4'), + }); + console.log( + 'Start LiveCompositor pipeline with single input ("input_1") and two outputs (RTP "output_stream" and MP4 "output_recording").' + ); + await compositor.start(); + + await sleep(10_000); + console.log('Connect new input ("input_2") and start new output to MP4 "output_recording_part2"'); + await compositor.registerInput('input_2', { + type: 'mp4', + serverPath: path.join(__dirname, '../.assets/ElephantsDream.mp4'), + }); + await compositor.registerOutput('output_recording_part2', { + type: 'mp4', + serverPath: path.join(__dirname, '../.workingdir/dynamic_outputs_recording_10s.mp4'), + video: { + encoder: VIDEO_ENCODER_OPTS, + resolution: RESOLUTION, + root: , + }, + audio: { + encoder: { + type: 'aac', + channels: 'stereo', + }, + }, + }); + + await sleep(10_000); + console.log('Stop output "output_recording"'); + await compositor.unregisterOutput('output_recording'); + + await sleep(10_000); + console.log('Stop all remaining outputs.'); + await compositor.unregisterOutput('output_recording_part2'); + await compositor.unregisterOutput('output_stream'); +} +run(); diff --git a/ts/examples/src/news-ticker.tsx b/ts/examples/src/news-ticker.tsx deleted file mode 100644 index d5fcd2992..000000000 --- a/ts/examples/src/news-ticker.tsx +++ /dev/null @@ -1,157 +0,0 @@ -import LiveCompositor from '@live-compositor/node'; -import { View, Text } from 'live-compositor'; -import { useEffect, useState, useId } from 'react'; -import { ffplayStartPlayerAsync, sleep } from './utils'; - -const EXAMPLE_TEXTS = [ - 'Example text', - 'Another example text', - 'Example text', - 'Another example text', - 'Example text', - 'Another example text', - 'Example text', - 'Another example text', - 'Example text', - 'Another example text', - 'Example text', - 'Another example text', - 'Example text', - 'Another example text', - 'Example text', - 'Another example text', - 'Example text', - 'Another example text', -]; - -const TEXT_CHUNK_SIZE = 400; - -type NewsTickerProps = { - text: string[]; - width: number; - /** Transition time to move entire width */ - durationMs: number; -}; - -function NewsTicker(props: NewsTickerProps) { - const [tickerState, setTickerState] = useState<{ - offset: number; - lastTextIndex: number; - chunks: { text: string; id: number }[]; - }>({ - offset: 0, - lastTextIndex: 0, - chunks: [], - }); - - useEffect(() => { - const timeout = setTimeout(() => { - const offsetForTimeout = (1000 / props.durationMs) * props.width; - - const chunksToRemove = Math.floor(tickerState.offset / TEXT_CHUNK_SIZE); - const offset = tickerState.offset - chunksToRemove * TEXT_CHUNK_SIZE + offsetForTimeout; - - const chunks = tickerState.chunks.slice(chunksToRemove); - - const chunksToAdd = Math.ceil( - (props.width * 2 - chunks.length * TEXT_CHUNK_SIZE) / TEXT_CHUNK_SIZE - ); - - let lastTextIndex = tickerState.lastTextIndex; - for (let i = 0; i < chunksToAdd; i++) { - chunks.push({ - text: props.text[lastTextIndex % props.text.length], - id: lastTextIndex, - }); - lastTextIndex = lastTextIndex + 1; - } - - setTickerState({ - chunks, - offset, - lastTextIndex, - }); - }, 1000); - return () => { - clearInterval(timeout); - }; - }, [tickerState]); - - return ( - - {tickerState.chunks.map(({ text, id }, index) => { - return ( - - ); - })} - - ); -} - -function NewsTickerItem(props: { text: string; offset: number }) { - const id = useId(); - return ( - - - {props.text} - - - ); -} - -function ExampleApp() { - return ( - - - - - - - - - - - - ); -} - -async function run() { - const compositor = await LiveCompositor.create(); - - ffplayStartPlayerAsync('127.0.0.1', 8001); - await sleep(2000); - - await compositor.registerOutput('output_1', { - type: 'rtp_stream', - port: 8001, - ip: '127.0.0.1', - transportProtocol: 'udp', - video: { - encoder: { - type: 'ffmpeg_h264', - preset: 'ultrafast', - }, - resolution: { - width: 1920, - height: 1080, - }, - root: , - }, - }); - - await compositor.start(); -} -run(); diff --git a/ts/examples/src/utils.ts b/ts/examples/src/utils.ts index 4392e2262..f7f16aafa 100644 --- a/ts/examples/src/utils.ts +++ b/ts/examples/src/utils.ts @@ -66,7 +66,7 @@ interface SpawnPromise extends Promise { function spawn(command: string, args: string[]): SpawnPromise { console.log(`Spawning: ${command} ${args.join(' ')}`); const child = nodeSpawn(command, args, { - stdio: 'ignore', + stdio: process.env.DEBUG ? 'inherit' : 'ignore', }); return new Promise((resolve, reject) => { @@ -123,11 +123,11 @@ a=rtcp-mux ); } -export async function sleep(timeout_ms: number): Promise { +export async function sleep(timeoutMs: number): Promise { await new Promise(res => { setTimeout(() => { res(); - }, timeout_ms); + }, timeoutMs); }); } diff --git a/ts/live-compositor/src/components/Text.ts b/ts/live-compositor/src/components/Text.ts index be93dcc9c..4dfe6015b 100644 --- a/ts/live-compositor/src/components/Text.ts +++ b/ts/live-compositor/src/components/Text.ts @@ -1,5 +1,6 @@ import * as Api from '../api'; import { createCompositorComponent, SceneComponent } from '../component'; +import { intoApiRgbaColor } from './common'; export type TextProps = { children?: (string | number)[] | string | number; @@ -38,13 +39,13 @@ export type TextProps = { */ lineHeight?: number; /** - * (**default=`"#FFFFFFFF"`**) Font color in `#RRGGBBAA` format. + * (**default=`"#FFFFFFFF"`**) Font color in `#RRGGBBAA` or `#RRGGBB` format. */ - colorRgba?: Api.RGBAColor; + color?: string; /** - * (**default=`"#00000000"`**) Background color in `#RRGGBBAA` format. + * (**default=`"#00000000"`**) Background color in `#RRGGBBAA` or `#RRGGBB` format. */ - backgroundColorRgba?: Api.RGBAColor; + backgroundColor?: string; /** * (**default=`"Verdana"`**) Font family. Provide [family-name](https://www.w3.org/TR/2018/REC-css-fonts-3-20180920/#family-name-value) * for a specific font. "generic-family" values like e.g. "sans-serif" will not work. @@ -81,8 +82,8 @@ function sceneBuilder(props: TextProps, children: SceneComponent[]): Api.Compone max_height: props.maxHeight, font_size: props.fontSize, line_height: props.lineHeight, - color_rgba: props.colorRgba, - background_color_rgba: props.backgroundColorRgba, + color_rgba: props.color && intoApiRgbaColor(props.color), + background_color_rgba: props.backgroundColor && intoApiRgbaColor(props.backgroundColor), font_family: props.fontFamily, style: props.style, align: props.align, diff --git a/ts/live-compositor/src/components/Tiles.ts b/ts/live-compositor/src/components/Tiles.ts index fb9e98b0b..0872f6757 100644 --- a/ts/live-compositor/src/components/Tiles.ts +++ b/ts/live-compositor/src/components/Tiles.ts @@ -24,9 +24,9 @@ export type TilesProps = { */ height?: number; /** - * (**default=`"#00000000"`**) Background color in a `"#RRGGBBAA"` format. + * (**default=`"#00000000"`**) Background color in a `"#RRGGBBAA"` or `"#RRGGBB"` format. */ - backgroundColorRgba?: Api.RGBAColor; + backgroundColor?: string; /** * (**default=`"16:9"`**) Aspect ratio of a tile in `"W:H"` format, where W and H are integers. */ @@ -63,7 +63,7 @@ function sceneBuilder(props: TilesProps, children: SceneComponent[]): Api.Compon children: children.map(sceneComponentIntoApi), width: props.width, height: props.height, - background_color_rgba: props.backgroundColorRgba, + background_color_rgba: props.backgroundColor, tile_aspect_ratio: props.tileAspectRatio, margin: props.margin, padding: props.padding, diff --git a/ts/live-compositor/src/components/View.ts b/ts/live-compositor/src/components/View.ts index 0668b643a..025465d45 100644 --- a/ts/live-compositor/src/components/View.ts +++ b/ts/live-compositor/src/components/View.ts @@ -1,6 +1,6 @@ import * as Api from '../api'; import { createCompositorComponent, SceneComponent, sceneComponentIntoApi } from '../component'; -import { intoApiTransition, Transition } from './common'; +import { intoApiRgbaColor, intoApiTransition, Transition } from './common'; export type ViewProps = { /** @@ -65,9 +65,9 @@ export type ViewProps = { */ overflow?: Api.Overflow; /** - * (**default=`"#00000000"`**) Background color in a `"#RRGGBBAA"` format. + * (**default=`"#00000000"`**) Background color in a `"#RRGGBBAA"` or `"#RRGGBB"`format. */ - backgroundColorRgba?: Api.RGBAColor; + backgroundColor?: string; }; const View = createCompositorComponent(sceneBuilder); @@ -91,7 +91,7 @@ function sceneBuilder(props: ViewProps, children: SceneComponent[]): Api.Compone transition: props.transition && intoApiTransition(props.transition), overflow: props.overflow, - background_color_rgba: props.backgroundColorRgba, + background_color_rgba: props.backgroundColor && intoApiRgbaColor(props.backgroundColor), }; } diff --git a/ts/live-compositor/src/components/common.ts b/ts/live-compositor/src/components/common.ts index 44278b633..09700d5a5 100644 --- a/ts/live-compositor/src/components/common.ts +++ b/ts/live-compositor/src/components/common.ts @@ -27,10 +27,6 @@ export type EasingFunction = | { functionName: 'bounce' } | { functionName: 'cubic-bezier'; - /** - * @minItems 4 - * @maxItems 4 - */ points: [number, number, number, number]; }; @@ -51,3 +47,13 @@ export function intoApiEasingFunction(easing: EasingFunction): Api.EasingFunctio throw new Error(`Invalid LiveCompositor.EasingFunction ${easing}`); } } + +const rgbRegExp = /^#[0-9a-fA-F]{6}$/; + +export function intoApiRgbaColor(color: string): Api.RGBAColor { + if (rgbRegExp.test(color)) { + return `${color}FF`; + } else { + return color; + } +} diff --git a/ts/live-compositor/src/context/instanceContextStore.ts b/ts/live-compositor/src/context/instanceContextStore.ts index febebe7e9..61dd619a3 100644 --- a/ts/live-compositor/src/context/instanceContextStore.ts +++ b/ts/live-compositor/src/context/instanceContextStore.ts @@ -1,11 +1,18 @@ import * as Api from '../api'; +export type StreamState = 'ready' | 'playing' | 'finished'; + export type InputStreamInfo = { inputId: string; - videoState?: 'ready' | 'playing' | 'finished'; - audioState?: 'ready' | 'playing' | 'finished'; + videoState?: StreamState; + audioState?: StreamState; }; +type UpdateAction = + | { type: 'update_input'; input: InputStreamInfo } + | { type: 'add_input'; input: InputStreamInfo } + | { type: 'remove_input'; inputId: string }; + type InstanceContext = { inputs: Record; }; @@ -15,25 +22,64 @@ export class InstanceContextStore { inputs: {}, }; private onChangeCallbacks: Set<() => void> = new Set(); + private eventQueue?: UpdateAction[]; + + /** + * Apply update immediately if there are no `runBlocking` calls in progress. + * Otherwise wait for `runBlocking call to finish`. + */ + public dispatchUpdate(update: UpdateAction) { + if (this.eventQueue) { + this.eventQueue.push(update); + } else { + this.applyUpdate(update); + } + } - public addInput(input: InputStreamInfo) { + /** + * No dispatch events will be processed while `fn` function executes. + * Argument passed to the callback should be used instead of `this.dispatchUpdate` + * to update the store from inside `fn` + */ + public async runBlocking( + fn: (update: (action: UpdateAction) => void) => Promise + ): Promise { + this.eventQueue = []; + try { + return await fn(a => this.applyUpdate(a)); + } finally { + for (const event of this.eventQueue) { + this.applyUpdate(event); + } + this.eventQueue = undefined; + } + } + + private applyUpdate(update: UpdateAction) { + if (update.type === 'add_input') { + this.addInput(update.input); + } else if (update.type === 'update_input') { + this.updateInput(update.input); + } else if (update.type === 'remove_input') { + this.removeInput(update.inputId); + } + } + + private addInput(input: InputStreamInfo) { if (this.context.inputs[input.inputId]) { - console.warn(`Input ${input.inputId} already exists. Overriding old context.`); + console.warn(`Adding input ${input.inputId}. Input already exists.`); } this.context = { ...this.context, - inputs: { - ...this.context.inputs, - [input.inputId]: input, - }, + inputs: { ...this.context.inputs, [input.inputId]: input }, }; this.signalUpdate(); } - public updateInput(update: Partial & { inputId: string }) { + private updateInput(update: InputStreamInfo) { const oldInput = this.context.inputs[update.inputId]; if (!oldInput) { - console.warn('Trying to update input that does not exists.'); + console.warn(`Updating input ${update.inputId}. Input does not exist.`); return; } this.context = { @@ -46,6 +92,13 @@ export class InstanceContextStore { this.signalUpdate(); } + private removeInput(inputId: string) { + const inputs = { ...this.context.inputs }; + delete inputs[inputId]; + this.context = { ...this.context, inputs }; + this.signalUpdate(); + } + private signalUpdate() { for (const cb of this.onChangeCallbacks) { cb(); diff --git a/ts/live-compositor/src/types/registerOutput.ts b/ts/live-compositor/src/types/registerOutput.ts index 519eba1f0..2c292015d 100644 --- a/ts/live-compositor/src/types/registerOutput.ts +++ b/ts/live-compositor/src/types/registerOutput.ts @@ -113,7 +113,7 @@ export type OutputRtpAudioOptions = { /** * Initial audio mixer configuration for output. */ - initial: AudioInputsConfiguration; + initial?: AudioInputsConfiguration; }; export interface OutputMp4AudioOptions { @@ -132,7 +132,7 @@ export interface OutputMp4AudioOptions { /** * Initial audio mixer configuration for output. */ - initial: AudioInputsConfiguration; + initial?: AudioInputsConfiguration; } export type RtpAudioEncoderOptions = {