Skip to content

Commit

Permalink
editoast: use pooling for amqp
Browse files Browse the repository at this point in the history
Signed-off-by: ElysaSrc <[email protected]>
  • Loading branch information
ElysaSrc committed Oct 11, 2024
1 parent 7a70d95 commit e66e534
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 74 deletions.
26 changes: 26 additions & 0 deletions editoast/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions editoast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ chrono.workspace = true
clap = { version = "4.5.19", features = ["derive", "env"] }
colored = "2.1.0"
dashmap = "6.1.0"
deadpool-lapin = "0.12.1"
derivative.workspace = true
diesel.workspace = true
diesel-async = { workspace = true }
Expand Down
40 changes: 40 additions & 0 deletions editoast/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4068,6 +4068,8 @@ components:
- $ref: '#/components/schemas/EditoastEditoastUrlErrorInvalidUrl'
- $ref: '#/components/schemas/EditoastElectricalProfilesErrorNotFound'
- $ref: '#/components/schemas/EditoastErrorConnectionDoesNotExist'
- $ref: '#/components/schemas/EditoastErrorCreatePoolLapin'
- $ref: '#/components/schemas/EditoastErrorDeadpoolLapin'
- $ref: '#/components/schemas/EditoastErrorLapin'
- $ref: '#/components/schemas/EditoastErrorResponseTimeout'
- $ref: '#/components/schemas/EditoastErrorSerialization'
Expand Down Expand Up @@ -4148,6 +4150,44 @@ components:
type: string
enum:
- editoast:coreclient:ConnectionDoesNotExist
EditoastErrorCreatePoolLapin:
type: object
required:
- type
- status
- message
properties:
context:
type: object
message:
type: string
status:
type: integer
enum:
- 500
type:
type: string
enum:
- editoast:coreclient:CreatePoolLapin
EditoastErrorDeadpoolLapin:
type: object
required:
- type
- status
- message
properties:
context:
type: object
message:
type: string
status:
type: integer
enum:
- 500
type:
type: string
enum:
- editoast:coreclient:DeadpoolLapin
EditoastErrorLapin:
type: object
required:
Expand Down
121 changes: 49 additions & 72 deletions editoast/src/core/mq_client.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
use deadpool_lapin::{Config, CreatePoolError, Pool, PoolError, Runtime};
use editoast_derive::EditoastError;
use futures_util::StreamExt;
use itertools::Itertools;
use lapin::{
options::{BasicConsumeOptions, BasicPublishOptions},
types::{ByteArray, FieldTable, ShortString},
BasicProperties, Connection, ConnectionProperties,
BasicProperties,
};
use serde::Serialize;
use serde_json::to_vec;
use std::{fmt::Debug, sync::Arc};
use std::fmt::Debug;
use thiserror::Error;
use tokio::{
sync::RwLock,
time::{timeout, Duration},
};
use tokio::time::{timeout, Duration};

#[derive(Debug, Clone)]
pub struct RabbitMQClient {
connection: Arc<RwLock<Option<Connection>>>,
pub pool: Pool,
exchange: String,
timeout: u64,
hostname: String,
Expand Down Expand Up @@ -51,6 +49,12 @@ pub enum Error {
#[error("Connection does not exist")]
#[editoast_error(status = "500")]
ConnectionDoesNotExist,
#[error("Cannot create the pool")]
#[editoast_error(status = "500")]
CreatePoolLapin(CreatePoolError),
#[error("Cannot acquire connection from pool")]
#[editoast_error(status = "500")]
DeadpoolLapin(PoolError),
}

pub struct MQResponse {
Expand All @@ -64,61 +68,22 @@ impl RabbitMQClient {
.map(|name| name.to_string_lossy().into_owned())
.unwrap_or_else(|_| "unknown".to_string());

let conn = Arc::new(RwLock::new(None));

tokio::spawn(Self::connection_loop(options.uri, conn.clone()));
let cfg = Config {
url: Some(options.uri),
..Default::default()
};
let pool = cfg
.create_pool(Some(Runtime::Tokio1))
.map_err(Error::CreatePoolLapin)?;

Ok(RabbitMQClient {
connection: conn,
pool,
exchange: format!("{}-req-xchg", options.worker_pool_identifier),
timeout: options.timeout,
hostname,
})
}

async fn connection_ok(connection: &Arc<RwLock<Option<Connection>>>) -> bool {
let guard = connection.as_ref().read().await;
let conn = guard.as_ref();
let status = match conn {
None => return false,
Some(conn) => conn.status().state(),
};
match status {
lapin::ConnectionState::Initial => true,
lapin::ConnectionState::Connecting => true,
lapin::ConnectionState::Connected => true,
lapin::ConnectionState::Closing => true,
lapin::ConnectionState::Closed => false,
lapin::ConnectionState::Error => false,
}
}

async fn connection_loop(uri: String, connection: Arc<RwLock<Option<Connection>>>) {
loop {
if Self::connection_ok(&connection).await {
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}

tracing::info!("Reconnecting to RabbitMQ");

// Connection should be re-established
let new_connection = Connection::connect(&uri, ConnectionProperties::default()).await;

match new_connection {
Ok(new_connection) => {
*connection.write().await = Some(new_connection);
tracing::info!("Reconnected to RabbitMQ");
}
Err(e) => {
tracing::error!("Error while reconnecting to RabbitMQ: {:?}", e);
}
}

tokio::time::sleep(Duration::from_secs(2)).await;
}
}

#[allow(dead_code)]
pub async fn call<T>(
&self,
Expand All @@ -131,14 +96,8 @@ impl RabbitMQClient {
where
T: Serialize,
{
// Get current connection
let connection = self.connection.read().await;
if connection.is_none() {
return Err(Error::ConnectionDoesNotExist);
}
let connection = connection.as_ref().unwrap();

// Create a channel
let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?;
let channel = connection.create_channel().await.map_err(Error::Lapin)?;

let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
Expand Down Expand Up @@ -172,6 +131,12 @@ impl RabbitMQClient {
.await
.map_err(Error::Lapin)?;

// Explicitly close the channel
channel
.close(200, "Normal shutdown")
.await
.map_err(Error::Lapin)?;

Ok(())
}

Expand All @@ -186,14 +151,8 @@ impl RabbitMQClient {
where
T: Serialize,
{
// Get current connection
let connection = self.connection.read().await;
if connection.is_none() {
return Err(Error::ConnectionDoesNotExist);
}
let connection = connection.as_ref().unwrap();

// Create a channel
let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?;
let channel = connection.create_channel().await.map_err(Error::Lapin)?;

let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
Expand Down Expand Up @@ -244,10 +203,20 @@ impl RabbitMQClient {
Duration::from_secs(override_timeout.unwrap_or(self.timeout)),
consumer.next(),
)
.await
.map_err(|_| Error::ResponseTimeout)?;
.await;

match response_delivery {
if response_delivery.is_err() {
channel
.close(200, "Normal shutdown")
.await
.map_err(Error::Lapin)?;

return Err(Error::ResponseTimeout);
}

let response_delivery = response_delivery.unwrap();

let result = match response_delivery {
Some(Ok(delivery)) => {
let status = delivery
.properties
Expand All @@ -265,7 +234,15 @@ impl RabbitMQClient {
}
Some(Err(e)) => Err(e.into()),
None => panic!("Rabbitmq consumer was cancelled unexpectedly"),
}
};

// Explicitly close the channel
channel
.close(200, "Normal shutdown")
.await
.map_err(Error::Lapin)?;

result
}
}

Expand Down
4 changes: 3 additions & 1 deletion front/public/locales/en/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
"Serialization": "Core: cannot serialize request",
"StatusParsing": "Core: cannot parse status",
"UnparsableErrorOutput": "Core returned an error in an unknown format",
"ConnectionDoesNotExist": "Core: message queue: connection not established"
"ConnectionDoesNotExist": "Core: message queue: connection not established",
"CreatePoolLapin": "Core: message queue: cannot create pool",
"DeadpoolLapin": "Core: message queue: pool error"
},
"DatabaseAccessError": "Database access fatal error",
"document": {
Expand Down
4 changes: 3 additions & 1 deletion front/public/locales/fr/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
"Serialization": "Core: impossible de sérialiser la requête",
"StatusParsing": "Core: impossible d'obtenir le status",
"UnparsableErrorOutput": "Core: a renvoyé une erreur dans un format inconnu",
"ConnectionDoesNotExist": "Core: file d'attente de messages: connexion non établie"
"ConnectionDoesNotExist": "Core: file d'attente de messages: connexion non établie",
"CreatePoolLapin": "Core: file d'attente de messages: erreur de création de pool",
"DeadpoolLapin": "Core: file d'attente de messages: erreur de pool"
},
"document": {
"NotFound": "Document '{{document_key}}' non trouvé"
Expand Down

0 comments on commit e66e534

Please sign in to comment.