Skip to content

Commit

Permalink
partially revert 7a3d0c9. achieve Send bound by asserting BuilderAren…
Browse files Browse the repository at this point in the history
…aImpl is Sync
  • Loading branch information
dwrensha committed Sep 17, 2024
1 parent 3ae30fd commit 06cc784
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 9 deletions.
27 changes: 22 additions & 5 deletions capnp-futures/src/write_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ enum Item<M>
where
M: AsOutputSegments,
{
Message(M, oneshot::Sender<()>),
Message(M, oneshot::Sender<M>),
Done(Result<(), Error>, oneshot::Sender<()>),
}
/// A handle that allows messages to be sent to a write queue.
Expand Down Expand Up @@ -78,11 +78,11 @@ where
while let Some(item) = rx.next().await {
match item {
Item::Message(m, returner) => {
let result = crate::serialize::write_message(&mut writer, m).await;
let result = crate::serialize::write_message(&mut writer, &m).await;
in_flight.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
result?;
writer.flush().await?;
let _ = returner.send(());
let _ = returner.send(m);
}
Item::Done(r, finisher) => {
let _ = finisher.send(());
Expand All @@ -96,13 +96,30 @@ where
(sender, queue)
}

fn _assert_kinds() {
fn _assert_send<T: Send>(_x: T) {}
fn _assert_sync<T: Sync>() {}
fn _assert_write_queue_send<W: AsyncWrite + Unpin + Send, M: AsOutputSegments + Sync + Send>(
w: W,
) {
let (s, f) = write_queue::<W, M>(w);
_assert_send(s);
_assert_send(f);
}
fn _assert_write_queue_send_2<W: AsyncWrite + Unpin + Send>(w: W) {
let (s, f) = write_queue::<W, capnp::message::Builder<capnp::message::HeapAllocator>>(w);
_assert_send(s);
_assert_send(f);
}
}

impl<M> Sender<M>
where
M: AsOutputSegments,
{
/// Enqueues a message to be written. The returned future resolves once the write
/// Enqueues a message to be written. Returns the message once the write
/// has completed. Dropping the returned future does *not* cancel the write.
pub fn send(&mut self, message: M) -> impl Future<Output = Result<(), Error>> + Unpin {
pub fn send(&mut self, message: M) -> impl Future<Output = Result<M, Error>> + Unpin {
self.in_flight
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let (complete, oneshot) = oneshot::channel();
Expand Down
5 changes: 4 additions & 1 deletion capnp-rpc/src/twoparty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ impl crate::OutgoingMessage for OutgoingMessage {
mut sender,
} = tmp;
let m = Rc::new(message);
(Promise::from_future(sender.send(m.clone())), m)
(
Promise::from_future(sender.send(m.clone()).map_ok(|_| ())),
m,
)
}

fn take(self: Box<Self>) -> ::capnp::message::Builder<::capnp::message::HeapAllocator> {
Expand Down
8 changes: 5 additions & 3 deletions capnp/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,16 +408,18 @@ where
arena: BuilderArenaImpl<A>,
}

unsafe impl<A> Send for Builder<A> where A: Send + Allocator {}

fn _assert_kinds() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
fn _assert_reader<S: ReaderSegments + Send>() {
_assert_send::<Reader<S>>();
}
fn _assert_builder<A: Allocator + Send>() {
fn _assert_builder_send<A: Allocator + Send>() {
_assert_send::<Builder<A>>();
}
fn _assert_builder_sync<A: Allocator + Sync>() {
_assert_sync::<Builder<A>>();
}
}

impl<A> Builder<A>
Expand Down
5 changes: 5 additions & 0 deletions capnp/src/private/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ where
inner: BuilderArenaImplInner<A>,
}

// BuilderArenaImpl has no interior mutability. Adding these impls
// allows message::Builder<A> to be Send and/or Sync when appropriate.
unsafe impl<A> Send for BuilderArenaImpl<A> where A: Send + Allocator {}
unsafe impl<A> Sync for BuilderArenaImpl<A> where A: Sync + Allocator {}

impl<A> BuilderArenaImpl<A>
where
A: Allocator,
Expand Down

0 comments on commit 06cc784

Please sign in to comment.