Skip to content

Commit

Permalink
Merge pull request #254 from holaplex/espi/auto-retry-failed-metadata…
Browse files Browse the repository at this point in the history
…-json-jobs

Job Retry
  • Loading branch information
kespinola authored Oct 20, 2023
2 parents 52ff551 + 71a2344 commit 14ece9f
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 92 deletions.
4 changes: 2 additions & 2 deletions api/src/background_worker/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl JobQueue {
let payload = serde_json::to_string(&job_to_enqueue)?;

redis::cmd("LPUSH")
.arg("job_queue")
.arg(T::QUEUE)
.arg(payload)
.query_async(&mut conn)
.await?;
Expand All @@ -94,7 +94,7 @@ impl JobQueue {
let db_conn = self.db_pool.get();

let res: Option<(String, String)> = redis::cmd("BRPOP")
.arg("job_queue")
.arg(T::QUEUE)
.arg(0)
.query_async(&mut conn)
.await?;
Expand Down
9 changes: 8 additions & 1 deletion api/src/background_worker/tasks/metadata_json_upload_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,15 @@ impl Context {

#[async_trait::async_trait]
impl BackgroundTask<Context> for MetadataJsonUploadTask {
const QUEUE: &'static str = "job_queue";
const NAME: &'static str = "MetadataJsonUploadTask";

fn name(&self) -> &'static str {
"MetadataJsonUploadTask"
Self::NAME
}

fn queue(&self) -> &'static str {
Self::QUEUE
}

fn payload(&self) -> Result<Value> {
Expand Down
7 changes: 7 additions & 0 deletions api/src/background_worker/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ pub enum BackgroundTaskError {

#[async_trait::async_trait]
pub trait BackgroundTask<C: Clone + Send + Sync>: Send + Sync + std::fmt::Debug {
/// The name of the task
const NAME: &'static str;

/// The queue of the task
const QUEUE: &'static str;

/// Process the task
/// # Arguments
/// * `self` - The task
Expand All @@ -55,6 +61,7 @@ pub trait BackgroundTask<C: Clone + Send + Sync>: Send + Sync + std::fmt::Debug
/// * `anyhow::Error` - Unable to serialize the payload
fn payload(&self) -> Result<Json>;
fn name(&self) -> &'static str;
fn queue(&self) -> &'static str;
}

pub use metadata_json_upload_task::{
Expand Down
77 changes: 71 additions & 6 deletions api/src/background_worker/worker.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use hub_core::{
thiserror, tokio,
tracing::{error, info},
};
use hub_core::{thiserror, tokio, tracing::error};
use sea_orm::{error::DbErr, ActiveModelTrait};
use serde::{Deserialize, Serialize};

use super::{
job::Job,
job_queue::{JobQueue, JobQueueError},
tasks::BackgroundTask,
};
Expand Down Expand Up @@ -68,6 +66,8 @@ where
tokio::spawn({
let db_pool = db_pool.clone();
let context = context.clone();
let job_queue = job_queue.clone();

async move {
let db_conn = db_pool.get();
let db_pool_process = db_pool.clone();
Expand Down Expand Up @@ -100,14 +100,20 @@ where
if let Err(e) = job_tracking_am.update(db_conn).await {
error!("Error updating job tracking: {}", e);
}
info!("Successfully processed job {}", job.id);
},
Err(e) => {
let job_tracking_am =
job_trackings::Entity::update_status(model, "failed");
if let Err(e) = job_tracking_am.update(db_conn).await {
error!("Error updating job tracking: {}", e);
error!("Error updating job tracking after failure: {}", e);
}

let requeue_result = job_queue.enqueue(job.task).await;

if let Err(e) = requeue_result {
error!("Error requeueing job {}: {}", job.id, e);
}

error!("Error processing job {}: {}", job.id, e);
},
}
Expand All @@ -118,4 +124,63 @@ where
});
}
}

/// This method is responsible for retrying failed jobs.
/// It fetches all failed jobs of a specific type from the database,
/// deserializes their payloads, and attempts to process them again.
/// If the job is processed successfully, its status is updated to "completed".
/// If the job fails again, an error is logged and the job is skipped.
/// The method returns an empty result if it finishes without panicking.
///
/// # Args
///
/// * `&self` - A reference to the Worker instance.
///
/// # Results
///
/// * `Result<(), WorkerError>` - An empty result indicating successful execution.
/// # Errors
/// `Err(WorkerError)`
pub async fn retry(&self) -> Result<(), WorkerError> {
let db_pool = self.db_pool.clone();
let conn = db_pool.get();

let failed_jobs = job_trackings::Entity::filter_failed_for_job_type(T::NAME.to_string())
.all(conn)
.await?;

for failed_job in failed_jobs {
let task_payload_result: Result<T, _> =
serde_json::from_value(failed_job.clone().payload);

match task_payload_result {
Ok(task_payload) => {
let job = Job::new(failed_job.id, task_payload);

let task_results = job
.task
.process(db_pool.clone(), self.context.clone())
.await;

if let Err(e) = task_results {
error!("Error retrying job: {}", e);
continue;
}

let job_tracking_am =
job_trackings::Entity::update_status(failed_job, "completed");

if let Err(e) = job_tracking_am.update(conn).await {
error!("Error updating job tracking: {}", e);
}
},
Err(e) => {
error!("Error deserializing job: {}", e);
continue;
},
}
}

Ok(())
}
}
12 changes: 11 additions & 1 deletion api/src/entities/job_trackings.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use hub_core::chrono;
use sea_orm::{entity::prelude::*, Set};
use sea_orm::{entity::prelude::*, QueryOrder, Set};
use serde_json::Value as Json;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
Expand Down Expand Up @@ -49,4 +49,14 @@ impl Entity {

active_model
}

pub fn filter_failed_for_job_type(job_type: String) -> Select<Self> {
Self::find()
.filter(
Column::Status
.eq("failed")
.and(Column::JobType.eq(job_type)),
)
.order_by_asc(Column::CreatedAt)
}
}
8 changes: 8 additions & 0 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ pub struct Args {

#[arg(long, env)]
pub redis_url: String,

#[command(subcommand)]
pub command: Option<Subcommand>,
}

#[derive(Debug, clap::Subcommand)]
pub enum Subcommand {
RetryJobs,
}

#[derive(Debug, Clone, Copy)]
Expand Down
Loading

0 comments on commit 14ece9f

Please sign in to comment.