Skip to content

Commit

Permalink
feat: do not reuse connection in worker, since we have our own rabbit…
Browse files Browse the repository at this point in the history
…mq server without connection limit
  • Loading branch information
jiegec committed Mar 6, 2024
1 parent 4664cda commit ad51e39
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 43 deletions.
7 changes: 4 additions & 3 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{ensure_channel, Args};
use crate::Args;
use chrono::Local;
use common::{ensure_job_queue, Job, JobError, JobOk, JobResult, WorkerIdentifier};
use futures::StreamExt;
Expand All @@ -8,7 +8,7 @@ use lapin::{
BasicQosOptions,
},
types::FieldTable,
BasicProperties,
BasicProperties, ConnectionProperties,
};
use log::{error, info, warn};
use std::{
Expand Down Expand Up @@ -281,7 +281,8 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
let mut tree_path = args.ciel_path.clone();
tree_path.push("TREE");

let channel = ensure_channel(args).await?;
let conn = lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let queue_name = format!("job-{}", &args.arch);
ensure_job_queue(&queue_name, &channel).await?;

Expand Down
7 changes: 4 additions & 3 deletions worker/src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{ensure_channel, Args};
use crate::Args;
use common::{ensure_job_queue, WorkerHeartbeat, WorkerIdentifier};
use lapin::{options::BasicPublishOptions, BasicProperties};
use lapin::{options::BasicPublishOptions, BasicProperties, ConnectionProperties};
use log::{info, warn};
use std::time::Duration;

pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> {
let channel = ensure_channel(args).await?;
let conn = lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let queue_name = "worker-heartbeat";
ensure_job_queue(queue_name, &channel).await?;

Expand Down
38 changes: 1 addition & 37 deletions worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,9 @@
use clap::Parser;
use lapin::{Channel, Connection, ConnectionProperties};
use once_cell::sync::Lazy;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::Mutex;
use std::path::PathBuf;

pub mod build;
pub mod heartbeat;

pub static CONNECTION: Lazy<Arc<Mutex<Option<Connection>>>> =
Lazy::new(|| Arc::new(Mutex::new(None)));

// try to reuse amqp channel
pub async fn ensure_channel(args: &Args) -> anyhow::Result<Channel> {
let mut lock = CONNECTION.lock().await;
let conn = match &*lock {
Some(conn) => {
if conn.status().connected() {
conn
} else {
// re-connect
*lock = None;

let conn =
lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default())
.await?;
*lock = Some(conn);
lock.as_ref().unwrap()
}
}
None => {
let conn = lapin::Connection::connect(&args.amqp_addr, ConnectionProperties::default())
.await?;
*lock = Some(conn);
lock.as_ref().unwrap()
}
};

let channel = conn.create_channel().await?;
Ok(channel)
}

#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
Expand Down

0 comments on commit ad51e39

Please sign in to comment.