-
Heyo! I've been trying kameo and so far I am absolutely loving the library, although i can't seem to wrap my brain around how to use the stream example with tokio-tungstenite websocket streams. I'm trying to implement a client for a WebSocket system that sends messages to the client and receives them from a client, I want to be able to send the incoming messages from the stream to the person that created the actor and vice versa. I've tried to just use tokio_tungstenite::connect_async return value with ActorRef::attach_stream but that just resulted in errors I don't understand. let (ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
let stream = Box::pin(ws_stream);
actor_ref.attach_stream(stream, "1st stream", "1st stream"); Does anyone have an idea? I am absolutely stumped. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
Hi @bananasov, thanks for asking this! The errors returned are definitely quite obscure. Essentially, in order to meet the requirements of Looking into the use kameo::message::{Context, Message, StreamMessage};
impl
Message<
StreamMessage<
Result<tungstenite::protocol::Message, tungstenite::error::Error>,
&'static str,
&'static str,
>,
> for MyActor
{
type Reply = Result<(), tungstenite::error::Error>;
async fn handle(&self, msg: StreamMessage, _ctx: Context<'_, Self, Self::Reply>) -> Self::Reply {
match msg {
StreamMessage::Next(ws_result) => {
let ws_msg = ws_result?;
// handle the websocket message
}
StreamMessage::Started(started_msg) => /* handle started */,
StreamMessage::Finished(finished_msg) => /* handle finished */,
}
}
} With this implementation, you should be able to call Hopefully this clears it up and solves the errors you're having |
Beta Was this translation helpful? Give feedback.
Hi @bananasov, thanks for asking this! The errors returned are definitely quite obscure.
Essentially, in order to meet the requirements of
attach_stream
, your actor needs to implementMessage<StreamMessage<T, S, F>>
. StreamMessage is a type provided by kameo, and is essentially just a wrapper type around the stream items.S
andF
can be any type you want for sending through context when the stream starts and finishes.Looking into the
Stream
impl ofWebSocketStream
, theItem
type isResult<Message, Error>
, which indicates your actor should have the following impl block: