Skip to content

Commit

Permalink
Make Publish take a reference to Client
Browse files Browse the repository at this point in the history
  • Loading branch information
n1ghtmare committed Mar 3, 2023
1 parent b85a4de commit 9b4ed78
Showing 1 changed file with 21 additions and 29 deletions.
50 changes: 21 additions & 29 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::{header::HeaderMap, status::StatusCode, Command, Error, Message, Subs
use bytes::Bytes;
use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use futures::FutureExt;
use lazy_static::lazy_static;
use regex::Regex;
use std::error;
Expand All @@ -38,60 +39,51 @@ lazy_static! {
/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
pub struct PublishError(mpsc::error::SendError<Command>);

pub struct Publish {
sender: mpsc::Sender<Command>,
pub struct Publish<'a> {
client: &'a Client,
subject: String,
payload: Bytes,
headers: Option<HeaderMap>,
respond: Option<String>,
}

impl Publish {
pub fn new(sender: mpsc::Sender<Command>, subject: String, payload: Bytes) -> Publish {
impl<'a> Publish<'a> {
pub fn new(client: &Client, subject: String, payload: Bytes) -> Publish {
Publish {
sender,
client,
subject,
payload,
headers: None,
respond: None,
}
}

pub fn headers(mut self, headers: HeaderMap) -> Publish {
pub fn headers(mut self, headers: HeaderMap) -> Publish<'a> {
self.headers = Some(headers);
self
}

pub fn reply(mut self, subject: String) -> Publish {
pub fn reply(mut self, subject: String) -> Publish<'a> {
self.respond = Some(subject);
self
}
}

impl IntoFuture for Publish {
impl<'a> IntoFuture for Publish<'a> {
type Output = Result<(), PublishError>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send>>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + 'a>>;

fn into_future(self) -> Self::IntoFuture {
let sender = self.sender.clone();
let subject = self.subject;
let payload = self.payload;
let respond = self.respond;
let headers = self.headers;

Box::pin(async move {
sender
.send(Command::Publish {
subject,
payload,
respond,
headers,
})
.map_err(PublishError)
.await?;

Ok(())
})
self.client
.sender
.send(Command::Publish {
subject: self.subject,
payload: self.payload,
respond: self.respond,
headers: self.headers,
})
.map_err(PublishError)
.boxed()
}
}

Expand Down Expand Up @@ -213,7 +205,7 @@ impl Client {
/// # }
/// ```
pub fn publish(&self, subject: String, payload: Bytes) -> Publish {
Publish::new(self.sender.clone(), subject, payload)
Publish::new(self, subject, payload)
}

/// Publish a [Message] with headers to a given subject.
Expand Down

0 comments on commit 9b4ed78

Please sign in to comment.