Skip to content

Commit

Permalink
wip: Remove the Bin extractor and have Data implement FromMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
kelnos committed Mar 9, 2024
1 parent 96f1690 commit 576baa5
Show file tree
Hide file tree
Showing 18 changed files with 44 additions and 62 deletions.
2 changes: 1 addition & 1 deletion e2e/socketioxide/socketioxide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {
// keep this handler async to test async message handlers
socket.on(
"message-with-ack",
|Data::<PayloadValue>(data), ack: AckSender| async move {
|ack: AckSender, Data::<PayloadValue>(data)| async move {
info!("Received event: {:?}", data);
ack.send(data).ok();
},
Expand Down
2 changes: 1 addition & 1 deletion examples/angular-todomvc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

s.on(
"update-store",
|s: SocketRef, Data::<Vec<Todo>>(new_todos), State(Todos(todos))| {
|s: SocketRef, State(Todos(todos)), Data::<Vec<Todo>>(new_todos)| {
info!("Received update-store event: {:?}", new_todos);

let mut todos = todos.lock().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/axum-echo-tls/axum_echo-tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {

socket.on(
"message-with-ack",
|Data::<PayloadValue>(data), ack: AckSender| {
|ack: AckSender, Data::<PayloadValue>(data)| {
info!("Received event: {:?}", data);
ack.send(data).ok();
},
Expand Down
2 changes: 1 addition & 1 deletion examples/axum-echo/axum_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {

socket.on(
"message-with-ack",
|Data::<PayloadValue>(data), ack: AckSender| {
|ack: AckSender, Data::<PayloadValue>(data)| {
info!("Received event: {:?}", data);
ack.send(data).ok();
},
Expand Down
8 changes: 4 additions & 4 deletions examples/basic-crud-application/src/handlers/todo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Todos {
}
}

pub fn create(s: SocketRef, Data(data): Data<PartialTodo>, ack: AckSender, todos: State<Todos>) {
pub fn create(s: SocketRef, ack: AckSender, todos: State<Todos>, Data(data): Data<PartialTodo>) {
let id = Uuid::new_v4();
let todo = Todo { id, inner: data };

Expand All @@ -53,12 +53,12 @@ pub fn create(s: SocketRef, Data(data): Data<PartialTodo>, ack: AckSender, todos
s.broadcast().emit("todo:created", todo).ok();
}

pub async fn read(Data(id): Data<Uuid>, ack: AckSender, todos: State<Todos>) {
pub async fn read(ack: AckSender, todos: State<Todos>, Data(id): Data<Uuid>) {
let todo = todos.get(&id).ok_or(Error::NotFound);
ack.send(todo).ok();
}

pub async fn update(s: SocketRef, Data(data): Data<Todo>, ack: AckSender, todos: State<Todos>) {
pub async fn update(s: SocketRef, ack: AckSender, todos: State<Todos>, Data(data): Data<Todo>) {
let res = todos
.get_mut(&data.id)
.ok_or(Error::NotFound)
Expand All @@ -70,7 +70,7 @@ pub async fn update(s: SocketRef, Data(data): Data<Todo>, ack: AckSender, todos:
ack.send(res).ok();
}

pub async fn delete(s: SocketRef, Data(id): Data<Uuid>, ack: AckSender, todos: State<Todos>) {
pub async fn delete(s: SocketRef, ack: AckSender, todos: State<Todos>, Data(id): Data<Uuid>) {
let res = todos.remove(&id).ok_or(Error::NotFound).map(|_| {
s.broadcast().emit("todo:deleted", id).ok();
});
Expand Down
2 changes: 1 addition & 1 deletion examples/chat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

s.on(
"add user",
|s: SocketRef, Data::<String>(username), user_cnt: State<UserCnt>| {
|s: SocketRef, user_cnt: State<UserCnt>, Data::<String>(username)| {
if s.extensions.get::<Username>().is_some() {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/hyper-echo/hyper_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {

socket.on(
"message-with-ack",
|Data::<PayloadValue>(data), ack: AckSender| {
|ack: AckSender, Data::<PayloadValue>(data)| {
info!("Received event: {:?}", data);
ack.send(data).ok();
},
Expand Down
4 changes: 2 additions & 2 deletions examples/loco-rooms-chat/src/channels/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub async fn on_connect(socket: SocketRef) {

socket.on(
"join",
|socket: SocketRef, Data::<String>(room), store: State<state::MessageStore>| async move {
|socket: SocketRef, store: State<state::MessageStore>, Data::<String>(room)| async move {
tracing::info!("Received join: {:?}", room);
let _ = socket.leave_all();
let _ = socket.join(room.clone());
Expand All @@ -29,7 +29,7 @@ pub async fn on_connect(socket: SocketRef) {

socket.on(
"message",
|socket: SocketRef, Data::<MessageIn>(data), store: State<state::MessageStore>| async move {
|socket: SocketRef, store: State<state::MessageStore>, Data::<MessageIn>(data)| async move {
tracing::info!("Received message: {:?}", data);

let response = state::Message {
Expand Down
2 changes: 1 addition & 1 deletion examples/private-messaging/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn on_connection(

s.on(
"private message",
|s: SocketRef, Data(PrivateMessageReq { to, content }), State(Messages(msg))| {
|s: SocketRef, State(Messages(msg)), Data(PrivateMessageReq { to, content })| {
let user_id = s.extensions.get::<Session>().unwrap().user_id;
let message = Message {
from: user_id,
Expand Down
4 changes: 2 additions & 2 deletions examples/react-rooms-chat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn on_connect(socket: SocketRef) {

socket.on(
"join",
|socket: SocketRef, Data::<String>(room), store: State<state::MessageStore>| async move {
|socket: SocketRef, store: State<state::MessageStore>, Data::<String>(room)| async move {
info!("Received join: {:?}", room);
let _ = socket.leave_all();
let _ = socket.join(room.clone());
Expand All @@ -38,7 +38,7 @@ async fn on_connect(socket: SocketRef) {

socket.on(
"message",
|socket: SocketRef, Data::<MessageIn>(data), store: State<state::MessageStore>| async move {
|socket: SocketRef, store: State<state::MessageStore>, Data::<MessageIn>(data)| async move {
info!("Received message: {:?}", data);

let response = state::Message {
Expand Down
2 changes: 1 addition & 1 deletion examples/salvo-echo/salvo_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {

socket.on(
"message-with-ack",
|Data::<PayloadValue>(data), ack: AckSender| {
|ack: AckSender, Data::<PayloadValue>(data)| {
info!("Received event: {:?}", data);
ack.send(data).ok();
},
Expand Down
2 changes: 1 addition & 1 deletion examples/viz-echo/viz_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn on_connect(socket: SocketRef, Data(data): Data<PayloadValue>) {

socket.on(
"message-with-ack",
|Data::<PayloadValue>(data), ack: AckSender| {
|ack: AckSender, Data::<PayloadValue>(data)| {
info!("Received event: {:?}", data);
ack.send(data).ok();
},
Expand Down
35 changes: 11 additions & 24 deletions socketioxide/src/handler/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
//! and [`DisconnectHandler`](super::DisconnectHandler).
//!
//! They can be used to extract data from the context of the handler and get specific params. Here are some examples of extractors:
//! * [`Data`]: extracts and deserialize to json any data, if a deserialization error occurs the handler won't be called:
//! * [`Data`]: extracts and deserialize to json any data. Because it consumes the event it should be the last argument. If a
//! deserialization error occurs the handler won't be called:
//! - for [`ConnectHandler`](super::ConnectHandler): extracts and deserialize to json the auth data
//! - for [`MessageHandler`](super::MessageHandler): extracts and deserialize to json the message data
//! * [`TryData`]: extracts and deserialize to json any data but with a `Result` type in case of error:
//! * [`TryData`]: extracts and deserialize to json any data. Because it consumes the event it should be the last argument. In case of
//! error, a `Result` type is returned;
//! - for [`ConnectHandler`](super::ConnectHandler): extracts and deserialize to json the auth data
//! - for [`MessageHandler`](super::MessageHandler): extracts and deserialize to json the message data
//! * [`SocketRef`]: extracts a reference to the [`Socket`]
//! * [`Bin`]: extract a binary payload for a given message. Because it consumes the event it should be the last argument
//! * [`AckSender`]: Can be used to send an ack response to the current message event
//! * [`ProtocolVersion`](crate::ProtocolVersion): extracts the protocol version
//! * [`TransportType`](crate::TransportType): extracts the transport type
Expand Down Expand Up @@ -129,19 +130,19 @@ where
.map(Data)
}
}
impl<T, A> FromMessageParts<A> for Data<T>
impl<T, A> FromMessage<A> for Data<T>
where
T: DeserializeOwned,
A: Adapter,
{
type Error = serde_json::Error;
fn from_message_parts(
_: &Arc<Socket<A>>,
v: &mut PayloadValue,
_: &Option<i64>,
fn from_message(
_: Arc<Socket<A>>,
mut v: PayloadValue,
_: Option<i64>,
) -> Result<Self, Self::Error> {
upwrap_array(v);
v.clone().into_data::<T>().map(Data)
upwrap_array(&mut v);
v.into_data::<T>().map(Data)
}
}

Expand Down Expand Up @@ -234,20 +235,6 @@ impl<A: Adapter> SocketRef<A> {
}
}

/// An Extractor that returns the binary data of the message.
/// If there is no binary data, it will contain an empty vec.
pub struct Bin(pub Vec<Vec<u8>>);
impl<A: Adapter> FromMessage<A> for Bin {
type Error = Infallible;
fn from_message(
_: Arc<Socket<A>>,
mut v: PayloadValue,
_: Option<i64>,
) -> Result<Self, Infallible> {
Ok(Bin(v.extract_binary_payloads()))
}
}

/// An Extractor to send an ack response corresponding to the current event.
/// If the client sent a normal message without expecting an ack, the ack callback will do nothing.
#[derive(Debug)]
Expand Down
12 changes: 5 additions & 7 deletions socketioxide/src/handler/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@
//! ack.send("ack data").ok();
//! });
//!
//! // `Bin` extractor must be the last argument because it consumes the rest of the packet
//! s.on("binary_event", |s: SocketRef, TryData::<String>(data), Bin(bin)| {
//! println!("Socket received event with data: {:?} and binary data: {:?}", data, bin);
//! s.on("binary_event", |s: SocketRef, TryData::<bytes::Bytes>(data)| {
//! println!("Socket received event with binary data: {:?}", data);
//! })
//! });
//! ```
Expand All @@ -47,9 +46,8 @@
//! tokio::time::sleep(std::time::Duration::from_secs(1)).await;
//! println!("Socket received event with data: {}", data);
//! });
//! // `Bin` extractor must be the last argument because it consumes the rest of the packet
//! s.on("/binary_event", move |s: SocketRef, TryData::<String>(data), Bin(bin)| async move {
//! println!("Socket received event with data: {:?} and binary data: {:?}", data, bin);
//! s.on("/binary_event", move |s: SocketRef, TryData::<bytes::Bytes>(data)| async move {
//! println!("Socket received event with binary data: {:?}", data);
//! })
//! });
//! ```
Expand All @@ -60,7 +58,7 @@
//! # use serde_json::Error;
//! # use socketioxide::extract::*;
//! // async named event handler
//! async fn on_event(s: SocketRef, Data(data): Data<PayloadValue>, ack: AckSender) {
//! async fn on_event(s: SocketRef, ack: AckSender, Data(data): Data<PayloadValue>) {
//! tokio::time::sleep(std::time::Duration::from_secs(1)).await;
//! ack.send("Here is my acknowledgment!").ok();
//! }
Expand Down
5 changes: 2 additions & 3 deletions socketioxide/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,8 @@ impl<A: Adapter> SocketIo<A> {
/// let (_, io) = SocketIo::new_svc();
/// io.ns("/", |socket: SocketRef| {
/// // Register an async handler for the "test" event and extract the data as a `MyData` struct
/// // Extract the binary payload as a `Vec<Vec<u8>>` with the Bin extractor.
/// // It should be the last extractor because it consumes the request
/// socket.on("test", |socket: SocketRef, Data::<MyData>(data), ack: AckSender| async move {
/// // `Data` should be the last extractor because it consumes the request
/// socket.on("test", |socket: SocketRef, ack: AckSender, Data::<MyData>(data)| async move {
/// println!("Received a test message {:?}", data);
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// ack.send(data).ok(); // The data received is sent back to the client through the ack
Expand Down
3 changes: 1 addition & 2 deletions socketioxide/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@
//! - for [`ConnectHandler`](handler::ConnectHandler): extracts and deserialize to json the auth data
//! - for [`MessageHandler`](handler::MessageHandler): extracts and deserialize to json the message data
//! * [`SocketRef`](extract::SocketRef): extracts a reference to the [`Socket`](socket::Socket)
//! * [`Bin`](extract::Bin): extract a binary payload for a given message. Because it consumes the event it should be the last argument
//! * [`AckSender`](extract::AckSender): Can be used to send an ack response to the current message event
//! * [`ProtocolVersion`]: extracts the protocol version of the socket
//! * [`TransportType`]: extracts the transport type of the socket
Expand All @@ -161,7 +160,7 @@
//! ### Extractor order
//! Extractors are run in the order of their declaration in the handler signature. If an extractor returns an error, the handler won't be called and a `tracing::error!` call will be emitted if the `tracing` feature is enabled.
//!
//! For the [`MessageHandler`](handler::MessageHandler), some extractors require to _consume_ the event and therefore only implement the [`FromMessage`](handler::FromMessage) trait, like the [`Bin`](extract::Bin) extractor, therefore they should be the last argument.
//! For the [`MessageHandler`](handler::MessageHandler), some extractors require to _consume_ the event and therefore only implement the [`FromMessage`](handler::FromMessage) trait, like the [`Data`](extract::Data) extractor, therefore they should be the last argument.
//!
//! Note that any extractors that implement the [`FromMessageParts`](handler::FromMessageParts) also implement by default the [`FromMessage`](handler::FromMessage) trait.
//!
Expand Down
12 changes: 6 additions & 6 deletions socketioxide/src/operators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,17 +304,17 @@ impl<A: Adapter> ConfOperators<'_, A> {
/// # use socketioxide::{PayloadValue, SocketIo, extract::*};
/// let (_, io) = SocketIo::new_svc();
/// io.ns("/", |socket: SocketRef| {
/// socket.on("test", |socket: SocketRef, Data::<PayloadValue>(data), Bin(bin)| async move {
/// // Emit a test message to the client
/// socket.emit("test", data).ok();
///
/// let bins: Vec<_> = bin
/// .clone()
/// socket.on("test", |socket: SocketRef, Data::<PayloadValue>(data)| async move {
/// // Get all the binary payloads in the data
/// let bins: Vec<_> = data.get_binary_payloads()
/// .into_iter()
/// .enumerate()
/// .map(|(i, bin)| PayloadValue::Binary(i, bin))
/// .collect();
///
/// // Emit a test message to the client
/// socket.emit("test", data).ok();
///
/// // Emit a test message with multiple arguments to the client
/// let mut message: Vec<PayloadValue> = vec!["world".into(), "hello".into(), 1.into()];
/// message.extend(bins.clone());
Expand Down
5 changes: 2 additions & 3 deletions socketioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,8 @@ impl<A: Adapter> Socket<A> {
/// let (_, io) = SocketIo::new_svc();
/// io.ns("/", |socket: SocketRef| {
/// // Register an async handler for the "test" event and extract the data as a `MyData` struct
/// // Extract the binary payload as a `Vec<Vec<u8>>` with the Bin extractor.
/// // It should be the last extractor because it consumes the request
/// socket.on("test", |socket: SocketRef, Data::<MyData>(data), ack: AckSender| async move {
/// // `Data` should be the last extractor because it consumes the request
/// socket.on("test", |socket: SocketRef, ack: AckSender, Data::<MyData>(data)| async move {
/// println!("Received a test message {:?}", data);
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// ack.send(data).ok(); // The data received is sent back to the client through the ack
Expand Down

0 comments on commit 576baa5

Please sign in to comment.