Skip to content

Commit

Permalink
Audio mixing (#410)
Browse files Browse the repository at this point in the history
  • Loading branch information
WojciechBarczynski authored Mar 4, 2024
1 parent cf18427 commit 0103f3c
Show file tree
Hide file tree
Showing 20 changed files with 445 additions and 298 deletions.
20 changes: 15 additions & 5 deletions compositor_pipeline/src/audio_mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tracing::trace;

use self::{
internal_audio_mixer::InternalAudioMixer,
types::{AudioMixingParams, AudioSamplesSet, OutputSamples},
types::{AudioChannels, AudioMixingParams, AudioSamplesSet, OutputSamples},
};

mod internal_audio_mixer;
Expand All @@ -15,17 +15,27 @@ pub mod types;
pub(super) struct AudioMixer(Arc<Mutex<InternalAudioMixer>>);

impl AudioMixer {
pub fn new() -> Self {
Self(Arc::new(Mutex::new(InternalAudioMixer::new())))
pub fn new(output_sample_rate: u32) -> Self {
Self(Arc::new(Mutex::new(InternalAudioMixer::new(
output_sample_rate,
))))
}

pub fn mix_samples(&self, samples_set: AudioSamplesSet) -> OutputSamples {
trace!(set=?samples_set, "Mixing samples");
self.0.lock().unwrap().mix_samples(samples_set)
}

pub fn register_output(&self, output_id: OutputId, audio: AudioMixingParams) {
self.0.lock().unwrap().register_output(output_id, audio)
pub fn register_output(
&self,
output_id: OutputId,
audio: AudioMixingParams,
channels: AudioChannels,
) {
self.0
.lock()
.unwrap()
.register_output(output_id, audio, channels)
}

pub fn unregister_output(&self, output_id: &OutputId) {
Expand Down
180 changes: 147 additions & 33 deletions compositor_pipeline/src/audio_mixer/internal_audio_mixer.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,175 @@
use std::{collections::HashMap, time::Duration};
use std::{
cmp::{max, min},
collections::HashMap,
sync::Arc,
};

use compositor_render::{error::UpdateSceneError, OutputId};

use super::types::{AudioMixingParams, AudioSamplesSet, OutputSamples};
use crate::audio_mixer::types::{AudioSamples, AudioSamplesBatch};

use super::types::{AudioChannels, AudioMixingParams, AudioSamplesSet, OutputSamples};

#[derive(Debug)]
struct OutputInfo {
audio: AudioMixingParams,
last_batch_pts: Option<Duration>,
channels: AudioChannels,
}

#[derive(Debug)]
pub(super) struct InternalAudioMixer {
outputs: HashMap<OutputId, OutputInfo>,
output_sample_rate: u32,
}

impl InternalAudioMixer {
pub fn new() -> Self {
pub fn new(output_sample_rate: u32) -> Self {
Self {
outputs: HashMap::new(),
output_sample_rate,
}
}

pub fn mix_samples(&mut self, samples_set: AudioSamplesSet) -> OutputSamples {
let get_batch = |output_info: &OutputInfo| {
let input_id = &output_info.audio.inputs.first()?.input_id;
samples_set.samples.get(input_id)?.iter().find(|batch| {
let last_pts = match output_info.last_batch_pts {
Some(pts) => pts,
None => return true,
trait Sample {
fn add_assign(&mut self, rhs: Self);
fn div(self, other: i32) -> Self;
fn mul(self, other: f32) -> Self;
}

fn mix_output<T: Clone + Copy + Default + Sample, F: Fn(&AudioSamplesBatch) -> Vec<T>>(
output_info: &OutputInfo,
output_sample_rate: u32,
samples_set: &AudioSamplesSet,
get_samples: F,
) -> Vec<T> {
let samples_count = samples_set.duration().as_secs_f64() * output_sample_rate as f64;
let mut mixed = vec![T::default(); samples_count as usize];
for input in &output_info.audio.inputs {
let Some(input_batches) = samples_set.samples.get(&input.input_id) else {
continue;
};
input_batches.iter().for_each(|batch| {
let batch_samples = get_samples(batch);
let batch_duration_before_output_start = batch
.start_pts
.saturating_sub(samples_set.start_pts)
.as_secs_f64();

let start_sample_index =
(batch_duration_before_output_start * output_sample_rate as f64) as usize;
let end_sample_index =
min(start_sample_index + batch_samples.len(), mixed.len());

for i in start_sample_index..end_sample_index {
mixed[i].add_assign(batch_samples[i].mul(input.volume));
}
})
}
mixed
.iter()
.map(|sample| sample.div(max(1, output_info.audio.inputs.len() as i32)))
.collect()
}

fn get_mono(audio_samples_batch: &AudioSamplesBatch) -> Vec<i32> {
match audio_samples_batch.samples.as_ref() {
AudioSamples::Mono(mono_samples) => {
mono_samples.iter().map(|s| *s as i32).collect()
}
AudioSamples::Stereo(stereo_samples) => stereo_samples
.iter()
.map(|(l, r)| (*l as i32 + *r as i32) / 2)
.collect(),
}
}

fn get_stereo(audio_samples_batch: &AudioSamplesBatch) -> Vec<(i32, i32)> {
match audio_samples_batch.samples.as_ref() {
AudioSamples::Mono(mono_samples) => mono_samples
.iter()
.map(|s| (*s as i32, *s as i32))
.collect(),
AudioSamples::Stereo(stereo_samples) => stereo_samples
.iter()
.map(|(l, r)| (*l as i32, *r as i32))
.collect(),
}
}

impl Sample for i32 {
fn add_assign(&mut self, rhs: Self) {
*self += rhs
}

fn div(self, other: i32) -> Self {
self + other
}

fn mul(self, other: f32) -> Self {
(self as f32 * other) as i32
}
}

impl Sample for (i32, i32) {
fn add_assign(&mut self, rhs: Self) {
*self = (self.0 + rhs.0, self.1 + rhs.1)
}

fn div(self, other: i32) -> Self {
(self.0 / other, self.1 / other)
}

batch.start_pts > last_pts
})
};

let samples = self
.outputs
.iter_mut()
.filter_map(|(output_id, output_info)| {
let batch = get_batch(output_info)?.clone();
output_info.last_batch_pts = Some(batch.start_pts);
Some((output_id.clone(), batch))
})
.collect();

OutputSamples(samples)
fn mul(self, other: f32) -> Self {
(
(self.0 as f32 * other) as i32,
(self.1 as f32 * other) as i32,
)
}
}

let mut output_samples = HashMap::new();

for (output_id, output_info) in &self.outputs {
let samples = match output_info.channels {
AudioChannels::Mono => {
let mixed: Vec<i32> =
mix_output(output_info, self.output_sample_rate, &samples_set, get_mono);
let samples = mixed.iter().map(|s| *s as i16).collect();
AudioSamples::Mono(samples)
}
AudioChannels::Stereo => {
let mixed: Vec<(i32, i32)> = mix_output(
output_info,
self.output_sample_rate,
&samples_set,
get_stereo,
);
let samples = mixed.iter().map(|(l, r)| (*l as i16, *r as i16)).collect();
AudioSamples::Stereo(samples)
}
};
output_samples.insert(
output_id.clone(),
AudioSamplesBatch {
samples: Arc::new(samples),
start_pts: samples_set.start_pts,
sample_rate: self.output_sample_rate,
},
);
}

OutputSamples(output_samples)
}

pub fn register_output(&mut self, output_id: OutputId, audio: AudioMixingParams) {
self.outputs.insert(
output_id,
OutputInfo {
audio,
last_batch_pts: None,
},
);
pub fn register_output(
&mut self,
output_id: OutputId,
audio: AudioMixingParams,
channels: AudioChannels,
) {
self.outputs
.insert(output_id, OutputInfo { audio, channels });
}

pub fn unregister_output(&mut self, output_id: &OutputId) {
Expand Down
20 changes: 14 additions & 6 deletions compositor_pipeline/src/audio_mixer/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub struct AudioMixingParams {
#[derive(Debug, Clone)]
pub struct InputParams {
pub input_id: InputId,
// [0, 1] range of input volume
pub volume: f32,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand All @@ -35,19 +37,25 @@ pub struct AudioSamplesBatch {
pub sample_rate: u32,
}

#[derive(Clone)]
pub enum AudioSamples {
Mono(Vec<i16>),
Stereo(Vec<(i16, i16)>),
}

impl AudioSamplesSet {
pub fn duration(&self) -> Duration {
self.end_pts.saturating_sub(self.start_pts)
}
}

impl AudioSamplesBatch {
pub fn end_pts(&self) -> Duration {
self.start_pts
+ Duration::from_secs_f64(self.samples.len() as f64 / self.sample_rate as f64)
}
}

#[derive(Clone)]
pub enum AudioSamples {
Mono(Vec<i16>),
Stereo(Vec<(i16, i16)>),
}

impl AudioSamples {
pub fn len(&self) -> usize {
match self {
Expand Down
17 changes: 12 additions & 5 deletions compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Pipeline {
inputs: HashMap::new(),
queue: Queue::new(opts.queue_options),
renderer,
audio_mixer: AudioMixer::new(),
audio_mixer: AudioMixer::new(opts.output_sample_rate),
is_started: false,
download_dir,
output_sample_rate: opts.output_sample_rate,
Expand Down Expand Up @@ -160,8 +160,12 @@ impl Pipeline {
return Err(RegisterInputError::AlreadyRegistered(input_id));
}

let (pipeline_input, receiver, port) =
new_pipeline_input(&input_id, input_options, &self.download_dir)?;
let (pipeline_input, receiver, port) = new_pipeline_input(
&input_id,
input_options,
&self.download_dir,
self.output_sample_rate,
)?;

self.inputs.insert(input_id.clone(), pipeline_input.into());
self.queue.add_input(&input_id, receiver, queue_options);
Expand Down Expand Up @@ -203,8 +207,11 @@ impl Pipeline {
self.outputs.insert(output_id.clone(), Arc::new(output));

if let Some(audio_opts) = audio.clone() {
self.audio_mixer
.register_output(output_id.clone(), audio_opts.initial);
self.audio_mixer.register_output(
output_id.clone(),
audio_opts.initial,
audio_opts.channels,
);
}

self.update_output(
Expand Down
19 changes: 11 additions & 8 deletions compositor_pipeline/src/pipeline/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use crate::{
audio_mixer::types::{AudioChannels, AudioSamplesBatch},
error::DecoderInitError,
queue::PipelineEvent,
};
use crate::{audio_mixer::types::AudioSamplesBatch, error::DecoderInitError, queue::PipelineEvent};

use self::{fdk_aac::FdkAacDecoder, ffmpeg_h264::H264FfmpegDecoder, opus_decoder::OpusDecoder};

Expand Down Expand Up @@ -37,6 +33,7 @@ impl Decoder {
input_id: InputId,
chunks: ChunksReceiver,
decoder_options: DecoderOptions,
output_sample_rate: u32,
) -> Result<(Self, DecodedDataReceiver), DecoderInitError> {
let DecoderOptions {
video: video_decoder_opt,
Expand Down Expand Up @@ -66,7 +63,13 @@ impl Decoder {
if let (Some(opt), Some(audio_receiver)) = (audio_decoder_opt, audio_receiver) {
let (sender, receiver) = bounded(10);
(
Some(AudioDecoder::new(opt, audio_receiver, sender, input_id)?),
Some(AudioDecoder::new(
opt,
output_sample_rate,
audio_receiver,
sender,
input_id,
)?),
Some(receiver),
)
} else {
Expand Down Expand Up @@ -94,13 +97,15 @@ pub enum AudioDecoder {
impl AudioDecoder {
pub fn new(
opts: AudioDecoderOptions,
output_sample_rate: u32,
chunks_receiver: Receiver<PipelineEvent<EncodedChunk>>,
samples_sender: Sender<PipelineEvent<AudioSamplesBatch>>,
input_id: InputId,
) -> Result<Self, DecoderInitError> {
match opts {
AudioDecoderOptions::Opus(opus_opt) => Ok(AudioDecoder::Opus(OpusDecoder::new(
opus_opt,
output_sample_rate,
chunks_receiver,
samples_sender,
input_id,
Expand Down Expand Up @@ -150,8 +155,6 @@ pub enum AudioDecoderOptions {

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OpusDecoderOptions {
pub sample_rate: u32,
pub channels: AudioChannels,
pub forward_error_correction: bool,
}

Expand Down
Loading

0 comments on commit 0103f3c

Please sign in to comment.