Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WinRT Futures should update their inner Waker #342

Closed
adumbidiot opened this issue Oct 18, 2020 · 11 comments · Fixed by #3142
Closed

WinRT Futures should update their inner Waker #342

adumbidiot opened this issue Oct 18, 2020 · 11 comments · Fixed by #3142
Labels
enhancement New feature or request

Comments

@adumbidiot
Copy link
Contributor

Opening this issue as a continuation of #322. While it is now possible to call poll on winrt futures more than once, consecutive poll calls will not update the Waker. As a result, these futures will hang forever if a future is transferred between tasks after being polled. A simple example of this behavior:

mod bindings {
    winrt::include_bindings!();
}

use bindings::*;
use crate::test_component::TestRunner;
use futures::FutureExt;

fn main() {
    let mut tokio_rt = tokio::runtime::Builder::new()
        .threaded_scheduler()
        .build()
        .unwrap();
    
    tokio_rt.block_on(async {
        let fut_1 = TestRunner::create_async_action(1000).unwrap();
        let fut_2 = TestRunner::create_async_action(2000).unwrap();
        
        let mut fut_1_fuse = fut_1.fuse();
        let mut fut_2_fuse = fut_2.fuse();
        
        let incomplete_future = futures::select! {
            res_1 = fut_1_fuse => fut_2_fuse,
            res_2 = fut_2_fuse => fut_1_fuse,            
        };
        
        println!("1 future complete, finishing other on different task...");
        
        // Work-around !Send future, this example still works with Send winrt futures and tokio::spawn.
        let local = tokio::task::LocalSet::new();
        
        local.spawn_local(async move {
            incomplete_future.await.unwrap();
        });
        
        local.await;
        
        println!("Both futures complete!");
    });
    
    println!("Done!");
}

This example will hang indefinitely and never complete. Unfortunately, I don't this its possible to fix this while maintaining the Future impl on IAsyncxxx as some extra memory is needed to store the shared reference to the Waker to update if needed.

In my opinion, IntoFuture is probably the best way forward, however it is currently unstable. Implementing this trait will allow await-ing on that value by implicity calling into_future, just as the IntoIterator trait allows iterating over a value that doesn't explicitly implement Iterator. In the meantime, maybe we could remove the Future impl from IAsyncxxx, replacing it with a method to get a wrapper struct implementing Future?

@kennykerr kennykerr added the enhancement New feature or request label Oct 19, 2020
@kennykerr
Copy link
Collaborator

Thanks for the repro, Nathaniel! That's super helpful. A wrapper may be the way to go, but I really don't enjoy all the verbiage Rust requires with all the unwrapping so I'll think about generating the projection to possibly return the wrapper directly so that it doesn't require another method call.

@adumbidiot
Copy link
Contributor Author

I was thinking that some of the verbiage could be avoided by making the conversion method not return a result, like this:

// Work-around being unable to implement new functions on foreign types
trait IAsyncActionExt {
    fn to_future(self) -> IAsyncActionFuture;
}

impl IAsyncActionExt for IAsyncAction {
    fn to_future(self) -> IAsyncActionFuture {
        IAsyncActionFuture {
            inner: self,
            shared_waker: Arc::new(Mutex::new(None)),
        }
    }
}

struct IAsyncActionFuture {
    inner: IAsyncAction,
    shared_waker: Arc<Mutex<Option<Waker>>>,
}

impl Future for IAsyncActionFuture {
    type Output = winrt::Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.inner.status()? == AsyncStatus::Started {
            let mut shared_waker = self.shared_waker.lock().unwrap();
            let shared_waker_is_none = shared_waker.is_none();
            *shared_waker = Some(cx.waker().clone());

            if shared_waker_is_none {
                let shared_waker = self.shared_waker.clone();
                self.inner.set_completed(AsyncActionCompletedHandler::new(
                    move |_sender, _args| {
                        // The waker will always be some here
                        shared_waker.lock().unwrap().take().unwrap().wake();
                        Ok(())
                    },
                ))?;
            }

            Poll::Pending
        } else {
            Poll::Ready(self.inner.get_results())
        }
    }
}

fn main() {
    let mut tokio_rt = tokio::runtime::Builder::new()
        .threaded_scheduler()
        .build()
        .unwrap();

    tokio_rt.block_on(async {
        let fut_1 = TestRunner::create_async_action(1000).unwrap().to_future();
        let fut_2 = TestRunner::create_async_action(2000).unwrap().to_future();

        let mut fut_1_fuse = fut_1.fuse();
        let mut fut_2_fuse = fut_2.fuse();

        let incomplete_future = futures::select! {
            res_1 = fut_1_fuse => fut_2_fuse,
            res_2 = fut_2_fuse => fut_1_fuse,
        };

        println!("1 future complete, finishing other on background task...");

        // Work-around !Send future, this example still works with Send winrt futures and tokio::spawn.
        let local = tokio::task::LocalSet::new();

        local.spawn_local(async move {
            incomplete_future.await.unwrap();
        });

        local.await;

        println!("Both futures complete!");
    });

    println!("Done!");
}

The only job of the conversion method is to essentially create a wrapper around an IAsyncxxx and a shared waker implementation, which is infallible. As a result, I think the only increased verbosity here is in the extra to_future method call, which we can partially hide when IntoFuture stabilizes. In this case though, i think the into_future verbosity is needed in order to call fuse on the created futures as FutureExt is only implemented for futures.

@goffrie
Copy link
Contributor

goffrie commented Sep 14, 2022

IntoFuture is stabilizing now, so this seems worth doing. I would also suggest implementing Drop for the wrapper future and having it call Cancel() on the IAsyncInfo, since that's consistent with semantics of other Rust futures.

@kennykerr
Copy link
Collaborator

Thanks for the reminder. Yes, now that IntoFuture is in 1.64 I'd love to see whether it can help to improve the integration of WinRT async with Rust async/await.

@kennykerr
Copy link
Collaborator

Closing this old issue - doesn't seem to be much demand for such API support. Let me know if you have thoughts to the contrary. 😊

@goffrie
Copy link
Contributor

goffrie commented Jul 2, 2024

This is still an issue and is a footgun for anyone who wants to use async APIs.

I might take a crack at implementing a wrapper using IntoFuture, but not this week.

@adumbidiot
Copy link
Contributor Author

I also would also like to note that the issue isn't specifically the code I sent above, but the fact that the waker isn't updated. This is a violation of the futures contract. Updates to async executors in the future may break the current code in strange ways, such as permanently hanging like shown in the code above.

If fixing this is off the table, would it be possible to at least gate the Future impl behind a feature flag? I don't want to accidentally use it when working with this crate.

@kennykerr
Copy link
Collaborator

#3142 removes the Future implementations. We can revisit in future... 😊

@kennykerr
Copy link
Collaborator

Reopening as there's been some renewed interest in futures support. Here's a sketch of how this can be implemented correctly to accommodate both Rust futures and WinRT async.

use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use windows::{core::*, Foundation::*, System::Threading::*};

// The actual async types can implement `IntoFuture` and return this `Future` with some extra state needed to
// keep things moving along. This would just need to be a bit more generic to handle the four kinds of async
// types.
struct AsyncFuture {
    // Represents the async execution and provides the virtual methods for setting up a `Completed` handler and
    // calling `GetResults` when execution is completed.
    inner: IAsyncAction,

    // Provides the `Status` virtual method and saves repeated calls to `QueryInterface` during polling.
    status: IAsyncInfo,

    // A shared waker is needed to keep the `Completed` handler updated.
    waker: Option<Arc<Mutex<Waker>>>,
}

impl AsyncFuture {
    fn new(inner: IAsyncAction) -> Self {
        Self {
            status: inner.cast().unwrap(),
            inner,
            waker: None,
        }
    }
}

unsafe impl Send for AsyncFuture {}
unsafe impl Sync for AsyncFuture {}

impl Future for AsyncFuture {
    type Output = Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // A status of `Started` just means async execution is still in flight. Since WinRT async is always
        // "hot start", if its not `Started` then its ready for us to call `GetResults` so we can skip all of
        // the remaining set up.
        if self.status.Status()? != AsyncStatus::Started {
            return Poll::Ready(self.inner.GetResults());
        }

        if let Some(shared_waker) = &self.waker {
            // We have a shared waker which means we're either getting polled again or been transfered to
            // another another execution context. Either way, we need to update the shared waker to make sure
            //  we've got the "current" waker.
            let mut guard = shared_waker.lock().unwrap();
            guard.clone_from(cx.waker());

            // It may be possible that the `Completed` handler acquired the lock and signaled the old waker
            // before we managed to acquire the lock to update it with the current waker. We check the status
            // again here just in case this happens.
            if self.status.Status()? != AsyncStatus::Started {
                return Poll::Ready(self.inner.GetResults());
            }
        } else {
            // If we don't have a saved waker it means this is the first time we're getting polled and should
            // create the shared waker and set up a `Completed` handler.
            let shared_waker = Arc::new(Mutex::new(cx.waker().clone()));
            self.waker = Some(shared_waker.clone());

            // Note that the handler can only be set once, which is why we need a shared waker in the first
            // place. On the other hand, the handler will get called even if async execution has already
            // completed, so we can just return `Pending` after setting the Completed handler.
            self.inner
                .SetCompleted(&AsyncActionCompletedHandler::new(move |_, _| {
                    shared_waker.lock().unwrap().wake_by_ref();
                    Ok(())
                }))?;
        };

        Poll::Pending
    }
}

async fn async_test() -> Result<()> {
    let object = ThreadPool::RunAsync(&WorkItemHandler::new(|_| {
        println!("work");
        Ok(())
    }))?;

    AsyncFuture::new(object).await
}

fn main() -> Result<()> {
    futures::executor::block_on(async_test())
}

@goffrie
Copy link
Contributor

goffrie commented Sep 1, 2024

This is now fixed by #3213, right?

@kennykerr
Copy link
Collaborator

Yes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants