From a62e76d9451b759a53fba14c3b57356dcbf67145 Mon Sep 17 00:00:00 2001 From: Troy Benson Date: Sat, 4 May 2024 01:57:21 +0000 Subject: [PATCH] feat: image processor redesign --- Cargo.lock | 159 ++++++++++------ Cargo.toml | 1 + ffmpeg/Cargo.toml | 2 - ffmpeg/src/decoder.rs | 9 - ffmpeg/src/encoder.rs | 15 -- ffmpeg/src/filter_graph.rs | 18 -- ffmpeg/src/io/internal.rs | 6 - ffmpeg/src/io/output.rs | 21 --- ffmpeg/src/packet.rs | 3 - ffmpeg/src/scalar.rs | 3 - foundations/examples/src/http-server.rs | 20 ++- foundations/src/http/server/mod.rs | 16 +- foundations/src/http/server/stream/mod.rs | 2 +- foundations/src/http/server/stream/quic.rs | 5 +- foundations/src/http/server/stream/tcp.rs | 6 +- foundations/src/http/server/stream/tls.rs | 5 +- foundations/src/telemetry/server.rs | 5 +- image_processor/Cargo.toml | 17 +- image_processor/build.rs | 6 - image_processor/proto/Cargo.toml | 23 +++ image_processor/proto/build.rs | 19 ++ .../scuffle/image_processor/events.proto | 20 +++ .../{src/pb.rs => proto/src/lib.rs} | 0 image_processor/src/config.rs | 170 +++++++++++------- image_processor/src/database.rs | 122 ++++++++++++- image_processor/src/disk/http.rs | 167 +++++++++++++++++ image_processor/src/disk/local.rs | 79 ++++++++ image_processor/src/disk/memory.rs | 151 ++++++++++++++++ image_processor/src/disk/mod.rs | 130 ++++++++++++++ image_processor/src/disk/public_http.rs | 96 ++++++++++ image_processor/src/disk/s3.rs | 134 ++++++++++++++ image_processor/src/event_queue/http.rs | 93 ++++++++++ image_processor/src/event_queue/mod.rs | 65 +++++++ image_processor/src/event_queue/nats.rs | 65 +++++++ image_processor/src/event_queue/redis.rs | 57 ++++++ image_processor/src/global.rs | 99 ++++++++-- image_processor/src/lib.rs | 9 - image_processor/src/main.rs | 81 +++------ video/transcoder/Cargo.toml | 2 +- 39 files changed, 1588 insertions(+), 313 deletions(-) delete mode 100644 image_processor/build.rs create mode 100644 image_processor/proto/Cargo.toml create mode 100644 image_processor/proto/build.rs create mode 100644 image_processor/proto/scuffle/image_processor/events.proto rename image_processor/{src/pb.rs => proto/src/lib.rs} (100%) create mode 100644 image_processor/src/disk/http.rs create mode 100644 image_processor/src/disk/local.rs create mode 100644 image_processor/src/disk/memory.rs create mode 100644 image_processor/src/disk/mod.rs create mode 100644 image_processor/src/disk/public_http.rs create mode 100644 image_processor/src/disk/s3.rs create mode 100644 image_processor/src/event_queue/http.rs create mode 100644 image_processor/src/event_queue/mod.rs create mode 100644 image_processor/src/event_queue/nats.rs create mode 100644 image_processor/src/event_queue/redis.rs delete mode 100644 image_processor/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 7b9863a8..a4215e43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -995,7 +995,7 @@ dependencies = [ "aws-smithy-types", "bytes", "deadpool-postgres", - "fred", + "fred 8.0.6", "futures-util", "http-body 1.0.0", "hyper 1.3.1", @@ -1118,6 +1118,7 @@ dependencies = [ "ahash 0.8.11", "base64 0.13.1", "bitvec", + "chrono", "hex", "indexmap 2.2.6", "js-sys", @@ -1380,12 +1381,9 @@ dependencies = [ [[package]] name = "cookie-factory" -version = "0.3.3" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9885fa71e26b8ab7855e2ec7cae6e9b380edff76cd052e07c683a0319d51b3a2" -dependencies = [ - "futures", -] +checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" [[package]] name = "core-foundation" @@ -2183,7 +2181,7 @@ dependencies = [ "log", "parking_lot", "rand", - "redis-protocol", + "redis-protocol 4.1.0", "rustls 0.22.4", "rustls-native-certs 0.7.0", "rustls-webpki 0.102.3", @@ -2198,6 +2196,32 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "fred" +version = "9.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915e065b377f6e16d5c01eae96bf31eeaf81e1e300b76f938761b3c21307cad8" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "bytes-utils", + "crossbeam-queue", + "float-cmp", + "futures", + "log", + "parking_lot", + "rand", + "redis-protocol 5.0.1", + "semver 1.0.22", + "socket2 0.5.7", + "tokio", + "tokio-stream", + "tokio-util", + "url", + "urlencoding", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -4217,7 +4241,7 @@ dependencies = [ "bitmask-enum", "bytes", "chrono", - "fred", + "fred 8.0.6", "futures", "futures-util", "hmac", @@ -4256,46 +4280,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "platform-image-processor" -version = "0.0.1" -dependencies = [ - "anyhow", - "async-nats", - "async-trait", - "aws-config", - "aws-sdk-s3", - "byteorder", - "bytes", - "chrono", - "fast_image_resize", - "file-format", - "futures", - "gifski", - "imgref", - "libavif-sys", - "libwebp-sys2", - "mongodb", - "num_cpus", - "png", - "postgres-from-row", - "prost 0.12.4", - "reqwest", - "rgb", - "scopeguard", - "scuffle-ffmpeg", - "scuffle-utils", - "serde", - "serde_json", - "sha2", - "thiserror", - "tokio", - "tonic", - "tonic-build", - "tracing", - "ulid", -] - [[package]] name = "platforms" version = "3.4.0" @@ -4806,6 +4790,20 @@ dependencies = [ "nom", ] +[[package]] +name = "redis-protocol" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65deb7c9501fbb2b6f812a30d59c0253779480853545153a51d8e9e444ddc99f" +dependencies = [ + "bytes", + "bytes-utils", + "cookie-factory", + "crc16", + "log", + "nom", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -5296,7 +5294,6 @@ dependencies = [ "crossbeam-channel", "ffmpeg-sys-next", "libc", - "scuffle-utils", "tokio", "tracing", ] @@ -5367,6 +5364,64 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "scuffle-image-processor" +version = "0.0.1" +dependencies = [ + "anyhow", + "async-nats", + "async-trait", + "aws-config", + "aws-sdk-s3", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bson", + "byteorder", + "bytes", + "chrono", + "fast_image_resize", + "file-format", + "fred 9.0.3", + "futures", + "gifski", + "http 1.1.0", + "imgref", + "libavif-sys", + "libwebp-sys2", + "mongodb", + "num_cpus", + "png", + "postgres-from-row", + "prost 0.12.4", + "reqwest", + "rgb", + "scopeguard", + "scuffle-ffmpeg", + "scuffle-foundations", + "scuffle-image-processor-proto", + "serde", + "serde_json", + "sha2", + "thiserror", + "tokio", + "tonic", + "tonic-build", + "tracing", + "ulid", + "url", +] + +[[package]] +name = "scuffle-image-processor-proto" +version = "0.0.0" +dependencies = [ + "prost 0.12.4", + "prost-build", + "serde", + "tonic", + "tonic-build", +] + [[package]] name = "scuffle-utils" version = "0.1.0" @@ -5377,7 +5432,7 @@ dependencies = [ "deadpool-postgres", "dotenvy", "fnv", - "fred", + "fred 8.0.6", "futures", "futures-channel", "futures-util", @@ -6861,7 +6916,7 @@ dependencies = [ "bytes", "chrono", "dotenvy", - "fred", + "fred 8.0.6", "futures", "futures-util", "hex", @@ -6902,7 +6957,7 @@ dependencies = [ "binary-helper", "chrono", "clap", - "fred", + "fred 8.0.6", "futures", "futures-util", "pb", diff --git a/Cargo.toml b/Cargo.toml index 0ecc4c35..369a9949 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "platform/api", "image_processor", + "image_processor/proto", "video/edge", "video/ingest", "video/transcoder", diff --git a/ffmpeg/Cargo.toml b/ffmpeg/Cargo.toml index c8b980d7..7fd53411 100644 --- a/ffmpeg/Cargo.toml +++ b/ffmpeg/Cargo.toml @@ -11,11 +11,9 @@ bytes = { optional = true, version = "1" } tokio = { optional = true, version = "1" } crossbeam-channel = { optional = true, version = "0.5" } tracing = { optional = true, version = "0.1" } -scuffle-utils = { path = "../utils", version = "*", optional = true, features = ["task"]} [features] default = [] -task-abort = ["dep:scuffle-utils"] channel = ["dep:bytes"] tokio-channel = ["channel", "dep:tokio"] crossbeam-channel = ["channel", "dep:crossbeam-channel"] diff --git a/ffmpeg/src/decoder.rs b/ffmpeg/src/decoder.rs index 3e58adff..f33e1b7c 100644 --- a/ffmpeg/src/decoder.rs +++ b/ffmpeg/src/decoder.rs @@ -152,9 +152,6 @@ impl GenericDecoder { } pub fn send_packet(&mut self, packet: &Packet) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _guard = scuffle_utils::task::AbortGuard::new(); - // Safety: `packet` is a valid pointer, and `self.decoder` is a valid pointer. let ret = unsafe { avcodec_send_packet(self.decoder.as_mut_ptr(), packet.as_ptr()) }; @@ -165,9 +162,6 @@ impl GenericDecoder { } pub fn send_eof(&mut self) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _guard = scuffle_utils::task::AbortGuard::new(); - // Safety: `self.decoder` is a valid pointer. let ret = unsafe { avcodec_send_packet(self.decoder.as_mut_ptr(), std::ptr::null()) }; @@ -178,9 +172,6 @@ impl GenericDecoder { } pub fn receive_frame(&mut self) -> Result, FfmpegError> { - #[cfg(feature = "task-abort")] - let _guard = scuffle_utils::task::AbortGuard::new(); - let mut frame = Frame::new()?; // Safety: `frame` is a valid pointer, and `self.decoder` is a valid pointer. diff --git a/ffmpeg/src/encoder.rs b/ffmpeg/src/encoder.rs index 238eae47..7b699c01 100644 --- a/ffmpeg/src/encoder.rs +++ b/ffmpeg/src/encoder.rs @@ -426,9 +426,6 @@ impl Encoder { outgoing_time_base: AVRational, settings: impl Into, ) -> Result { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - if codec.as_ptr().is_null() { return Err(FfmpegError::NoEncoder); } @@ -489,9 +486,6 @@ impl Encoder { } pub fn send_eof(&mut self) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - // Safety: `self.encoder` is a valid pointer. let ret = unsafe { avcodec_send_frame(self.encoder.as_mut_ptr(), std::ptr::null()) }; if ret == 0 { @@ -502,9 +496,6 @@ impl Encoder { } pub fn send_frame(&mut self, frame: &Frame) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - // Safety: `self.encoder` and `frame` are valid pointers. let ret = unsafe { avcodec_send_frame(self.encoder.as_mut_ptr(), frame.as_ptr()) }; if ret == 0 { @@ -515,9 +506,6 @@ impl Encoder { } pub fn receive_packet(&mut self) -> Result, FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - let mut packet = Packet::new()?; const AVERROR_EAGAIN: i32 = AVERROR(EAGAIN); @@ -631,9 +619,6 @@ impl MuxerEncoder { } pub fn send_eof(&mut self) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - self.encoder.send_eof()?; self.handle_packets()?; diff --git a/ffmpeg/src/filter_graph.rs b/ffmpeg/src/filter_graph.rs index 65d7f116..0bf180e7 100644 --- a/ffmpeg/src/filter_graph.rs +++ b/ffmpeg/src/filter_graph.rs @@ -14,18 +14,12 @@ unsafe impl Send for FilterGraph {} impl FilterGraph { pub fn new() -> Result { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - // Safety: the pointer returned from avfilter_graph_alloc is valid unsafe { Self::wrap(avfilter_graph_alloc()) } } /// Safety: `ptr` must be a valid pointer to an `AVFilterGraph`. unsafe fn wrap(ptr: *mut AVFilterGraph) -> Result { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - Ok(Self( SmartPtr::wrap_non_null(ptr, |ptr| unsafe { avfilter_graph_free(ptr) }).ok_or(FfmpegError::Alloc)?, )) @@ -40,9 +34,6 @@ impl FilterGraph { } pub fn add(&mut self, filter: Filter, name: &str, args: &str) -> Result, FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - let name = CString::new(name).expect("failed to convert name to CString"); let args = CString::new(args).expect("failed to convert args to CString"); @@ -238,9 +229,6 @@ unsafe impl Send for FilterContextSource<'_> {} impl FilterContextSource<'_> { pub fn send_frame(&mut self, frame: &Frame) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - // Safety: `frame` is a valid pointer, and `self.0` is a valid pointer. unsafe { match av_buffersrc_write_frame(self.0, frame.as_ptr()) { @@ -251,9 +239,6 @@ impl FilterContextSource<'_> { } pub fn send_eof(&mut self, pts: Option) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - // Safety: `self.0` is a valid pointer. unsafe { match if let Some(pts) = pts { @@ -275,9 +260,6 @@ unsafe impl Send for FilterContextSink<'_> {} impl FilterContextSink<'_> { pub fn receive_frame(&mut self) -> Result, FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - let mut frame = Frame::new()?; // Safety: `frame` is a valid pointer, and `self.0` is a valid pointer. diff --git a/ffmpeg/src/io/internal.rs b/ffmpeg/src/io/internal.rs index 33b08ad1..b9cedc2c 100644 --- a/ffmpeg/src/io/internal.rs +++ b/ffmpeg/src/io/internal.rs @@ -112,9 +112,6 @@ impl Default for InnerOptions { impl Inner { pub fn new(data: T, options: InnerOptions) -> Result { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - // Safety: av_malloc is safe to call let buffer = unsafe { SmartPtr::wrap_non_null(av_malloc(options.buffer_size), |ptr| { @@ -227,9 +224,6 @@ impl Inner<()> { } pub fn open_output(path: &str) -> Result { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - let path = std::ffi::CString::new(path).expect("Failed to convert path to CString"); // Safety: avformat_alloc_output_context2 is safe to call diff --git a/ffmpeg/src/io/output.rs b/ffmpeg/src/io/output.rs index c16e06a7..4eaad989 100644 --- a/ffmpeg/src/io/output.rs +++ b/ffmpeg/src/io/output.rs @@ -149,9 +149,6 @@ impl Output { } pub fn add_stream(&mut self, codec: Option<*const AVCodec>) -> Option> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - // Safety: `avformat_new_stream` is safe to call. let stream = unsafe { avformat_new_stream(self.as_mut_ptr(), codec.unwrap_or_else(std::ptr::null)) }; if stream.is_null() { @@ -167,9 +164,6 @@ impl Output { } pub fn copy_stream<'a>(&'a mut self, stream: &Stream<'_>) -> Option> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - let codec_param = stream.codec_parameters()?; // Safety: `avformat_new_stream` is safe to call. @@ -195,9 +189,6 @@ impl Output { } pub fn write_header(&mut self) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - if self.witten_header { return Err(FfmpegError::Arguments("header already written")); } @@ -216,9 +207,6 @@ impl Output { } pub fn write_header_with_options(&mut self, options: &mut Dictionary) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - if self.witten_header { return Err(FfmpegError::Arguments("header already written")); } @@ -237,9 +225,6 @@ impl Output { } pub fn write_trailer(&mut self) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - if !self.witten_header { return Err(FfmpegError::Arguments("header not written")); } @@ -254,9 +239,6 @@ impl Output { } pub fn write_interleaved_packet(&mut self, mut packet: Packet) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - if !self.witten_header { return Err(FfmpegError::Arguments("header not written")); } @@ -272,9 +254,6 @@ impl Output { } pub fn write_packet(&mut self, packet: &Packet) -> Result<(), FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - if !self.witten_header { return Err(FfmpegError::Arguments("header not written")); } diff --git a/ffmpeg/src/packet.rs b/ffmpeg/src/packet.rs index dcc0c6a7..b2ad2137 100644 --- a/ffmpeg/src/packet.rs +++ b/ffmpeg/src/packet.rs @@ -17,9 +17,6 @@ impl<'a> Packets<'a> { } pub fn receive(&mut self) -> Result, FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - let mut packet = Packet::new()?; // Safety: av_read_frame is safe to call, 'packet' is a valid pointer diff --git a/ffmpeg/src/scalar.rs b/ffmpeg/src/scalar.rs index feade65c..34bae5f7 100644 --- a/ffmpeg/src/scalar.rs +++ b/ffmpeg/src/scalar.rs @@ -87,9 +87,6 @@ impl Scalar { } pub fn process<'a>(&'a mut self, frame: &Frame) -> Result<&'a VideoFrame, FfmpegError> { - #[cfg(feature = "task-abort")] - let _abort_guard = scuffle_utils::task::AbortGuard::new(); - // Safety: `frame` is a valid pointer, and `self.ptr` is a valid pointer. let ret = unsafe { sws_scale( diff --git a/foundations/examples/src/http-server.rs b/foundations/examples/src/http-server.rs index ac650465..bf209253 100644 --- a/foundations/examples/src/http-server.rs +++ b/foundations/examples/src/http-server.rs @@ -9,7 +9,7 @@ use scuffle_foundations::bootstrap::{bootstrap, Bootstrap, RuntimeSettings}; use scuffle_foundations::http::server::stream::{IncomingConnection, MakeService, ServiceHandler, SocketKind}; use scuffle_foundations::http::server::Server; use scuffle_foundations::settings::auto_settings; -use scuffle_foundations::settings::cli::{Matches, clap}; +use scuffle_foundations::settings::cli::{clap, Matches}; use scuffle_foundations::telemetry::settings::TelemetrySettings; use tokio::signal::unix::SignalKind; @@ -37,20 +37,22 @@ impl Bootstrap for HttpServerSettings { fn additional_args() -> Vec { vec![ - clap::Arg::new("tls-cert") - .long("tls-cert") - .value_name("FILE"), - clap::Arg::new("tls-key") - .long("tls-key") - .value_name("FILE"), + clap::Arg::new("tls-cert").long("tls-cert").value_name("FILE"), + clap::Arg::new("tls-key").long("tls-key").value_name("FILE"), ] } } #[bootstrap] async fn main(settings: Matches) { - let tls_cert = settings.args.get_one::("tls-cert").or(settings.settings.tls_cert.as_ref()); - let tls_key = settings.args.get_one::("tls-key").or(settings.settings.tls_key.as_ref()); + let tls_cert = settings + .args + .get_one::("tls-cert") + .or(settings.settings.tls_cert.as_ref()); + let tls_key = settings + .args + .get_one::("tls-key") + .or(settings.settings.tls_key.as_ref()); let Some((tls_cert, tls_key)) = tls_cert.zip(tls_key) else { panic!("TLS certificate and key are required"); diff --git a/foundations/src/http/server/mod.rs b/foundations/src/http/server/mod.rs index 6eadd26e..b1c3b028 100644 --- a/foundations/src/http/server/mod.rs +++ b/foundations/src/http/server/mod.rs @@ -5,7 +5,6 @@ mod builder; pub mod stream; pub use axum; - use hyper_util::rt::TokioExecutor; #[cfg(not(feature = "runtime"))] use tokio::spawn; @@ -145,7 +144,8 @@ impl Server { let make_service = self.make_service.clone(); let backend = TlsBackend::new(tcp_listener, acceptor.clone(), self.http1_2.clone(), &ctx); let span = tracing::info_span!("tls", addr = %self.bind, worker = i); - self.backends.push(AbortOnDrop::new(spawn(backend.serve(make_service).instrument(span)))); + self.backends + .push(AbortOnDrop::new(spawn(backend.serve(make_service).instrument(span)))); } } else if self.insecure_bind.is_none() { self.insecure_bind = Some(self.bind); @@ -162,7 +162,8 @@ impl Server { let make_service = self.make_service.clone(); let backend = TcpBackend::new(tcp_listener, self.http1_2.clone(), &ctx); let span = tracing::info_span!("tcp", addr = %addr, worker = i); - self.backends.push(AbortOnDrop::new(spawn(backend.serve(make_service).instrument(span)))); + self.backends + .push(AbortOnDrop::new(spawn(backend.serve(make_service).instrument(span)))); } } @@ -179,7 +180,8 @@ impl Server { let make_service = self.make_service.clone(); let backend = QuicBackend::new(endpoint, quic.h3.clone(), &ctx); let span = tracing::info_span!("quic", addr = %self.bind, worker = i); - self.backends.push(AbortOnDrop::new(spawn(backend.serve(make_service).instrument(span)))); + self.backends + .push(AbortOnDrop::new(spawn(backend.serve(make_service).instrument(span)))); } } @@ -202,7 +204,11 @@ impl Server { binds.push(format!("https+quic://{}", self.bind)); } - tracing::info!(worker_count = self.worker_count, "listening on {binds}", binds = binds.join(", ")); + tracing::info!( + worker_count = self.worker_count, + "listening on {binds}", + binds = binds.join(", ") + ); Ok(()) } diff --git a/foundations/src/http/server/stream/mod.rs b/foundations/src/http/server/stream/mod.rs index c7f24729..4e3aa743 100644 --- a/foundations/src/http/server/stream/mod.rs +++ b/foundations/src/http/server/stream/mod.rs @@ -6,9 +6,9 @@ pub mod tls; use std::convert::Infallible; +pub use axum::body::Body; pub use axum::extract::Request; pub use axum::response::{IntoResponse, Response}; -pub use axum::body::Body; use super::Error; diff --git a/foundations/src/http/server/stream/quic.rs b/foundations/src/http/server/stream/quic.rs index 5c74247e..122f014d 100644 --- a/foundations/src/http/server/stream/quic.rs +++ b/foundations/src/http/server/stream/quic.rs @@ -254,7 +254,6 @@ async fn serve_request(service: &impl ServiceHandler, request: Request, mut stre tracing::trace!(?parts, "sending response"); send.send_response(Response::from_parts(parts, ())).await?; - let mut body = std::pin::pin!(body); tracing::trace!("sending response body"); @@ -274,10 +273,10 @@ async fn serve_request(service: &impl ServiceHandler, request: Request, mut stre } None => { send.finish().await?; - }, + } } } - + tracing::trace!("response body finished"); Ok(()) diff --git a/foundations/src/http/server/stream/tcp.rs b/foundations/src/http/server/stream/tcp.rs index cd2c5c14..37698707 100644 --- a/foundations/src/http/server/stream/tcp.rs +++ b/foundations/src/http/server/stream/tcp.rs @@ -151,7 +151,8 @@ impl Connection { let resp = service.on_request(req.map(Body::new)).await.into_response(); drop(ctx); Ok::<_, Infallible>(resp) - }.instrument(span.clone()) + } + .instrument(span.clone()) }) }; @@ -165,11 +166,10 @@ impl Connection { } }; - if let Err(err) = r { self.service.on_error(err.into()).await; } - + self.service.on_close().await; tracing::trace!("connection closed"); } diff --git a/foundations/src/http/server/stream/tls.rs b/foundations/src/http/server/stream/tls.rs index ad7188df..a300a345 100644 --- a/foundations/src/http/server/stream/tls.rs +++ b/foundations/src/http/server/stream/tls.rs @@ -174,7 +174,8 @@ impl Connection { let resp = service.on_request(req.map(Body::new)).await.into_response(); drop(ctx); Ok::<_, Infallible>(resp) - }.instrument(span.clone()) + } + .instrument(span.clone()) }) }; @@ -191,7 +192,7 @@ impl Connection { if let Err(err) = r { self.service.on_error(err.into()).await; } - + self.service.on_close().await; tracing::trace!("connection closed"); } diff --git a/foundations/src/telemetry/server.rs b/foundations/src/telemetry/server.rs index 03ac023e..567bc212 100644 --- a/foundations/src/telemetry/server.rs +++ b/foundations/src/telemetry/server.rs @@ -302,10 +302,7 @@ pub async fn init(settings: ServerSettings) -> anyhow::Result<()> { router = router.fallback(axum::routing::any(not_found)); - let mut server = settings - .builder - .build(router) - .context("failed to build server")?; + let mut server = settings.builder.build(router).context("failed to build server")?; server.start_and_wait().await.context("failed to start server")?; diff --git a/image_processor/Cargo.toml b/image_processor/Cargo.toml index ff8e4819..ab7e5de4 100644 --- a/image_processor/Cargo.toml +++ b/image_processor/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "platform-image-processor" +name = "scuffle-image-processor" version = "0.0.1" edition = "2021" authors = ["Scuffle "] @@ -37,11 +37,20 @@ bytes = "1.0" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } fast_image_resize = "3.0.4" chrono = "0.4" +url = { version = "2", features = ["serde"] } +http = "1" -scuffle-utils = { version = "*", path = "../utils", features = ["all"] } -scuffle-ffmpeg = { version = "*", path = "../ffmpeg", features = ["task-abort", "tracing"] } +scuffle-foundations = { version = "*", path = "../foundations" } +scuffle-ffmpeg = { version = "*", path = "../ffmpeg", features = ["tracing"] } -mongodb = { version = "2.0", features = ["tokio-runtime"] } +scuffle-image-processor-proto = { version = "*", path = "./proto" } + +mongodb = { version = "2", features = ["tokio-runtime", "bson-chrono-0_4"] } +bson = { version = "2", features = ["chrono-0_4"] } + +aws-smithy-types = "1" +aws-smithy-runtime-api = "1" +fred = "9.0.3" [build-dependencies] tonic-build = "0.11" diff --git a/image_processor/build.rs b/image_processor/build.rs deleted file mode 100644 index c41c6e69..00000000 --- a/image_processor/build.rs +++ /dev/null @@ -1,6 +0,0 @@ -fn main() -> Result<(), Box> { - tonic_build::configure() - .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") - .compile(&["proto/scuffle/image_processor/service.proto"], &["proto/"])?; - Ok(()) -} diff --git a/image_processor/proto/Cargo.toml b/image_processor/proto/Cargo.toml new file mode 100644 index 00000000..1c923bc3 --- /dev/null +++ b/image_processor/proto/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "scuffle-image-processor-proto" +version = "0.0.0" +edition = "2021" +authors = ["Scuffle "] +description = "Scuffle Image Processor Protocol Buffers" +license = "MIT OR Apache-2.0" + +[dependencies] +prost = "0.12.4" +tonic = "0.11.0" +serde = { version = "1", optional = true, features = ["derive"] } + +[build-dependencies] +prost-build = "0.12.4" +tonic-build = "0.11.0" + +[features] +server = [] +client = [] +serde = [ "dep:serde" ] + +default = ["server", "client", "serde"] diff --git a/image_processor/proto/build.rs b/image_processor/proto/build.rs new file mode 100644 index 00000000..6755bd88 --- /dev/null +++ b/image_processor/proto/build.rs @@ -0,0 +1,19 @@ +fn main() -> Result<(), Box> { + let config = tonic_build::configure() + .build_server(cfg!(feature = "server")) + .build_client(cfg!(feature = "client")); + + #[cfg(feature = "serde")] + let config = config.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]"); + + config.compile( + &[ + "scuffle/image_processor/service.proto", + "scuffle/image_processor/types.proto", + "scuffle/image_processor/events.proto", + ], + &["./"], + )?; + + Ok(()) +} diff --git a/image_processor/proto/scuffle/image_processor/events.proto b/image_processor/proto/scuffle/image_processor/events.proto new file mode 100644 index 00000000..eea968e7 --- /dev/null +++ b/image_processor/proto/scuffle/image_processor/events.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package scuffle.image_processor; + +message EventCallback { + message Success {} + + message Fail {} + + message Cancel {} + + message Start {} + + oneof event { + Success success = 1; + Fail fail = 2; + Cancel cancel = 3; + Start start = 4; + } +} diff --git a/image_processor/src/pb.rs b/image_processor/proto/src/lib.rs similarity index 100% rename from image_processor/src/pb.rs rename to image_processor/proto/src/lib.rs diff --git a/image_processor/src/config.rs b/image_processor/src/config.rs index 41e66f86..3f40e656 100644 --- a/image_processor/src/config.rs +++ b/image_processor/src/config.rs @@ -1,6 +1,9 @@ use std::collections::HashMap; -#[derive(Debug, Clone, PartialEq, serde::Deserialize)] +use scuffle_foundations::{bootstrap::RuntimeSettings, settings::auto_settings, telemetry::settings::TelemetrySettings}; +use url::Url; + +#[auto_settings] #[serde(default)] pub struct ImageProcessorConfig { /// MongoDB database configuration @@ -8,37 +11,27 @@ pub struct ImageProcessorConfig { /// The disk configurations for the image processor pub disks: Vec, /// The event queues for the image processor - pub event_queues: Vec, + pub event_queues: Vec, /// Concurrency limit, defaults to number of CPUs + /// 0 means all CPUs + #[settings(default = 0)] pub concurrency: usize, + + /// Telemetry configuration + pub telemetry: TelemetrySettings, + /// Runtime configuration + pub runtime: RuntimeSettings, } -#[derive(Debug, Clone, PartialEq, serde::Deserialize)] +#[auto_settings] +#[serde(default)] pub struct DatabaseConfig { + #[settings(default = "mongodb://localhost:27017".into())] pub uri: String, } -impl Default for DatabaseConfig { - fn default() -> Self { - Self { - uri: "mongodb://localhost:27017".to_string(), - } - } -} - -impl Default for ImageProcessorConfig { - fn default() -> Self { - Self { - database: DatabaseConfig::default(), - disks: vec![], - event_queues: vec![], - concurrency: num_cpus::get(), - } - } -} - -#[derive(Debug, Clone, PartialEq, serde::Deserialize)] -#[serde(tag = "kind")] +#[auto_settings(impl_default = false)] +#[serde(tag = "kind", rename_all = "kebab-case")] pub enum DiskConfig { /// Local disk Local(LocalDiskConfig), @@ -52,138 +45,193 @@ pub enum DiskConfig { PublicHttp(PublicHttpDiskConfig), } -#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize)] -#[serde(default)] +#[auto_settings] pub struct LocalDiskConfig { /// The name of the disk pub name: String, /// The path to the local disk pub path: std::path::PathBuf, /// The disk mode + #[serde(default)] pub mode: DiskMode, } -#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize)] -#[serde(default)] +#[auto_settings] pub struct S3DiskConfig { /// The name of the disk pub name: String, /// The S3 bucket name pub bucket: String, - /// The S3 region - pub region: String, /// The S3 access key pub access_key: String, /// The S3 secret key pub secret_key: String, + /// The S3 region + #[serde(default = "default_region")] + pub region: String, /// The S3 endpoint + #[serde(default)] pub endpoint: Option, /// The S3 bucket prefix path + #[serde(default)] pub path: Option, /// Use path style + #[serde(default)] pub path_style: bool, /// The disk mode + #[serde(default)] pub mode: DiskMode, /// The maximum number of concurrent connections + #[serde(default)] pub max_connections: Option, } -#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize)] -#[serde(default)] +fn default_region() -> String { + "us-east-1".into() +} + +#[auto_settings] pub struct MemoryDiskConfig { /// The name of the disk pub name: String, /// The maximum capacity of the memory disk + #[serde(default)] pub capacity: Option, /// Global, shared memory disk for all tasks otherwise each task gets its /// own memory disk + #[serde(default = "default_true")] pub global: bool, /// The disk mode + #[serde(default)] pub mode: DiskMode, } -#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize)] -#[serde(default)] +fn default_true() -> bool { + true +} + +#[auto_settings(impl_default = false)] pub struct HttpDiskConfig { /// The name of the disk pub name: String, /// The base URL for the HTTP disk - pub base_url: String, + pub url: Url, /// The timeout for the HTTP disk + #[serde(default = "default_timeout")] pub timeout: Option, /// Allow insecure TLS + #[serde(default)] pub allow_insecure: bool, /// The disk mode + #[serde(default)] pub mode: DiskMode, /// The maximum number of concurrent connections + #[serde(default)] pub max_connections: Option, /// Additional headers for the HTTP disk + #[serde(default)] pub headers: HashMap, - /// Write Method - pub write_method: String, } -#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize)] +fn default_timeout() -> Option { + Some(std::time::Duration::from_secs(30)) +} + +#[auto_settings] +#[serde(rename_all = "kebab-case")] +#[derive(Copy, PartialEq, Eq, Hash)] pub enum DiskMode { /// Read only Read, - #[default] + #[settings(default)] /// Read and write ReadWrite, /// Write only Write, } - -#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize)] -#[serde(default)] -/// Public http disks do not have a name because they will be invoked if the input path is a URL -/// that starts with 'http' or 'https'. Public http disks can only be read-only. -/// If you do not have a public http disk, the image processor will not be able to download images using HTTP. +/// Public http disks do not have a name because they will be invoked if the +/// input path is a URL that starts with 'http' or 'https'. Public http disks +/// can only be read-only. If you do not have a public http disk, the image +/// processor will not be able to download images using HTTP. +#[auto_settings] pub struct PublicHttpDiskConfig { /// The timeout for the HTTP disk + #[serde(default = "default_timeout")] pub timeout: Option, /// Allow insecure TLS + #[serde(default)] pub allow_insecure: bool, /// The maximum number of concurrent connections + #[serde(default)] pub max_connections: Option, /// Additional headers for the HTTP disk + #[serde(default)] pub headers: HashMap, /// Whitelist of allowed domains or IPs can be subnets or CIDR ranges /// IPs are compared after resolving the domain name + #[serde(default)] pub whitelist: Vec, /// Blacklist of disallowed domains or IPs can be subnets or CIDR ranges /// IPs are compared after resolving the domain name + #[serde(default)] pub blacklist: Vec, } -#[derive(Debug, Clone, PartialEq, serde::Deserialize)] -pub enum EventQueue { - Nats(NatsEventQueue), - Http(HttpEventQueue), - Redis(RedisEventQueue), +#[auto_settings(impl_default = false)] +#[serde(tag = "kind", rename_all = "kebab-case")] +pub enum EventQueueConfig { + Nats(NatsEventQueueConfig), + Http(HttpEventQueueConfig), + Redis(RedisEventQueueConfig), } -#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize)] -#[serde(default)] -pub struct NatsEventQueue { +#[auto_settings(impl_default = false)] +pub struct NatsEventQueueConfig { + /// The name of the event queue pub name: String, + /// The Nats URL + /// For example: nats://localhost:4222 + pub url: String, + /// Allow Protobuf messages + #[serde(default)] + pub allow_protobuf: bool, } -#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize)] -#[serde(default)] -pub struct HttpEventQueue { +#[auto_settings(impl_default = false)] +pub struct HttpEventQueueConfig { + /// The name of the event queue pub name: String, - pub url: String, + /// The base URL for the HTTP event queue + pub url: Url, + /// The timeout for the HTTP event queue + /// Default is 30 seconds + #[serde(default = "default_timeout")] pub timeout: Option, + /// Allow insecure TLS (if the url is https, do not verify the certificate) + #[serde(default)] pub allow_insecure: bool, - pub method: String, + /// Additional headers for the HTTP event queue + /// Can be used to set the authorization header + /// Default is empty + #[serde(default)] pub headers: HashMap, + /// The maximum number of concurrent connections + /// Default is None + #[serde(default)] + pub max_connections: Option, + /// Allow Protobuf messages + #[serde(default)] + pub allow_protobuf: bool, } -#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize)] -#[serde(default)] -pub struct RedisEventQueue { +#[auto_settings(impl_default = false)] +pub struct RedisEventQueueConfig { + /// The name of the event queue pub name: String, + /// The Redis URL, for example: redis://localhost:6379 pub url: String, + /// Allow Protobuf messages + #[serde(default)] + pub allow_protobuf: bool, } diff --git a/image_processor/src/database.rs b/image_processor/src/database.rs index 3b1d3f13..9f1a354b 100644 --- a/image_processor/src/database.rs +++ b/image_processor/src/database.rs @@ -1,7 +1,20 @@ -use mongodb::bson::oid::ObjectId; -use ulid::Ulid; +use std::sync::Arc; -use crate::pb::Task; +use bson::Bson; +use mongodb::{bson::oid::ObjectId, Database, IndexModel}; +use scuffle_image_processor_proto::Task; +use serde::{Deserialize, Serializer}; + +use crate::global::Global; + +fn serialize_protobuf(value: &T, serializer: S) -> Result { + serializer.serialize_bytes(&value.encode_to_vec()) +} + +fn deserialize_protobuf<'de, T: prost::Message + Default, D: serde::Deserializer<'de>>(deserializer: D) -> Result { + let bytes = Vec::::deserialize(deserializer)?; + T::decode(bytes.as_slice()).map_err(serde::de::Error::custom) +} #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] pub struct Job { @@ -9,6 +22,109 @@ pub struct Job { pub id: ObjectId, pub priority: i32, pub hold_until: Option>, + #[serde(serialize_with = "serialize_protobuf", deserialize_with = "deserialize_protobuf")] pub task: Task, pub claimed_by_id: Option, } + +impl Job { + fn collection(database: &Database) -> mongodb::Collection { + database.collection("jobs") + } + + pub async fn setup_collection(database: &Database) -> Result<(), mongodb::error::Error> { + let collection = Self::collection(database); + + collection.create_index( + IndexModel::builder() + .keys(bson::doc! { + "hold_until": 1, + "priority": -1, + }) + .build(), + None, + ).await?; + + Ok(()) + } + + pub async fn new(global: &Arc, task: Task, priority: i32) -> Result { + let job = Job { + id: ObjectId::new(), + priority, + hold_until: None, + task, + claimed_by_id: None, + }; + + Self::collection(global.database()).insert_one(&job, None).await?; + + Ok(job) + } + + pub async fn fetch(global: &Arc) -> Result, mongodb::error::Error> { + // Find with the highest priority and no hold_until or hold_until in the past + Self::collection(global.database()) + .find_one_and_update( + bson::doc! { + "$or": [ + bson::doc! { + "hold_until": Bson::Null, + }, + bson::doc! { + "hold_until": { + "$lt": chrono::Utc::now(), + }, + }, + ], + }, + bson::doc! { + "$set": { + "claimed_by_id": global.worker_id(), + "hold_until": chrono::Utc::now() + chrono::Duration::seconds(60), + }, + }, + Some( + mongodb::options::FindOneAndUpdateOptions::builder() + .sort(bson::doc! { + "priority": -1, + }) + .build(), + ), + ) + .await + } + + pub async fn refresh(&self, global: &Arc) -> Result { + let success = Self::collection(global.database()) + .update_one( + bson::doc! { + "_id": self.id.clone(), + "claimed_by_id": global.worker_id(), + }, + bson::doc! { + "$set": { + "hold_until": chrono::Utc::now() + chrono::Duration::seconds(60), + }, + }, + None, + ) + .await?; + + Ok(success.modified_count == 1) + } + + pub async fn complete(&self, global: &Arc) -> Result { + let success = Self::collection(global.database()) + .delete_one( + bson::doc! { + "_id": self.id.clone(), + "claimed_by_id": global.worker_id(), + }, + None, + ) + .await?; + + Ok(success.deleted_count == 1) + } +} diff --git a/image_processor/src/disk/http.rs b/image_processor/src/disk/http.rs new file mode 100644 index 00000000..cd313e5b --- /dev/null +++ b/image_processor/src/disk/http.rs @@ -0,0 +1,167 @@ +use bytes::Bytes; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; +use reqwest::Method; +use url::Url; + +use super::{Disk, DiskError, DiskWriteOptions}; +use crate::config::{DiskMode, HttpDiskConfig}; + +#[derive(Debug)] +pub struct HttpDisk { + name: String, + base_url: Url, + mode: DiskMode, + semaphore: Option, + client: reqwest::Client, +} + +#[derive(Debug, thiserror::Error)] +pub enum HttpDiskError { + #[error("invalid path")] + InvalidPath(#[from] url::ParseError), + #[error("reqwest: {0}")] + Reqwest(#[from] reqwest::Error), + #[error("invalid header name")] + InvalidHeaderName(#[from] reqwest::header::InvalidHeaderName), + #[error("invalid header value")] + InvalidHeaderValue(#[from] reqwest::header::InvalidHeaderValue), +} + +impl HttpDisk { + #[tracing::instrument(skip(config), name = "HttpDisk::new", fields(name = %config.name), err)] + pub async fn new(config: &HttpDiskConfig) -> Result { + tracing::debug!("setting up http disk"); + Ok(Self { + name: config.name.clone(), + base_url: config.url.clone(), + mode: config.mode, + semaphore: config.max_connections.map(|max| tokio::sync::Semaphore::new(max)), + client: { + let mut builder = reqwest::Client::builder(); + + if let Some(timeout) = config.timeout { + builder = builder.timeout(timeout); + } + + if config.allow_insecure { + builder = builder.danger_accept_invalid_certs(true); + } + + let mut headers = HeaderMap::new(); + + for (key, value) in &config.headers { + headers.insert(key.parse::()?, value.parse::()?); + } + + builder = builder.default_headers(headers); + + builder.build().map_err(HttpDiskError::Reqwest)? + }, + }) + } +} + +impl Disk for HttpDisk { + fn name(&self) -> &str { + &self.name + } + + #[tracing::instrument(skip(self), name = "HttpDisk::read", fields(name = %self.name), err)] + async fn read(&self, path: &str) -> Result { + tracing::debug!("reading file"); + + if self.mode == DiskMode::Write { + return Err(DiskError::ReadOnly); + } + + let _permit = if let Some(semaphore) = &self.semaphore { + Some(semaphore.acquire().await) + } else { + None + }; + + let url = self.base_url.join(path).map_err(HttpDiskError::InvalidPath)?; + + let response = self.client.get(url).send().await.map_err(HttpDiskError::Reqwest)?; + + let response = response.error_for_status().map_err(HttpDiskError::Reqwest)?; + + Ok(response.bytes().await.map_err(HttpDiskError::Reqwest)?) + } + + #[tracing::instrument(skip(self, data), name = "HttpDisk::write", fields(name = %self.name, size = data.len()), err)] + async fn write(&self, path: &str, data: Bytes, options: Option) -> Result<(), DiskError> { + tracing::debug!("writing file"); + + if self.mode == DiskMode::Read { + return Err(DiskError::WriteOnly); + } + + let _permit = if let Some(semaphore) = &self.semaphore { + Some(semaphore.acquire().await) + } else { + None + }; + + let url = self.base_url.join(path).map_err(HttpDiskError::InvalidPath)?; + + let mut request = self + .client + .request(Method::POST, url) + .body(data) + .build() + .map_err(HttpDiskError::Reqwest)?; + + if let Some(options) = options { + if let Some(cache_control) = &options.cache_control { + request.headers_mut().insert( + reqwest::header::CACHE_CONTROL, + cache_control.parse().map_err(HttpDiskError::InvalidHeaderValue)?, + ); + } + + if let Some(content_type) = &options.content_type { + request.headers_mut().insert( + reqwest::header::CONTENT_TYPE, + content_type.parse().map_err(HttpDiskError::InvalidHeaderValue)?, + ); + } + + if let Some(acl) = &options.acl { + request.headers_mut().insert( + reqwest::header::HeaderName::from_static("x-amz-acl"), + acl.parse().map_err(HttpDiskError::InvalidHeaderValue)?, + ); + } + } + + let resp = self.client.execute(request).await.map_err(HttpDiskError::Reqwest)?; + + resp.error_for_status().map_err(HttpDiskError::Reqwest)?; + + Ok(()) + } + + #[tracing::instrument(skip(self), name = "HttpDisk::delete", fields(name = %self.name), err)] + async fn delete(&self, path: &str) -> Result<(), DiskError> { + tracing::debug!("deleting file"); + + if self.mode == DiskMode::Read { + return Err(DiskError::WriteOnly); + } + + let _permit = if let Some(semaphore) = &self.semaphore { + Some(semaphore.acquire().await) + } else { + None + }; + + let url = self.base_url.join(path).map_err(HttpDiskError::InvalidPath)?; + + let response = self.client.delete(url).send().await.map_err(HttpDiskError::Reqwest)?; + + response.error_for_status().map_err(HttpDiskError::Reqwest)?; + + Ok(()) + } +} diff --git a/image_processor/src/disk/local.rs b/image_processor/src/disk/local.rs new file mode 100644 index 00000000..3e9be4fe --- /dev/null +++ b/image_processor/src/disk/local.rs @@ -0,0 +1,79 @@ +use std::path::PathBuf; + +use bytes::Bytes; + +use super::{Disk, DiskError, DiskWriteOptions}; +use crate::config::{DiskMode, LocalDiskConfig}; + +#[derive(Debug)] +pub struct LocalDisk { + name: String, + mode: DiskMode, + path: PathBuf, +} + +#[derive(Debug, thiserror::Error)] +pub enum LocalDiskError { + #[error("io: {0}")] + Io(#[from] std::io::Error), +} + +impl LocalDisk { + #[tracing::instrument(skip(config), name = "LocalDisk::new", fields(name = %config.name), err)] + pub async fn new(config: &LocalDiskConfig) -> Result { + tracing::debug!("setting up local disk"); + + if !config.path.exists() { + tokio::fs::create_dir_all(&config.path).await.map_err(LocalDiskError::Io)?; + } + + Ok(Self { + name: config.name.clone(), + mode: config.mode, + path: config.path.clone(), + }) + } +} + +impl Disk for LocalDisk { + fn name(&self) -> &str { + &self.name + } + + #[tracing::instrument(skip(self), name = "LocalDisk::read", err)] + async fn read(&self, path: &str) -> Result { + tracing::debug!("reading file"); + + if self.mode == DiskMode::Write { + return Err(DiskError::ReadOnly); + } + + let path = self.path.join(path); + Ok(tokio::fs::read(path).await.map_err(LocalDiskError::Io)?.into()) + } + + #[tracing::instrument(skip(self, data), name = "LocalDisk::write", err, fields(size = data.len()))] + async fn write(&self, path: &str, data: Bytes, options: Option) -> Result<(), DiskError> { + tracing::debug!("writing file"); + + if self.mode == DiskMode::Read { + return Err(DiskError::WriteOnly); + } + + let path = self.path.join(path); + Ok(tokio::fs::write(path, data).await.map_err(LocalDiskError::Io)?) + } + + #[tracing::instrument(skip(self), name = "LocalDisk::delete", err)] + async fn delete(&self, path: &str) -> Result<(), DiskError> { + tracing::debug!("deleting file"); + + if self.mode == DiskMode::Read { + return Err(DiskError::WriteOnly); + } + + let path = self.path.join(path); + tokio::fs::remove_file(path).await.map_err(LocalDiskError::Io)?; + Ok(()) + } +} diff --git a/image_processor/src/disk/memory.rs b/image_processor/src/disk/memory.rs new file mode 100644 index 00000000..426278cc --- /dev/null +++ b/image_processor/src/disk/memory.rs @@ -0,0 +1,151 @@ +use std::collections::HashMap; + +use bytes::Bytes; +use tokio::sync::RwLock; + +use super::{Disk, DiskError, DiskWriteOptions}; +use crate::config::{DiskMode, MemoryDiskConfig}; + +#[derive(Debug)] +struct FileHolder { + remaining_capacity: usize, + files: HashMap, +} + +impl FileHolder { + fn get(&self, path: &str) -> Option<&MemoryFile> { + self.files.get(path) + } + + fn insert(&mut self, path: String, file: MemoryFile) -> Result, MemoryDiskError> { + if file.data.len() > self.remaining_capacity { + return Err(MemoryDiskError::NoSpaceLeft); + } + + self.remaining_capacity -= file.data.len(); + self.files + .insert(path, file) + .map(|file| { + self.remaining_capacity += file.data.len(); + Ok(file) + }) + .transpose() + } + + fn remove(&mut self, path: &str) -> Option { + let file = self.files.remove(path)?; + self.remaining_capacity += file.data.len(); + Some(file) + } +} + +#[derive(Debug)] +pub struct MemoryDisk { + name: String, + mode: DiskMode, + files: RwLock, + global: bool, +} + +#[derive(Debug, Clone)] +pub struct MemoryFile { + data: Bytes, + _options: DiskWriteOptions, +} + +#[derive(Debug, Clone, thiserror::Error)] +pub enum MemoryDiskError { + #[error("no space left on disk")] + NoSpaceLeft, +} + +impl MemoryDisk { + #[tracing::instrument(skip(config), name = "MemoryDisk::new", fields(name = %config.name), err)] + pub async fn new(config: &MemoryDiskConfig) -> Result { + tracing::debug!("setting up memory disk"); + Ok(Self { + name: config.name.clone(), + mode: config.mode, + global: config.global, + files: RwLock::new(FileHolder { + remaining_capacity: config.capacity.unwrap_or(usize::MAX), + files: HashMap::new(), + }), + }) + } +} + +impl Disk for MemoryDisk { + fn name(&self) -> &str { + &self.name + } + + #[tracing::instrument(skip(self), name = "MemoryDisk::read", err)] + async fn read(&self, path: &str) -> Result { + tracing::debug!("reading file"); + + if self.mode == DiskMode::Write { + return Err(DiskError::ReadOnly); + } + + Ok(self + .files + .read() + .await + .get(path) + .map(|file| file.data.clone()) + .ok_or(DiskError::NotFound)?) + } + + #[tracing::instrument(skip(self, data), name = "MemoryDisk::write", err, fields(size = data.len()))] + async fn write(&self, path: &str, data: Bytes, options: Option) -> Result<(), DiskError> { + tracing::debug!("writing file"); + + if self.mode == DiskMode::Read { + return Err(DiskError::WriteOnly); + } + + let mut files = self.files.write().await; + + files.insert( + path.to_owned(), + MemoryFile { + data, + _options: options.unwrap_or_default(), + }, + )?; + + Ok(()) + } + + #[tracing::instrument(skip(self), name = "MemoryDisk::delete", err)] + async fn delete(&self, path: &str) -> Result<(), DiskError> { + tracing::debug!("deleting file"); + + if self.mode == DiskMode::Read { + return Err(DiskError::WriteOnly); + } + + self.files.write().await.remove(path).ok_or(DiskError::NotFound)?; + Ok(()) + } + + fn scoped(&self) -> Option + where + Self: Sized, + { + if self.global { + return None; + } + + Some(Self { + name: self.name.clone(), + mode: self.mode, + global: false, + files: RwLock::new(FileHolder { + remaining_capacity: 0, + files: HashMap::new(), + }), + }) + } +} diff --git a/image_processor/src/disk/mod.rs b/image_processor/src/disk/mod.rs new file mode 100644 index 00000000..435bd6c4 --- /dev/null +++ b/image_processor/src/disk/mod.rs @@ -0,0 +1,130 @@ +use bytes::Bytes; + +use self::http::{HttpDisk, HttpDiskError}; +use self::local::{LocalDisk, LocalDiskError}; +use self::memory::{MemoryDisk, MemoryDiskError}; +use self::public_http::{PublicHttpDisk, PublicHttpDiskError}; +use self::s3::{S3Disk, S3DiskError}; +use crate::config::DiskConfig; + +pub mod http; +pub mod local; +pub mod memory; +pub mod public_http; +pub mod s3; + +#[derive(Debug, thiserror::Error)] +pub enum DiskError { + #[error("http: {0}")] + Http(#[from] HttpDiskError), + #[error("local: {0}")] + Local(#[from] LocalDiskError), + #[error("s3: {0}")] + S3(#[from] S3DiskError), + #[error("memory: {0}")] + Memory(#[from] MemoryDiskError), + #[error("public http: {0}")] + PublicHttp(#[from] PublicHttpDiskError), + #[error("not found")] + NotFound, + #[error("read only")] + ReadOnly, + #[error("write only")] + WriteOnly, +} + +#[derive(Debug, Clone, Default)] +pub struct DiskWriteOptions { + pub cache_control: Option, + pub content_type: Option, + pub acl: Option, + pub content_disposition: Option, +} + +pub trait Disk { + /// Get the name of the disk + fn name(&self) -> &str; + + /// Read data from a disk + fn read(&self, path: &str) -> impl std::future::Future> + Send; + + /// Write data to a disk + fn write( + &self, + path: &str, + data: Bytes, + options: Option, + ) -> impl std::future::Future> + Send; + + /// Delete data from a disk + fn delete(&self, path: &str) -> impl std::future::Future> + Send; + + /// Can be scoped to a specific request + fn scoped(&self) -> Option + where + Self: Sized, + { + None + } +} + +#[derive(Debug)] +pub enum AnyDisk { + Local(LocalDisk), + S3(S3Disk), + Memory(MemoryDisk), + Http(HttpDisk), + PublicHttp(PublicHttpDisk), +} + +impl Disk for AnyDisk { + fn name(&self) -> &str { + match self { + AnyDisk::Local(disk) => disk.name(), + AnyDisk::S3(disk) => disk.name(), + AnyDisk::Memory(disk) => disk.name(), + AnyDisk::Http(disk) => disk.name(), + AnyDisk::PublicHttp(disk) => disk.name(), + } + } + + async fn read(&self, path: &str) -> Result { + match self { + AnyDisk::Local(disk) => disk.read(path).await, + AnyDisk::S3(disk) => disk.read(path).await, + AnyDisk::Memory(disk) => disk.read(path).await, + AnyDisk::Http(disk) => disk.read(path).await, + AnyDisk::PublicHttp(disk) => disk.read(path).await, + } + } + + async fn write(&self, path: &str, data: Bytes, options: Option) -> Result<(), DiskError> { + match self { + AnyDisk::Local(disk) => disk.write(path, data, options).await, + AnyDisk::S3(disk) => disk.write(path, data, options).await, + AnyDisk::Memory(disk) => disk.write(path, data, options).await, + AnyDisk::Http(disk) => disk.write(path, data, options).await, + AnyDisk::PublicHttp(disk) => disk.write(path, data, options).await, + } + } + + async fn delete(&self, path: &str) -> Result<(), DiskError> { + match self { + AnyDisk::Local(disk) => disk.delete(path).await, + AnyDisk::S3(disk) => disk.delete(path).await, + AnyDisk::Memory(disk) => disk.delete(path).await, + AnyDisk::Http(disk) => disk.delete(path).await, + AnyDisk::PublicHttp(disk) => disk.delete(path).await, + } + } +} + +pub async fn build_disk(config: &DiskConfig) -> Result { + match config { + DiskConfig::Local(local) => Ok(AnyDisk::Local(LocalDisk::new(local).await?)), + DiskConfig::S3(s3) => Ok(AnyDisk::S3(S3Disk::new(s3).await?)), + DiskConfig::Memory(memory) => Ok(AnyDisk::Memory(MemoryDisk::new(memory).await?)), + DiskConfig::Http(http) => Ok(AnyDisk::Http(HttpDisk::new(http).await?)), + DiskConfig::PublicHttp(public_http) => Ok(AnyDisk::PublicHttp(PublicHttpDisk::new(public_http).await?)), + } +} diff --git a/image_processor/src/disk/public_http.rs b/image_processor/src/disk/public_http.rs new file mode 100644 index 00000000..7f09f2bc --- /dev/null +++ b/image_processor/src/disk/public_http.rs @@ -0,0 +1,96 @@ +use bytes::Bytes; +use http::{HeaderName, HeaderValue}; + +use super::{Disk, DiskError, DiskWriteOptions}; +use crate::config::PublicHttpDiskConfig; + +pub const PUBLIC_HTTP_DISK_NAME: &str = "__public_http"; + +#[derive(Debug)] +pub struct PublicHttpDisk { + client: reqwest::Client, + semaphore: Option, +} + +#[derive(Debug, thiserror::Error)] +pub enum PublicHttpDiskError { + #[error("reqwest: {0}")] + Reqwest(#[from] reqwest::Error), + #[error("invalid header name")] + InvalidHeaderName(#[from] reqwest::header::InvalidHeaderName), + #[error("invalid header value")] + InvalidHeaderValue(#[from] reqwest::header::InvalidHeaderValue), + #[error("unsupported: {0}")] + Unsupported(&'static str), +} + +impl PublicHttpDisk { + #[tracing::instrument(skip(config), name = "PublicHttpDisk::new", err)] + pub async fn new(config: &PublicHttpDiskConfig) -> Result { + tracing::debug!("setting up public http disk"); + if !config.blacklist.is_empty() || !config.whitelist.is_empty() { + tracing::error!("blacklist and whitelist are not supported for public http disk"); + return Err(PublicHttpDiskError::Unsupported("blacklist and whitelist")); + } + + Ok(Self { + client: { + let mut builder = reqwest::Client::builder(); + + if let Some(timeout) = config.timeout { + builder = builder.timeout(timeout); + } + + if config.allow_insecure { + builder = builder.danger_accept_invalid_certs(true); + } + + let mut headers = reqwest::header::HeaderMap::new(); + + for (key, value) in &config.headers { + headers.insert(key.parse::()?, value.parse::()?); + } + + builder = builder.default_headers(headers); + + builder.build().map_err(|e| PublicHttpDiskError::Reqwest(e))? + }, + semaphore: config.max_connections.map(|max| tokio::sync::Semaphore::new(max)), + }) + } +} + +impl Disk for PublicHttpDisk { + fn name(&self) -> &str { + PUBLIC_HTTP_DISK_NAME + } + + #[tracing::instrument(skip(self), name = "PublicHttpDisk::read", err)] + async fn read(&self, path: &str) -> Result { + tracing::debug!("reading file"); + + let _permit = if let Some(semaphore) = &self.semaphore { + Some(semaphore.acquire().await) + } else { + None + }; + + let response = self.client.get(path).send().await.map_err(PublicHttpDiskError::Reqwest)?; + + let response = response.error_for_status().map_err(PublicHttpDiskError::Reqwest)?; + + Ok(response.bytes().await.map_err(PublicHttpDiskError::Reqwest)?) + } + + #[tracing::instrument(skip(self, data), name = "PublicHttpDisk::write", fields(size = data.len()), err)] + async fn write(&self, path: &str, data: Bytes, options: Option) -> Result<(), DiskError> { + tracing::error!("writing is not supported for public http disk"); + Err(DiskError::ReadOnly) + } + + #[tracing::instrument(skip(self), name = "PublicHttpDisk::delete", err)] + async fn delete(&self, path: &str) -> Result<(), DiskError> { + tracing::error!("deleting is not supported for public http disk"); + Err(DiskError::ReadOnly) + } +} diff --git a/image_processor/src/disk/s3.rs b/image_processor/src/disk/s3.rs new file mode 100644 index 00000000..0d09bbf4 --- /dev/null +++ b/image_processor/src/disk/s3.rs @@ -0,0 +1,134 @@ +use aws_config::{AppName, Region, SdkConfig}; +use aws_sdk_s3::config::{Credentials, SharedCredentialsProvider}; +use aws_sdk_s3::operation::delete_object::DeleteObjectError; +use aws_sdk_s3::operation::get_object::GetObjectError; +use aws_sdk_s3::operation::put_object::PutObjectError; +use aws_smithy_runtime_api::client::orchestrator::HttpResponse; +use aws_smithy_runtime_api::client::result::SdkError; +use bytes::Bytes; +use scuffle_foundations::service_info; + +use super::{Disk, DiskError, DiskWriteOptions}; +use crate::config::{DiskMode, S3DiskConfig}; + +#[derive(Debug)] +pub struct S3Disk { + name: String, + mode: DiskMode, + client: aws_sdk_s3::Client, + bucket: String, +} + +#[derive(Debug, thiserror::Error)] +pub enum S3DiskError { + #[error("s3: {0}")] + S3Error(#[from] aws_sdk_s3::Error), + #[error("byte stream: {0}")] + ByteStreamError(#[from] aws_smithy_types::byte_stream::error::Error), + #[error("read: {0}")] + ReadError(#[from] SdkError), + #[error("write: {0}")] + WriteError(#[from] SdkError), + #[error("delete: {0}")] + DeleteError(#[from] SdkError), +} + +impl S3Disk { + #[tracing::instrument(skip(config), name = "S3Disk::new", fields(name = %config.name), err)] + pub async fn new(config: &S3DiskConfig) -> Result { + tracing::debug!("setting up s3 disk"); + Ok(Self { + name: config.name.clone(), + mode: config.mode, + client: aws_sdk_s3::Client::new(&{ + let mut builder = SdkConfig::builder(); + + builder.set_app_name(Some(AppName::new(service_info!().name).unwrap())); + + builder.set_region(Some(Region::new(config.region.clone()))); + + builder.set_credentials_provider(Some(SharedCredentialsProvider::new(Credentials::new( + config.access_key.clone(), + config.secret_key.clone(), + None, + None, + "ConfiguredCredentialsProvider", + )))); + + builder.build() + }), + bucket: config.bucket.clone(), + }) + } +} + +impl Disk for S3Disk { + fn name(&self) -> &str { + &self.name + } + + #[tracing::instrument(skip(self), name = "S3Disk::read", err)] + async fn read(&self, path: &str) -> Result { + if self.mode == DiskMode::Write { + return Err(DiskError::ReadOnly); + } + + let result = self + .client + .get_object() + .bucket(&self.bucket) + .key(path) + .send() + .await + .map_err(S3DiskError::from)?; + + let bytes = result.body.collect().await.map_err(S3DiskError::from)?; + + Ok(bytes.into_bytes()) + } + + #[tracing::instrument(skip(self, data), name = "S3Disk::write", err, fields(size = data.len()))] + async fn write(&self, path: &str, data: Bytes, options: Option) -> Result<(), DiskError> { + if self.mode == DiskMode::Read { + return Err(DiskError::WriteOnly); + } + + let mut req = self.client.put_object().bucket(&self.bucket).key(path).body(data.into()); + + if let Some(options) = options { + if let Some(cache_control) = &options.cache_control { + req = req.cache_control(cache_control); + } + if let Some(content_type) = &options.content_type { + req = req.content_type(content_type); + } + if let Some(content_disposition) = &options.content_disposition { + req = req.content_disposition(content_disposition); + } + if let Some(acl) = &options.acl { + req = req.acl(acl.as_str().into()); + } + } + + req.send().await.map_err(S3DiskError::from)?; + + Ok(()) + } + + #[tracing::instrument(skip(self), name = "S3Disk::delete", err)] + async fn delete(&self, path: &str) -> Result<(), DiskError> { + if self.mode == DiskMode::Read { + return Err(DiskError::WriteOnly); + } + + self.client + .delete_object() + .bucket(&self.bucket) + .key(path) + .send() + .await + .map_err(S3DiskError::from)?; + + Ok(()) + } +} diff --git a/image_processor/src/event_queue/http.rs b/image_processor/src/event_queue/http.rs new file mode 100644 index 00000000..1a995438 --- /dev/null +++ b/image_processor/src/event_queue/http.rs @@ -0,0 +1,93 @@ +use prost::Message; +use scuffle_image_processor_proto::EventCallback; +use url::Url; + +use super::{EventQueue, EventQueueError, PROTOBUF_CONTENT_TYPE}; +use crate::config::HttpEventQueueConfig; + +#[derive(Debug)] +pub struct HttpEventQueue { + name: String, + url: Url, + client: reqwest::Client, + semaphore: Option, + allow_protobuf: bool, +} + +#[derive(Debug, thiserror::Error)] +pub enum HttpEventQueueError { + #[error("reqwest: {0}")] + Reqwest(#[from] reqwest::Error), + #[error("invalid header name")] + InvalidHeaderName(#[from] reqwest::header::InvalidHeaderName), + #[error("invalid header value")] + InvalidHeaderValue(#[from] reqwest::header::InvalidHeaderValue), +} + +impl HttpEventQueue { + #[tracing::instrument(skip(config), name = "HttpEventQueue::new", fields(name = %config.name), err)] + pub async fn new(config: &HttpEventQueueConfig) -> Result { + tracing::debug!("setting up http event queue"); + Ok(Self { + name: config.name.clone(), + client: { + let mut builder = reqwest::Client::builder(); + + if let Some(timeout) = config.timeout { + builder = builder.timeout(timeout); + } + + if config.allow_insecure { + builder = builder.danger_accept_invalid_certs(true); + } + + let mut headers = reqwest::header::HeaderMap::new(); + + for (key, value) in &config.headers { + headers.insert( + key.parse::()?, + value.parse::()?, + ); + } + + builder = builder.default_headers(headers); + + builder.build().map_err(|e| HttpEventQueueError::Reqwest(e))? + }, + url: config.url.clone(), + allow_protobuf: config.allow_protobuf, + semaphore: config.max_connections.map(|max| tokio::sync::Semaphore::new(max)), + }) + } +} + +impl EventQueue for HttpEventQueue { + fn name(&self) -> &str { + &self.name + } + + #[tracing::instrument(skip(self), name = "HttpEventQueue::publish", fields(name = %self.name))] + async fn publish(&self, topic: &str, data: EventCallback) -> Result<(), EventQueueError> { + let _permit = if let Some(semaphore) = &self.semaphore { + Some(semaphore.acquire().await) + } else { + None + }; + + let mut req = self.client.post(self.url.clone()).header("X-Topic", topic); + + if self.allow_protobuf { + req = req.header("Content-Type", PROTOBUF_CONTENT_TYPE).body(data.encode_to_vec()); + } else { + req = req.json(&data); + } + + req.send() + .await + .map_err(HttpEventQueueError::Reqwest)? + .error_for_status() + .map_err(HttpEventQueueError::Reqwest)?; + + Ok(()) + } +} diff --git a/image_processor/src/event_queue/mod.rs b/image_processor/src/event_queue/mod.rs new file mode 100644 index 00000000..bf4765e9 --- /dev/null +++ b/image_processor/src/event_queue/mod.rs @@ -0,0 +1,65 @@ +use scuffle_image_processor_proto::EventCallback; + +use self::http::{HttpEventQueue, HttpEventQueueError}; +use self::nats::{NatsEventQueue, NatsEventQueueError}; +use self::redis::{RedisEventQueue, RedisEventQueueError}; +use crate::config::EventQueueConfig; + +pub mod http; +pub mod nats; +pub mod redis; + +#[derive(Debug, thiserror::Error)] +pub enum EventQueueError { + #[error("nats: {0}")] + Nats(#[from] NatsEventQueueError), + #[error("http: {0}")] + Http(#[from] HttpEventQueueError), + #[error("redis: {0}")] + Redis(#[from] RedisEventQueueError), +} + +const PROTOBUF_CONTENT_TYPE: &str = "application/protobuf; proto=scuffle.image_processor.EventCallback"; + +pub trait EventQueue { + fn name(&self) -> &str; + + fn publish( + &self, + topic: &str, + data: EventCallback, + ) -> impl std::future::Future> + Send; +} + +#[derive(Debug)] +pub enum AnyEventQueue { + Nats(NatsEventQueue), + Http(HttpEventQueue), + Redis(RedisEventQueue), +} + +impl EventQueue for AnyEventQueue { + fn name(&self) -> &str { + match self { + AnyEventQueue::Nats(queue) => queue.name(), + AnyEventQueue::Http(queue) => queue.name(), + AnyEventQueue::Redis(queue) => queue.name(), + } + } + + async fn publish(&self, topic: &str, data: EventCallback) -> Result<(), EventQueueError> { + match self { + AnyEventQueue::Nats(queue) => queue.publish(topic, data).await, + AnyEventQueue::Http(queue) => queue.publish(topic, data).await, + AnyEventQueue::Redis(queue) => queue.publish(topic, data).await, + } + } +} + +pub async fn build_event_queue(config: &EventQueueConfig) -> Result { + match config { + EventQueueConfig::Nats(nats) => Ok(AnyEventQueue::Nats(NatsEventQueue::new(nats).await?)), + EventQueueConfig::Redis(redis) => Ok(AnyEventQueue::Redis(RedisEventQueue::new(redis).await?)), + EventQueueConfig::Http(http) => Ok(AnyEventQueue::Http(HttpEventQueue::new(http).await?)), + } +} diff --git a/image_processor/src/event_queue/nats.rs b/image_processor/src/event_queue/nats.rs new file mode 100644 index 00000000..0d1754e7 --- /dev/null +++ b/image_processor/src/event_queue/nats.rs @@ -0,0 +1,65 @@ +use prost::Message; +use scuffle_image_processor_proto::EventCallback; + +use super::{EventQueue, EventQueueError, PROTOBUF_CONTENT_TYPE}; +use crate::config::NatsEventQueueConfig; + +#[derive(Debug)] +pub struct NatsEventQueue { + name: String, + allow_protobuf: bool, + nats: async_nats::Client, +} + +#[derive(Debug, thiserror::Error)] +pub enum NatsEventQueueError { + #[error("connect: {0}")] + Connect(#[from] async_nats::ConnectError), + #[error("encode json: {0}")] + EncodeJson(#[from] serde_json::Error), + #[error("publish: {0}")] + Publish(#[from] async_nats::PublishError), +} + +impl NatsEventQueue { + #[tracing::instrument(skip(config), name = "NatsEventQueue::new", fields(name = %config.name), err)] + pub async fn new(config: &NatsEventQueueConfig) -> Result { + tracing::debug!("setting up nats event queue"); + let nats = async_nats::connect(&config.url).await?; + + Ok(Self { + name: config.name.clone(), + allow_protobuf: config.allow_protobuf, + nats, + }) + } +} + +impl EventQueue for NatsEventQueue { + fn name(&self) -> &str { + &self.name + } + + #[tracing::instrument(skip(self), name = "NatsEventQueue::publish", err)] + async fn publish(&self, topic: &str, data: EventCallback) -> Result<(), EventQueueError> { + let mut header_map = async_nats::HeaderMap::new(); + + let payload = if self.allow_protobuf { + header_map.insert("Content-Type", PROTOBUF_CONTENT_TYPE); + data.encode_to_vec() + } else { + header_map.insert("Content-Type", "application/json"); + serde_json::to_string(&data) + .map_err(NatsEventQueueError::EncodeJson)? + .into_bytes() + } + .into(); + + self.nats + .publish_with_headers(topic.to_owned(), header_map, payload) + .await + .map_err(NatsEventQueueError::Publish)?; + + Ok(()) + } +} diff --git a/image_processor/src/event_queue/redis.rs b/image_processor/src/event_queue/redis.rs new file mode 100644 index 00000000..560ad6e5 --- /dev/null +++ b/image_processor/src/event_queue/redis.rs @@ -0,0 +1,57 @@ +use fred::interfaces::PubsubInterface; +use fred::types::RedisConfig; +use prost::Message; +use scuffle_image_processor_proto::EventCallback; + +use super::{EventQueue, EventQueueError}; +use crate::config::RedisEventQueueConfig; + +#[derive(Debug)] +pub struct RedisEventQueue { + client: fred::clients::RedisClient, + name: String, + allow_protobuf: bool, +} + +#[derive(Debug, thiserror::Error)] +pub enum RedisEventQueueError { + #[error("redis: {0}")] + Redis(#[from] fred::error::RedisError), + #[error("json encode: {0}")] + JsonEncode(#[from] serde_json::Error), +} + +impl RedisEventQueue { + #[tracing::instrument(skip(config), name = "RedisEventQueue::new", fields(name = %config.name), err)] + pub async fn new(config: &RedisEventQueueConfig) -> Result { + Ok(Self { + client: fred::clients::RedisClient::new(RedisConfig::from_url(&config.url)?, None, None, None), + name: config.name.clone(), + allow_protobuf: config.allow_protobuf, + }) + } +} + +impl EventQueue for RedisEventQueue { + fn name(&self) -> &str { + &self.name + } + + #[tracing::instrument(skip(self), name = "RedisEventQueue::publish", err)] + async fn publish(&self, topic: &str, data: EventCallback) -> Result<(), EventQueueError> { + let payload = if self.allow_protobuf { + data.encode_to_vec() + } else { + serde_json::to_string(&data) + .map_err(RedisEventQueueError::JsonEncode)? + .into_bytes() + }; + + self.client + .publish(topic, payload) + .await + .map_err(RedisEventQueueError::Redis)?; + + Ok(()) + } +} diff --git a/image_processor/src/global.rs b/image_processor/src/global.rs index a4e458e6..92f7e451 100644 --- a/image_processor/src/global.rs +++ b/image_processor/src/global.rs @@ -1,29 +1,98 @@ -use scuffle_utils::context::Context; +use std::collections::HashMap; + +use anyhow::Context; +use bson::oid::ObjectId; +use scuffle_foundations::BootstrapResult; use crate::config::ImageProcessorConfig; +use crate::database::Job; +use crate::disk::public_http::PUBLIC_HTTP_DISK_NAME; +use crate::disk::{build_disk, AnyDisk, Disk}; +use crate::event_queue::{build_event_queue, AnyEventQueue, EventQueue}; -pub struct ImageProcessorGlobalImpl { - ctx: Context, +pub struct Global { + worker_id: ObjectId, config: ImageProcessorConfig, - http_client: reqwest::Client, + client: mongodb::Client, + database: mongodb::Database, + disks: HashMap, + event_queues: HashMap, } -pub trait ImageProcessorGlobal: Send + Sync + 'static { - fn ctx(&self) -> &Context; - fn config(&self) -> &ImageProcessorConfig; - fn http_client(&self) -> &reqwest::Client; -} +impl Global { + pub async fn new(config: ImageProcessorConfig) -> BootstrapResult { + tracing::debug!("setting up mongo client"); + + let client = mongodb::Client::with_uri_str(&config.database.uri).await.context("mongodb")?; + let Some(database) = client.default_database() else { + anyhow::bail!("no default database") + }; + + tracing::debug!("setting up job collection"); + + Job::setup_collection(&database).await.context("setup job collection")?; + + tracing::debug!("setting up disks and event queues"); + + let mut disks = HashMap::new(); + + for disk in &config.disks { + let disk = build_disk(disk).await.context("disk")?; + let name = disk.name().to_string(); + if disks.insert(name.clone(), disk).is_some() { + anyhow::bail!("duplicate disk name: {name}"); + } + } + + if config.disks.is_empty() { + tracing::warn!("no disks configured"); + } + + let mut event_queues = HashMap::new(); -impl ImageProcessorGlobal for ImageProcessorGlobalImpl { - fn ctx(&self) -> &Context { - &self.ctx + for event_queue in &config.event_queues { + let event_queue = build_event_queue(event_queue).await.context("event queue")?; + let name = event_queue.name().to_string(); + if event_queues.insert(name.clone(), event_queue).is_some() { + anyhow::bail!("duplicate event queue name: {name}"); + } + } + + if config.event_queues.is_empty() { + tracing::warn!("no event queues configured"); + } + + Ok(Self { + worker_id: ObjectId::new(), + config, + client, + database, + disks, + event_queues, + }) } - fn config(&self) -> &ImageProcessorConfig { + pub fn worker_id(&self) -> ObjectId { + self.worker_id + } + + pub fn config(&self) -> &ImageProcessorConfig { &self.config } - fn http_client(&self) -> &reqwest::Client { - &self.http_client + pub fn disk(&self, name: &str) -> Option<&AnyDisk> { + self.disks.get(name) + } + + pub fn event_queue(&self, name: &str) -> Option<&AnyEventQueue> { + self.event_queues.get(name) + } + + pub fn public_http_disk(&self) -> Option<&AnyDisk> { + self.disk(PUBLIC_HTTP_DISK_NAME) + } + + pub fn database(&self) -> &mongodb::Database { + &self.database } } diff --git a/image_processor/src/lib.rs b/image_processor/src/lib.rs deleted file mode 100644 index c7d8ee31..00000000 --- a/image_processor/src/lib.rs +++ /dev/null @@ -1,9 +0,0 @@ -pub mod config; -pub mod database; -pub mod global; -pub mod grpc; -pub mod pb; -pub mod processor; - -#[cfg(test)] -mod tests; diff --git a/image_processor/src/main.rs b/image_processor/src/main.rs index 95c38824..dc43c184 100644 --- a/image_processor/src/main.rs +++ b/image_processor/src/main.rs @@ -1,69 +1,34 @@ -#![allow(dead_code)] - use std::sync::Arc; -use anyhow::Context as _; -use binary_helper::global::{setup_database, setup_nats, GlobalCtx, GlobalDb, GlobalNats}; -use binary_helper::{bootstrap, grpc_health, grpc_server, impl_global_traits}; -use platform_image_processor::config::ImageProcessorConfig; -use scuffle_utils::context::Context; -use tokio::select; +use scuffle_foundations::bootstrap::{bootstrap, Bootstrap}; +use scuffle_foundations::settings::cli::Matches; +use scuffle_foundations::BootstrapResult; -#[derive(Debug, Clone, Default, config::Config, serde::Deserialize)] -#[serde(default)] -struct ExtConfig { - image_processor: ImageProcessorConfig, -} +use self::config::ImageProcessorConfig; -impl binary_helper::config::ConfigExtention for ExtConfig { - const APP_NAME: &'static str = "scuffle-image-processor"; -} +impl Bootstrap for ImageProcessorConfig { + type Settings = Self; -// TODO: We don't need grpc and nats -type AppConfig = binary_helper::config::AppConfig; + fn runtime_mode(&self) -> scuffle_foundations::bootstrap::RuntimeSettings { + self.runtime.clone() + } -pub fn main() { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .max_blocking_threads( - std::env::var("TOKIO_MAX_BLOCKING_THREADS") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(2048), - ) - .build() - .expect("failed to create tokio runtime") - .block_on(async { - if let Err(err) = bootstrap::(|global| async move { - let grpc_future = { - let mut server = grpc_server(&global.config.grpc) - .await - .context("failed to create grpc server")?; - let router = server.add_service(grpc_health::HealthServer::new(&global, |global, _| async move { - !global.db().is_closed() - && global.nats().connection_state() == async_nats::connection::State::Connected - })); - - let router = platform_image_processor::grpc::add_routes(&global, router); + fn telemetry_config(&self) -> Option { + Some(self.telemetry.clone()) + } +} - router.serve_with_shutdown(global.config.grpc.bind_address, async { - global.ctx().done().await; - }) - }; +mod config; +mod database; +mod disk; +mod event_queue; +mod global; - let processor_future = platform_image_processor::processor::run(global.clone()); +#[bootstrap] +async fn main(cfg: Matches) -> BootstrapResult<()> { + tracing::info!("starting image processor"); - select! { - r = grpc_future => r.context("grpc server stopped unexpectedly")?, - r = processor_future => r.context("processor stopped unexpectedly")?, - } + let global = Arc::new(global::Global::new(cfg.settings).await?); - Ok(()) - }) - .await - { - tracing::error!("{:#}", err); - std::process::exit(1); - } - }) + Ok(()) } diff --git a/video/transcoder/Cargo.toml b/video/transcoder/Cargo.toml index 2143401d..b8502c78 100644 --- a/video/transcoder/Cargo.toml +++ b/video/transcoder/Cargo.toml @@ -41,7 +41,7 @@ config = { workspace = true } pb = { workspace = true } video-common = { workspace = true } binary-helper = { workspace = true } -scuffle-ffmpeg = { workspace = true, features = ["tokio-channel", "tracing", "task-abort"] } +scuffle-ffmpeg = { workspace = true, features = ["tokio-channel", "tracing"] } [dev-dependencies] dotenvy = "0.15"