Skip to content

Commit

Permalink
Make Request take a reference to Client
Browse files Browse the repository at this point in the history
  • Loading branch information
n1ghtmare committed Mar 3, 2023
1 parent 9b4ed78 commit 2df4d1d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
27 changes: 14 additions & 13 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl Client {
/// # }
/// ```
pub fn request(&self, subject: String, payload: Bytes) -> Request {
Request::new(self.clone(), subject, payload)
Request::new(self, subject, payload)
}

/// Sends the request with headers.
Expand All @@ -325,7 +325,7 @@ impl Client {
headers: HeaderMap,
payload: Bytes,
) -> Result<Message, Error> {
let message = Request::new(self.clone(), subject, payload)
let message = Request::new(self, subject, payload)
.headers(headers)
.await?;

Expand Down Expand Up @@ -455,17 +455,17 @@ impl Client {

/// Used for building and sending requests.
#[derive(Debug)]
pub struct Request {
client: Client,
pub struct Request<'a> {
client: &'a Client,
subject: String,
payload: Option<Bytes>,
headers: Option<HeaderMap>,
timeout: Option<Option<Duration>>,
inbox: Option<String>,
}

impl Request {
pub fn new(client: Client, subject: String, payload: Bytes) -> Request {
impl<'a> Request<'a> {
pub fn new(client: &'a Client, subject: String, payload: Bytes) -> Request<'a> {
Request {
client,
subject,
Expand All @@ -487,7 +487,7 @@ impl Request {
/// # Ok(())
/// # }
/// ```
pub fn payload(mut self, payload: Bytes) -> Request {
pub fn payload(mut self, payload: Bytes) -> Request<'a> {
self.payload = Some(payload);
self
}
Expand All @@ -510,7 +510,7 @@ impl Request {
/// # Ok(())
/// # }
/// ```
pub fn headers(mut self, headers: HeaderMap) -> Request {
pub fn headers(mut self, headers: HeaderMap) -> Request<'a> {
self.headers = Some(headers);
self
}
Expand All @@ -531,7 +531,7 @@ impl Request {
/// # Ok(())
/// # }
/// ```
pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
pub fn timeout(mut self, timeout: Option<Duration>) -> Request<'a> {
self.timeout = Some(timeout);
self
}
Expand All @@ -550,7 +550,7 @@ impl Request {
/// # Ok(())
/// # }
/// ```
pub fn inbox(mut self, inbox: String) -> Request {
pub fn inbox(mut self, inbox: String) -> Request<'a> {
self.inbox = Some(inbox);
self
}
Expand All @@ -561,6 +561,7 @@ impl Request {
let mut publish = self
.client
.publish(self.subject, self.payload.unwrap_or_else(Bytes::new));

if let Some(headers) = self.headers {
publish = publish.headers(headers);
}
Expand Down Expand Up @@ -598,11 +599,11 @@ impl Request {
}
}

impl IntoFuture for Request {
impl<'a> IntoFuture for Request<'a> {
type Output = Result<Message, Error>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<Message, Error>> + Send>>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<Message, Error>> + Send + 'a>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(self.send())
self.send().boxed()
}
}
2 changes: 1 addition & 1 deletion async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ impl<'a, T: Sized + Serialize, V: DeserializeOwned> Request<'a, T, V> {
impl<'a, T: Sized + Serialize, V: DeserializeOwned + Send> IntoFuture for Request<'a, T, V> {
type Output = Result<Response<V>, Error>;

type IntoFuture = Pin<Box<dyn Future<Output = Result<Response<V>, Error>> + Send>>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<Response<V>, Error>> + Send + 'a>>;

fn into_future(self) -> Self::IntoFuture {
serde_json::to_vec(&self.payload)
Expand Down

0 comments on commit 2df4d1d

Please sign in to comment.