Skip to content

Commit

Permalink
Reapply "feat: do not reuse connection in worker, since we have our o…
Browse files Browse the repository at this point in the history
…wn rabbitmq server without connection limit"

This reverts commit ab72da5.
  • Loading branch information
jiegec committed Mar 7, 2024
1 parent ab72da5 commit e695ff5
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 43 deletions.
7 changes: 4 additions & 3 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{ensure_channel, Args};
use crate::Args;
use chrono::Local;
use common::{ensure_job_queue, Job, JobError, JobOk, JobResult, WorkerIdentifier};
use futures::StreamExt;
Expand All @@ -8,7 +8,7 @@ use lapin::{
BasicQosOptions,
},
types::FieldTable,
BasicProperties,
BasicProperties, ConnectionProperties,
};
use log::{error, info, warn};
use std::{
Expand Down Expand Up @@ -281,7 +281,8 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
let mut tree_path = args.ciel_path.clone();
tree_path.push("TREE");

let channel = ensure_channel(args).await?;
let conn = lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let queue_name = format!("job-{}", &args.arch);
ensure_job_queue(&queue_name, &channel).await?;

Expand Down
7 changes: 4 additions & 3 deletions worker/src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{ensure_channel, Args};
use crate::Args;
use common::{ensure_job_queue, WorkerHeartbeat, WorkerIdentifier};
use lapin::{options::BasicPublishOptions, BasicProperties};
use lapin::{options::BasicPublishOptions, BasicProperties, ConnectionProperties};
use log::{info, warn};
use std::time::Duration;

pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> {
let channel = ensure_channel(args).await?;
let conn = lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let queue_name = "worker-heartbeat";
ensure_job_queue(queue_name, &channel).await?;

Expand Down
38 changes: 1 addition & 37 deletions worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,9 @@
use clap::Parser;
use lapin::{Channel, Connection, ConnectionProperties};
use once_cell::sync::Lazy;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::Mutex;
use std::path::PathBuf;

pub mod build;
pub mod heartbeat;

pub static CONNECTION: Lazy<Arc<Mutex<Option<Connection>>>> =
Lazy::new(|| Arc::new(Mutex::new(None)));

// try to reuse amqp channel
pub async fn ensure_channel(args: &Args) -> anyhow::Result<Channel> {
let mut lock = CONNECTION.lock().await;
let conn = match &*lock {
Some(conn) => {
if conn.status().connected() {
conn
} else {
// re-connect
*lock = None;

let conn =
lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default())
.await?;
*lock = Some(conn);
lock.as_ref().unwrap()
}
}
None => {
let conn = lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default())
.await?;
*lock = Some(conn);
lock.as_ref().unwrap()
}
};

let channel = conn.create_channel().await?;
Ok(channel)
}

#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
Expand Down

0 comments on commit e695ff5

Please sign in to comment.