Skip to content

Commit

Permalink
feat: implement the job-retry subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Oct 20, 2023
1 parent 16a0efe commit 71a2344
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 22 deletions.
22 changes: 17 additions & 5 deletions api/src/background_worker/worker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use hub_core::{
thiserror, tokio,
tracing::{error, info},
};
use hub_core::{thiserror, tokio, tracing::error};
use sea_orm::{error::DbErr, ActiveModelTrait};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -103,7 +100,6 @@ where
if let Err(e) = job_tracking_am.update(db_conn).await {
error!("Error updating job tracking: {}", e);
}
info!("Successfully processed job {}", job.id);
},
Err(e) => {
let job_tracking_am =
Expand All @@ -129,6 +125,22 @@ where
}
}

/// This method is responsible for retrying failed jobs.
/// It fetches all failed jobs of a specific type from the database,
/// deserializes their payloads, and attempts to process them again.
/// If the job is processed successfully, its status is updated to "completed".
/// If the job fails again, an error is logged and the job is skipped.
/// The method returns an empty result if it finishes without panicking.
///
/// # Args
///
/// * `&self` - A reference to the Worker instance.
///
/// # Results
///
/// * `Result<(), WorkerError>` - An empty result indicating successful execution.
/// # Errors
/// `Err(WorkerError)`
pub async fn retry(&self) -> Result<(), WorkerError> {
let db_pool = self.db_pool.clone();
let conn = db_pool.get();
Expand Down
57 changes: 40 additions & 17 deletions api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use holaplex_hub_nfts::{
metrics::Metrics,
proto, Actions, AppState, Args, Services, Subcommand,
};
use hub_core::{clap, prelude::*, tokio, tracing::info};
use hub_core::{prelude::*, tokio};
use poem::{get, listener::TcpListener, middleware::AddData, post, EndpointExt, Route, Server};
use redis::Client as RedisClient;

Expand All @@ -35,7 +35,7 @@ pub fn main() {

match command {
None => serve(common, port, db, hub_uploads, redis_url),
Some(Subcommand::RetryJobs) => retry_jobs(common),
Some(Subcommand::RetryJobs) => retry_jobs(common, redis_url, db, hub_uploads),
}
});
}
Expand Down Expand Up @@ -76,25 +76,13 @@ fn serve(
let metadata_json_upload_task_context =
MetadataJsonUploadContext::new(hub_uploads, solana.clone(), polygon.clone());

let job_queue = JobQueue::new(redis_client, connection.clone());
let job_queue = JobQueue::new(redis_client.clone(), connection.clone());
let worker = Worker::<MetadataJsonUploadContext, MetadataJsonUploadTask>::new(
job_queue.clone(),
connection.clone(),
metadata_json_upload_task_context,
);

let matches = clap::Command::new("hub-nfts")
.subcommand(clap::command!("jobs").subcommand(clap::command!("retry")))
.get_matches();

if let Some(("jobs", jobs_command)) = matches.subcommand() {
if let Some(("retry", _retry_command)) = jobs_command.subcommand() {
worker.retry().await?;

return Ok(());
}
}

let schema = build_schema();

let state = AppState::new(
Expand All @@ -106,6 +94,7 @@ fn serve(
polygon.clone(),
common.asset_proxy,
job_queue.clone(),
redis_client,
);

let cons = common.consumer_cfg.build::<Services>().await?;
Expand Down Expand Up @@ -140,6 +129,40 @@ fn serve(
})
}

fn retry_jobs(common: hub_core::Common) -> Result<()> {
common.rt.block_on(async move { todo!() })
fn retry_jobs(
common: hub_core::Common,
redis_url: String,
db: holaplex_hub_nfts::db::DbArgs,
hub_uploads: holaplex_hub_nfts::hub_uploads::HubUploadArgs,
) -> Result<()> {
common.rt.block_on(async move {
let connection = Connection::new(db)
.await
.context("failed to get database connection")?;
let redis_client = RedisClient::open(redis_url)?;
let hub_uploads = HubUploadClient::new(hub_uploads)?;

let producer = common
.producer_cfg
.clone()
.build::<proto::NftEvents>()
.await?;

let solana = Solana::new(producer.clone());
let polygon = Polygon::new(producer.clone());

let metadata_json_upload_task_context =
MetadataJsonUploadContext::new(hub_uploads, solana.clone(), polygon.clone());

let job_queue = JobQueue::new(redis_client, connection.clone());
let worker = Worker::<MetadataJsonUploadContext, MetadataJsonUploadTask>::new(
job_queue.clone(),
connection.clone(),
metadata_json_upload_task_context,
);

worker.retry().await?;

Ok(())
})
}

0 comments on commit 71a2344

Please sign in to comment.