Skip to content

Commit

Permalink
feat(image-processor): input metadata on success event
Browse files Browse the repository at this point in the history
  • Loading branch information
lennartkloock committed May 25, 2024
1 parent 8a57bf8 commit fac6d5d
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 31 deletions.
1 change: 1 addition & 0 deletions image-processor/proto/scuffle/image_processor/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ message EventCallback {
message Success {
string drive = 1;
repeated OutputFile files = 2;
InputFileMetadata input_metadata = 3;
}

message Fail {
Expand Down
10 changes: 10 additions & 0 deletions image-processor/proto/scuffle/image_processor/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@ message OutputFile {
OutputFormat format = 9;
}

// Returned after the image is processed.
message InputFileMetadata {
// The width of the input image.
uint32 width = 1;
// The height of the input image.
uint32 height = 2;
// The frame count of the input image.
uint32 frame_count = 3;
}

message Output {
// The drive path to store the output image.
// This is a prefix and the processor will append the suffix to this path to determine the final path.
Expand Down
40 changes: 18 additions & 22 deletions image-processor/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use scuffle_image_processor_proto::{event_callback, EventCallback, EventQueue as EventTopic, OutputFile};
use scuffle_image_processor_proto::{event_callback, EventCallback, EventQueue as EventTopic};

use crate::database::Job;
use crate::event_queue::EventQueue;
Expand Down Expand Up @@ -29,42 +29,38 @@ pub async fn on_event(global: &Arc<Global>, job: &Job, event_topic: &EventTopic,
}
}

fn start_event(_: &Job) -> event_callback::Event {
event_callback::Event::Start(event_callback::Start {})
}

fn success_event(_: &Job, drive: String, files: Vec<OutputFile>) -> event_callback::Event {
event_callback::Event::Success(event_callback::Success { drive, files })
}

fn fail_event(_: &Job, err: JobError) -> event_callback::Event {
event_callback::Event::Fail(event_callback::Fail { error: Some(err.into()) })
}

fn cancel_event(_: &Job) -> event_callback::Event {
event_callback::Event::Cancel(event_callback::Cancel {})
}

pub async fn on_start(global: &Arc<Global>, job: &Job) {
if let Some(on_start) = &job.task.events.as_ref().and_then(|events| events.on_start.as_ref()) {
on_event(global, job, on_start, start_event(job)).await;
on_event(global, job, on_start, event_callback::Event::Start(event_callback::Start {})).await;
}
}

pub async fn on_success(global: &Arc<Global>, job: &Job, drive: String, files: Vec<OutputFile>) {
pub async fn on_success(global: &Arc<Global>, job: &Job, success: event_callback::Success) {
if let Some(on_success) = &job.task.events.as_ref().and_then(|events| events.on_success.as_ref()) {
on_event(global, job, on_success, success_event(job, drive, files)).await;
on_event(global, job, on_success, event_callback::Event::Success(success)).await;
}
}

pub async fn on_failure(global: &Arc<Global>, job: &Job, err: JobError) {
if let Some(on_failure) = &job.task.events.as_ref().and_then(|events| events.on_failure.as_ref()) {
on_event(global, job, on_failure, fail_event(job, err)).await;
on_event(
global,
job,
on_failure,
event_callback::Event::Fail(event_callback::Fail { error: Some(err.into()) }),
)
.await;
}
}

pub async fn on_cancel(global: &Arc<Global>, job: &Job) {
if let Some(on_cancel) = &job.task.events.as_ref().and_then(|events| events.on_cancel.as_ref()) {
on_event(global, job, on_cancel, cancel_event(job)).await;
on_event(
global,
job,
on_cancel,
event_callback::Event::Cancel(event_callback::Cancel {}),
)
.await;
}
}
29 changes: 22 additions & 7 deletions image-processor/src/worker/process/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ use std::sync::Arc;

use bytes::Bytes;
use file_format::FileFormat;
use scuffle_image_processor_proto::{animation_config, Output, OutputFormat, OutputFormatOptions, Task};
use scuffle_image_processor_proto::{animation_config, InputFileMetadata, Output, OutputFormat, OutputFormatOptions, Task};
use tokio::sync::OwnedSemaphorePermit;

use super::decoder::{AnyDecoder, Decoder, DecoderFrontend, DecoderInfo, LoopCount};
use super::encoder::{AnyEncoder, Encoder, EncoderBackend, EncoderSettings};
use super::encoder::{AnyEncoder, Encoder, EncoderBackend, EncoderError, EncoderSettings};
use super::resize::{ImageResizer, ResizeOutputTarget};
use super::JobError;

pub struct JobOutput {
pub input: InputFileMetadata,
pub output: Vec<OutputImage>,
}

pub struct OutputImage {
pub format: OutputFormat,
pub format_name: Option<String>,
pub format_idx: usize,
Expand Down Expand Up @@ -54,7 +59,7 @@ impl Drop for CancelToken {
}
}

pub async fn spawn(task: Task, input: Bytes, permit: Arc<OwnedSemaphorePermit>) -> Result<Vec<JobOutput>, JobError> {
pub async fn spawn(task: Task, input: Bytes, permit: Arc<OwnedSemaphorePermit>) -> Result<JobOutput, JobError> {
let cancel_token = CancelToken::new();
let _cancel_guard = cancel_token.clone();

Expand Down Expand Up @@ -351,14 +356,15 @@ impl<'a> BlockingTask<'a> {
Ok(true)
}

pub fn finish(self) -> Result<Vec<JobOutput>, JobError> {
self.static_encoders
pub fn finish(self) -> Result<JobOutput, JobError> {
let output = self
.static_encoders
.into_iter()
.chain(self.anim_encoders)
.flat_map(|(f_idx, encoders)| {
encoders.into_iter().map(move |(output, encoder)| {
let info = encoder.info();
Ok(JobOutput {
Ok(OutputImage {
format: info.format,
format_name: info.name.clone(),
format_idx: f_idx,
Expand All @@ -372,6 +378,15 @@ impl<'a> BlockingTask<'a> {
})
})
})
.collect()
.collect::<Result<_, EncoderError>>()?;

Ok(JobOutput {
input: InputFileMetadata {
width: self.decoder_info.width as u32,
height: self.decoder_info.height as u32,
frame_count: self.decoder_info.frame_count as u32,
},
output,
})
}
}
9 changes: 7 additions & 2 deletions image-processor/src/worker/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bson::oid::ObjectId;
use scuffle_foundations::context::Context;
use scuffle_image_processor_proto::{event_callback, ErrorCode, OutputFile, OutputFormat};

use self::blocking::JobOutput;
pub use self::decoder::DecoderFrontend;
use self::resize::ResizeError;
use crate::database::Job;
Expand Down Expand Up @@ -157,7 +158,7 @@ impl ProcessJob {
match result {
Ok(success) => {
tracing::info!("job completed");
crate::events::on_success(&global, &self.job, success.drive, success.files).await;
crate::events::on_success(&global, &self.job, success).await;
}
Err(err) => {
tracing::error!("failed to process job: {err}");
Expand All @@ -180,7 +181,10 @@ impl ProcessJob {

let job = self.job.clone();

let output_results = blocking::spawn(job.task.clone(), input, self.permit.clone()).await?;
let JobOutput {
output: output_results,
input: input_metadata,
} = blocking::spawn(job.task.clone(), input, self.permit.clone()).await?;

let is_animated = output_results.iter().any(|r| r.frame_count > 1);

Expand Down Expand Up @@ -237,6 +241,7 @@ impl ProcessJob {

Ok(event_callback::Success {
drive: output_drive_path.drive.clone(),
input_metadata: Some(input_metadata),
files,
})
}
Expand Down

0 comments on commit fac6d5d

Please sign in to comment.