Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to use shared variable in async callback #425

Open
HuakunShen opened this issue Apr 27, 2024 · 4 comments
Open

Unable to use shared variable in async callback #425

HuakunShen opened this issue Apr 27, 2024 · 4 comments

Comments

@HuakunShen
Copy link

Using the regular version works

fn main() {
    let (tx, rx) = channel::<String>();
    let tx_clone = tx.clone();
    let callback = move |payload: Payload, socket: RawClient| {
        tx_clone.send("...".to_string());
    };
    let callback2 = move |payload: Payload, socket: RawClient| {
        tx.send("...".to_string());
    };
    let socket = ClientBuilder::new("")
        .on("evt1", callback)
        .on("evt2", callback2)
        .connect()
        .expect("Connection failed");
}

but if I switch to use the async version

#[tokio::main]
async fn main() {
    let (tx, rx) = channel::<String>();
    let tx_clone = tx.clone();
    let callback = |payload: Payload, socket: Client| {
        async move {
            tx_clone.send("...".to_string());
        }.boxed()
    };
    let callback2 = |payload: Payload, socket: Client| {
        async move {
            tx.send("...".to_string());
        }.boxed()
    };
    let socket = ClientBuilder::new("")
        .on("evt1", callback)
        .on("evt2", callback2)
        .connect()
        .await
        .expect("Connection failed");
}

Using the Async version I always get error:

expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
this closure implements `FnOnce`, not `FnMut`

Couldn't figure out how to fix this. Also could not find examples on this.

All examples I could find are the simplest where callback doesn't need to access external variables.

@kitgxrl
Copy link

kitgxrl commented Apr 29, 2024

This can be fixed by an additional clone:

let tx_clone = tx.clone();
let callback = |payload: Payload, socket: Client| {
  let tx_clone = tx_clone.clone();
  async move {
    tx_clone.send("...".to_string());
  }.boxed()
};
let callback2 = |payload: Payload, socket: Client| {
  let tx = tx.clone();
  async move {
    tx.send("...".to_string());
  }.boxed()
};

this is untested but how I recall fixing this. It's not by any means ideal but #363 is likely related

@HuakunShen
Copy link
Author

HuakunShen commented Apr 30, 2024

Thanks! I figured this out.

But what I ended up doing is use on_any and channel to transfer all event handling to another loop outside the closures to avoid variables moving. It's much simpler.

Here is how I did it.

#[derive(Debug)]
pub struct EventMessage {
    pub event: String,
    pub payload: Payload,
}

let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
let (evt_tx, mut evt_rx) = tokio::sync::mpsc::channel::<EventMessage>(1);

let socket = ClientBuilder::new(SERVER_URL)
    .on(Event::Connect, |_, _| async move {}.boxed())
    .on_any(move |evt, payload, _| {
        let evt_tx = evt_tx.clone();
        async move {
            evt_tx
                .send(EventMessage {
                    event: evt.to_string(),
                    payload,
                })
                .await
                .unwrap();
        }
        .boxed()
    })
    .on(Event::Error, |err, _: Client| {
        async move {
            eprintln!("Error: {:#?}", err);
        }
        .boxed()
    })
    .connect()
    .await
    .expect("Connection failed");


loop {
    tokio::select! {
        _ = done_rx.recv() => {
            break;
        }
        Some(evt) = evt_rx.recv() => {
            // Handle event received from evt_rx
            match evt.event.as_str() {
                "evt1" => {...}
                "evt2" => {...}
            }
        }
        _ = tokio::signal::ctrl_c() => {
            break;
        }
    };
}

@HuakunShen
Copy link
Author

HuakunShen commented Apr 30, 2024

@1c3t3a
I wonder if it's possible to update the design to use channels for event handling rather than closures. The socket.connect().await can simply return a channel + a socket, then we can read events and payloads directly from the channel to avoid dealing with closures and cloning and move variables to heap.

@shenjackyuanjie
Copy link
Contributor

about this, I'm also thinking about another thing, how to share a "status data" between different callback function?
like in axum you can use a State to share state between callbacks
how can I do that over here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants