Skip to content

Commit

Permalink
Remove all Pause, Play and Close events
Browse files Browse the repository at this point in the history
A follow up to [this comment](RustAudio#288 (comment)).
  • Loading branch information
mitchmindtree committed Jun 23, 2019
1 parent b1539c5 commit 26f7e99
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 121 deletions.
12 changes: 2 additions & 10 deletions src/alsa/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use PlayStreamError;
use SupportedFormatsError;
use SampleFormat;
use SampleRate;
use StreamCloseCause;
use StreamData;
use StreamError;
use StreamEvent;
Expand Down Expand Up @@ -473,7 +472,7 @@ impl EventLoop {
let run_context = &mut *run_context;

'stream_loop: loop {
process_commands(run_context, callback);
process_commands(run_context);

reset_descriptors_with_pending_command_trigger(
&mut run_context.descriptors,
Expand Down Expand Up @@ -827,22 +826,16 @@ impl EventLoop {
}

// Process any pending `Command`s within the `RunContext`'s queue.
fn process_commands(
run_context: &mut RunContext,
callback: &mut dyn FnMut(StreamId, StreamEvent),
) {
fn process_commands(run_context: &mut RunContext) {
for command in run_context.commands.try_iter() {
match command {
Command::DestroyStream(stream_id) => {
run_context.streams.retain(|s| s.id != stream_id);
let event = StreamCloseCause::UserDestroyed.into();
callback(stream_id, event);
},
Command::PlayStream(stream_id) => {
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id)
{
callback(stream_id, StreamEvent::Play);
unsafe {
alsa::snd_pcm_pause(stream.channel, 0);
}
Expand All @@ -853,7 +846,6 @@ fn process_commands(
if let Some(stream) = run_context.streams.iter_mut()
.find(|stream| stream.can_pause && stream.id == stream_id)
{
callback(stream_id, StreamEvent::Pause);
unsafe {
alsa::snd_pcm_pause(stream.channel, 1);
}
Expand Down
47 changes: 8 additions & 39 deletions src/coreaudio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use PlayStreamError;
use SupportedFormatsError;
use SampleFormat;
use SampleRate;
use StreamCloseCause;
use StreamData;
use StreamEvent;
use SupportedFormat;
Expand Down Expand Up @@ -336,9 +335,7 @@ enum UserCallback {
Active(&'static mut (FnMut(StreamId, StreamEvent) + Send)),
// A queue of events that have occurred but that have not yet been emitted to the user as we
// don't yet have a callback to do so.
Inactive {
pending_events: Vec<(StreamId, StreamEvent<'static>)>
},
Inactive,
}

struct StreamInner {
Expand Down Expand Up @@ -440,7 +437,7 @@ impl EventLoop {
#[inline]
pub fn new() -> EventLoop {
EventLoop {
user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })),
user_callback: Arc::new(Mutex::new(UserCallback::Inactive)),
streams: Mutex::new(Vec::new()),
}
}
Expand All @@ -451,20 +448,10 @@ impl EventLoop {
{
{
let mut guard = self.user_callback.lock().unwrap();
let pending_events = match *guard {
UserCallback::Inactive { ref mut pending_events } => {
mem::replace(pending_events, vec![])
}
UserCallback::Active(_) => {
panic!("`EventLoop::run` was called when the event loop was already running");
}
};

let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback;
for (stream_id, event) in pending_events {
callback(stream_id, event);
if let UserCallback::Active(_) = *guard {
panic!("`EventLoop::run` was called when the event loop was already running");
}

let callback: &mut (FnMut(StreamId, StreamEvent) + Send) = &mut callback;
*guard = UserCallback::Active(unsafe { mem::transmute(callback) });
}

Expand All @@ -474,7 +461,7 @@ impl EventLoop {
}

// It is critical that we remove the callback before returning (currently not possible).
// *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] };
// *self.user_callback.lock().unwrap() = UserCallback::Inactive;
}

fn next_stream_id(&self) -> usize {
Expand Down Expand Up @@ -698,7 +685,7 @@ impl EventLoop {
let data_slice = slice::from_raw_parts(data as *const $SampleType, data_len);
let callback = match *user_callback {
UserCallback::Active(ref mut cb) => cb,
UserCallback::Inactive { .. } => return Ok(()),
UserCallback::Inactive => return Ok(()),
};
let unknown_type_buffer = UnknownTypeInputBuffer::$SampleFormat(::InputBuffer { buffer: data_slice });
let stream_data = StreamData::Input { buffer: unknown_type_buffer };
Expand Down Expand Up @@ -770,7 +757,7 @@ impl EventLoop {
let data_slice = slice::from_raw_parts_mut(data as *mut $SampleType, data_len);
let callback = match *user_callback {
UserCallback::Active(ref mut cb) => cb,
UserCallback::Inactive { .. } => {
UserCallback::Inactive => {
for sample in data_slice.iter_mut() {
*sample = $equilibrium;
}
Expand Down Expand Up @@ -802,33 +789,18 @@ impl EventLoop {
Ok(StreamId(stream_id))
}

fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) {
let mut guard = self.user_callback.lock().unwrap();
match *guard {
UserCallback::Active(ref mut callback) => callback(id, event),
UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)),
}
}

pub fn destroy_stream(&self, stream_id: StreamId) {
{
let mut streams = self.streams.lock().unwrap();
streams[stream_id.0] = None;
}
// Emit the `Close` event to the user.
let event = StreamEvent::Close(StreamCloseCause::UserDestroyed);
self.emit_or_enqueue_event(stream_id, event);
}

pub fn play_stream(&self, stream_id: StreamId) -> Result<(), PlayStreamError> {
let mut streams = self.streams.lock().unwrap();
let stream = streams[stream_id.0].as_mut().unwrap();

if !stream.playing {
// Emit the `Play` event to the user. This should not block, as the stream should not
// yet be playing if this is being called.
self.emit_or_enqueue_event(stream_id, StreamEvent::Play);

if let Err(e) = stream.audio_unit.start() {
let description = format!("{}", std::error::Error::description(&e));
let err = BackendSpecificError { description };
Expand All @@ -850,9 +822,6 @@ impl EventLoop {
return Err(err.into());
}

// Emit the `Pause` event to the user.
self.emit_or_enqueue_event(stream_id, StreamEvent::Pause);

stream.playing = false;
}
Ok(())
Expand Down
61 changes: 2 additions & 59 deletions src/emscripten/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::mem;
use std::os::raw::c_void;
use std::slice::from_raw_parts;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use stdweb;
use stdweb::Reference;
use stdweb::unstable::TryInto;
Expand All @@ -16,7 +16,6 @@ use Format;
use PauseStreamError;
use PlayStreamError;
use SupportedFormatsError;
use StreamCloseCause;
use StreamData;
use StreamEvent;
use SupportedFormat;
Expand All @@ -33,22 +32,6 @@ use UnknownTypeOutputBuffer;

pub struct EventLoop {
streams: Mutex<Vec<Option<Reference>>>,
// The `EventLoop` requires a handle to the callbacks in order to be able to emit necessary
// events for `Play`, `Pause` and `Close`.
user_callback: Arc<Mutex<UserCallback>>
}

enum UserCallback {
// When `run` is called with a callback, that callback will be stored here.
//
// It is essential for the safety of the program that this callback is removed before `run`
// returns (not possible with the current CPAL API).
Active(&'static mut (dyn FnMut(StreamId, StreamEvent) + Send)),
// A queue of events that have occurred but that have not yet been emitted to the user as we
// don't yet have a callback to do so.
Inactive {
pending_events: Vec<(StreamId, StreamEvent<'static>)>
},
}

impl EventLoop {
Expand All @@ -57,38 +40,13 @@ impl EventLoop {
stdweb::initialize();
EventLoop {
streams: Mutex::new(Vec::new()),
user_callback: Arc::new(Mutex::new(UserCallback::Inactive { pending_events: vec![] })),
}
}

#[inline]
pub fn run<F>(&self, mut callback: F) -> !
pub fn run<F>(&self, callback: F) -> !
where F: FnMut(StreamId, StreamEvent) + Send,
{
// Retrieve and process any pending events.
//
// Then, set the callback ready to be shared between audio processing and the event loop
// handle.
{
let mut guard = self.user_callback.lock().unwrap();
let pending_events = match *guard {
UserCallback::Inactive { ref mut pending_events } => {
mem::replace(pending_events, vec![])
}
UserCallback::Active(_) => {
panic!("`EventLoop::run` was called when the event loop was already running");
}
};

let callback: &mut (dyn FnMut(StreamId, StreamEvent) + Send) = &mut callback;
for (stream_id, event) in pending_events {
callback(stream_id, event);
}

*guard = UserCallback::Active(unsafe { mem::transmute(callback) });
}


// The `run` function uses `set_timeout` to invoke a Rust callback repeatidely. The job
// of this callback is to fill the content of the audio buffers.

Expand Down Expand Up @@ -164,9 +122,6 @@ impl EventLoop {
set_timeout(|| callback_fn::<F>(user_data_ptr as *mut _), 10);

stdweb::event_loop();

// It is critical that we remove the callback before returning (currently not possible).
// *self.user_callback.lock().unwrap() = UserCallback::Inactive { pending_events: vec![] };
}

#[inline]
Expand All @@ -191,19 +146,9 @@ impl EventLoop {
Ok(StreamId(stream_id))
}

fn emit_or_enqueue_event(&self, id: StreamId, event: StreamEvent<'static>) {
let mut guard = self.user_callback.lock().unwrap();
match *guard {
UserCallback::Active(ref mut callback) => callback(id, event),
UserCallback::Inactive { ref mut pending_events } => pending_events.push((id, event)),
}
}

#[inline]
pub fn destroy_stream(&self, stream_id: StreamId) {
self.streams.lock().unwrap()[stream_id.0] = None;
let event = StreamEvent::Close(StreamCloseCause::UserDestroyed);
self.emit_or_enqueue_event(stream_id, event);
}

#[inline]
Expand All @@ -213,7 +158,6 @@ impl EventLoop {
.get(stream_id.0)
.and_then(|v| v.as_ref())
.expect("invalid stream ID");
self.emit_or_enqueue_event(stream_id, StreamEvent::Play);
js!(@{stream}.resume());
Ok(())
}
Expand All @@ -226,7 +170,6 @@ impl EventLoop {
.and_then(|v| v.as_ref())
.expect("invalid stream ID");
js!(@{stream}.suspend());
self.emit_or_enqueue_event(stream_id, StreamEvent::Pause);
Ok(())
}
}
Expand Down
6 changes: 0 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,6 @@ pub enum StreamData<'a> {
pub enum StreamEvent<'a> {
/// Some data is ready to be processed.
Data(StreamData<'a>),
/// The stream has received a **Play** command.
Play,
/// The stream has received a **Pause** command.
///
/// No **Data** events should occur until a subsequent **Play** command is received.
Pause,
/// The stream was closed, either because the user destroyed it or because of an error.
///
/// The stream event callback will not be called again after this event occurs.
Expand Down
7 changes: 0 additions & 7 deletions src/wasapi/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use Format;
use PauseStreamError;
use PlayStreamError;
use SampleFormat;
use StreamCloseCause;
use StreamData;
use StreamError;
use StreamEvent;
Expand Down Expand Up @@ -758,8 +757,6 @@ fn process_commands(
run_context.streams.remove(p);
},
}
let event = StreamEvent::Close(StreamCloseCause::UserDestroyed);
callback(stream_id, event);
},
Command::PlayStream(stream_id) => {
match run_context.streams.iter().position(|s| s.id == stream_id) {
Expand All @@ -772,8 +769,6 @@ fn process_commands(
match stream_error_from_hresult(hresult) {
Ok(()) => {
run_context.streams[p].playing = true;
let event = StreamEvent::Play;
callback(stream_id, event);
}
Err(err) => {
let event = StreamEvent::Close(err.into());
Expand All @@ -797,8 +792,6 @@ fn process_commands(
match stream_error_from_hresult(hresult) {
Ok(()) => {
run_context.streams[p].playing = false;
let event = StreamEvent::Pause;
callback(stream_id, event);
}
Err(err) => {
let event = StreamEvent::Close(err.into());
Expand Down

0 comments on commit 26f7e99

Please sign in to comment.