Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable buffer size #512

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

PetrGlad
Copy link

@PetrGlad PetrGlad commented Oct 1, 2023

I needed to configure smaller buffer size to reduce playback delays (for a MIDI/VST application), and could not find a better way than to patch rodio. Notes:

  • This is a breaking change, I needed to pass the buffer size to new_output_stream_with_format, it is likely possible to rewrite this, keeping that function intact if necessary.
  • Using references to pass parameters is not strictly required, but the stream constructor does not need to own them, and it made it a bit easier to required some parameters after calling
  • SampleFormat probably should be a part of StreamConfig. This will streamline the construction parameters. It is not clear to me what this affects, though.

The version from this patch works for me. I am willing to rework the request if there is a better idea how to do this.

@dvdsk dvdsk added the breaking Proposed change that would break the public API label Mar 27, 2024
@vyfor vyfor mentioned this pull request Jul 4, 2024
src/stream.rs Show resolved Hide resolved
@dvdsk
Copy link
Collaborator

dvdsk commented Sep 15, 2024

Just to give you all an update, configurable buffer size will be merged one day. Its just that I have my plate full atm and want to do as many breaking changes at once, such that I can write a porting/update guide and do an announcement on the various platforms. Before I do that I want to overhaul

  • The stream being returned (needing to keep a variable you do not use is annoying and a rodio footgun).
  • The Sink only processing changes every 5ms
  • The compile time features are confusing (no clear distinction between containers & codecs)

And that will take a while once I have the time or someone else pitches in. Therefore thanks @PetrGlad for this PR, I suggest anyone who urgently needs this change to rely on their fork or this PR for now.

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 16, 2024

@PetrGlad I've been listing all the breaking changes planned. What is your opinion on replacing all the try_from factory functions with a single builder?

@PetrGlad
Copy link
Author

@dvdsk Yes, this looks to me like a good idea, especially if the builder instance can be copied/passed around. In my case I was happy with the defaults and only wanted to change the buffer size. In that case a builder can help setting the defaults. Maybe the builder should have methods that can provide replacements for OutputStream::try_from_device (something like ConfigBuilder::from(conf: SupportedStreamConfig)):

let conf_builder = rodio::OutputConfigBuilder::from(cpal_suported_config)
    .buffer_size(13);
let output = rodio::OutputStream::open(conf_builder); 

or

let conf = rodio::OutputConfigBuilder::from(cpal_suported_config)
    .buffer_size(13)
    .build();
let output = rodio::OutputStream::open(conf); 

Another option could be to add all possible options to some config struct in rodio and implement Default, From<cpal::SupportedStreamConfig>, and From<cpal::StreamConfig> for it to fill defaults, and let users to adjust this struct's fields before they pass it to the stream initialization function. I think of the necessary parameters only stream's sample format is not part of cpal::StreamConfig. If there are not too many parameters this could be simpler than builder.

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 18, 2024

Most builders are named following Builder. So here that would be OutputStreamBuilder.

Then there is the question of the factory name for the builder. I see new and default used a lot. default seems to make the most sense since we are replacing:

    /// Return a new stream & handle using the default output device.
    ///
    /// On failure will fallback to trying any non-default output devices.
    pub fn try_default() -> ..  {..}

    /// Returns a new stream & handle using the given output device and the default output
    /// configuration.
    pub fn try_from_device(device: &cpal::Device) -> .. {..}

    /// Returns a new stream & handle using the given device and stream config.
    ///
    /// If the supplied `SupportedStreamConfig` is invalid for the device this function will
    /// fail to create an output stream and instead return a `StreamError`
    pub fn try_from_device_config(
        device: &cpal::Device,
        config: SupportedStreamConfig) -> ..  {..}

The first would become:
OutputStreamBuilder::default().build()
then the second becomes:
OutputStreamBuilder::default().with_device(device).build()
then:
OutputStreamBuilder::default().with_device(device).with(config).build()
and finally something to configure the buffer size

Instead of build we could use something like open as it is more semantically correct. build is well known however and serves as a second beacon to the reader that this is a builder pattern. This aids readability.

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 18, 2024

Instead of build we could use something like open as it is more semantically correct. build is well known however and serves as a second beacon to the reader that this is a builder pattern. This aids readability.

on second thought open might help bring the idea of a stream across... I have no clue what is better, ill leave that too you :)

@PetrGlad
Copy link
Author

PetrGlad commented Sep 20, 2024

@dvdsk I will sketch the new initialization in a separate branch.
Also it is tempting to get rid of separate reference that is used to keep the stream open. But this seems to be a bigger change than I was prepared for. I think to have that that users should have Arc to the stream if they need it. They may downgrade it themselves if they want to. If I were designing it the output stream would be a sink for single source and one would have to use a mixer to play several sounds at once, and use source-sink connections everywhere else. That probably can be not super efficient, though.

@PetrGlad
Copy link
Author

PetrGlad commented Sep 20, 2024

I wish I had not looked inside, because now I have questions :)

It is not obvious why UniformSourceIterator should be re-bootstrapped periodically, unless the Source may produce different stream formats in different frames. If that is true, this decision complicates the implementation a lot, I would limited a Source's output to always be of the same format. Or was there other reason for this?
Why is there a need to split sources into frames at all?

In the mixer code

/// The input of the mixer.
pub struct DynamicMixerController<S> {
....
     ////// The doc says this is mixer's input but these fields
    ////// are for output format. Maybe they should be renamed.
    channels: u16,
    sample_rate: u32,
}

By the way why the mixer is called "dynamic"? Because sources can be added dynamically not at the initialization time?

The following requires the states of self.pending_sources and self.has_pending to be consistent.

self.pending_sources
            .lock()
            .unwrap()
            .push(Box::new(uniform_source) as Box<_>);
self.has_pending.store(true, Ordering::SeqCst);

Is has_pending only for optimization here? Since its value can be inferred from self.pending_sources.is_empty() and self.pending_sources is already behind a mutex.
I suspect that races are possible here. I think at least we should keep the lock to self.pending_sources while the flag is updated.
If adding sources and play should be in different threads, I'd replaced this with a channel. Clients then can add new sources to this channel and playback side can include them at it's own pace.

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 20, 2024

Also it is tempting to get rid of separate reference that is used to keep the stream open

That is planned, I suggest we do that in a different PR #614

I am really sorry it seems like I have edited one of your messages instead of quote replying..... I ll see if I can restore things

Edit: fixed

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 20, 2024

Also it is tempting to get rid of separate reference that is used to keep the stream open.

That is planned, I suggest we do that in a different PR #614

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 20, 2024

Why is there a need to split sources into frames at all?

Some examples:

  • Some audio compression formats dynamically change the sample rate
  • A single audio file might contain multiple 'tracks' some of which might be mono
  • A source could be a composed source that chains together multiple audio files each with different sample rate and a different number of channels

You should read: https://docs.rs/rodio/latest/rodio/source/trait.Source.html It helped me out when I began with rodio

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 20, 2024

By the way why the mixer is called "dynamic"? Because sources can be added dynamically not at the initialization time?

I think so, I have never really read the mixer source

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 20, 2024

Is has_pending only for optimization here?

Yes, since its needed in the Iterator::next call. That is called sample_time * n_channels times a second which tends to be a lot :). A Mutex is a bit slower then checking an atomic

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 20, 2024

I suspect that races are possible here. I think at least we should keep the lock to self.pending_sources while the flag is updated.

There is only one DynamicMixerController at the time (it can not be cloned), therefore I think its safe.

@PetrGlad
Copy link
Author

PetrGlad commented Sep 23, 2024

@dvdsk Regarding DynamicMixerController, as I understand DynamicMixerController part is used from clients' code and thread, while DynamicMixer is used from cpal's thread. So races are possible but harmless here since normally the source is polled frequently. The worst case scenario, when new source may be stuck indefinitely is avoided because start_pending_sources holds the self.input.pending_sources lock until after the flag is updated:

fn start_pending_sources(&mut self) {
  let mut pending = grabbing_the_lock_here();
  ..........
  /* By the way, this line makes `self.still_pending` as a field of `Self` useless
   unless it is there to avoid extra memory allocations.
   The remaining sources are shoved back to the controller right away.
   One can use iteration+swap_remove instead  (see an alternative version below).
  */
  std::mem::swap(&mut self.still_pending, &mut pending);

  let has_pending = !pending.is_empty();
  // If  not the lock the new sources could have been added here while has_pending==false.
  /* I am not familiar with atomics Ordering semantics, but if the following line
    should ensure the lock is released after the flag update is complete.
  */
  self.input.has_pending.store(has_pending, Ordering::SeqCst);
  // The 'pending' lock is released here        
}

I would still keep the lock longer here just to be on the safe side:

pub fn add<T>(&self, source: T)
where
    T: Source<Item=S> + Send + 'static,
{
    let uniform_source = UniformSourceIterator::new(source, self.channels, self.sample_rate);
    let mut pending = self.pending_sources
        .lock()
        .unwrap(); // By the way, this could return an error instead of panicking.
    pending.push(Box::new(uniform_source) as Box<_>);
    self.has_pending.store(true, Ordering::SeqCst);
}

Regarding the filter loop implementation, it could have been the following (DynamicMixer::still_pending can be removed then)

let mut i = 0;
while i < pending.len() {
    let in_step = self.sample_count % pending[i].channels() as usize == 0;
    if in_step {
        self.current_sources.push(pending.swap_remove(i));
    } else {
        i += 1;
    }
}

I see the same pattern with swapping vectors is used in DynamicMixer::sum_current_sources. I could only imagine that keeping Self::current_sources can only be useful to avoid repeatable memory allocations. Maybe that implementation is faster but I could also imagine this version

fn sum_current_sources(&mut self) -> S {
    let mut sum = S::zero_value();
    let mut i = 0;
    while i < self.current_sources.len() {
        if let Some(value) = self.current_sources[i].next() {
            sum = sum.saturating_add(value);
            i += 1;
        } else {
            self.current_sources.swap_remove(i);
        }
    }
    sum
}

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 23, 2024

I could only imagine that keeping Self::current_sources can only be useful to avoid repeatable memory allocations. Maybe that implementation is faster but I could also imagine this version

Or this, throwing out the separate function entirely:

    #[inline]
    fn next(&mut self) -> Option<S> {
        if self.input.has_pending.load(Ordering::SeqCst) {
            self.start_pending_sources();
        }

        self.sample_count += 1;

        /* let sum = self.sum_current_sources();

        if self.current_sources.is_empty() {
            None
        } else {
            Some(sum)
        }*/
        self.current_sources
              .iter_mut()
              .filter_map(Source::next)
              .reduce(S::zero_value(), |sum, source| sum.saturating_add(source) );
    }

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 23, 2024

regarding start_pending_sources. The only reason to keep the sources around (the pushing them back to still_pending) seems to be to synchronize channels.

A less lazy approach does away with all that complexity.

fn start_pending_sources(&mut self) {
  let mut pending = ...
  for source in drain {
     skip samples till in sync.
     add to current sources.
  }
}

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 23, 2024

yeah, the dynamic mixer could use a refactor. There are some tests but we would ideally have some benchmarks before replacing stuff in there. Audio can be tricky and I did not write/benchmark nor test this. I do not know why certain choices where made but it all does seem to work, even though I would design it differently.

If you want to discuss this further, or refactor this code (you are welcome too though we would need benchmarks preferable automated). Please open a new issue since its kinda unrelated to making the buffer size configurable.

@PetrGlad
Copy link
Author

PetrGlad commented Sep 23, 2024

This will calculate the output value,

self.current_sources
  .iter_mut()
  .filter_map(Source::next)
  .reduce(S::zero_value(), |sum, source| sum.saturating_add(source) );

but as I understand we also should discard sources that are completed (the ones that returned None). This is a side effect of sum_current_sources that I tried to keep in the suggested version.

@PetrGlad
Copy link
Author

PetrGlad commented Sep 23, 2024

regarding start_pending_sources. The only reason to keep the sources around (the pushing them back to still_pending) seems to be to synchronize channels.

A less lazy approach does away with all that complexity.

fn start_pending_sources(&mut self) {
  let mut pending = ...
  for source in drain {
     skip samples till in sync.
     add to current sources.
  }
}

Yes, I think my version does this.

let mut i = 0;
while i < pending.len() {
    let in_step = self.sample_count % pending[i].channels() as usize == 0;
    if in_step {
        self.current_sources.push(pending.swap_remove(i));
    } else {
        i += 1;
    }
}

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 24, 2024

we also should discard sources that are completed (the ones that returned None).

only because that was used in next() for self.current_sources.is_empty(). I removed the need for that. The iterator chain lives directly in next(). The is empty then return None check is removed since the iterator chain will return None anyway once every source does.

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 24, 2024

Yes, I think my version does this.

I'll clarify what I meant by writing it out.

fn start_pending_sources(&mut self) {
    let mut pending = self.input.pending_sources.lock().unwrap();

    for mut source in pending.drain(..) {
        let to_skip = self.sample_count % source.channels() as usize;
        for _ in 0..to_skip {
            source.next();
        }
        self.current_sources.push(source);
    }
    // false positive/negative is okay so ordering can be relaxed
    // this just needs to be atomic
    self.input.has_pending.store(false, Ordering::Relaxed);
}

@PetrGlad
Copy link
Author

PetrGlad commented Sep 24, 2024

@dvdsk Ah, got it. Yes this should work. I wonder if users may care about those skipped samples.
BTW, since a Source is an Iterator

....
let to_skip = self.sample_count % source.channels() as usize;
self.current_sources.push(source.skip(to_skip));
...

Although .skip() returns lazy wrapper, it does not call .next() right away.

Also since there is no pending_sources field anymore, current_sources can be renamed to sources

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 24, 2024

I wonder if users may care about those skipped samples.

at 44100 samples per second I hope not. If they do need such precision the current dynamic mixer wont work for them anyway.

BTW, since a Source is an Iterator

yeah thought about that, but afraid it might make it slower, cant base that on anything nor benchmarked it so thats probably a little silly of me. I should not be so arrogant to think I know better then the compiler 😓.

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 24, 2024

This is a side effect of sum_current_sources that I tried to keep in the suggested version.

actually your right, that is an issue that thats missing. Kinda an issue when you keep adding sources and the mixer grows infinity.

@PetrGlad
Copy link
Author

PetrGlad commented Sep 24, 2024

@dvdsk I have looked at the mixer because in the current version it is actually the interface to the stream, it is always added as output. So therefore to me #613 is related to stream initialization. I would like to return something simpler and let users to configure necessary functions. output stream builder can provide necessary conveniences.
See mix_multiple_sources.rs for motivation. This example actually builds sine_wave -> take_duration -> amplify -> mixer -> sink -> mixer -> cpal_callback. One mixer should suffice, but at the moment users do not have a choice.

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 24, 2024

One mixer should suffice, but at the moment users do not have a choice.

I think that second mixer is only there because the user is using it with the Sink right? The Sink provides a ton of features for pausing/skipping sources etc. If you want those to work with your mixed sources you need a second mixer thats passed in to Sink.

While the OutputStream needs to be a mixer to combine the default output: silence (that S::zero_value()) with whatever the user wants whether that is through the Sink or by using the OutputStream directly.

There is an argument to be made that this is overkill for the Sink and performance could be improved by using something else for the Sink. But to make that argument we need benchmarks to support the wins, as it will make the codebase more complex since you can then no longer use OutputStreamHandle/OutputStream for Sink.

@PetrGlad
Copy link
Author

In mixer a user may be adding new sources indefinitely, so the sources collection will grow unbounded. Moreover, all the completed sources will have to be checked by the filter every time the sum is calculated. It is probably not a usual but situation, but still possible. One may decide not to worry about that, though.

I could not find agreements regarding stopped sources. E.g. cpal output stream callback keeps trying to .next() even after None is returned. I have looked a the std::iter::Iterator API documentation, and this is not a violation of the contract. Should other sinks do the same?

@PetrGlad
Copy link
Author

Output stream callback does not use Silence from mixer, it can only receive Nones and work fine with that. See for example CpalDeviceExt::new_output_stream_with_format

self.build_output_stream::<f32, _, _>(
    &config,
    move |data, _| {
        data.iter_mut()
            .for_each(|d| *d = samples.next().unwrap_or(0f32))
    },
    error_callback,
    None,
),

Removing unnecessary mixer is not strictly about performance, but also simplifying the setups and giving users a choice. To me it looks like OutputStreamHandle::play_once and play_raw are only useful for hello-world examples. I think such conveniences can be implemented on top of the underlying API. For example by a stream builder.

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 24, 2024

I could not find agreements regarding stopped sources. E.g. cpal output stream callback keeps trying to .next() even after None is returned.

I dont know what happens when you pass in None. It might be a loud pop or something. But it might be nothing. I was not there when most of rodio was written so like you I am guessing at the reasons why things are the way they are.

Since I know little or nothing of cpal I am not really qualified to review/merge any code touching the interaction with cpal.

@PetrGlad
Copy link
Author

PetrGlad commented Sep 24, 2024

The question about source iterators was, do we expect a source to yield None for a wile and then again start producing samples? As I mentioned the stream callback does rely in this treating None as 0 value. But in other places yielding None from Source::next() is an end of stream assuming that source is now empty and can be discarded.

cpal only polls for new samples via callback, which in turn invokes .next() method on the final rodio source to get enough samples to fill the buffer. The callback is set up here. So this is not really about cpal but what rodio expects from its Source implementations.

@dvdsk
Copy link
Collaborator

dvdsk commented Sep 24, 2024

I could not find agreements regarding stopped sources.

I do not think its written down anywhere however a Source returning None should not be tried again. If you do retry them that will lead wrong behaviour like the Empty source never ending.

Note that the dynamicmixer (which is retried in the cpal callback) is not a Source and can therefore return None and then Some again

Edit: it is now documented in #58b61f6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking Proposed change that would break the public API
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants