From 9b4ed78e982b24661614f2e0993c8486a503b907 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 3 Mar 2023 18:54:16 +0200 Subject: [PATCH] Make `Publish` take a reference to `Client` --- async-nats/src/client.rs | 50 +++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index db67c85fb..9babd9367 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -18,6 +18,7 @@ use super::{header::HeaderMap, status::StatusCode, Command, Error, Message, Subs use bytes::Bytes; use futures::future::TryFutureExt; use futures::stream::StreamExt; +use futures::FutureExt; use lazy_static::lazy_static; use regex::Regex; use std::error; @@ -38,18 +39,18 @@ lazy_static! { /// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions. pub struct PublishError(mpsc::error::SendError); -pub struct Publish { - sender: mpsc::Sender, +pub struct Publish<'a> { + client: &'a Client, subject: String, payload: Bytes, headers: Option, respond: Option, } -impl Publish { - pub fn new(sender: mpsc::Sender, subject: String, payload: Bytes) -> Publish { +impl<'a> Publish<'a> { + pub fn new(client: &Client, subject: String, payload: Bytes) -> Publish { Publish { - sender, + client, subject, payload, headers: None, @@ -57,41 +58,32 @@ impl Publish { } } - pub fn headers(mut self, headers: HeaderMap) -> Publish { + pub fn headers(mut self, headers: HeaderMap) -> Publish<'a> { self.headers = Some(headers); self } - pub fn reply(mut self, subject: String) -> Publish { + pub fn reply(mut self, subject: String) -> Publish<'a> { self.respond = Some(subject); self } } -impl IntoFuture for Publish { +impl<'a> IntoFuture for Publish<'a> { type Output = Result<(), PublishError>; - type IntoFuture = Pin> + Send>>; + type IntoFuture = Pin> + Send + 'a>>; fn into_future(self) -> Self::IntoFuture { - let sender = self.sender.clone(); - let subject = self.subject; - let payload = self.payload; - let respond = self.respond; - let headers = self.headers; - - Box::pin(async move { - sender - .send(Command::Publish { - subject, - payload, - respond, - headers, - }) - .map_err(PublishError) - .await?; - - Ok(()) - }) + self.client + .sender + .send(Command::Publish { + subject: self.subject, + payload: self.payload, + respond: self.respond, + headers: self.headers, + }) + .map_err(PublishError) + .boxed() } } @@ -213,7 +205,7 @@ impl Client { /// # } /// ``` pub fn publish(&self, subject: String, payload: Bytes) -> Publish { - Publish::new(self.sender.clone(), subject, payload) + Publish::new(self, subject, payload) } /// Publish a [Message] with headers to a given subject.