diff --git a/capnp-futures/src/write_queue.rs b/capnp-futures/src/write_queue.rs index f52d8a3d7..1af8216fc 100644 --- a/capnp-futures/src/write_queue.rs +++ b/capnp-futures/src/write_queue.rs @@ -30,7 +30,7 @@ enum Item where M: AsOutputSegments, { - Message(M, oneshot::Sender<()>), + Message(M, oneshot::Sender), Done(Result<(), Error>, oneshot::Sender<()>), } /// A handle that allows messages to be sent to a write queue. @@ -78,11 +78,11 @@ where while let Some(item) = rx.next().await { match item { Item::Message(m, returner) => { - let result = crate::serialize::write_message(&mut writer, m).await; + let result = crate::serialize::write_message(&mut writer, &m).await; in_flight.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); result?; writer.flush().await?; - let _ = returner.send(()); + let _ = returner.send(m); } Item::Done(r, finisher) => { let _ = finisher.send(()); @@ -96,13 +96,30 @@ where (sender, queue) } +fn _assert_kinds() { + fn _assert_send(_x: T) {} + fn _assert_sync() {} + fn _assert_write_queue_send( + w: W, + ) { + let (s, f) = write_queue::(w); + _assert_send(s); + _assert_send(f); + } + fn _assert_write_queue_send_2(w: W) { + let (s, f) = write_queue::>(w); + _assert_send(s); + _assert_send(f); + } +} + impl Sender where M: AsOutputSegments, { - /// Enqueues a message to be written. The returned future resolves once the write + /// Enqueues a message to be written. Returns the message once the write /// has completed. Dropping the returned future does *not* cancel the write. - pub fn send(&mut self, message: M) -> impl Future> + Unpin { + pub fn send(&mut self, message: M) -> impl Future> + Unpin { self.in_flight .fetch_add(1, std::sync::atomic::Ordering::SeqCst); let (complete, oneshot) = oneshot::channel(); diff --git a/capnp-rpc/src/twoparty.rs b/capnp-rpc/src/twoparty.rs index 6bf1419ca..4fcd54f7a 100644 --- a/capnp-rpc/src/twoparty.rs +++ b/capnp-rpc/src/twoparty.rs @@ -74,7 +74,10 @@ impl crate::OutgoingMessage for OutgoingMessage { mut sender, } = tmp; let m = Rc::new(message); - (Promise::from_future(sender.send(m.clone())), m) + ( + Promise::from_future(sender.send(m.clone()).map_ok(|_| ())), + m, + ) } fn take(self: Box) -> ::capnp::message::Builder<::capnp::message::HeapAllocator> { diff --git a/capnp/src/message.rs b/capnp/src/message.rs index 1a38f51dd..51f4574e7 100644 --- a/capnp/src/message.rs +++ b/capnp/src/message.rs @@ -408,16 +408,18 @@ where arena: BuilderArenaImpl, } -unsafe impl Send for Builder where A: Send + Allocator {} - fn _assert_kinds() { fn _assert_send() {} + fn _assert_sync() {} fn _assert_reader() { _assert_send::>(); } - fn _assert_builder() { + fn _assert_builder_send() { _assert_send::>(); } + fn _assert_builder_sync() { + _assert_sync::>(); + } } impl Builder diff --git a/capnp/src/private/arena.rs b/capnp/src/private/arena.rs index 76939f277..c339cdda2 100644 --- a/capnp/src/private/arena.rs +++ b/capnp/src/private/arena.rs @@ -247,6 +247,11 @@ where inner: BuilderArenaImplInner, } +// BuilderArenaImpl has no interior mutability. Adding these impls +// allows message::Builder to be Send and/or Sync when appropriate. +unsafe impl Send for BuilderArenaImpl where A: Send + Allocator {} +unsafe impl Sync for BuilderArenaImpl where A: Sync + Allocator {} + impl BuilderArenaImpl where A: Allocator,