Skip to content

Commit

Permalink
Revert "feat: do not reuse connection in worker, since we have our ow…
Browse files Browse the repository at this point in the history
…n rabbitmq server without connection limit"

This reverts commit ad51e39.
  • Loading branch information
jiegec committed Mar 7, 2024
1 parent ad51e39 commit ab72da5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 9 deletions.
7 changes: 3 additions & 4 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::Args;
use crate::{ensure_channel, Args};
use chrono::Local;
use common::{ensure_job_queue, Job, JobError, JobOk, JobResult, WorkerIdentifier};
use futures::StreamExt;
Expand All @@ -8,7 +8,7 @@ use lapin::{
BasicQosOptions,
},
types::FieldTable,
BasicProperties, ConnectionProperties,
BasicProperties,
};
use log::{error, info, warn};
use std::{
Expand Down Expand Up @@ -281,8 +281,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
let mut tree_path = args.ciel_path.clone();
tree_path.push("TREE");

let conn = lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let channel = ensure_channel(args).await?;
let queue_name = format!("job-{}", &args.arch);
ensure_job_queue(&queue_name, &channel).await?;

Expand Down
7 changes: 3 additions & 4 deletions worker/src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::Args;
use crate::{ensure_channel, Args};
use common::{ensure_job_queue, WorkerHeartbeat, WorkerIdentifier};
use lapin::{options::BasicPublishOptions, BasicProperties, ConnectionProperties};
use lapin::{options::BasicPublishOptions, BasicProperties};
use log::{info, warn};
use std::time::Duration;

pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> {
let conn = lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let channel = ensure_channel(args).await?;
let queue_name = "worker-heartbeat";
ensure_job_queue(queue_name, &channel).await?;

Expand Down
38 changes: 37 additions & 1 deletion worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,45 @@
use clap::Parser;
use std::path::PathBuf;
use lapin::{Channel, Connection, ConnectionProperties};
use once_cell::sync::Lazy;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::Mutex;

pub mod build;
pub mod heartbeat;

pub static CONNECTION: Lazy<Arc<Mutex<Option<Connection>>>> =
Lazy::new(|| Arc::new(Mutex::new(None)));

// try to reuse amqp channel
pub async fn ensure_channel(args: &Args) -> anyhow::Result<Channel> {
let mut lock = CONNECTION.lock().await;
let conn = match &*lock {
Some(conn) => {
if conn.status().connected() {
conn
} else {
// re-connect
*lock = None;

let conn =
lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default())
.await?;
*lock = Some(conn);
lock.as_ref().unwrap()
}
}
None => {
let conn = lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default())
.await?;
*lock = Some(conn);
lock.as_ref().unwrap()
}
};

let channel = conn.create_channel().await?;
Ok(channel)
}

#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
Expand Down

0 comments on commit ab72da5

Please sign in to comment.