diff --git a/Cargo.lock b/Cargo.lock index 0a4e214d..0ecb2f7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3668,6 +3668,7 @@ dependencies = [ "chrono", "common", "config", + "fred", "futures", "futures-util", "hmac", diff --git a/common/Cargo.toml b/common/Cargo.toml index 3e2e1a9e..35764ea2 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -65,7 +65,7 @@ uuid = { version = "1.6", features = ["v4"], optional = true } ulid = { version = "1.1", features = ["uuid"], optional = true} aws-config = { version = "1.1", optional = true } -aws-sdk-s3 = { version = "1.12", optional = true } +aws-sdk-s3 = { version = "1.12", optional = true, features = ["behavior-version-latest"] } aws-credential-types = { version = "1.1", optional = true, features = ["hardcoded-credentials"] } aws-smithy-types = { version = "1.1", features = ["http-body-1-x"], optional = true } http-body = { version = "1.0.0", optional = true } diff --git a/common/src/config.rs b/common/src/config.rs index e33de0a5..afad301d 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -195,7 +195,7 @@ pub struct S3BucketConfig { impl Default for S3BucketConfig { fn default() -> Self { Self { - name: "scuffle-image-processor".to_owned(), + name: "scuffle".to_owned(), region: "us-east-1".to_owned(), endpoint: Some("http://localhost:9000".to_string()), credentials: S3CredentialsConfig::default(), diff --git a/ffmpeg/src/scalar.rs b/ffmpeg/src/scalar.rs index 85973b61..190cdd68 100644 --- a/ffmpeg/src/scalar.rs +++ b/ffmpeg/src/scalar.rs @@ -57,15 +57,12 @@ impl Scalar { frame_mut.height = height; frame_mut.format = pixel_format as i32; - // Safety: `av_image_alloc` is safe to call, and the pointer returned is valid. - av_image_alloc( - frame_mut.data.as_mut_ptr(), - frame_mut.linesize.as_mut_ptr(), - width, - height, - pixel_format, - 32, - ); + // Safety: `av_frame_get_buffer` is safe to call, and the pointer returned is + // valid. + match av_frame_get_buffer(frame_mut, 32) { + 0 => {} + err => return Err(FfmpegError::Code(err.into())), + } } Ok(Self { diff --git a/platform/api/Cargo.toml b/platform/api/Cargo.toml index a48b2766..bd4a59f6 100644 --- a/platform/api/Cargo.toml +++ b/platform/api/Cargo.toml @@ -43,7 +43,7 @@ thiserror = "1.0" anyhow = "1.0" multer = "3.0" aws-config = "1.1" -aws-sdk-s3 = "1.12" +aws-sdk-s3 = { version = "1.12", features = ["behavior-version-latest"] } http-body = "1.0" http-body-util = "0.1" hyper-util = "0.1" @@ -51,6 +51,7 @@ pin-project = "1.1" base64 = "0.21" postgres-from-row = "0.5" postgres-types = "0.2" +fred = { version = "8.0", features = ["enable-rustls", "sentinel-client", "dns"] } config = { workspace = true } pb = { workspace = true } diff --git a/platform/api/src/api/v1/gql/models/category.rs b/platform/api/src/api/v1/gql/models/category.rs index 1dabc809..3a8fb277 100644 --- a/platform/api/src/api/v1/gql/models/category.rs +++ b/platform/api/src/api/v1/gql/models/category.rs @@ -17,7 +17,7 @@ impl From for Category { Self { id: value.id.into(), name: value.name, - revision: value.revision, + revision: 1, updated_at: value.updated_at.into(), } } diff --git a/platform/api/src/api/v1/gql/models/image_upload.rs b/platform/api/src/api/v1/gql/models/image_upload.rs index 6023f197..0d0c61cf 100644 --- a/platform/api/src/api/v1/gql/models/image_upload.rs +++ b/platform/api/src/api/v1/gql/models/image_upload.rs @@ -6,7 +6,7 @@ use super::ulid::GqlUlid; use crate::api::v1::gql::error::{GqlError, Result}; use crate::api::v1::gql::ext::ContextExt; use crate::config::ImageUploaderConfig; -use crate::database::UploadedFile; +use crate::database::{UploadedFile, UploadedFileStatus}; use crate::global::ApiGlobal; #[derive(SimpleObject, Clone)] @@ -51,7 +51,7 @@ impl ImageUpload { impl ImageUpload { pub fn from_uploaded_file(uploaded_file: UploadedFile) -> Result> { - if uploaded_file.pending { + if uploaded_file.status != UploadedFileStatus::Completed { return Ok(None); } diff --git a/platform/api/src/api/v1/gql/subscription/file.rs b/platform/api/src/api/v1/gql/subscription/file.rs index 38b4fa2f..67bfd5a9 100644 --- a/platform/api/src/api/v1/gql/subscription/file.rs +++ b/platform/api/src/api/v1/gql/subscription/file.rs @@ -7,6 +7,7 @@ use crate::api::v1::gql::error::ext::{OptionExt, ResultExt}; use crate::api::v1::gql::error::{GqlError, Result}; use crate::api::v1::gql::ext::ContextExt; use crate::api::v1::gql::models::ulid::GqlUlid; +use crate::database::UploadedFileStatus; use crate::global::ApiGlobal; use crate::subscription::SubscriptionTopic; @@ -63,7 +64,7 @@ impl FileSubscription { .map_err_gql("failed to subscribe to file status")?; Ok(async_stream::stream!({ - if !file.pending { + if file.status != UploadedFileStatus::Queued { // When file isn't pending anymore, just yield once with the status from the db let status = if file.failed.is_some() { FileStatus::Failure diff --git a/platform/api/src/api/v1/upload/profile_picture.rs b/platform/api/src/api/v1/upload/profile_picture.rs index cca332a8..e0e3dd7d 100644 --- a/platform/api/src/api/v1/upload/profile_picture.rs +++ b/platform/api/src/api/v1/upload/profile_picture.rs @@ -17,7 +17,7 @@ use crate::api::auth::AuthData; use crate::api::error::ApiError; use crate::api::Body; use crate::config::{ApiConfig, ImageUploaderConfig}; -use crate::database::{FileType, RolePermission}; +use crate::database::{FileType, RolePermission, UploadedFileStatus}; use crate::global::ApiGlobal; fn create_task(file_id: Ulid, input_path: &str, config: &ImageUploaderConfig, owner_id: Ulid) -> image_processor::Task { @@ -33,7 +33,7 @@ fn create_task(file_id: Ulid, input_path: &str, config: &ImageUploaderConfig, ow ImageFormat::Webp as i32, ImageFormat::Avif as i32, ], - callback_subject: config.profile_picture_callback_subject.clone(), + callback_subject: config.callback_subject.clone(), limits: Some(image_processor::task::Limits { max_input_duration_ms: 10 * 1000, // 10 seconds max_input_frame_count: 300, @@ -197,7 +197,7 @@ impl UploadType for ProfilePicture { .await .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to insert image job"))?; - common::database::query("INSERT INTO uploaded_files(id, owner_id, uploader_id, name, type, metadata, total_size, pending, path) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)") + common::database::query("INSERT INTO uploaded_files(id, owner_id, uploader_id, name, type, metadata, total_size, path, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)") .bind(file_id) // id .bind(auth.session.user_id) // owner_id .bind(auth.session.user_id) // uploader_id @@ -209,8 +209,8 @@ impl UploadType for ProfilePicture { })), })) // metadata .bind(file.len() as i64) // total_size - .bind(true) // pending .bind(&input_path) // path + .bind(UploadedFileStatus::Queued) // status .build() .execute(&tx) .await diff --git a/platform/api/src/config.rs b/platform/api/src/config.rs index c470e3b7..bdeb65a7 100644 --- a/platform/api/src/config.rs +++ b/platform/api/src/config.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; use std::path::PathBuf; +use std::time::Duration; use common::config::{S3BucketConfig, TlsConfig}; @@ -70,23 +71,27 @@ pub struct ImageUploaderConfig { /// The S3 Bucket which contains the source images pub bucket: S3BucketConfig, + /// Public endpoint for downloads + pub public_endpoint: String, + /// Profile picture callback subject (can't contain `.`) - pub profile_picture_callback_subject: String, + pub callback_subject: String, /// Profile picture task priority, higher number means higher priority pub profile_picture_task_priority: i64, - /// Public endpoint for downloads - pub public_endpoint: String, + /// Igdb image task priority, higher number means higher priority + pub igdb_image_task_priority: i32, } impl Default for ImageUploaderConfig { fn default() -> Self { Self { bucket: S3BucketConfig::default(), - profile_picture_callback_subject: "scuffle-platform-image_processor-profile_picture".to_string(), - profile_picture_task_priority: 2, + callback_subject: "scuffle-platform-image_processor-callback".to_string(), public_endpoint: "https://images.scuffle.tv/scuffle-image-processor-public".to_string(), + profile_picture_task_priority: 2, + igdb_image_task_priority: 1, } } } @@ -138,3 +143,34 @@ pub struct VideoApiPlaybackKeypairConfig { /// Path to the playback private key for the video api pub private_key: PathBuf, } + +#[derive(Debug, Clone, PartialEq, config::Config, serde::Deserialize)] +#[serde(default)] +pub struct IgDbConfig { + /// The IGDB Client ID + pub client_id: String, + + /// The IGDB Client Secret + pub client_secret: String, + + /// Process Images + pub process_images: bool, + + /// Refresh Interval + pub refresh_interval: Duration, + + /// Igdb Cron Subject + pub igdb_cron_subject: String, +} + +impl Default for IgDbConfig { + fn default() -> Self { + Self { + client_id: "igdb_client_id".to_string(), + client_secret: "igdb_client_secret".to_string(), + process_images: false, + refresh_interval: Duration::from_secs(6 * 60 * 60), // 6 hours + igdb_cron_subject: "scuffle-platform-igdb_cron".to_string(), + } + } +} diff --git a/platform/api/src/database/category.rs b/platform/api/src/database/category.rs index a2fdc1d0..c4205964 100644 --- a/platform/api/src/database/category.rs +++ b/platform/api/src/database/category.rs @@ -4,7 +4,17 @@ use ulid::Ulid; #[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct Category { pub id: Ulid, + pub igdb_id: Option, pub name: String, - pub revision: i32, + pub aliases: Vec, + pub keywords: Vec, + pub storyline: Option, + pub summary: Option, + pub over_18: bool, + pub cover_id: Option, + pub rating: f64, + pub artwork_ids: Vec, + pub igdb_similar_game_ids: Vec, + pub websites: Vec, pub updated_at: DateTime, } diff --git a/platform/api/src/database/file_type.rs b/platform/api/src/database/file_type.rs index fc90e04b..8daf1dc9 100644 --- a/platform/api/src/database/file_type.rs +++ b/platform/api/src/database/file_type.rs @@ -3,14 +3,10 @@ use postgres_types::{FromSql, ToSql}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ToSql, FromSql)] #[postgres(name = "file_type")] pub enum FileType { - #[postgres(name = "custom_thumbnail")] - CustomThumbnail, #[postgres(name = "profile_picture")] ProfilePicture, - #[postgres(name = "offline_banner")] - OfflineBanner, - #[postgres(name = "role_badge")] - RoleBadge, - #[postgres(name = "channel_role_badge")] - ChannelRoleBadge, + #[postgres(name = "category_cover")] + CategoryCover, + #[postgres(name = "category_artwork")] + CategoryArtwork, } diff --git a/platform/api/src/database/mod.rs b/platform/api/src/database/mod.rs index e359445e..511032a3 100644 --- a/platform/api/src/database/mod.rs +++ b/platform/api/src/database/mod.rs @@ -8,6 +8,7 @@ mod search_result; mod session; mod two_fa_request; mod uploaded_file; +mod uploaded_file_status; mod user; pub use category::*; @@ -20,4 +21,5 @@ pub use search_result::*; pub use session::*; pub use two_fa_request::*; pub use uploaded_file::*; +pub use uploaded_file_status::*; pub use user::*; diff --git a/platform/api/src/database/uploaded_file.rs b/platform/api/src/database/uploaded_file.rs index 6534c448..eaf1a90f 100644 --- a/platform/api/src/database/uploaded_file.rs +++ b/platform/api/src/database/uploaded_file.rs @@ -1,20 +1,20 @@ use common::database::protobuf; use ulid::Ulid; -use super::FileType; +use super::{FileType, UploadedFileStatus}; #[derive(Debug, Clone, postgres_from_row::FromRow)] pub struct UploadedFile { pub id: Ulid, - pub owner_id: Ulid, - pub uploader_id: Ulid, + pub owner_id: Option, + pub uploader_id: Option, pub name: String, #[from_row(rename = "type")] pub ty: FileType, #[from_row(from_fn = "protobuf")] pub metadata: pb::scuffle::platform::internal::types::UploadedFileMetadata, pub total_size: i64, - pub pending: bool, + pub status: UploadedFileStatus, pub path: String, pub updated_at: chrono::DateTime, pub failed: Option, diff --git a/platform/api/src/database/uploaded_file_status.rs b/platform/api/src/database/uploaded_file_status.rs new file mode 100644 index 00000000..a6c2628d --- /dev/null +++ b/platform/api/src/database/uploaded_file_status.rs @@ -0,0 +1,14 @@ +use postgres_types::{FromSql, ToSql}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ToSql, FromSql)] +#[postgres(name = "uploaded_file_status")] +pub enum UploadedFileStatus { + #[postgres(name = "unqueued")] + Unqueued, + #[postgres(name = "queued")] + Queued, + #[postgres(name = "failed")] + Failed, + #[postgres(name = "completed")] + Completed, +} diff --git a/platform/api/src/global.rs b/platform/api/src/global.rs index c8f9a24c..f8a7c5fe 100644 --- a/platform/api/src/global.rs +++ b/platform/api/src/global.rs @@ -1,6 +1,6 @@ use common::dataloader::DataLoader; -use crate::config::{ApiConfig, ImageUploaderConfig, JwtConfig, TurnstileConfig, VideoApiConfig}; +use crate::config::{ApiConfig, IgDbConfig, ImageUploaderConfig, JwtConfig, TurnstileConfig, VideoApiConfig}; use crate::dataloader::category::CategoryByIdLoader; use crate::dataloader::global_state::GlobalStateLoader; use crate::dataloader::role::RoleByIdLoader; @@ -39,8 +39,10 @@ pub trait ApiGlobal: + common::global::GlobalConfigProvider + common::global::GlobalConfigProvider + common::global::GlobalConfigProvider + + common::global::GlobalConfigProvider + common::global::GlobalNats + common::global::GlobalDb + + common::global::GlobalRedis + common::global::GlobalConfig + ApiState + Send @@ -56,8 +58,10 @@ impl ApiGlobal for T where + common::global::GlobalConfigProvider + common::global::GlobalConfigProvider + common::global::GlobalConfigProvider + + common::global::GlobalConfigProvider + common::global::GlobalNats + common::global::GlobalDb + + common::global::GlobalRedis + common::global::GlobalConfig + ApiState + Send diff --git a/platform/api/src/igdb_cron.rs b/platform/api/src/igdb_cron.rs new file mode 100644 index 00000000..f8941695 --- /dev/null +++ b/platform/api/src/igdb_cron.rs @@ -0,0 +1,608 @@ +use std::collections::HashMap; +use std::pin::pin; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use async_nats::jetstream::consumer::pull::{Config, MessagesErrorKind}; +use async_nats::jetstream::consumer::Consumer; +use async_nats::jetstream::AckKind; +use bytes::Bytes; +use fred::interfaces::KeysInterface; +use futures_util::StreamExt; +use pb::scuffle::platform::internal::image_processor; +use pb::scuffle::platform::internal::types::{uploaded_file_metadata, ImageFormat, UploadedFileMetadata}; +use postgres_from_row::tokio_postgres::IsolationLevel; +use postgres_from_row::FromRow; +use tokio::select; +use ulid::Ulid; + +use crate::config::{IgDbConfig, ImageUploaderConfig}; +use crate::database::{Category, FileType, UploadedFile, UploadedFileStatus}; +use crate::global::ApiGlobal; + +pub async fn run(global: Arc) -> anyhow::Result<()> { + let config = global.config::(); + let stream = global + .jetstream() + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: config.igdb_cron_subject.clone(), + subjects: vec![config.igdb_cron_subject.clone()], + max_messages: 1, + discard: async_nats::jetstream::stream::DiscardPolicy::New, + retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue, + ..Default::default() + }) + .await + .context("create stream")?; + + let consumer = stream + .get_or_create_consumer( + "igdb-cron", + async_nats::jetstream::consumer::pull::Config { + name: Some("igdb-cron".to_string()), + ..Default::default() + }, + ) + .await + .context("create consumer")?; + + select! { + e = cron(&global, config) => e.context("cron")?, + e = process(&global, consumer, config) => e.context("process")?, + } + Ok(()) +} + +async fn cron(global: &Arc, config: &IgDbConfig) -> anyhow::Result<()> { + let mut timer = tokio::time::interval(Duration::from_secs(60)); + loop { + timer.tick().await; + tracing::debug!("igdb cron"); + global + .nats() + .publish(config.igdb_cron_subject.clone(), Bytes::new()) + .await + .context("publish")?; + } +} + +async fn process(global: &Arc, consumer: Consumer, config: &IgDbConfig) -> anyhow::Result<()> { + let mut messages = consumer.messages().await.context("messages")?; + + let duration = chrono::Duration::from_std(config.refresh_interval).context("duration")?; + + 'outer: while let Some(message) = messages.next().await { + let message = match message { + Ok(message) => message, + Err(err) if matches!(err.kind(), MessagesErrorKind::MissingHeartbeat) => { + continue; + } + Err(err) => { + anyhow::bail!("message: {:#}", err); + } + }; + + let info = message.info().map_err(|e| anyhow::anyhow!("info: {e}"))?; + + let time = chrono::DateTime::::from(std::time::SystemTime::from(info.published)); + let first_run = global + .redis() + .get::, _>("igdb_last_refresh") + .await + .context("redis get")? + .is_none(); + + if chrono::Utc::now() - time < duration && !first_run { + tracing::info!("Skipping IGDB refresh"); + message + .ack_with(AckKind::Nak(Some(Duration::from_secs(300)))) + .await + .map_err(|e| anyhow::anyhow!("ack: {}", e))?; + continue; + } + + // Refresh IGDB + tracing::info!("Refreshing IGDB"); + + let refresh_igdb = refresh_igdb(global, config); + let mut refresh_igdb = pin!(refresh_igdb); + + loop { + select! { + e = &mut refresh_igdb => { + if let Err(e) = e { + tracing::error!("igdb: {:#}", e); + message.ack_with(AckKind::Nak(Some(Duration::from_secs(300)))).await.map_err(|e| anyhow::anyhow!("ack: {e}"))?; + continue 'outer; + } + + break; + } + _ = tokio::time::sleep(Duration::from_secs(15)) => { + tracing::debug!("igdb: refresh ack hold"); + message.ack_with(AckKind::Progress).await.map_err(|e| anyhow::anyhow!("ack: {e}"))?; + } + } + } + + global + .redis() + .set("igdb_last_refresh", chrono::Utc::now().to_rfc3339(), None, None, false) + .await + .context("redis set")?; + message + .ack_with(AckKind::Ack) + .await + .map_err(|e| anyhow::anyhow!("ack: {e}"))?; + } + + Ok(()) +} + +#[derive(serde::Deserialize)] +struct CountResponse { + count: usize, +} + +#[derive(Default, serde::Deserialize)] +#[serde(default)] +struct Game { + id: i32, + age_ratings: Vec, + alternative_names: Vec, + artworks: Vec, + cover: Option, + genres: Vec, + keywords: Vec, + name: String, + rating: f64, + similar_games: Vec, + storyline: Option, + summary: Option, + themes: Vec, + websites: Vec, +} + +#[derive(Default, serde::Deserialize)] +#[serde(default)] +struct AgeRating { + id: i32, + category: i32, + rating: i32, +} + +#[derive(Default, serde::Deserialize)] +#[serde(default)] +struct IdName { + id: i32, + name: String, +} + +#[derive(Default, serde::Deserialize)] +#[serde(default)] +struct Image { + id: i32, + alpha_channel: bool, + animated: bool, + image_id: String, + height: i32, + width: i32, +} + +#[derive(Default, serde::Deserialize)] +#[serde(default)] +struct Website { + id: i32, + category: i32, + url: String, +} + +async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> anyhow::Result<()> { + let access_token = global + .redis() + .get::, _>("igdb_access_token") + .await + .context("redis get")?; + let access_token = if let Some(access_token) = access_token { + access_token + } else { + let (access_token, ttl) = get_access_token(config).await.context("get access token")?; + global + .redis() + .set( + "igdb_access_token", + &access_token, + Some(fred::types::Expiration::EX((ttl / 2).max(1))), + None, + false, + ) + .await + .context("redis set")?; + access_token + }; + + let client = reqwest::ClientBuilder::new() + .user_agent("scuffle/0.1.0") + .default_headers({ + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + reqwest::header::AUTHORIZATION, + format!("Bearer {}", access_token).parse().unwrap(), + ); + headers.insert(reqwest::header::ACCEPT, "application/json".parse().unwrap()); + headers.insert("Client-ID", config.client_id.parse().unwrap()); + headers + }) + .build() + .context("build client")?; + + // The API has a ratelimit of 4 requests per second + // Lets start by counting the number of games there are. + // Then we can divide that by 4 to get the number of seconds we need to wait + + let resp = client + .post("https://api.igdb.com/v4/games/count") + .body("where category = 0;") // Category 0 is for games (not dlc, etc) + .send() + .await + .context("count request")?; + + resp.error_for_status_ref().context("count response")?; + + let resp = resp.json::().await.context("count json")?; + + let total = resp.count; + let aprox_seconds = (total as f64 / 500.0 * 1000.0 / 400.0).round() as i64; + + tracing::info!("IGDB has {total} games, a refresh will take aproximately {aprox_seconds} seconds"); + + let mut timer = tokio::time::interval(Duration::from_millis(250)); + timer.tick().await; + + let mut offset = 0; + + loop { + let resp = client.post("https://api.igdb.com/v4/games") + .body(format!("fields name, genres.name, alternative_names.name, summary, storyline, id, age_ratings.*, artworks.*, keywords.name, rating, similar_games, url, themes.name, websites.*, cover.*; where category = 0; offset {offset}; limit 500;")) + .send() + .await + .context("games request")?; + + resp.error_for_status_ref().context("games response")?; + + let resp = resp.json::>().await.context("games json")?; + + if resp.is_empty() { + tracing::info!("igdb: done"); + break; + } + + struct InsertItem { + id: Ulid, + igdb_id: i32, + name: String, + alternative_names: Vec, + keywords: Vec, + storyline: Option, + summary: Option, + over_18: bool, + cover_id: Option, + rating: f64, + updated_at: chrono::DateTime, + artwork_ids: Vec, + igdb_similar_game_ids: Vec, + websites: Vec, + } + + let mut client = global.db().get().await.context("get db connection")?; + let tx = client.transaction().await.context("start transaction")?; + + let image_ids = resp + .iter() + .flat_map(|item| { + item.artworks + .iter() + .chain(item.cover.as_ref()) + .map(|x| (Ulid::new(), x.image_id.as_str())) + }) + .collect::>(); + + #[derive(FromRow)] + struct ImageId { + image_id: String, + uploaded_file_id: Ulid, + } + + common::database::query("INSERT INTO igdb_image (uploaded_file_id, image_id)") + .push_values(&image_ids, |mut sep, item| { + sep.push_bind(item.0); + sep.push_bind(item.1); + }) + .push("ON CONFLICT (image_id) DO NOTHING;") + .build() + .execute(&tx) + .await + .context("insert igdb_image")?; + + let image_ids = + common::database::query("SELECT image_id, uploaded_file_id FROM igdb_image WHERE image_id = ANY($1::TEXT[])") + .bind(image_ids.iter().map(|x| x.1).collect::>()) + .build_query_as::() + .fetch_all(&tx) + .await + .context("select igdb_image")?; + + let image_ids = image_ids + .into_iter() + .map(|row| (row.image_id, row.uploaded_file_id)) + .collect::>(); + + let uploaded_files = resp + .iter() + .flat_map(|item| { + item.cover + .as_ref() + .map(|cover| UploadedFile { + id: image_ids[&cover.image_id], + failed: None, + name: format!("igdb/cover/{}.jpg", cover.image_id), + owner_id: None, + path: format!( + "https://images.igdb.com/igdb/image/upload/t_cover_big_2x/{}.jpg", + cover.image_id + ), + status: UploadedFileStatus::Unqueued, + metadata: UploadedFileMetadata { + metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { + versions: Vec::new(), + })), + }, + total_size: 0, + ty: FileType::CategoryCover, + updated_at: chrono::Utc::now(), + uploader_id: None, + }) + .into_iter() + .chain(item.artworks.iter().map(|artwork| UploadedFile { + id: image_ids[&artwork.image_id], + failed: None, + name: format!("igdb/artwork/{}.jpg", artwork.image_id), + owner_id: None, + path: format!( + "https://images.igdb.com/igdb/image/upload/t_1080p_2x/{}.jpg", + artwork.image_id + ), + status: UploadedFileStatus::Unqueued, + metadata: UploadedFileMetadata { + metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { + versions: Vec::new(), + })), + }, + total_size: 0, + ty: FileType::CategoryArtwork, + updated_at: chrono::Utc::now(), + uploader_id: None, + })) + }) + .collect::>(); + + let uploaded_files_ids = + common::database::query("INSERT INTO uploaded_files (id, name, type, metadata, total_size, path, status) ") + .push_values(&uploaded_files, |mut sep, item| { + sep.push_bind(item.id); + sep.push_bind(&item.name); + sep.push_bind(item.ty); + sep.push_bind(common::database::Protobuf(item.metadata.clone())); + sep.push_bind(item.total_size); + sep.push_bind(&item.path); + sep.push_bind(item.status); + }) + .push("ON CONFLICT (id) DO NOTHING RETURNING id;") + .build_query_single_scalar::() + .fetch_all(&tx) + .await + .context("insert uploaded_files")?; + + let resp = resp + .into_iter() + .map(|item| InsertItem { + id: Ulid::new(), + igdb_id: item.id, + name: item.name, + alternative_names: item.alternative_names.into_iter().map(|x| x.name).collect::>(), + keywords: item + .keywords + .into_iter() + .chain(item.genres) + .chain(item.themes) + .map(|x| x.name.to_lowercase()) + .collect::>(), + storyline: item.storyline, + summary: item.summary, + over_18: item.age_ratings.into_iter().any(|x| x.category == 2 && x.rating == 5), // PEGI 18 + cover_id: item.cover.map(|x| image_ids[&x.image_id]), + rating: item.rating, + updated_at: chrono::Utc::now(), + artwork_ids: item.artworks.into_iter().map(|x| image_ids[&x.image_id]).collect::>(), + igdb_similar_game_ids: item.similar_games, + websites: item.websites.into_iter().map(|x| x.url).collect::>(), + }) + .collect::>(); + + offset += resp.len(); + let count = resp.len(); + + let categories = common::database::query("INSERT INTO categories (id, igdb_id, name, aliases, keywords, storyline, summary, over_18, cover_id, rating, updated_at, artwork_ids, igdb_similar_game_ids, websites) ") + .push_values(&resp, |mut sep, item| { + sep.push_bind(item.id); + sep.push_bind(item.igdb_id); + sep.push_bind(&item.name); + sep.push_bind(&item.alternative_names); + sep.push_bind(&item.keywords); + sep.push_bind(&item.storyline); + sep.push_bind(&item.summary); + sep.push_bind(item.over_18); + sep.push_bind(item.cover_id); + sep.push_bind(item.rating); + sep.push_bind(item.updated_at); + sep.push_bind(&item.artwork_ids); + sep.push_bind(&item.igdb_similar_game_ids); + sep.push_bind(&item.websites); + }) + .push("ON CONFLICT (igdb_id) WHERE igdb_id IS NOT NULL DO UPDATE SET ") + .push("name = EXCLUDED.name, ") + .push("aliases = EXCLUDED.aliases, ") + .push("keywords = EXCLUDED.keywords, ") + .push("storyline = EXCLUDED.storyline, ") + .push("summary = EXCLUDED.summary, ") + .push("rating = EXCLUDED.rating, ") + .push("updated_at = NOW(), ") + .push("igdb_similar_game_ids = EXCLUDED.igdb_similar_game_ids, ") + .push("websites = EXCLUDED.websites, ") + .push("artwork_ids = EXCLUDED.artwork_ids RETURNING *;") + .build_query_as::() + .fetch_all(&tx) + .await + .context("insert categories")?; + + if categories.len() != count { + tracing::warn!("igdb: categories count mismatch {} != {}", categories.len(), count); + } + + let categories = categories + .into_iter() + .flat_map(|c| { + c.cover_id + .into_iter() + .chain(c.artwork_ids.into_iter()) + .map(move |id| (id, c.id)) + }) + .collect::>(); + + common::database::query("WITH updated(id, category) AS (") + .push_values(categories.iter().collect::>(), |mut sep, item| { + sep.push_bind(item.0).push_unseparated("::UUID"); + sep.push_bind(item.1).push_unseparated("::UUID"); + }) + .push( + ") UPDATE igdb_image SET category_id = updated.category FROM updated WHERE igdb_image.uploaded_file_id = updated.id;", + ) + .build() + .execute(&tx) + .await + .context("update igdb_image")?; + + tx.commit().await.context("commit")?; + + if config.process_images { + let image_processor_config = global.config::(); + + let tx = client + .build_transaction() + .isolation_level(IsolationLevel::ReadCommitted) + .start() + .await + .context("start transaction image_jobs")?; + + let unqueued = common::database::query( + "UPDATE uploaded_files SET status = 'queued' WHERE id = ANY($1::UUID[]) AND status = 'unqueued' RETURNING id, path;", + ) + .bind(uploaded_files_ids) + .build_query_scalar::<(Ulid, String)>() + .fetch_all(&tx) + .await + .context("update uploaded_files")?; + + if !unqueued.is_empty() { + common::database::query("INSERT INTO image_jobs (id, priority, task) ") + .bind(image_processor_config.igdb_image_task_priority as i64) + .push_values(unqueued, |mut sep, (id, path)| { + sep.push_bind(id).push("$1").push_bind(common::database::Protobuf(create_task( + categories[&id], + id, + path, + image_processor_config, + ))); + }) + .push("ON CONFLICT (id) DO NOTHING;") + .build() + .execute(&tx) + .await + .context("insert image_jobs")?; + + tx.commit().await.context("commit image_jobs")?; + } + } + + tracing::debug!("igdb progress: {}/{}", offset, total); + + if count < 500 { + tracing::info!("igdb: done"); + break; + } + + timer.tick().await; + } + + Ok(()) +} + +async fn get_access_token(config: &IgDbConfig) -> anyhow::Result<(String, i64)> { + let client = reqwest::Client::new(); + let response = client + .post("https://id.twitch.tv/oauth2/token") + .form(&[ + ("client_id", config.client_id.as_ref()), + ("client_secret", config.client_secret.as_ref()), + ("grant_type", "client_credentials"), + ]) + .send() + .await + .context("request")?; + + response.error_for_status_ref().context("response")?; + + #[derive(serde::Deserialize)] + struct Response { + access_token: String, + expires_in: i64, + token_type: String, + } + + let response = response.json::().await.context("json")?; + + if response.token_type != "bearer" { + anyhow::bail!("invalid token type: {}", response.token_type); + } + + Ok((response.access_token, response.expires_in)) +} + +fn create_task( + category_id: ulid::Ulid, + id: ulid::Ulid, + path: String, + config: &ImageUploaderConfig, +) -> image_processor::Task { + image_processor::Task { + callback_subject: config.callback_subject.clone(), + upscale: false, + output_prefix: format!("categories/{category_id}/{id}"), + scales: vec![1], + limits: Some(image_processor::task::Limits { + max_processing_time_ms: 60000, + ..Default::default() + }), + formats: vec![ + ImageFormat::AvifStatic as i32, + ImageFormat::WebpStatic as i32, + ImageFormat::PngStatic as i32, + ], + input_path: path, + resize_method: image_processor::task::ResizeMethod::Fit as i32, + base_height: 1080, + base_width: 1920, + resize_algorithm: image_processor::task::ResizeAlgorithm::Lanczos3 as i32, + } +} diff --git a/platform/api/src/image_processor_callback.rs b/platform/api/src/image_processor_callback.rs new file mode 100644 index 00000000..762e4a45 --- /dev/null +++ b/platform/api/src/image_processor_callback.rs @@ -0,0 +1,262 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use async_nats::jetstream::consumer::pull::MessagesErrorKind; +use async_nats::jetstream::stream::RetentionPolicy; +use async_nats::jetstream::AckKind; +use futures_util::StreamExt; +use pb::ext::UlidExt; +use pb::scuffle::platform::internal::events::{processed_image, ProcessedImage}; +use pb::scuffle::platform::internal::types::{uploaded_file_metadata, UploadedFileMetadata}; +use prost::Message; +use tokio::select; + +use crate::config::ImageUploaderConfig; +use crate::database::{FileType, UploadedFile, UploadedFileStatus}; +use crate::global::ApiGlobal; +use crate::subscription::SubscriptionTopic; + +pub async fn run(global: Arc) -> anyhow::Result<()> { + let config = global.config::(); + + // It can't contain dots for some reason + let stream_name = config.callback_subject.replace('.', "-"); + + let stream = global + .jetstream() + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: stream_name.clone(), + subjects: vec![config.callback_subject.clone()], + max_consumers: 1, + retention: RetentionPolicy::WorkQueue, + ..Default::default() + }) + .await + .context("stream")?; + + let consumer = stream + .get_or_create_consumer( + &stream_name, + async_nats::jetstream::consumer::pull::Config { + name: Some(stream_name.clone()), + ..Default::default() + }, + ) + .await + .context("consumer")?; + + let mut messages = consumer.messages().await.context("messages")?; + + loop { + select! { + _ = global.ctx().done() => break, + message = messages.next() => { + handle_message(&global, message).await?; + }, + } + } + + Ok(()) +} + +async fn handle_message( + global: &Arc, + message: Option>>, +) -> anyhow::Result<()> { + let message = match message { + Some(Ok(message)) => message, + Some(Err(err)) if matches!(err.kind(), MessagesErrorKind::MissingHeartbeat) => { + tracing::warn!("missing heartbeat"); + return Ok(()); + } + Some(Err(err)) => { + anyhow::bail!("message: {:#}", err) + } + None => { + anyhow::bail!("stream closed"); + } + }; + + let (job_id, job_result) = match ProcessedImage::decode(message.payload.as_ref()) { + Ok(ProcessedImage { + job_id, + result: Some(result), + }) => (job_id, result), + err => { + if let Err(err) = err { + tracing::warn!(error = %err, "failed to decode image upload job result"); + } else { + tracing::warn!("malformed image upload job result"); + } + message + .ack() + .await + .map_err(|err| anyhow::anyhow!(err)) + .context("failed to ack")?; + return Ok(()); + } + }; + tracing::trace!("received image upload job result: {:?}", job_result); + + let mut client = global.db().get().await.context("failed to get db connection")?; + let tx = client.transaction().await.context("failed to start transaction")?; + + let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET status = $1, failed = $2, metadata = $3, updated_at = NOW() WHERE id = $4 AND status = 'queued' RETURNING *") + .bind(if matches!(job_result, processed_image::Result::Success(_)) { + UploadedFileStatus::Completed + } else { + UploadedFileStatus::Failed + }) + .bind(match &job_result { + processed_image::Result::Success(_) => None, + processed_image::Result::Failure(processed_image::Failure { reason, .. }) => { + Some(reason) + } + }) + .bind(common::database::Protobuf(UploadedFileMetadata { + metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { + versions: match &job_result { + processed_image::Result::Success(processed_image::Success { variants }) => variants.clone(), + processed_image::Result::Failure(_) => Vec::new(), + }, + })), + })) + .bind(job_id.into_ulid()) + .build_query_as() + .fetch_optional(&tx) + .await + .context("failed to get uploaded file")? { + Some(uploaded_file) => uploaded_file, + None => { + tracing::warn!("uploaded file not found"); + message.ack().await.map_err(|err| anyhow::anyhow!(err)).context("failed to ack")?; + return Ok(()); + } + }; + + match job_result { + processed_image::Result::Success(_) => { + global + .nats() + .publish( + SubscriptionTopic::UploadedFileStatus(uploaded_file.id), + pb::scuffle::platform::internal::events::UploadedFileStatus { + file_id: Some(uploaded_file.id.into()), + status: Some( + pb::scuffle::platform::internal::events::uploaded_file_status::Status::Success( + pb::scuffle::platform::internal::events::uploaded_file_status::Success {}, + ), + ), + } + .encode_to_vec() + .into(), + ) + .await + .context("failed to publish file update event")?; + + let updated = match uploaded_file.ty { + FileType::ProfilePicture => { + let owner_id = uploaded_file + .owner_id + .ok_or_else(|| anyhow::anyhow!("uploaded file owner id is null"))?; + + if common::database::query("UPDATE users SET profile_picture_id = $1, pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $2 AND pending_profile_picture_id = $3") + .bind(uploaded_file.id) + .bind(owner_id) + .bind(uploaded_file.id) + .build() + .execute(&tx) + .await + .context("failed to update user")? == 1 { + Some(( + SubscriptionTopic::UserProfilePicture(uploaded_file.owner_id.unwrap()), + pb::scuffle::platform::internal::events::UserProfilePicture { + user_id: Some(uploaded_file.owner_id.unwrap().into()), + profile_picture_id: Some(uploaded_file.id.into()), + } + .encode_to_vec() + .into(), + )) + } else { + None + } + } + FileType::CategoryCover => None, + FileType::CategoryArtwork => None, + }; + + if let Some((topic, payload)) = updated { + global + .nats() + .publish(topic, payload) + .await + .context("failed to publish image upload update event")?; + } + } + processed_image::Result::Failure(processed_image::Failure { + reason, + friendly_message, + }) => { + global + .nats() + .publish( + SubscriptionTopic::UploadedFileStatus(uploaded_file.id), + pb::scuffle::platform::internal::events::UploadedFileStatus { + file_id: Some(uploaded_file.id.into()), + status: Some( + pb::scuffle::platform::internal::events::uploaded_file_status::Status::Failure( + pb::scuffle::platform::internal::events::uploaded_file_status::Failure { + reason, + friendly_message, + }, + ), + ), + } + .encode_to_vec() + .into(), + ) + .await + .context("failed to publish file update event")?; + + match uploaded_file.ty { + FileType::ProfilePicture => { + let owner_id = uploaded_file + .owner_id + .ok_or_else(|| anyhow::anyhow!("uploaded file owner id is null"))?; + + common::database::query( + "UPDATE users SET pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $1 AND pending_profile_picture_id = $2", + ) + .bind(owner_id) + .bind(uploaded_file.id) + .build() + .execute(&tx) + .await + .context("failed to update user")?; + } + FileType::CategoryCover => {} + FileType::CategoryArtwork => {} + } + } + } + + if let Err(err) = tx.commit().await.context("failed to commit transaction") { + tracing::warn!(error = %err, "failed to commit transaction"); + message + .ack_with(AckKind::Nak(Some(Duration::from_secs(5)))) + .await + .map_err(|err| anyhow::anyhow!(err)) + .context("failed to ack")?; + return Ok(()); + } + + message + .ack() + .await + .map_err(|err| anyhow::anyhow!(err)) + .context("failed to ack")?; + + tracing::debug!("processed image upload job result"); + Ok(()) +} diff --git a/platform/api/src/image_upload_callback.rs b/platform/api/src/image_upload_callback.rs deleted file mode 100644 index 9fa43de0..00000000 --- a/platform/api/src/image_upload_callback.rs +++ /dev/null @@ -1,188 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use anyhow::Context; -use async_nats::jetstream::AckKind; -use futures_util::StreamExt; -use pb::ext::UlidExt; -use pb::scuffle::platform::internal::events::{processed_image, ProcessedImage}; -use pb::scuffle::platform::internal::types::{uploaded_file_metadata, UploadedFileMetadata}; -use prost::Message; -use tokio::select; - -use crate::config::ImageUploaderConfig; -use crate::database::UploadedFile; -use crate::global::ApiGlobal; -use crate::subscription::SubscriptionTopic; - -const PROFILE_PICTURE_CONSUMER_NAME: &str = "profile-picture-consumer"; - -pub async fn run(global: Arc) -> anyhow::Result<()> { - let config = global.config::(); - - let profile_picture_stream = global - .jetstream() - .get_or_create_stream(async_nats::jetstream::stream::Config { - name: config.profile_picture_callback_subject.clone(), - subjects: vec![config.profile_picture_callback_subject.clone()], - max_consumers: 1, - ..Default::default() - }) - .await - .context("failed to create profile picture stream")?; - - let profile_picture_consumer = profile_picture_stream - .get_or_create_consumer( - PROFILE_PICTURE_CONSUMER_NAME, - async_nats::jetstream::consumer::pull::Config { - name: Some(PROFILE_PICTURE_CONSUMER_NAME.into()), - durable_name: Some(PROFILE_PICTURE_CONSUMER_NAME.into()), - filter_subject: config.profile_picture_callback_subject.clone(), - ..Default::default() - }, - ) - .await - .context("failed to create profile picture consumer")?; - - let mut profile_picture_consumer = profile_picture_consumer - .messages() - .await - .context("failed to get profile picture consumer messages")?; - - loop { - select! { - _ = global.ctx().done() => break, - message = profile_picture_consumer.next() => { - let message = message.ok_or_else(|| anyhow::anyhow!("profile picture consumer closed"))?.context("failed to get profile picture consumer message")?; - let (job_id, job_result) = match ProcessedImage::decode(message.payload.as_ref()) { - Ok(ProcessedImage { job_id, result: Some(result) }) => (job_id, result), - err => { - if let Err(err) = err { - tracing::warn!(error = %err, "failed to decode profile picture job result"); - } else { - tracing::warn!("malformed profile picture job result"); - } - message.ack().await.map_err(|err| anyhow::anyhow!(err)).context("failed to ack")?; - continue; - }, - }; - tracing::debug!("received profile picture job result: {:?}", job_result); - - let mut client = global.db().get().await.context("failed to get db connection")?; - let tx = client.transaction().await.context("failed to start transaction")?; - - match job_result { - processed_image::Result::Success(processed_image::Success { variants }) => { - let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET pending = FALSE, metadata = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") - .bind(common::database::Protobuf(UploadedFileMetadata { - metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { - versions: variants, - })), - })) - .bind(job_id.into_ulid()) - .build_query_as() - .fetch_optional(&tx) - .await - .context("failed to get uploaded file")? { - Some(uploaded_file) => uploaded_file, - None => { - tracing::warn!("uploaded file not found"); - message.ack().await.map_err(|err| anyhow::anyhow!(err)).context("failed to ack")?; - continue; - } - }; - - global - .nats() - .publish( - SubscriptionTopic::UploadedFileStatus(uploaded_file.id), - pb::scuffle::platform::internal::events::UploadedFileStatus { - file_id: Some(uploaded_file.id.into()), - status: Some(pb::scuffle::platform::internal::events::uploaded_file_status::Status::Success(pb::scuffle::platform::internal::events::uploaded_file_status::Success {})), - }.encode_to_vec().into(), - ) - .await - .context("failed to publish file update event")?; - - let user_updated = common::database::query("UPDATE users SET profile_picture_id = $1, pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $2 AND pending_profile_picture_id = $1") - .bind(uploaded_file.id) - .bind(uploaded_file.owner_id) - .build() - .execute(&tx) - .await - .context("failed to update user")? == 1; - - if let Err(err) = tx.commit().await.context("failed to commit transaction") { - tracing::warn!(error = %err, "failed to commit transaction"); - message.ack_with(AckKind::Nak(Some(Duration::from_secs(5)))).await.map_err(|err| anyhow::anyhow!(err)).context("failed to ack")?; - continue; - } - - if user_updated { - global - .nats() - .publish( - SubscriptionTopic::UserProfilePicture(uploaded_file.owner_id), - pb::scuffle::platform::internal::events::UserProfilePicture { - user_id: Some(uploaded_file.owner_id.into()), - profile_picture_id: Some(uploaded_file.id.into()), - }.encode_to_vec().into(), - ) - .await - .context("failed to publish profile picture update event")?; - } - }, - processed_image::Result::Failure(processed_image::Failure { reason, friendly_message }) => { - let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET pending = FALSE, failed = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") - .bind(reason.clone()) - .bind(job_id.into_ulid()) - .build_query_as() - .fetch_optional(&tx) - .await - .context("failed to get uploaded file")? { - Some(uploaded_file) => uploaded_file, - None => { - tracing::warn!("uploaded file not found"); - message.ack().await.map_err(|err| anyhow::anyhow!(err)).context("failed to ack")?; - continue; - } - }; - - global - .nats() - .publish( - SubscriptionTopic::UploadedFileStatus(uploaded_file.id), - pb::scuffle::platform::internal::events::UploadedFileStatus { - file_id: Some(uploaded_file.id.into()), - status: Some(pb::scuffle::platform::internal::events::uploaded_file_status::Status::Failure(pb::scuffle::platform::internal::events::uploaded_file_status::Failure { - reason, - friendly_message, - })), - }.encode_to_vec().into(), - ) - .await - .context("failed to publish file update event")?; - - common::database::query("UPDATE users SET pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $1 AND pending_profile_picture_id = $2") - .bind(uploaded_file.owner_id) - .bind(uploaded_file.id) - .build() - .execute(&tx) - .await - .context("failed to update user")?; - - if let Err(err) = tx.commit().await.context("failed to commit transaction") { - tracing::warn!(error = %err, "failed to commit transaction"); - message.ack_with(AckKind::Nak(Some(Duration::from_secs(5)))).await.map_err(|err| anyhow::anyhow!(err)).context("failed to ack")?; - continue; - } - }, - } - - message.ack().await.map_err(|err| anyhow::anyhow!(err)).context("failed to ack")?; - }, - } - } - - Ok(()) -} diff --git a/platform/api/src/lib.rs b/platform/api/src/lib.rs index 5e12c69e..bcb18296 100644 --- a/platform/api/src/lib.rs +++ b/platform/api/src/lib.rs @@ -4,7 +4,8 @@ pub mod database; pub mod dataloader; pub mod global; pub mod grpc; -pub mod image_upload_callback; +pub mod igdb_cron; +pub mod image_processor_callback; pub mod subscription; pub mod turnstile; pub mod video_api; diff --git a/platform/api/src/main.rs b/platform/api/src/main.rs index 4d356a3c..36ef8515 100644 --- a/platform/api/src/main.rs +++ b/platform/api/src/main.rs @@ -3,13 +3,14 @@ use std::time::Duration; use anyhow::Context as _; use async_graphql::SDLExportOptions; -use binary_helper::global::{setup_database, setup_nats}; +use binary_helper::global::{setup_database, setup_nats, setup_redis}; use binary_helper::{bootstrap, grpc_health, grpc_server, impl_global_traits}; +use common::config::RedisConfig; use common::context::Context; use common::dataloader::DataLoader; use common::global::*; use common::grpc::TlsSettings; -use platform_api::config::{ApiConfig, ImageUploaderConfig, JwtConfig, TurnstileConfig, VideoApiConfig}; +use platform_api::config::{ApiConfig, IgDbConfig, ImageUploaderConfig, JwtConfig, TurnstileConfig, VideoApiConfig}; use platform_api::dataloader::category::CategoryByIdLoader; use platform_api::dataloader::global_state::GlobalStateLoader; use platform_api::dataloader::role::RoleByIdLoader; @@ -21,7 +22,7 @@ use platform_api::video_api::{ load_playback_keypair_private_key, setup_video_events_client, setup_video_playback_session_client, setup_video_room_client, VideoEventsClient, VideoPlaybackSessionClient, VideoRoomClient, }; -use platform_api::{image_upload_callback, video_event_handler}; +use platform_api::{igdb_cron, image_processor_callback, video_event_handler}; use tokio::select; #[derive(Debug, Clone, Default, config::Config, serde::Deserialize)] @@ -46,6 +47,12 @@ struct ExtConfig { /// The video api config video_api: VideoApiConfig, + + /// The IGDB config + igdb: IgDbConfig, + + /// Redis Config + redis: RedisConfig, } impl binary_helper::config::ConfigExtention for ExtConfig { @@ -98,11 +105,20 @@ struct GlobalState { video_playback_session_client: VideoPlaybackSessionClient, video_events_client: VideoEventsClient, + redis: Arc, + playback_private_key: Option>, } impl_global_traits!(GlobalState); +impl common::global::GlobalRedis for GlobalState { + #[inline(always)] + fn redis(&self) -> &Arc { + &self.redis + } +} + impl common::global::GlobalConfigProvider for GlobalState { #[inline(always)] fn provide_config(&self) -> &ApiConfig { @@ -138,6 +154,13 @@ impl common::global::GlobalConfigProvider for GlobalState { } } +impl common::global::GlobalConfigProvider for GlobalState { + #[inline(always)] + fn provide_config(&self) -> &IgDbConfig { + &self.config.extra.igdb + } +} + impl platform_api::global::ApiState for GlobalState { fn category_by_id_loader(&self) -> &DataLoader { &self.category_by_id_loader @@ -252,12 +275,15 @@ impl binary_helper::Global for GlobalState { .map(load_playback_keypair_private_key) .transpose()?; + let redis = setup_redis(&config.extra.redis).await?; + Ok(Self { ctx, config, nats, jetstream, db, + redis, category_by_id_loader, global_state_loader, role_by_id_loader, @@ -294,19 +320,18 @@ pub async fn main() { }; let api_future = platform_api::api::run(global.clone()); - let subscription_manager = global.subscription_manager.run(global.ctx.clone(), global.nats.clone()); - let video_event_handler = video_event_handler::run(global.clone()); - - let image_upload_callback = image_upload_callback::run(global.clone()); + let image_processor_callback = image_processor_callback::run(global.clone()); + let igdb_cron = igdb_cron::run(global.clone()); select! { r = grpc_future => r.context("grpc server stopped unexpectedly")?, r = api_future => r.context("api server stopped unexpectedly")?, r = subscription_manager => r.context("subscription manager stopped unexpectedly")?, r = video_event_handler => r.context("video event handler stopped unexpectedly")?, - r = image_upload_callback => r.context("image upload callback handler stopped unexpectedly")?, + r = image_processor_callback => r.context("image processor callback handler stopped unexpectedly")?, + r = igdb_cron => r.context("igdb cron stopped unexpectedly")?, } Ok(()) diff --git a/platform/image_processor/Cargo.toml b/platform/image_processor/Cargo.toml index db84f3ea..feb69bc7 100644 --- a/platform/image_processor/Cargo.toml +++ b/platform/image_processor/Cargo.toml @@ -14,7 +14,7 @@ ulid = { version = "1.1", features = ["uuid"] } postgres-from-row = "0.5" prost = "0.12" aws-config = "1.1" -aws-sdk-s3 = "1.12" +aws-sdk-s3 = { version = "1.12", features = ["behavior-version-latest"] } async-trait = "0.1" anyhow = "1.0" async-nats = "0.33" diff --git a/platform/image_processor/src/config.rs b/platform/image_processor/src/config.rs index 0aa48c16..ea68fe51 100644 --- a/platform/image_processor/src/config.rs +++ b/platform/image_processor/src/config.rs @@ -1,4 +1,4 @@ -use common::config::S3BucketConfig; +use common::config::{S3BucketConfig, S3CredentialsConfig}; use ulid::Ulid; #[derive(Debug, Clone, PartialEq, config::Config, serde::Deserialize)] @@ -23,8 +23,24 @@ pub struct ImageProcessorConfig { impl Default for ImageProcessorConfig { fn default() -> Self { Self { - source_bucket: S3BucketConfig::default(), - target_bucket: S3BucketConfig::default(), + source_bucket: S3BucketConfig { + name: "scuffle-image-processor".to_owned(), + endpoint: Some("http://localhost:9000".to_owned()), + region: "us-east-1".to_owned(), + credentials: S3CredentialsConfig { + access_key: Some("root".to_owned()), + secret_key: Some("scuffle123".to_owned()), + }, + }, + target_bucket: S3BucketConfig { + name: "scuffle-image-processor-public".to_owned(), + endpoint: Some("http://localhost:9000".to_owned()), + region: "us-east-1".to_owned(), + credentials: S3CredentialsConfig { + access_key: Some("root".to_owned()), + secret_key: Some("scuffle123".to_owned()), + }, + }, concurrency: num_cpus::get(), instance_id: Ulid::new(), allow_http: true, diff --git a/platform/image_processor/src/global.rs b/platform/image_processor/src/global.rs index 47c5d786..430ab049 100644 --- a/platform/image_processor/src/global.rs +++ b/platform/image_processor/src/global.rs @@ -5,6 +5,7 @@ use crate::config::ImageProcessorConfig; pub trait ImageProcessorState { fn s3_source_bucket(&self) -> &Bucket; fn s3_target_bucket(&self) -> &Bucket; + fn http_client(&self) -> &reqwest::Client; } pub trait ImageProcessorGlobal: diff --git a/platform/image_processor/src/main.rs b/platform/image_processor/src/main.rs index ad191f85..980d12a7 100644 --- a/platform/image_processor/src/main.rs +++ b/platform/image_processor/src/main.rs @@ -29,6 +29,7 @@ struct GlobalState { jetstream: async_nats::jetstream::Context, s3_source_bucket: common::s3::Bucket, s3_target_bucket: common::s3::Bucket, + http_client: reqwest::Client, } impl_global_traits!(GlobalState); @@ -50,6 +51,11 @@ impl platform_image_processor::global::ImageProcessorState for GlobalState { fn s3_target_bucket(&self) -> &common::s3::Bucket { &self.s3_target_bucket } + + #[inline(always)] + fn http_client(&self) -> &reqwest::Client { + &self.http_client + } } impl binary_helper::Global for GlobalState { @@ -60,6 +66,10 @@ impl binary_helper::Global for GlobalState { let (nats, jetstream) = setup_nats(&config.name, &config.nats).await?; + let http_client = reqwest::Client::builder() + .user_agent(concat!("scuffle-image-processor/", env!("CARGO_PKG_VERSION"))) + .build()?; + Ok(Self { ctx, db, @@ -68,40 +78,53 @@ impl binary_helper::Global for GlobalState { config, s3_source_bucket, s3_target_bucket, + http_client, }) } } -#[tokio::main] -pub async fn main() { - if let Err(err) = bootstrap::(|global| async move { - let grpc_future = { - let mut server = grpc_server(&global.config.grpc) - .await - .context("failed to create grpc server")?; - let router = server.add_service(grpc_health::HealthServer::new(&global, |global, _| async move { - !global.db().is_closed() && global.nats().connection_state() == async_nats::connection::State::Connected - })); - - let router = platform_image_processor::grpc::add_routes(&global, router); - - router.serve_with_shutdown(global.config.grpc.bind_address, async { - global.ctx().done().await; +pub fn main() { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .max_blocking_threads( + std::env::var("TOKIO_MAX_BLOCKING_THREADS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(2048), + ) + .build() + .expect("failed to create tokio runtime") + .block_on(async { + if let Err(err) = bootstrap::(|global| async move { + let grpc_future = { + let mut server = grpc_server(&global.config.grpc) + .await + .context("failed to create grpc server")?; + let router = server.add_service(grpc_health::HealthServer::new(&global, |global, _| async move { + !global.db().is_closed() + && global.nats().connection_state() == async_nats::connection::State::Connected + })); + + let router = platform_image_processor::grpc::add_routes(&global, router); + + router.serve_with_shutdown(global.config.grpc.bind_address, async { + global.ctx().done().await; + }) + }; + + let processor_future = platform_image_processor::processor::run(global.clone()); + + select! { + r = grpc_future => r.context("grpc server stopped unexpectedly")?, + r = processor_future => r.context("processor stopped unexpectedly")?, + } + + Ok(()) }) - }; - - let processor_future = platform_image_processor::processor::run(global.clone()); - - select! { - r = grpc_future => r.context("grpc server stopped unexpectedly")?, - r = processor_future => r.context("processor stopped unexpectedly")?, - } - - Ok(()) - }) - .await - { - tracing::error!("{:#}", err); - std::process::exit(1); - } + .await + { + tracing::error!("{:#}", err); + std::process::exit(1); + } + }) } diff --git a/platform/image_processor/src/processor/error.rs b/platform/image_processor/src/processor/error.rs index dfca20c5..322848b1 100644 --- a/platform/image_processor/src/processor/error.rs +++ b/platform/image_processor/src/processor/error.rs @@ -33,25 +33,13 @@ pub enum ProcessorError { #[error("invalid job state")] InvalidJobState, - #[error("directory create: {0}")] - DirectoryCreate(std::io::Error), - - #[error("file read: {0}")] - FileRead(std::io::Error), - - #[error("working directory change: {0}")] - WorkingDirectoryChange(std::io::Error), - - #[error("file create: {0}")] - FileCreate(std::io::Error), - #[error("download source from s3: {0}")] S3Download(aws_sdk_s3::error::SdkError), #[error("download source from s3: {0}")] S3DownloadStream(aws_sdk_s3::primitives::ByteStreamError), - #[error("upload target to s3: {0}")] + #[error("upload target to s3: {0:?}")] S3Upload(aws_sdk_s3::error::SdkError), #[error("publish to nats: {0}")] @@ -96,6 +84,9 @@ pub enum ProcessorError { #[error("http download disabled")] HttpDownloadDisabled, + #[error("download timeout")] + DownloadTimeout, + #[error("http download: {0}")] HttpDownload(#[from] reqwest::Error), } @@ -105,9 +96,6 @@ impl ProcessorError { let msg = match self { ProcessorError::LostJob => Some("The job was lost"), ProcessorError::InvalidJobState => Some("The job is in an invalid state"), - ProcessorError::DirectoryCreate(_) => Some("Failed to create directory"), - ProcessorError::FileRead(_) => Some("Failed to read file"), - ProcessorError::FileCreate(_) => Some("Failed to create file"), ProcessorError::S3Download(_) => Some("Failed to download file"), ProcessorError::S3Upload(_) => Some("Failed to upload file"), ProcessorError::FileFormat(_) => Some("Failed to read file format"), diff --git a/platform/image_processor/src/processor/job/encoder/png.rs b/platform/image_processor/src/processor/job/encoder/png.rs index 666aba18..4dc8b424 100644 --- a/platform/image_processor/src/processor/job/encoder/png.rs +++ b/platform/image_processor/src/processor/job/encoder/png.rs @@ -47,6 +47,11 @@ impl Encoder for PngEncoder { let mut encoder = png::Encoder::new(&mut result, frame.image.width() as u32, frame.image.height() as u32); + assert!( + frame.image.buf().as_bytes().len() == frame.image.width() * frame.image.height() * 4, + "image buffer size mismatch" + ); + encoder.set_color(png::ColorType::Rgba); encoder.set_depth(png::BitDepth::Eight); encoder diff --git a/platform/image_processor/src/processor/job/mod.rs b/platform/image_processor/src/processor/job/mod.rs index 575a144a..7f3803fb 100644 --- a/platform/image_processor/src/processor/job/mod.rs +++ b/platform/image_processor/src/processor/job/mod.rs @@ -1,15 +1,16 @@ use std::borrow::Cow; use std::sync::Arc; +use std::time::Duration; use aws_sdk_s3::types::ObjectCannedAcl; use bytes::Bytes; +use common::prelude::FutureTimeout; use common::s3::PutObjectOptions; use common::task::AsyncTask; use file_format::FileFormat; use futures::FutureExt; use prost::Message; use tokio::select; -use tokio::sync::SemaphorePermit; use tracing::Instrument; use self::decoder::DecoderBackend; @@ -33,9 +34,12 @@ pub(crate) struct Job<'a, G: ImageProcessorGlobal> { pub(crate) job: database::Job, } -#[tracing::instrument(skip(global, _ticket, job), fields(job_id = %job.id.0), level = "info")] -pub async fn handle_job(global: &Arc, _ticket: SemaphorePermit<'_>, job: database::Job) { +#[tracing::instrument(skip(global, job), fields(job_id = %job.id), level = "info")] +pub async fn handle_job(global: &Arc, job: database::Job) { let job = Job { global, job }; + + tracing::info!("processing job"); + if let Err(err) = job.process().in_current_span().await { tracing::error!(err = %err, "job failed"); } @@ -50,7 +54,11 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { tracing::debug!("downloading {}", self.job.task.input_path); - Ok(reqwest::get(&self.job.task.input_path) + Ok(self + .global + .http_client() + .get(&self.job.task.input_path) + .send() .await .map_err(ProcessorError::HttpDownload)? .error_for_status() @@ -150,9 +158,9 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { let input_data = { let mut tries = 0; loop { - match self.download_source().await { - Ok(data) => break data, - Err(e) => { + match self.download_source().timeout(Duration::from_secs(5)).await { + Ok(Ok(data)) => break data, + Ok(Err(e)) => { if tries >= 60 { return Err(e); } @@ -161,14 +169,20 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { tracing::debug!(err = %e, "failed to download source, retrying"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } + Err(_) => { + if tries >= 60 { + return Err(ProcessorError::DownloadTimeout); + } + + tries += 1; + tracing::debug!("download timed out, retrying"); + } } } }; let backend = DecoderBackend::from_format(FileFormat::from_bytes(&input_data))?; - let url_prefix = format!("{}/{}", self.job.task.output_prefix.trim_end_matches('/'), self.job.id); - let job_c = self.job.clone(); tracing::debug!("processing job"); @@ -184,8 +198,8 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { })??; for image in images.images.iter() { + let url = image.url(&self.job.task.output_prefix); // image upload - let url = image.url(&url_prefix); tracing::debug!("uploading result to {}/{}", self.global.config().target_bucket.name, url); self.global .s3_target_bucket() @@ -214,13 +228,13 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { variants: images .images .iter() - .map(|i| pb::scuffle::platform::internal::types::ProcessedImageVariant { - path: i.url(&url_prefix), - format: i.request.1.into(), - width: i.width as u32, - height: i.height as u32, - byte_size: i.data.len() as u32, - scale: i.request.0 as u32, + .map(|image| pb::scuffle::platform::internal::types::ProcessedImageVariant { + path: image.url(&self.job.task.output_prefix), + format: image.request.1.into(), + width: image.width as u32, + height: image.height as u32, + byte_size: image.data.len() as u32, + scale: image.request.0 as u32, }) .collect(), }, diff --git a/platform/image_processor/src/processor/job/resize.rs b/platform/image_processor/src/processor/job/resize.rs index 667bf520..57f4413e 100644 --- a/platform/image_processor/src/processor/job/resize.rs +++ b/platform/image_processor/src/processor/job/resize.rs @@ -2,6 +2,7 @@ use anyhow::Context; use fast_image_resize as fr; use imgref::Img; use pb::scuffle::platform::internal::image_processor::task::{ResizeAlgorithm, ResizeMethod}; +use rgb::{ComponentBytes, RGBA}; use super::frame::Frame; use crate::processor::error::{ProcessorError, Result}; @@ -175,7 +176,18 @@ impl ImageResizer { let height = dst_image.height().get() as usize; let buffer = dst_image.into_vec(); - let buffer = unsafe { std::mem::transmute::, Vec>>(buffer) }; + let buffer = unsafe { + let buf = buffer.into_boxed_slice(); + let size = buf.len(); + let ptr = Box::into_raw(buf) as *mut u8; + + let new_size = size / 4; + assert!(new_size * 4 == size, "image buffer size mismatch"); + + Vec::from_raw_parts(ptr as *mut RGBA, new_size, new_size) + }; + + assert_eq!(buffer.as_bytes().len(), width * height * 4, "image buffer size mismatch"); Ok(Frame { image: Img::new(buffer, width, height), diff --git a/platform/image_processor/src/processor/mod.rs b/platform/image_processor/src/processor/mod.rs index 655906ef..24d8bf23 100644 --- a/platform/image_processor/src/processor/mod.rs +++ b/platform/image_processor/src/processor/mod.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use futures::StreamExt; @@ -6,7 +7,6 @@ use tokio::select; use self::error::Result; use crate::config::ImageProcessorConfig; use crate::global::ImageProcessorGlobal; -use crate::processor::error::ProcessorError; use crate::processor::job::handle_job; pub(crate) mod error; @@ -16,31 +16,60 @@ pub(crate) mod utils; pub async fn run(global: Arc) -> Result<()> { let config = global.config::(); - let semaphore = tokio::sync::Semaphore::new(config.concurrency); + let concurrency = AtomicUsize::new(config.concurrency); let mut done = global.ctx().done(); let mut futures = futures::stream::FuturesUnordered::new(); + let make_job_query = { + let global = &global; + let concurrency = &concurrency; + move |wait: bool| async move { + if wait { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + + let concurrency = concurrency.load(std::sync::atomic::Ordering::Relaxed); + + if concurrency == 0 { + tracing::debug!("concurrency limit reached, waiting for a slot"); + None + } else { + tracing::debug!("querying for jobs: {concurrency}"); + Some(utils::query_job(global, concurrency).await) + } + } + }; + + let mut job_query = Some(Box::pin(make_job_query(false))); + loop { select! { - ticket_job = async { - let ticket = semaphore.acquire().await?; - - if let Some(job) = utils::query_job(&global).await? { - Ok::<_, ProcessorError>(Some((ticket, job))) + Some(jobs) = async { + if let Some(job_query_fut) = &mut job_query { + let r = job_query_fut.await; + job_query = None; + r } else { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - Ok::<_, ProcessorError>(None) + None } - } => { - let Some((ticket, job)) = ticket_job? else { - continue; - }; + } => { + let jobs = jobs?; + tracing::debug!("got {} jobs", jobs.len()); + job_query = Some(Box::pin(make_job_query(jobs.is_empty()))); - futures.push(handle_job(&global, ticket, job)); + for job in jobs { + concurrency.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + futures.push(handle_job(&global, job)); + } + }, + Some(_) = futures.next() => { + concurrency.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if job_query.is_none() { + job_query = Some(Box::pin(make_job_query(true))); + } }, - Some(_) = futures.next() => {}, _ = &mut done => break, } } diff --git a/platform/image_processor/src/processor/utils.rs b/platform/image_processor/src/processor/utils.rs index 9d187e09..4648b458 100644 --- a/platform/image_processor/src/processor/utils.rs +++ b/platform/image_processor/src/processor/utils.rs @@ -7,7 +7,7 @@ use crate::database::Job; use crate::global::ImageProcessorGlobal; use crate::processor::error::Result; -pub async fn query_job(global: &Arc) -> Result> { +pub async fn query_job(global: &Arc, limit: usize) -> Result> { Ok(common::database::query( "UPDATE image_jobs SET claimed_by = $1, @@ -18,14 +18,15 @@ pub async fn query_job(global: &Arc) -> Result