Skip to content

Commit

Permalink
editoast: use pooling for amqp
Browse files Browse the repository at this point in the history
Signed-off-by: ElysaSrc <[email protected]>
  • Loading branch information
ElysaSrc committed Oct 11, 2024
1 parent 665e2b6 commit 01ccc2f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 68 deletions.
26 changes: 26 additions & 0 deletions editoast/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions editoast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ chrono.workspace = true
clap = { version = "4.5.19", features = ["derive", "env"] }
colored = "2.1.0"
dashmap = "6.1.0"
deadpool-lapin = "0.12.1"
derivative.workspace = true
diesel.workspace = true
diesel-async = { workspace = true }
Expand Down
87 changes: 19 additions & 68 deletions editoast/src/core/mq_client.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
use deadpool_lapin::{Config, CreatePoolError, Pool, PoolError, Runtime};
use editoast_derive::EditoastError;
use futures_util::StreamExt;
use itertools::Itertools;
use lapin::{
options::{BasicConsumeOptions, BasicPublishOptions},
types::{ByteArray, FieldTable, ShortString},
BasicProperties, Connection, ConnectionProperties,
BasicProperties,
};
use serde::Serialize;
use serde_json::to_vec;
use std::{fmt::Debug, sync::Arc};
use std::fmt::Debug;
use thiserror::Error;
use tokio::{
sync::RwLock,
time::{timeout, Duration},
};
use tokio::time::{timeout, Duration};

#[derive(Debug, Clone)]
pub struct RabbitMQClient {
connection: Arc<RwLock<Option<Connection>>>,
pub pool: Pool,
exchange: String,
timeout: u64,
hostname: String,
Expand Down Expand Up @@ -51,6 +49,12 @@ pub enum Error {
#[error("Connection does not exist")]
#[editoast_error(status = "500")]
ConnectionDoesNotExist,
#[error("Cannot create the pool")]
#[editoast_error(status = "500")]
CeatePoolLapin(CreatePoolError),
#[error("Cannot acquire connection from pool")]
#[editoast_error(status = "500")]
DeadpoolLapin(PoolError),
}

pub struct MQResponse {
Expand All @@ -64,61 +68,20 @@ impl RabbitMQClient {
.map(|name| name.to_string_lossy().into_owned())
.unwrap_or_else(|_| "unknown".to_string());

let conn = Arc::new(RwLock::new(None));

tokio::spawn(Self::connection_loop(options.uri, conn.clone()));
let mut cfg = Config::default();
cfg.url = Some(options.uri);
let pool = cfg
.create_pool(Some(Runtime::Tokio1))
.map_err(Error::CeatePoolLapin)?;

Ok(RabbitMQClient {
connection: conn,
pool,
exchange: format!("{}-req-xchg", options.worker_pool_identifier),
timeout: options.timeout,
hostname,
})
}

async fn connection_ok(connection: &Arc<RwLock<Option<Connection>>>) -> bool {
let guard = connection.as_ref().read().await;
let conn = guard.as_ref();
let status = match conn {
None => return false,
Some(conn) => conn.status().state(),
};
match status {
lapin::ConnectionState::Initial => true,
lapin::ConnectionState::Connecting => true,
lapin::ConnectionState::Connected => true,
lapin::ConnectionState::Closing => true,
lapin::ConnectionState::Closed => false,
lapin::ConnectionState::Error => false,
}
}

async fn connection_loop(uri: String, connection: Arc<RwLock<Option<Connection>>>) {
loop {
if Self::connection_ok(&connection).await {
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}

tracing::info!("Reconnecting to RabbitMQ");

// Connection should be re-established
let new_connection = Connection::connect(&uri, ConnectionProperties::default()).await;

match new_connection {
Ok(new_connection) => {
*connection.write().await = Some(new_connection);
tracing::info!("Reconnected to RabbitMQ");
}
Err(e) => {
tracing::error!("Error while reconnecting to RabbitMQ: {:?}", e);
}
}

tokio::time::sleep(Duration::from_secs(2)).await;
}
}

#[allow(dead_code)]
pub async fn call<T>(
&self,
Expand All @@ -131,14 +94,8 @@ impl RabbitMQClient {
where
T: Serialize,
{
// Get current connection
let connection = self.connection.read().await;
if connection.is_none() {
return Err(Error::ConnectionDoesNotExist);
}
let connection = connection.as_ref().unwrap();

// Create a channel
let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?;
let channel = connection.create_channel().await.map_err(Error::Lapin)?;

let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
Expand Down Expand Up @@ -186,14 +143,8 @@ impl RabbitMQClient {
where
T: Serialize,
{
// Get current connection
let connection = self.connection.read().await;
if connection.is_none() {
return Err(Error::ConnectionDoesNotExist);
}
let connection = connection.as_ref().unwrap();

// Create a channel
let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?;
let channel = connection.create_channel().await.map_err(Error::Lapin)?;

let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
Expand Down

0 comments on commit 01ccc2f

Please sign in to comment.