diff --git a/api/src/background_worker/job_queue.rs b/api/src/background_worker/job_queue.rs index bcac193..31c267e 100644 --- a/api/src/background_worker/job_queue.rs +++ b/api/src/background_worker/job_queue.rs @@ -70,7 +70,7 @@ impl JobQueue { let payload = serde_json::to_string(&job_to_enqueue)?; redis::cmd("LPUSH") - .arg("job_queue") + .arg(T::QUEUE) .arg(payload) .query_async(&mut conn) .await?; @@ -94,7 +94,7 @@ impl JobQueue { let db_conn = self.db_pool.get(); let res: Option<(String, String)> = redis::cmd("BRPOP") - .arg("job_queue") + .arg(T::QUEUE) .arg(0) .query_async(&mut conn) .await?; diff --git a/api/src/background_worker/tasks/metadata_json_upload_task.rs b/api/src/background_worker/tasks/metadata_json_upload_task.rs index f348c78..fd73361 100644 --- a/api/src/background_worker/tasks/metadata_json_upload_task.rs +++ b/api/src/background_worker/tasks/metadata_json_upload_task.rs @@ -666,8 +666,15 @@ impl Context { #[async_trait::async_trait] impl BackgroundTask for MetadataJsonUploadTask { + const QUEUE: &'static str = "job_queue"; + const NAME: &'static str = "MetadataJsonUploadTask"; + fn name(&self) -> &'static str { - "MetadataJsonUploadTask" + Self::NAME + } + + fn queue(&self) -> &'static str { + Self::QUEUE } fn payload(&self) -> Result { diff --git a/api/src/background_worker/tasks/mod.rs b/api/src/background_worker/tasks/mod.rs index 48f6b0d..87d56ea 100644 --- a/api/src/background_worker/tasks/mod.rs +++ b/api/src/background_worker/tasks/mod.rs @@ -36,6 +36,12 @@ pub enum BackgroundTaskError { #[async_trait::async_trait] pub trait BackgroundTask: Send + Sync + std::fmt::Debug { + /// The name of the task + const NAME: &'static str; + + /// The queue of the task + const QUEUE: &'static str; + /// Process the task /// # Arguments /// * `self` - The task @@ -55,6 +61,7 @@ pub trait BackgroundTask: Send + Sync + std::fmt::Debug /// * `anyhow::Error` - Unable to serialize the payload fn payload(&self) -> Result; fn name(&self) -> &'static str; + fn queue(&self) -> &'static str; } pub use metadata_json_upload_task::{ diff --git a/api/src/background_worker/worker.rs b/api/src/background_worker/worker.rs index 32cad80..2e3508f 100644 --- a/api/src/background_worker/worker.rs +++ b/api/src/background_worker/worker.rs @@ -1,11 +1,9 @@ -use hub_core::{ - thiserror, tokio, - tracing::{error, info}, -}; +use hub_core::{thiserror, tokio, tracing::error}; use sea_orm::{error::DbErr, ActiveModelTrait}; use serde::{Deserialize, Serialize}; use super::{ + job::Job, job_queue::{JobQueue, JobQueueError}, tasks::BackgroundTask, }; @@ -68,6 +66,8 @@ where tokio::spawn({ let db_pool = db_pool.clone(); let context = context.clone(); + let job_queue = job_queue.clone(); + async move { let db_conn = db_pool.get(); let db_pool_process = db_pool.clone(); @@ -100,14 +100,20 @@ where if let Err(e) = job_tracking_am.update(db_conn).await { error!("Error updating job tracking: {}", e); } - info!("Successfully processed job {}", job.id); }, Err(e) => { let job_tracking_am = job_trackings::Entity::update_status(model, "failed"); if let Err(e) = job_tracking_am.update(db_conn).await { - error!("Error updating job tracking: {}", e); + error!("Error updating job tracking after failure: {}", e); } + + let requeue_result = job_queue.enqueue(job.task).await; + + if let Err(e) = requeue_result { + error!("Error requeueing job {}: {}", job.id, e); + } + error!("Error processing job {}: {}", job.id, e); }, } @@ -118,4 +124,63 @@ where }); } } + + /// This method is responsible for retrying failed jobs. + /// It fetches all failed jobs of a specific type from the database, + /// deserializes their payloads, and attempts to process them again. + /// If the job is processed successfully, its status is updated to "completed". + /// If the job fails again, an error is logged and the job is skipped. + /// The method returns an empty result if it finishes without panicking. + /// + /// # Args + /// + /// * `&self` - A reference to the Worker instance. + /// + /// # Results + /// + /// * `Result<(), WorkerError>` - An empty result indicating successful execution. + /// # Errors + /// `Err(WorkerError)` + pub async fn retry(&self) -> Result<(), WorkerError> { + let db_pool = self.db_pool.clone(); + let conn = db_pool.get(); + + let failed_jobs = job_trackings::Entity::filter_failed_for_job_type(T::NAME.to_string()) + .all(conn) + .await?; + + for failed_job in failed_jobs { + let task_payload_result: Result = + serde_json::from_value(failed_job.clone().payload); + + match task_payload_result { + Ok(task_payload) => { + let job = Job::new(failed_job.id, task_payload); + + let task_results = job + .task + .process(db_pool.clone(), self.context.clone()) + .await; + + if let Err(e) = task_results { + error!("Error retrying job: {}", e); + continue; + } + + let job_tracking_am = + job_trackings::Entity::update_status(failed_job, "completed"); + + if let Err(e) = job_tracking_am.update(conn).await { + error!("Error updating job tracking: {}", e); + } + }, + Err(e) => { + error!("Error deserializing job: {}", e); + continue; + }, + } + } + + Ok(()) + } } diff --git a/api/src/entities/job_trackings.rs b/api/src/entities/job_trackings.rs index ce3bea0..35db8bd 100644 --- a/api/src/entities/job_trackings.rs +++ b/api/src/entities/job_trackings.rs @@ -1,5 +1,5 @@ use hub_core::chrono; -use sea_orm::{entity::prelude::*, Set}; +use sea_orm::{entity::prelude::*, QueryOrder, Set}; use serde_json::Value as Json; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] @@ -49,4 +49,14 @@ impl Entity { active_model } + + pub fn filter_failed_for_job_type(job_type: String) -> Select { + Self::find() + .filter( + Column::Status + .eq("failed") + .and(Column::JobType.eq(job_type)), + ) + .order_by_asc(Column::CreatedAt) + } } diff --git a/api/src/lib.rs b/api/src/lib.rs index a3a4de4..d5ab6e3 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -120,6 +120,14 @@ pub struct Args { #[arg(long, env)] pub redis_url: String, + + #[command(subcommand)] + pub command: Option, +} + +#[derive(Debug, clap::Subcommand)] +pub enum Subcommand { + RetryJobs, } #[derive(Debug, Clone, Copy)] diff --git a/api/src/main.rs b/api/src/main.rs index e7743ac..6a4d828 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -13,7 +13,7 @@ use holaplex_hub_nfts::{ handlers::{graphql_handler, health, metrics_handler, playground}, hub_uploads::HubUploadClient, metrics::Metrics, - proto, Actions, AppState, Args, Services, + proto, Actions, AppState, Args, Services, Subcommand, }; use hub_core::{prelude::*, tokio}; use poem::{get, listener::TcpListener, middleware::AddData, post, EndpointExt, Route, Server}; @@ -30,88 +30,139 @@ pub fn main() { db, hub_uploads, redis_url, + command, } = args; - common.rt.block_on(async move { - let connection = Connection::new(db) - .await - .context("failed to get database connection")?; - - let schema = build_schema(); - - let producer = common - .producer_cfg - .clone() - .build::() - .await?; - let credits = common.credits_cfg.build::().await?; - let hub_uploads = HubUploadClient::new(hub_uploads)?; - - let metrics = Metrics::new()?; - - let event_processor = events::Processor::new( - connection.clone(), - credits.clone(), - producer.clone(), - metrics.clone(), - ); - - let solana = Solana::new(producer.clone()); - let polygon = Polygon::new(producer.clone()); - - let redis_client = RedisClient::open(redis_url)?; - - let metadata_json_upload_task_context = - MetadataJsonUploadContext::new(hub_uploads, solana.clone(), polygon.clone()); - - let job_queue = JobQueue::new(redis_client.clone(), connection.clone()); - let worker = Worker::::new( - job_queue.clone(), - connection.clone(), - metadata_json_upload_task_context, - ); - - let state = AppState::new( - schema, - connection.clone(), - producer.clone(), - credits.clone(), - solana.clone(), - polygon.clone(), - common.asset_proxy, - job_queue.clone(), - redis_client, - ); - - let cons = common.consumer_cfg.build::().await?; - - tokio::spawn(async move { worker.start().await }); - - tokio::spawn(async move { - cons.consume( - |b| { - b.with_jitter() - .with_min_delay(Duration::from_millis(500)) - .with_max_delay(Duration::from_secs(90)) - }, - |e| async move { event_processor.process(e).await }, - ) - .await - }); - - Server::new(TcpListener::bind(format!("0.0.0.0:{port}"))) - .run( - Route::new() - .at( - "/graphql", - post(graphql_handler).with(AddData::new(state.clone())), - ) - .at("/playground", get(playground)) - .at("/health", get(health)) - .at("/metrics", get(metrics_handler).with(AddData::new(metrics))), - ) - .await - .context("failed to build graphql server") - }) + match command { + None => serve(common, port, db, hub_uploads, redis_url), + Some(Subcommand::RetryJobs) => retry_jobs(common, redis_url, db, hub_uploads), + } }); } + +fn serve( + common: hub_core::Common, + port: u16, + db: holaplex_hub_nfts::db::DbArgs, + hub_uploads: holaplex_hub_nfts::hub_uploads::HubUploadArgs, + redis_url: String, +) -> Result<()> { + common.rt.block_on(async move { + let connection = Connection::new(db) + .await + .context("failed to get database connection")?; + let hub_uploads = HubUploadClient::new(hub_uploads)?; + let credits = common.credits_cfg.build::().await?; + let metrics = Metrics::new()?; + + let producer = common + .producer_cfg + .clone() + .build::() + .await?; + + let event_processor = events::Processor::new( + connection.clone(), + credits.clone(), + producer.clone(), + metrics.clone(), + ); + + let solana = Solana::new(producer.clone()); + let polygon = Polygon::new(producer.clone()); + + let redis_client = RedisClient::open(redis_url)?; + + let metadata_json_upload_task_context = + MetadataJsonUploadContext::new(hub_uploads, solana.clone(), polygon.clone()); + + let job_queue = JobQueue::new(redis_client.clone(), connection.clone()); + let worker = Worker::::new( + job_queue.clone(), + connection.clone(), + metadata_json_upload_task_context, + ); + + let schema = build_schema(); + + let state = AppState::new( + schema, + connection.clone(), + producer.clone(), + credits.clone(), + solana.clone(), + polygon.clone(), + common.asset_proxy, + job_queue.clone(), + redis_client, + ); + + let cons = common.consumer_cfg.build::().await?; + + tokio::spawn(async move { worker.start().await }); + + tokio::spawn(async move { + cons.consume( + |b| { + b.with_jitter() + .with_min_delay(Duration::from_millis(500)) + .with_max_delay(Duration::from_secs(90)) + }, + |e| async move { event_processor.process(e).await }, + ) + .await + }); + + Server::new(TcpListener::bind(format!("0.0.0.0:{port}"))) + .run( + Route::new() + .at( + "/graphql", + post(graphql_handler).with(AddData::new(state.clone())), + ) + .at("/playground", get(playground)) + .at("/health", get(health)) + .at("/metrics", get(metrics_handler).with(AddData::new(metrics))), + ) + .await + .context("failed to build graphql server") + }) +} + +fn retry_jobs( + common: hub_core::Common, + redis_url: String, + db: holaplex_hub_nfts::db::DbArgs, + hub_uploads: holaplex_hub_nfts::hub_uploads::HubUploadArgs, +) -> Result<()> { + common.rt.block_on(async move { + let connection = Connection::new(db) + .await + .context("failed to get database connection")?; + let redis_client = RedisClient::open(redis_url)?; + let hub_uploads = HubUploadClient::new(hub_uploads)?; + + let producer = common + .producer_cfg + .clone() + .build::() + .await?; + + let solana = Solana::new(producer.clone()); + let polygon = Polygon::new(producer.clone()); + + let metadata_json_upload_task_context = + MetadataJsonUploadContext::new(hub_uploads, solana.clone(), polygon.clone()); + + let job_queue = JobQueue::new(redis_client, connection.clone()); + let worker = Worker::::new( + job_queue.clone(), + connection.clone(), + metadata_json_upload_task_context, + ); + + worker.retry().await?; + + Ok(()) + }) +}