From 0103f3c6700a7c3992346638c36034bac180968d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Barczy=C5=84ski?= <104033489+WojciechBarczynski@users.noreply.github.com> Date: Mon, 4 Mar 2024 15:01:31 +0100 Subject: [PATCH] Audio mixing (#410) --- compositor_pipeline/src/audio_mixer.rs | 20 +- .../src/audio_mixer/internal_audio_mixer.rs | 180 ++++++++++++++---- compositor_pipeline/src/audio_mixer/types.rs | 20 +- compositor_pipeline/src/pipeline.rs | 17 +- compositor_pipeline/src/pipeline/decoder.rs | 19 +- .../src/pipeline/decoder/opus_decoder.rs | 59 +++--- compositor_pipeline/src/pipeline/input/rtp.rs | 2 - .../src/pipeline/input/rtp/depayloader.rs | 59 +++--- .../src/pipeline/output/rtp.rs | 6 +- .../src/pipeline/output/rtp/payloader.rs | 29 +-- .../src/pipeline/pipeline_input.rs | 11 +- compositor_pipeline/src/pipeline/rtp.rs | 6 +- examples/audio.rs | 117 +++++++++--- schemas/register.schema.json | 97 +++------- schemas/scene.schema.json | 7 + src/api.rs | 2 +- src/types.rs | 2 + src/types/from_audio.rs | 35 ++-- src/types/from_register_request.rs | 33 ++-- src/types/register_request.rs | 22 --- 20 files changed, 445 insertions(+), 298 deletions(-) diff --git a/compositor_pipeline/src/audio_mixer.rs b/compositor_pipeline/src/audio_mixer.rs index ef7713911..e00a2c317 100644 --- a/compositor_pipeline/src/audio_mixer.rs +++ b/compositor_pipeline/src/audio_mixer.rs @@ -5,7 +5,7 @@ use tracing::trace; use self::{ internal_audio_mixer::InternalAudioMixer, - types::{AudioMixingParams, AudioSamplesSet, OutputSamples}, + types::{AudioChannels, AudioMixingParams, AudioSamplesSet, OutputSamples}, }; mod internal_audio_mixer; @@ -15,8 +15,10 @@ pub mod types; pub(super) struct AudioMixer(Arc>); impl AudioMixer { - pub fn new() -> Self { - Self(Arc::new(Mutex::new(InternalAudioMixer::new()))) + pub fn new(output_sample_rate: u32) -> Self { + Self(Arc::new(Mutex::new(InternalAudioMixer::new( + output_sample_rate, + )))) } pub fn mix_samples(&self, samples_set: AudioSamplesSet) -> OutputSamples { @@ -24,8 +26,16 @@ impl AudioMixer { self.0.lock().unwrap().mix_samples(samples_set) } - pub fn register_output(&self, output_id: OutputId, audio: AudioMixingParams) { - self.0.lock().unwrap().register_output(output_id, audio) + pub fn register_output( + &self, + output_id: OutputId, + audio: AudioMixingParams, + channels: AudioChannels, + ) { + self.0 + .lock() + .unwrap() + .register_output(output_id, audio, channels) } pub fn unregister_output(&self, output_id: &OutputId) { diff --git a/compositor_pipeline/src/audio_mixer/internal_audio_mixer.rs b/compositor_pipeline/src/audio_mixer/internal_audio_mixer.rs index 1872c8910..86d5299aa 100644 --- a/compositor_pipeline/src/audio_mixer/internal_audio_mixer.rs +++ b/compositor_pipeline/src/audio_mixer/internal_audio_mixer.rs @@ -1,61 +1,175 @@ -use std::{collections::HashMap, time::Duration}; +use std::{ + cmp::{max, min}, + collections::HashMap, + sync::Arc, +}; use compositor_render::{error::UpdateSceneError, OutputId}; -use super::types::{AudioMixingParams, AudioSamplesSet, OutputSamples}; +use crate::audio_mixer::types::{AudioSamples, AudioSamplesBatch}; + +use super::types::{AudioChannels, AudioMixingParams, AudioSamplesSet, OutputSamples}; #[derive(Debug)] struct OutputInfo { audio: AudioMixingParams, - last_batch_pts: Option, + channels: AudioChannels, } #[derive(Debug)] pub(super) struct InternalAudioMixer { outputs: HashMap, + output_sample_rate: u32, } impl InternalAudioMixer { - pub fn new() -> Self { + pub fn new(output_sample_rate: u32) -> Self { Self { outputs: HashMap::new(), + output_sample_rate, } } pub fn mix_samples(&mut self, samples_set: AudioSamplesSet) -> OutputSamples { - let get_batch = |output_info: &OutputInfo| { - let input_id = &output_info.audio.inputs.first()?.input_id; - samples_set.samples.get(input_id)?.iter().find(|batch| { - let last_pts = match output_info.last_batch_pts { - Some(pts) => pts, - None => return true, + trait Sample { + fn add_assign(&mut self, rhs: Self); + fn div(self, other: i32) -> Self; + fn mul(self, other: f32) -> Self; + } + + fn mix_output Vec>( + output_info: &OutputInfo, + output_sample_rate: u32, + samples_set: &AudioSamplesSet, + get_samples: F, + ) -> Vec { + let samples_count = samples_set.duration().as_secs_f64() * output_sample_rate as f64; + let mut mixed = vec![T::default(); samples_count as usize]; + for input in &output_info.audio.inputs { + let Some(input_batches) = samples_set.samples.get(&input.input_id) else { + continue; }; + input_batches.iter().for_each(|batch| { + let batch_samples = get_samples(batch); + let batch_duration_before_output_start = batch + .start_pts + .saturating_sub(samples_set.start_pts) + .as_secs_f64(); + + let start_sample_index = + (batch_duration_before_output_start * output_sample_rate as f64) as usize; + let end_sample_index = + min(start_sample_index + batch_samples.len(), mixed.len()); + + for i in start_sample_index..end_sample_index { + mixed[i].add_assign(batch_samples[i].mul(input.volume)); + } + }) + } + mixed + .iter() + .map(|sample| sample.div(max(1, output_info.audio.inputs.len() as i32))) + .collect() + } + + fn get_mono(audio_samples_batch: &AudioSamplesBatch) -> Vec { + match audio_samples_batch.samples.as_ref() { + AudioSamples::Mono(mono_samples) => { + mono_samples.iter().map(|s| *s as i32).collect() + } + AudioSamples::Stereo(stereo_samples) => stereo_samples + .iter() + .map(|(l, r)| (*l as i32 + *r as i32) / 2) + .collect(), + } + } + + fn get_stereo(audio_samples_batch: &AudioSamplesBatch) -> Vec<(i32, i32)> { + match audio_samples_batch.samples.as_ref() { + AudioSamples::Mono(mono_samples) => mono_samples + .iter() + .map(|s| (*s as i32, *s as i32)) + .collect(), + AudioSamples::Stereo(stereo_samples) => stereo_samples + .iter() + .map(|(l, r)| (*l as i32, *r as i32)) + .collect(), + } + } + + impl Sample for i32 { + fn add_assign(&mut self, rhs: Self) { + *self += rhs + } + + fn div(self, other: i32) -> Self { + self + other + } + + fn mul(self, other: f32) -> Self { + (self as f32 * other) as i32 + } + } + + impl Sample for (i32, i32) { + fn add_assign(&mut self, rhs: Self) { + *self = (self.0 + rhs.0, self.1 + rhs.1) + } + + fn div(self, other: i32) -> Self { + (self.0 / other, self.1 / other) + } - batch.start_pts > last_pts - }) - }; - - let samples = self - .outputs - .iter_mut() - .filter_map(|(output_id, output_info)| { - let batch = get_batch(output_info)?.clone(); - output_info.last_batch_pts = Some(batch.start_pts); - Some((output_id.clone(), batch)) - }) - .collect(); - - OutputSamples(samples) + fn mul(self, other: f32) -> Self { + ( + (self.0 as f32 * other) as i32, + (self.1 as f32 * other) as i32, + ) + } + } + + let mut output_samples = HashMap::new(); + + for (output_id, output_info) in &self.outputs { + let samples = match output_info.channels { + AudioChannels::Mono => { + let mixed: Vec = + mix_output(output_info, self.output_sample_rate, &samples_set, get_mono); + let samples = mixed.iter().map(|s| *s as i16).collect(); + AudioSamples::Mono(samples) + } + AudioChannels::Stereo => { + let mixed: Vec<(i32, i32)> = mix_output( + output_info, + self.output_sample_rate, + &samples_set, + get_stereo, + ); + let samples = mixed.iter().map(|(l, r)| (*l as i16, *r as i16)).collect(); + AudioSamples::Stereo(samples) + } + }; + output_samples.insert( + output_id.clone(), + AudioSamplesBatch { + samples: Arc::new(samples), + start_pts: samples_set.start_pts, + sample_rate: self.output_sample_rate, + }, + ); + } + + OutputSamples(output_samples) } - pub fn register_output(&mut self, output_id: OutputId, audio: AudioMixingParams) { - self.outputs.insert( - output_id, - OutputInfo { - audio, - last_batch_pts: None, - }, - ); + pub fn register_output( + &mut self, + output_id: OutputId, + audio: AudioMixingParams, + channels: AudioChannels, + ) { + self.outputs + .insert(output_id, OutputInfo { audio, channels }); } pub fn unregister_output(&mut self, output_id: &OutputId) { diff --git a/compositor_pipeline/src/audio_mixer/types.rs b/compositor_pipeline/src/audio_mixer/types.rs index 897f18907..1192d17ff 100644 --- a/compositor_pipeline/src/audio_mixer/types.rs +++ b/compositor_pipeline/src/audio_mixer/types.rs @@ -10,6 +10,8 @@ pub struct AudioMixingParams { #[derive(Debug, Clone)] pub struct InputParams { pub input_id: InputId, + // [0, 1] range of input volume + pub volume: f32, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -35,6 +37,18 @@ pub struct AudioSamplesBatch { pub sample_rate: u32, } +#[derive(Clone)] +pub enum AudioSamples { + Mono(Vec), + Stereo(Vec<(i16, i16)>), +} + +impl AudioSamplesSet { + pub fn duration(&self) -> Duration { + self.end_pts.saturating_sub(self.start_pts) + } +} + impl AudioSamplesBatch { pub fn end_pts(&self) -> Duration { self.start_pts @@ -42,12 +56,6 @@ impl AudioSamplesBatch { } } -#[derive(Clone)] -pub enum AudioSamples { - Mono(Vec), - Stereo(Vec<(i16, i16)>), -} - impl AudioSamples { pub fn len(&self) -> usize { match self { diff --git a/compositor_pipeline/src/pipeline.rs b/compositor_pipeline/src/pipeline.rs index 29da1887e..c134d6491 100644 --- a/compositor_pipeline/src/pipeline.rs +++ b/compositor_pipeline/src/pipeline.rs @@ -130,7 +130,7 @@ impl Pipeline { inputs: HashMap::new(), queue: Queue::new(opts.queue_options), renderer, - audio_mixer: AudioMixer::new(), + audio_mixer: AudioMixer::new(opts.output_sample_rate), is_started: false, download_dir, output_sample_rate: opts.output_sample_rate, @@ -160,8 +160,12 @@ impl Pipeline { return Err(RegisterInputError::AlreadyRegistered(input_id)); } - let (pipeline_input, receiver, port) = - new_pipeline_input(&input_id, input_options, &self.download_dir)?; + let (pipeline_input, receiver, port) = new_pipeline_input( + &input_id, + input_options, + &self.download_dir, + self.output_sample_rate, + )?; self.inputs.insert(input_id.clone(), pipeline_input.into()); self.queue.add_input(&input_id, receiver, queue_options); @@ -203,8 +207,11 @@ impl Pipeline { self.outputs.insert(output_id.clone(), Arc::new(output)); if let Some(audio_opts) = audio.clone() { - self.audio_mixer - .register_output(output_id.clone(), audio_opts.initial); + self.audio_mixer.register_output( + output_id.clone(), + audio_opts.initial, + audio_opts.channels, + ); } self.update_output( diff --git a/compositor_pipeline/src/pipeline/decoder.rs b/compositor_pipeline/src/pipeline/decoder.rs index a475f3cf4..1d5a77032 100644 --- a/compositor_pipeline/src/pipeline/decoder.rs +++ b/compositor_pipeline/src/pipeline/decoder.rs @@ -1,8 +1,4 @@ -use crate::{ - audio_mixer::types::{AudioChannels, AudioSamplesBatch}, - error::DecoderInitError, - queue::PipelineEvent, -}; +use crate::{audio_mixer::types::AudioSamplesBatch, error::DecoderInitError, queue::PipelineEvent}; use self::{fdk_aac::FdkAacDecoder, ffmpeg_h264::H264FfmpegDecoder, opus_decoder::OpusDecoder}; @@ -37,6 +33,7 @@ impl Decoder { input_id: InputId, chunks: ChunksReceiver, decoder_options: DecoderOptions, + output_sample_rate: u32, ) -> Result<(Self, DecodedDataReceiver), DecoderInitError> { let DecoderOptions { video: video_decoder_opt, @@ -66,7 +63,13 @@ impl Decoder { if let (Some(opt), Some(audio_receiver)) = (audio_decoder_opt, audio_receiver) { let (sender, receiver) = bounded(10); ( - Some(AudioDecoder::new(opt, audio_receiver, sender, input_id)?), + Some(AudioDecoder::new( + opt, + output_sample_rate, + audio_receiver, + sender, + input_id, + )?), Some(receiver), ) } else { @@ -94,6 +97,7 @@ pub enum AudioDecoder { impl AudioDecoder { pub fn new( opts: AudioDecoderOptions, + output_sample_rate: u32, chunks_receiver: Receiver>, samples_sender: Sender>, input_id: InputId, @@ -101,6 +105,7 @@ impl AudioDecoder { match opts { AudioDecoderOptions::Opus(opus_opt) => Ok(AudioDecoder::Opus(OpusDecoder::new( opus_opt, + output_sample_rate, chunks_receiver, samples_sender, input_id, @@ -150,8 +155,6 @@ pub enum AudioDecoderOptions { #[derive(Debug, Clone, PartialEq, Eq)] pub struct OpusDecoderOptions { - pub sample_rate: u32, - pub channels: AudioChannels, pub forward_error_correction: bool, } diff --git a/compositor_pipeline/src/pipeline/decoder/opus_decoder.rs b/compositor_pipeline/src/pipeline/decoder/opus_decoder.rs index 1c236913d..126dff54a 100644 --- a/compositor_pipeline/src/pipeline/decoder/opus_decoder.rs +++ b/compositor_pipeline/src/pipeline/decoder/opus_decoder.rs @@ -5,7 +5,7 @@ use crossbeam_channel::{Receiver, Sender}; use log::error; use crate::{ - audio_mixer::types::{AudioChannels, AudioSamples, AudioSamplesBatch}, + audio_mixer::types::{AudioSamples, AudioSamplesBatch}, error::DecoderInitError, pipeline::structs::EncodedChunk, queue::PipelineEvent, @@ -18,15 +18,24 @@ pub struct OpusDecoder; impl OpusDecoder { pub fn new( opts: OpusDecoderOptions, + output_sample_rate: u32, chunks_receiver: Receiver>, sample_sender: Sender>, input_id: InputId, ) -> Result { - let decoder = opus::Decoder::new(opts.sample_rate, opts.channels.into())?; + let decoder = opus::Decoder::new(output_sample_rate, opus::Channels::Stereo)?; std::thread::Builder::new() .name(format!("opus decoder {}", input_id.0)) - .spawn(move || Self::run_decoding_thread(decoder, opts, chunks_receiver, sample_sender)) + .spawn(move || { + Self::run_decoding_thread( + decoder, + opts, + output_sample_rate, + chunks_receiver, + sample_sender, + ) + }) .unwrap(); Ok(Self) @@ -35,6 +44,7 @@ impl OpusDecoder { fn run_decoding_thread( mut decoder: opus::Decoder, opts: OpusDecoderOptions, + output_sample_rate: u32, chunks_receiver: Receiver>, sample_sender: Sender>, ) { @@ -49,35 +59,32 @@ impl OpusDecoder { break; } }; - let decoded_samples_count = match decoder.decode(&chunk.data, &mut buffer, false) { - Ok(samples_count) => samples_count, - Err(err) => { - error!("Failed to decode opus packet: {}", err); - continue; - } - }; - - let samples = match opts.channels { - AudioChannels::Mono => { - let samples = buffer.iter().take(decoded_samples_count).cloned().collect(); - AudioSamples::Mono(samples) - } - AudioChannels::Stereo => { - let mut samples = Vec::with_capacity(decoded_samples_count / 2); - for i in 0..decoded_samples_count { - samples.push((buffer[2 * i], buffer[2 * i + 1])); + let decoded_samples_count = + match decoder.decode(&chunk.data, &mut buffer, opts.forward_error_correction) { + Ok(samples_count) => samples_count, + Err(err) => { + error!("Failed to decode opus packet: {}", err); + continue; } - AudioSamples::Stereo(samples) - } - }; + }; + + let mut decoded_samples = Vec::with_capacity(decoded_samples_count / 2); + for i in 0..decoded_samples_count { + decoded_samples.push((buffer[2 * i], buffer[2 * i + 1])); + } + + let samples = AudioSamples::Stereo(decoded_samples); - let samples = AudioSamplesBatch { + let samples_batch = AudioSamplesBatch { samples: Arc::new(samples), start_pts: chunk.pts, - sample_rate: opts.sample_rate, + sample_rate: output_sample_rate, }; - if sample_sender.send(PipelineEvent::Data(samples)).is_err() { + if sample_sender + .send(PipelineEvent::Data(samples_batch)) + .is_err() + { return; }; } diff --git a/compositor_pipeline/src/pipeline/input/rtp.rs b/compositor_pipeline/src/pipeline/input/rtp.rs index ee247b35e..b38ef2762 100644 --- a/compositor_pipeline/src/pipeline/input/rtp.rs +++ b/compositor_pipeline/src/pipeline/input/rtp.rs @@ -58,13 +58,11 @@ pub struct RtpReceiverOptions { #[derive(Debug, Clone, PartialEq, Eq)] pub struct InputVideoStream { pub options: decoder::VideoDecoderOptions, - pub payload_type: u8, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct InputAudioStream { pub options: decoder::AudioDecoderOptions, - pub payload_type: u8, } pub struct OutputAudioStream { diff --git a/compositor_pipeline/src/pipeline/input/rtp/depayloader.rs b/compositor_pipeline/src/pipeline/input/rtp/depayloader.rs index 3f059e301..f0a53ebaf 100644 --- a/compositor_pipeline/src/pipeline/input/rtp/depayloader.rs +++ b/compositor_pipeline/src/pipeline/input/rtp/depayloader.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use log::error; use rtp::{ codecs::{h264::H264Packet, opus::OpusPacket}, packetizer::Depacketizer, @@ -7,7 +8,7 @@ use rtp::{ use crate::pipeline::{ decoder, - rtp::PayloadType, + rtp::{AUDIO_PAYLOAD_TYPE, VIDEO_PAYLOAD_TYPE}, structs::{AudioCodec, EncodedChunk, EncodedChunkKind, VideoCodec}, }; @@ -21,27 +22,22 @@ pub enum DepayloaderNewError { pub(crate) struct Depayloader { /// (Depayloader, payload type) - pub video: Option<(VideoDepayloader, PayloadType)>, - pub audio: Option<(AudioDepayloader, PayloadType)>, + pub video: Option, + pub audio: Option, } impl Depayloader { pub fn new(stream: &RtpStream) -> Result { - let video = stream.video.as_ref().map(|video| { - ( - VideoDepayloader::new(&video.options), - PayloadType(video.payload_type), - ) - }); - - let audio = match stream.audio.as_ref() { - Some(audio) => Some(( - AudioDepayloader::new(&audio.options)?, - PayloadType(audio.payload_type), - )), - - None => None, - }; + let video = stream + .video + .as_ref() + .map(|video| VideoDepayloader::new(&video.options)); + + let audio = stream + .audio + .as_ref() + .map(|audio| AudioDepayloader::new(&audio.options)) + .transpose()?; Ok(Self { video, audio }) } @@ -50,20 +46,21 @@ impl Depayloader { &mut self, packet: rtp::packet::Packet, ) -> Result, DepayloadingError> { - let pty = packet.header.payload_type; - if let Some((video_depayloader, video_payload_type)) = &mut self.video { - if video_payload_type.0 == pty { - return video_depayloader.depayload(packet); - } - } - - if let Some((audio_depayloader, audio_payload_type)) = &mut self.audio { - if audio_payload_type.0 == pty { - return audio_depayloader.depayload(packet); - } + match packet.header.payload_type { + VIDEO_PAYLOAD_TYPE => match self.video.as_mut() { + Some(video_depayloader) => video_depayloader.depayload(packet), + None => Err(DepayloadingError::BadPayloadType( + packet.header.payload_type, + )), + }, + AUDIO_PAYLOAD_TYPE => match self.audio.as_mut() { + Some(audio_depayloader) => audio_depayloader.depayload(packet), + None => Err(DepayloadingError::BadPayloadType( + packet.header.payload_type, + )), + }, + other => Err(DepayloadingError::BadPayloadType(other)), } - - Err(DepayloadingError::BadPayloadType(pty)) } } diff --git a/compositor_pipeline/src/pipeline/output/rtp.rs b/compositor_pipeline/src/pipeline/output/rtp.rs index 43529ff77..c549a863f 100644 --- a/compositor_pipeline/src/pipeline/output/rtp.rs +++ b/compositor_pipeline/src/pipeline/output/rtp.rs @@ -13,7 +13,7 @@ use webrtc_util::Marshal; use crate::{ error::OutputInitError, pipeline::{ - rtp::{bind_to_requested_port, BindToPortError, PayloadType, RequestedPort}, + rtp::{bind_to_requested_port, BindToPortError, RequestedPort}, structs::EncodedChunk, AudioCodec, Port, VideoCodec, }, @@ -47,8 +47,8 @@ struct RtpContext { pub struct RtpSenderOptions { pub output_id: OutputId, pub connection_options: RtpConnectionOptions, - pub video: Option<(VideoCodec, PayloadType)>, - pub audio: Option<(AudioCodec, PayloadType)>, + pub video: Option, + pub audio: Option, } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/compositor_pipeline/src/pipeline/output/rtp/payloader.rs b/compositor_pipeline/src/pipeline/output/rtp/payloader.rs index faf9183dc..1ad975947 100644 --- a/compositor_pipeline/src/pipeline/output/rtp/payloader.rs +++ b/compositor_pipeline/src/pipeline/output/rtp/payloader.rs @@ -5,7 +5,7 @@ use rand::Rng; use rtp::codecs::{h264::H264Payloader, opus::OpusPayloader}; use crate::pipeline::{ - rtp::PayloadType, + rtp::{AUDIO_PAYLOAD_TYPE, VIDEO_PAYLOAD_TYPE}, structs::{EncodedChunk, EncodedChunkKind}, AudioCodec, VideoCodec, }; @@ -71,7 +71,6 @@ pub struct Payloader { enum VideoPayloader { H264 { payloader: H264Payloader, - payload_type: PayloadType, context: RtpStreamContext, }, } @@ -79,19 +78,15 @@ enum VideoPayloader { enum AudioPayloader { Opus { payloader: OpusPayloader, - payload_type: PayloadType, context: RtpStreamContext, }, } impl Payloader { - pub fn new( - video: Option<(VideoCodec, PayloadType)>, - audio: Option<(AudioCodec, PayloadType)>, - ) -> Self { + pub fn new(video: Option, audio: Option) -> Self { Self { - video: video.map(|(codec, payload_type)| VideoPayloader::new(codec, payload_type)), - audio: audio.map(|(codec, payload_type)| AudioPayloader::new(codec, payload_type)), + video: video.map(VideoPayloader::new), + audio: audio.map(AudioPayloader::new), } } @@ -134,11 +129,10 @@ impl Payloader { } impl VideoPayloader { - pub fn new(codec: VideoCodec, payload_type: PayloadType) -> Self { + pub fn new(codec: VideoCodec) -> Self { match codec { VideoCodec::H264 => Self::H264 { payloader: H264Payloader::default(), - payload_type, context: RtpStreamContext::new(), }, } @@ -158,14 +152,13 @@ impl VideoPayloader { match self { VideoPayloader::H264 { ref mut payloader, - ref payload_type, ref mut context, } => payload( payloader, context, chunk, mtu, - payload_type, + VIDEO_PAYLOAD_TYPE, H264_CLOCK_RATE, ), } @@ -173,11 +166,10 @@ impl VideoPayloader { } impl AudioPayloader { - pub fn new(codec: AudioCodec, payload_type: PayloadType) -> Self { + pub fn new(codec: AudioCodec) -> Self { match codec { AudioCodec::Opus => Self::Opus { payloader: OpusPayloader, - payload_type, context: RtpStreamContext::new(), }, AudioCodec::Aac => panic!("Aac audio output is not supported yet"), @@ -198,14 +190,13 @@ impl AudioPayloader { match self { AudioPayloader::Opus { ref mut payloader, - ref payload_type, ref mut context, } => payload( payloader, context, chunk, mtu, - payload_type, + AUDIO_PAYLOAD_TYPE, OPUS_CLOCK_RATE, ), } @@ -217,7 +208,7 @@ fn payload( context: &mut RtpStreamContext, chunk: EncodedChunk, mtu: usize, - payload_type: &PayloadType, + payload_type: u8, clock_rate: u32, ) -> Result, PayloadingError> { let payloads = payloader.payload(mtu, &chunk.data)?; @@ -232,7 +223,7 @@ fn payload( padding: false, extension: false, marker: i == packets_amount - 1, // marker needs to be set on the last packet of each frame - payload_type: payload_type.0, + payload_type, sequence_number: context.next_sequence_number, timestamp: (chunk.pts.as_secs_f64() * clock_rate as f64) as u32, ssrc: context.ssrc, diff --git a/compositor_pipeline/src/pipeline/pipeline_input.rs b/compositor_pipeline/src/pipeline/pipeline_input.rs index 38ec28136..e4e16d6f0 100644 --- a/compositor_pipeline/src/pipeline/pipeline_input.rs +++ b/compositor_pipeline/src/pipeline/pipeline_input.rs @@ -14,14 +14,19 @@ pub(super) fn new_pipeline_input( input_id: &InputId, input_options: InputOptions, download_dir: &Path, + output_sample_rate: u32, ) -> Result<(PipelineInput, DecodedDataReceiver, Option), RegisterInputError> { let (input, chunks_receiver, decoder_options, port) = input::Input::new(input_options, download_dir) .map_err(|e| RegisterInputError::InputError(input_id.clone(), e))?; - let (decoder, decoded_data_receiver) = - decoder::Decoder::new(input_id.clone(), chunks_receiver, decoder_options) - .map_err(|e| RegisterInputError::DecoderError(input_id.clone(), e))?; + let (decoder, decoded_data_receiver) = decoder::Decoder::new( + input_id.clone(), + chunks_receiver, + decoder_options, + output_sample_rate, + ) + .map_err(|e| RegisterInputError::DecoderError(input_id.clone(), e))?; let pipeline_input = PipelineInput { input, decoder }; Ok((pipeline_input, decoded_data_receiver, port)) diff --git a/compositor_pipeline/src/pipeline/rtp.rs b/compositor_pipeline/src/pipeline/rtp.rs index 118790edd..4ddb271c6 100644 --- a/compositor_pipeline/src/pipeline/rtp.rs +++ b/compositor_pipeline/src/pipeline/rtp.rs @@ -2,6 +2,9 @@ use std::net; use super::Port; +pub(crate) const VIDEO_PAYLOAD_TYPE: u8 = 96; +pub(crate) const AUDIO_PAYLOAD_TYPE: u8 = 97; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TransportProtocol { Udp, @@ -14,9 +17,6 @@ pub(super) enum BindToPortError { AllPortsAlreadyInUse { lower_bound: u16, upper_bound: u16 }, } -#[derive(Debug, Clone)] -pub struct PayloadType(pub u8); - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RequestedPort { Exact(u16), diff --git a/examples/audio.rs b/examples/audio.rs index 319a6b9b4..8dd53692c 100644 --- a/examples/audio.rs +++ b/examples/audio.rs @@ -4,7 +4,7 @@ use serde_json::json; use std::{ env, fs, process::{Command, Stdio}, - thread, + thread::{self}, time::Duration, }; use video_compositor::{config::config, http, logger, types::Resolution}; @@ -14,9 +14,12 @@ use crate::common::write_video_audio_example_sdp_file; #[path = "./common/common.rs"] mod common; -const SAMPLE_FILE_URL: &str = +const BUNNY_FILE_URL: &str = "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"; -const SAMPLE_FILE_PATH: &str = "examples/assets/BigBuckBunny.mp4"; +const SINTEL_FILE_URL: &str = + "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/Sintel.mp4"; +const BUNNY_FILE_PATH: &str = "examples/assets/BigBuckBunny.mp4"; +const SINTEL_FILE_PATH: &str = "examples/assets/Sintel.mp4"; const VIDEO_RESOLUTION: Resolution = Resolution { width: 1280, height: 720, @@ -38,7 +41,7 @@ fn main() { fn start_example_client_code() -> Result<()> { info!("[example] Start listening on output port."); - let output_sdp = write_video_audio_example_sdp_file("127.0.0.1", 8002, 8010)?; + let output_sdp = write_video_audio_example_sdp_file("127.0.0.1", 8002, 8004)?; Command::new("ffplay") .args(["-protocol_whitelist", "file,rtp,udp", &output_sdp]) .stdout(Stdio::null()) @@ -47,19 +50,24 @@ fn start_example_client_code() -> Result<()> { thread::sleep(Duration::from_secs(2)); info!("[example] Download sample."); - let sample_path = env::current_dir()?.join(SAMPLE_FILE_PATH); - fs::create_dir_all(sample_path.parent().unwrap())?; - common::ensure_downloaded(SAMPLE_FILE_URL, &sample_path)?; + let bunny_path = env::current_dir()?.join(BUNNY_FILE_PATH); + fs::create_dir_all(bunny_path.parent().unwrap())?; + common::ensure_downloaded(BUNNY_FILE_URL, &bunny_path)?; + + info!("[example] Download sample."); + let sintel_path = env::current_dir()?.join(SINTEL_FILE_PATH); + fs::create_dir_all(sintel_path.parent().unwrap())?; + common::ensure_downloaded(SINTEL_FILE_URL, &sintel_path)?; info!("[example] Send register input request."); common::post(&json!({ "type": "register", "entity_type": "rtp_input_stream", "input_id": "input_1", - "port": 8004, + "port": 8006, "video": { "codec": "h264" - } + }, }))?; info!("[example] Send register input request."); @@ -67,12 +75,32 @@ fn start_example_client_code() -> Result<()> { "type": "register", "entity_type": "rtp_input_stream", "input_id": "input_2", - "port": 8006, + "port": 8008, "audio": { - "codec": "opus", - "sample_rate": 48_000, - "channels": "stereo", - } + "codec": "opus" + }, + }))?; + + info!("[example] Send register input request."); + common::post(&json!({ + "type": "register", + "entity_type": "rtp_input_stream", + "input_id": "input_3", + "port": 8010, + "video": { + "codec": "h264" + }, + }))?; + + info!("[example] Send register input request."); + common::post(&json!({ + "type": "register", + "entity_type": "rtp_input_stream", + "input_id": "input_4", + "port": 8012, + "audio": { + "codec": "opus" + }, }))?; info!("[example] Send register output request."); @@ -89,9 +117,17 @@ fn start_example_client_code() -> Result<()> { }, "encoder_preset": "medium", "initial": { - "id": "input_1", - "type": "input_stream", - "input_id": "input_1", + "type": "tiles", + "children": [ + { + "type": "input_stream", + "input_id": "input_1" + }, + { + "type": "input_stream", + "input_id": "input_3" + } + ] }, "resolution": { "width": VIDEO_RESOLUTION.width, "height": VIDEO_RESOLUTION.height }, } @@ -102,11 +138,14 @@ fn start_example_client_code() -> Result<()> { "type": "register", "entity_type": "output_stream", "output_id": "output_2", - "port": 8010, + "port": 8004, "ip": "127.0.0.1", "audio": { "initial": { - "inputs": [{"input_id": "input_2"}] + "inputs": [ + {"input_id": "input_2", "volume": 0.3}, + {"input_id": "input_4"} + ] }, "channels": "stereo" } @@ -119,9 +158,10 @@ fn start_example_client_code() -> Result<()> { "type": "start", }))?; + let path = sintel_path.clone(); Command::new("ffmpeg") .args(["-stream_loop", "-1", "-re", "-i"]) - .arg(sample_path.clone()) + .arg(path.clone()) .args([ "-an", "-c:v", @@ -130,20 +170,51 @@ fn start_example_client_code() -> Result<()> { "rtp", "-bsf:v", "h264_mp4toannexb", - "rtp://127.0.0.1:8004?rtcpport=8004", + "rtp://127.0.0.1:8006?rtcpport=8006", ]) .spawn()?; + let path = sintel_path.clone(); Command::new("ffmpeg") .args(["-re", "-i"]) - .arg(sample_path) + .arg(path) .args([ "-vn", "-c:a", "libopus", "-f", "rtp", - "rtp://127.0.0.1:8006?rtcpport=8006", + "rtp://127.0.0.1:8008?rtcpport=8008", + ]) + .spawn()?; + + let path = bunny_path.clone(); + Command::new("ffmpeg") + .args(["-stream_loop", "-1", "-re", "-i"]) + .arg(path.clone()) + .args([ + "-an", + "-c:v", + "copy", + "-f", + "rtp", + "-bsf:v", + "h264_mp4toannexb", + "rtp://127.0.0.1:8010?rtcpport=8010", + ]) + .spawn()?; + + let path = bunny_path.clone(); + Command::new("ffmpeg") + .args(["-re", "-i"]) + .arg(path) + .args([ + "-vn", + "-c:a", + "libopus", + "-f", + "rtp", + "rtp://127.0.0.1:8012?rtcpport=8012", ]) .spawn()?; diff --git a/schemas/register.schema.json b/schemas/register.schema.json index f66885991..43b279d67 100644 --- a/schemas/register.schema.json +++ b/schemas/register.schema.json @@ -464,15 +464,6 @@ "type": "null" } ] - }, - "rtp_payload_type": { - "description": "(**default=`96`**) Value of payload type field in received RTP packets.\n\nPackets with different payload type won't be treated as video and included in composing. Values should be in [0, 64] or [96, 255]. Values in range [65, 95] can't be used. For more information, see [RFC](https://datatracker.ietf.org/doc/html/rfc5761#section-4) Packets with different payload type won't be treated as video and included in composing.", - "type": [ - "integer", - "null" - ], - "format": "uint8", - "minimum": 0.0 } }, "additionalProperties": false @@ -490,10 +481,6 @@ }, "InputRtpAudioOptions": { "type": "object", - "required": [ - "channels", - "sample_rate" - ], "properties": { "codec": { "description": "(**default=`\"opus\"`**) Audio codec.", @@ -506,29 +493,6 @@ } ] }, - "sample_rate": { - "description": "Sample rate. If the specified sample rate doesn't match real sample rate, audio won't be mixed properly.", - "type": "integer", - "format": "uint32", - "minimum": 0.0 - }, - "channels": { - "description": "Audio channels.", - "allOf": [ - { - "$ref": "#/definitions/AudioChannels" - } - ] - }, - "rtp_payload_type": { - "description": "(**default=`97`**) Value of payload type field in received RTP packets.\n\nPackets with different payload type won't be treated as audio and included in mixing. Values should be in range [0, 64] or [96, 255]. Values in range [65, 95] can't be used. For more information, check out [RFC](https://datatracker.ietf.org/doc/html/rfc5761#section-4).", - "type": [ - "integer", - "null" - ], - "format": "uint8", - "minimum": 0.0 - }, "forward_error_correction": { "description": "(**default=`false`**) Specifies whether the stream uses forward error correction. It's specific for Opus codec. For more information, check out [RFC](https://datatracker.ietf.org/doc/html/rfc6716#section-2.1.7).", "type": [ @@ -550,24 +514,6 @@ } ] }, - "AudioChannels": { - "oneOf": [ - { - "description": "Mono audio (single channel).", - "type": "string", - "enum": [ - "mono" - ] - }, - { - "description": "Stereo audio (two channels).", - "type": "string", - "enum": [ - "stereo" - ] - } - ] - }, "OutputId": { "type": "string" }, @@ -587,15 +533,6 @@ }, "initial": { "$ref": "#/definitions/Component" - }, - "rtp_payload_type": { - "description": "(**default=`96`**)", - "type": [ - "integer", - "null" - ], - "format": "uint8", - "minimum": 0.0 } }, "additionalProperties": false @@ -1836,15 +1773,6 @@ "null" ] }, - "rtp_payload_type": { - "description": "(**default=`97`**)", - "type": [ - "integer", - "null" - ], - "format": "uint8", - "minimum": 0.0 - }, "encoder_preset": { "description": "(**default=\"voip\"**) Specifies preset for audio output encoder.", "anyOf": [ @@ -1881,9 +1809,34 @@ "properties": { "input_id": { "$ref": "#/definitions/InputId" + }, + "volume": { + "type": [ + "number", + "null" + ], + "format": "float" } } }, + "AudioChannels": { + "oneOf": [ + { + "description": "Mono audio (single channel).", + "type": "string", + "enum": [ + "mono" + ] + }, + { + "description": "Stereo audio (two channels).", + "type": "string", + "enum": [ + "stereo" + ] + } + ] + }, "AudioEncoderPreset": { "oneOf": [ { diff --git a/schemas/scene.schema.json b/schemas/scene.schema.json index 639a53c2f..5fdb77fa1 100644 --- a/schemas/scene.schema.json +++ b/schemas/scene.schema.json @@ -1263,6 +1263,13 @@ "properties": { "input_id": { "$ref": "#/definitions/InputId" + }, + "volume": { + "type": [ + "number", + "null" + ], + "format": "float" } } } diff --git a/src/api.rs b/src/api.rs index 99b6e2cf3..052cdf13d 100644 --- a/src/api.rs +++ b/src/api.rs @@ -165,7 +165,7 @@ impl Api { Some(component) => Some(component.try_into()?), None => None, }; - let audio = update.audio.map(|a| a.into()); + let audio = update.audio.map(|a| a.try_into()).transpose()?; match update.schedule_time_ms { Some(schedule_time_ms) => { diff --git a/src/types.rs b/src/types.rs index e6dcd1bcd..c4160dff4 100644 --- a/src/types.rs +++ b/src/types.rs @@ -82,6 +82,8 @@ pub struct Audio { #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] pub struct InputAudio { input_id: InputId, + // (**default=`1.0`**) float in [0, 1] range representing input volume + volume: Option, } impl Display for InputId { diff --git a/src/types/from_audio.rs b/src/types/from_audio.rs index b8e69a5a2..cd33a8ec6 100644 --- a/src/types/from_audio.rs +++ b/src/types/from_audio.rs @@ -1,23 +1,30 @@ -use super::{Audio, InputAudio}; +use super::{Audio, InputAudio, TypeError}; -impl From