From d58e335f830024f6cce1e3abe2faa410f437b18a Mon Sep 17 00:00:00 2001 From: Troy Benson Date: Sun, 14 Jan 2024 18:05:05 +0000 Subject: [PATCH 1/6] feat: categories --- Cargo.lock | 1 + platform/api/Cargo.toml | 1 + .../api/src/api/v1/gql/models/category.rs | 2 +- .../api/src/api/v1/gql/models/image_upload.rs | 4 +- .../api/src/api/v1/gql/subscription/file.rs | 3 +- .../api/src/api/v1/upload/profile_picture.rs | 6 +- platform/api/src/config.rs | 40 ++ platform/api/src/database/category.rs | 12 +- platform/api/src/database/file_type.rs | 4 + platform/api/src/database/mod.rs | 2 + platform/api/src/database/uploaded_file.rs | 8 +- .../api/src/database/uploaded_file_status.rs | 13 + platform/api/src/global.rs | 6 +- platform/api/src/igdb_cron.rs | 584 ++++++++++++++++++ platform/api/src/image_upload_callback.rs | 10 +- platform/api/src/lib.rs | 1 + platform/api/src/main.rs | 34 +- .../migrations/20230825170300_init.down.sql | 6 +- .../migrations/20230825170300_init.up.sql | 45 +- 19 files changed, 755 insertions(+), 27 deletions(-) create mode 100644 platform/api/src/database/uploaded_file_status.rs create mode 100644 platform/api/src/igdb_cron.rs diff --git a/Cargo.lock b/Cargo.lock index 0a4e214d..0ecb2f7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3668,6 +3668,7 @@ dependencies = [ "chrono", "common", "config", + "fred", "futures", "futures-util", "hmac", diff --git a/platform/api/Cargo.toml b/platform/api/Cargo.toml index a48b2766..cb96a928 100644 --- a/platform/api/Cargo.toml +++ b/platform/api/Cargo.toml @@ -51,6 +51,7 @@ pin-project = "1.1" base64 = "0.21" postgres-from-row = "0.5" postgres-types = "0.2" +fred = { version = "8.0", features = ["enable-rustls", "sentinel-client", "dns"] } config = { workspace = true } pb = { workspace = true } diff --git a/platform/api/src/api/v1/gql/models/category.rs b/platform/api/src/api/v1/gql/models/category.rs index 1dabc809..3a8fb277 100644 --- a/platform/api/src/api/v1/gql/models/category.rs +++ b/platform/api/src/api/v1/gql/models/category.rs @@ -17,7 +17,7 @@ impl From for Category { Self { id: value.id.into(), name: value.name, - revision: value.revision, + revision: 1, updated_at: value.updated_at.into(), } } diff --git a/platform/api/src/api/v1/gql/models/image_upload.rs b/platform/api/src/api/v1/gql/models/image_upload.rs index 6023f197..0d0c61cf 100644 --- a/platform/api/src/api/v1/gql/models/image_upload.rs +++ b/platform/api/src/api/v1/gql/models/image_upload.rs @@ -6,7 +6,7 @@ use super::ulid::GqlUlid; use crate::api::v1::gql::error::{GqlError, Result}; use crate::api::v1::gql::ext::ContextExt; use crate::config::ImageUploaderConfig; -use crate::database::UploadedFile; +use crate::database::{UploadedFile, UploadedFileStatus}; use crate::global::ApiGlobal; #[derive(SimpleObject, Clone)] @@ -51,7 +51,7 @@ impl ImageUpload { impl ImageUpload { pub fn from_uploaded_file(uploaded_file: UploadedFile) -> Result> { - if uploaded_file.pending { + if uploaded_file.status != UploadedFileStatus::Completed { return Ok(None); } diff --git a/platform/api/src/api/v1/gql/subscription/file.rs b/platform/api/src/api/v1/gql/subscription/file.rs index 38b4fa2f..67bfd5a9 100644 --- a/platform/api/src/api/v1/gql/subscription/file.rs +++ b/platform/api/src/api/v1/gql/subscription/file.rs @@ -7,6 +7,7 @@ use crate::api::v1::gql::error::ext::{OptionExt, ResultExt}; use crate::api::v1::gql::error::{GqlError, Result}; use crate::api::v1::gql::ext::ContextExt; use crate::api::v1::gql::models::ulid::GqlUlid; +use crate::database::UploadedFileStatus; use crate::global::ApiGlobal; use crate::subscription::SubscriptionTopic; @@ -63,7 +64,7 @@ impl FileSubscription { .map_err_gql("failed to subscribe to file status")?; Ok(async_stream::stream!({ - if !file.pending { + if file.status != UploadedFileStatus::Queued { // When file isn't pending anymore, just yield once with the status from the db let status = if file.failed.is_some() { FileStatus::Failure diff --git a/platform/api/src/api/v1/upload/profile_picture.rs b/platform/api/src/api/v1/upload/profile_picture.rs index cca332a8..b4ce7131 100644 --- a/platform/api/src/api/v1/upload/profile_picture.rs +++ b/platform/api/src/api/v1/upload/profile_picture.rs @@ -17,7 +17,7 @@ use crate::api::auth::AuthData; use crate::api::error::ApiError; use crate::api::Body; use crate::config::{ApiConfig, ImageUploaderConfig}; -use crate::database::{FileType, RolePermission}; +use crate::database::{FileType, RolePermission, UploadedFileStatus}; use crate::global::ApiGlobal; fn create_task(file_id: Ulid, input_path: &str, config: &ImageUploaderConfig, owner_id: Ulid) -> image_processor::Task { @@ -197,7 +197,7 @@ impl UploadType for ProfilePicture { .await .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to insert image job"))?; - common::database::query("INSERT INTO uploaded_files(id, owner_id, uploader_id, name, type, metadata, total_size, pending, path) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)") + common::database::query("INSERT INTO uploaded_files(id, owner_id, uploader_id, name, type, metadata, total_size, path, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)") .bind(file_id) // id .bind(auth.session.user_id) // owner_id .bind(auth.session.user_id) // uploader_id @@ -209,8 +209,8 @@ impl UploadType for ProfilePicture { })), })) // metadata .bind(file.len() as i64) // total_size - .bind(true) // pending .bind(&input_path) // path + .bind(UploadedFileStatus::Queued) // status .build() .execute(&tx) .await diff --git a/platform/api/src/config.rs b/platform/api/src/config.rs index c470e3b7..4b5b40fe 100644 --- a/platform/api/src/config.rs +++ b/platform/api/src/config.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; use std::path::PathBuf; +use std::time::Duration; use common::config::{S3BucketConfig, TlsConfig}; @@ -78,6 +79,12 @@ pub struct ImageUploaderConfig { /// Public endpoint for downloads pub public_endpoint: String, + + /// Igdb image callback subject + pub igdb_image_callback_subject: String, + + /// Igdb image task priority, higher number means higher priority + pub igdb_image_task_priority: i32, } impl Default for ImageUploaderConfig { @@ -87,6 +94,8 @@ impl Default for ImageUploaderConfig { profile_picture_callback_subject: "scuffle-platform-image_processor-profile_picture".to_string(), profile_picture_task_priority: 2, public_endpoint: "https://images.scuffle.tv/scuffle-image-processor-public".to_string(), + igdb_image_callback_subject: "image_processor.igdb_image".to_string(), + igdb_image_task_priority: 1, } } } @@ -138,3 +147,34 @@ pub struct VideoApiPlaybackKeypairConfig { /// Path to the playback private key for the video api pub private_key: PathBuf, } + +#[derive(Debug, Clone, PartialEq, config::Config, serde::Deserialize)] +#[serde(default)] +pub struct IgDbConfig { + /// The IGDB Client ID + pub client_id: String, + + /// The IGDB Client Secret + pub client_secret: String, + + /// Process Images + pub process_images: bool, + + /// Refresh Interval + pub refresh_interval: Duration, + + /// Igdb Cron Subject + pub igdb_cron_subject: String, +} + +impl Default for IgDbConfig { + fn default() -> Self { + Self { + client_id: "igdb_client_id".to_string(), + client_secret: "igdb_client_secret".to_string(), + process_images: false, + refresh_interval: Duration::from_secs(6 * 60 * 60), // 6 hours + igdb_cron_subject: "scuffle-platform-igdb_cron".to_string(), + } + } +} diff --git a/platform/api/src/database/category.rs b/platform/api/src/database/category.rs index a2fdc1d0..c4205964 100644 --- a/platform/api/src/database/category.rs +++ b/platform/api/src/database/category.rs @@ -4,7 +4,17 @@ use ulid::Ulid; #[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct Category { pub id: Ulid, + pub igdb_id: Option, pub name: String, - pub revision: i32, + pub aliases: Vec, + pub keywords: Vec, + pub storyline: Option, + pub summary: Option, + pub over_18: bool, + pub cover_id: Option, + pub rating: f64, + pub artwork_ids: Vec, + pub igdb_similar_game_ids: Vec, + pub websites: Vec, pub updated_at: DateTime, } diff --git a/platform/api/src/database/file_type.rs b/platform/api/src/database/file_type.rs index fc90e04b..17358abb 100644 --- a/platform/api/src/database/file_type.rs +++ b/platform/api/src/database/file_type.rs @@ -13,4 +13,8 @@ pub enum FileType { RoleBadge, #[postgres(name = "channel_role_badge")] ChannelRoleBadge, + #[postgres(name = "category_cover")] + CategoryCover, + #[postgres(name = "category_artwork")] + CategoryArtwork, } diff --git a/platform/api/src/database/mod.rs b/platform/api/src/database/mod.rs index e359445e..511032a3 100644 --- a/platform/api/src/database/mod.rs +++ b/platform/api/src/database/mod.rs @@ -8,6 +8,7 @@ mod search_result; mod session; mod two_fa_request; mod uploaded_file; +mod uploaded_file_status; mod user; pub use category::*; @@ -20,4 +21,5 @@ pub use search_result::*; pub use session::*; pub use two_fa_request::*; pub use uploaded_file::*; +pub use uploaded_file_status::*; pub use user::*; diff --git a/platform/api/src/database/uploaded_file.rs b/platform/api/src/database/uploaded_file.rs index 6534c448..eaf1a90f 100644 --- a/platform/api/src/database/uploaded_file.rs +++ b/platform/api/src/database/uploaded_file.rs @@ -1,20 +1,20 @@ use common::database::protobuf; use ulid::Ulid; -use super::FileType; +use super::{FileType, UploadedFileStatus}; #[derive(Debug, Clone, postgres_from_row::FromRow)] pub struct UploadedFile { pub id: Ulid, - pub owner_id: Ulid, - pub uploader_id: Ulid, + pub owner_id: Option, + pub uploader_id: Option, pub name: String, #[from_row(rename = "type")] pub ty: FileType, #[from_row(from_fn = "protobuf")] pub metadata: pb::scuffle::platform::internal::types::UploadedFileMetadata, pub total_size: i64, - pub pending: bool, + pub status: UploadedFileStatus, pub path: String, pub updated_at: chrono::DateTime, pub failed: Option, diff --git a/platform/api/src/database/uploaded_file_status.rs b/platform/api/src/database/uploaded_file_status.rs new file mode 100644 index 00000000..f5df3f3d --- /dev/null +++ b/platform/api/src/database/uploaded_file_status.rs @@ -0,0 +1,13 @@ +use postgres_types::{ToSql, FromSql}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ToSql, FromSql)] +pub enum UploadedFileStatus { + #[postgres(name = "unqueued")] + Unqueued, + #[postgres(name = "queued")] + Queued, + #[postgres(name = "failed")] + Failed, + #[postgres(name = "completed")] + Completed, +} diff --git a/platform/api/src/global.rs b/platform/api/src/global.rs index c8f9a24c..f8a7c5fe 100644 --- a/platform/api/src/global.rs +++ b/platform/api/src/global.rs @@ -1,6 +1,6 @@ use common::dataloader::DataLoader; -use crate::config::{ApiConfig, ImageUploaderConfig, JwtConfig, TurnstileConfig, VideoApiConfig}; +use crate::config::{ApiConfig, IgDbConfig, ImageUploaderConfig, JwtConfig, TurnstileConfig, VideoApiConfig}; use crate::dataloader::category::CategoryByIdLoader; use crate::dataloader::global_state::GlobalStateLoader; use crate::dataloader::role::RoleByIdLoader; @@ -39,8 +39,10 @@ pub trait ApiGlobal: + common::global::GlobalConfigProvider + common::global::GlobalConfigProvider + common::global::GlobalConfigProvider + + common::global::GlobalConfigProvider + common::global::GlobalNats + common::global::GlobalDb + + common::global::GlobalRedis + common::global::GlobalConfig + ApiState + Send @@ -56,8 +58,10 @@ impl ApiGlobal for T where + common::global::GlobalConfigProvider + common::global::GlobalConfigProvider + common::global::GlobalConfigProvider + + common::global::GlobalConfigProvider + common::global::GlobalNats + common::global::GlobalDb + + common::global::GlobalRedis + common::global::GlobalConfig + ApiState + Send diff --git a/platform/api/src/igdb_cron.rs b/platform/api/src/igdb_cron.rs new file mode 100644 index 00000000..b98c3a10 --- /dev/null +++ b/platform/api/src/igdb_cron.rs @@ -0,0 +1,584 @@ +use std::collections::HashMap; +use std::pin::pin; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use async_nats::jetstream::consumer::pull::Config; +use async_nats::jetstream::consumer::Consumer; +use async_nats::jetstream::AckKind; +use bytes::Bytes; +use fred::interfaces::KeysInterface; +use futures_util::StreamExt; +use pb::scuffle::platform::internal::image_processor; +use pb::scuffle::platform::internal::types::{uploaded_file_metadata, ImageFormat, UploadedFileMetadata}; +use postgres_from_row::FromRow; +use tokio::select; +use ulid::Ulid; + +use crate::config::{IgDbConfig, ImageUploaderConfig}; +use crate::database::{Category, FileType, UploadedFile, UploadedFileStatus}; +use crate::global::ApiGlobal; + +pub async fn run(global: Arc) -> anyhow::Result<()> { + let config = global.config::(); + let stream = global + .jetstream() + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: config.igdb_cron_subject.clone(), + subjects: vec![config.igdb_cron_subject.clone()], + max_messages: 1, + discard: async_nats::jetstream::stream::DiscardPolicy::New, + retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue, + ..Default::default() + }) + .await + .context("create stream")?; + + let consumer = stream + .get_or_create_consumer( + "igdb-cron", + async_nats::jetstream::consumer::pull::Config { + name: Some("igdb-cron".to_string()), + ..Default::default() + }, + ) + .await + .context("create consumer")?; + + select! { + e = cron(&global, config) => e.context("cron")?, + e = process(&global, consumer, config) => e.context("process")?, + } + Ok(()) +} + +async fn cron(global: &Arc, config: &IgDbConfig) -> anyhow::Result<()> { + let mut timer = tokio::time::interval(Duration::from_secs(60)); + loop { + global + .nats() + .publish(config.igdb_cron_subject.clone(), Bytes::new()) + .await + .context("publish")?; + timer.tick().await; + } +} + +async fn process(global: &Arc, consumer: Consumer, config: &IgDbConfig) -> anyhow::Result<()> { + let mut messages = consumer.messages().await.context("messages")?; + + let duration = chrono::Duration::from_std(config.refresh_interval).context("duration")?; + + 'outer: while let Some(message) = messages.next().await { + let message = message.context("message")?; + + let info = message.info().map_err(|e| anyhow::anyhow!("info: {e}"))?; + + let time = chrono::DateTime::::from(std::time::SystemTime::from(info.published)); + let first_run = global + .redis() + .get::, _>("igdb_last_refresh") + .await + .context("redis get")? + .is_none(); + + if chrono::Utc::now() - time < duration && !first_run { + tracing::info!("Skipping IGDB refresh"); + message + .ack_with(AckKind::Nak(Some(Duration::from_secs(300)))) + .await + .map_err(|e| anyhow::anyhow!("ack: {}", e))?; + continue; + } + + // Refresh IGDB + tracing::info!("Refreshing IGDB"); + + let refresh_igdb = refresh_igdb(global, config); + let mut refresh_igdb = pin!(refresh_igdb); + + loop { + select! { + e = &mut refresh_igdb => { + if let Err(e) = e { + tracing::error!("igdb: {:#}", e); + message.ack_with(AckKind::Nak(Some(Duration::from_secs(300)))).await.map_err(|e| anyhow::anyhow!("ack: {e}"))?; + continue 'outer; + } + + break; + } + _ = tokio::time::sleep(Duration::from_secs(15)) => { + tracing::debug!("igdb: refresh ack hold"); + message.ack_with(AckKind::Progress).await.map_err(|e| anyhow::anyhow!("ack: {e}"))?; + } + } + } + + global + .redis() + .set("igdb_last_refresh", chrono::Utc::now().to_rfc3339(), None, None, false) + .await + .context("redis set")?; + message + .ack_with(AckKind::Ack) + .await + .map_err(|e| anyhow::anyhow!("ack: {e}"))?; + } + + Ok(()) +} + +#[derive(serde::Deserialize)] +struct CountResponse { + count: usize, +} + +#[derive(Default, serde::Deserialize)] +#[serde(default)] +struct Game { + id: i32, + age_ratings: Vec, + alternative_names: Vec, + artworks: Vec, + cover: Option, + genres: Vec, + keywords: Vec, + name: String, + rating: f64, + similar_games: Vec, + storyline: Option, + summary: Option, + themes: Vec, + websites: Vec, +} + +#[derive(Default, serde::Deserialize)] +#[serde(default)] +struct AgeRating { + id: i32, + category: i32, + rating: i32, +} + +#[derive(Default, serde::Deserialize)] +#[serde(default)] +struct IdName { + id: i32, + name: String, +} + +#[derive(Default, serde::Deserialize)] +#[serde(default)] +struct Image { + id: i32, + alpha_channel: bool, + animated: bool, + image_id: String, + height: i32, + width: i32, +} + +#[derive(Default, serde::Deserialize)] +#[serde(default)] +struct Website { + id: i32, + category: i32, + url: String, +} + +async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> anyhow::Result<()> { + let access_token = global + .redis() + .get::, _>("igdb_access_token") + .await + .context("redis get")?; + let access_token = if let Some(access_token) = access_token { + access_token + } else { + let (access_token, ttl) = get_access_token(config).await.context("get access token")?; + global + .redis() + .set( + "igdb_access_token", + &access_token, + Some(fred::types::Expiration::EX((ttl / 2).max(1))), + None, + false, + ) + .await + .context("redis set")?; + access_token + }; + + let client = reqwest::ClientBuilder::new() + .user_agent("scuffle/0.1.0") + .default_headers({ + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + reqwest::header::AUTHORIZATION, + format!("Bearer {}", access_token).parse().unwrap(), + ); + headers.insert(reqwest::header::ACCEPT, "application/json".parse().unwrap()); + headers.insert("Client-ID", config.client_id.parse().unwrap()); + headers + }) + .build() + .context("build client")?; + + // The API has a ratelimit of 4 requests per second + // Lets start by counting the number of games there are. + // Then we can divide that by 4 to get the number of seconds we need to wait + + let resp = client + .post("https://api.igdb.com/v4/games/count") + .body("where category = 0;") // Category 0 is for games (not dlc, etc) + .send() + .await + .context("count request")?; + + resp.error_for_status_ref().context("count response")?; + + let resp = resp.json::().await.context("count json")?; + + let total = resp.count; + let aprox_seconds = (total as f64 / 500.0 * 1000.0 / 400.0).round() as i64; + + tracing::info!("IGDB has {total} games, a refresh will take aproximately {aprox_seconds} seconds"); + + let mut timer = tokio::time::interval(Duration::from_millis(250)); + timer.tick().await; + + let mut offset = 0; + + loop { + let resp = client.post("https://api.igdb.com/v4/games") + .body(format!("fields name, genres.name, alternative_names.name, summary, storyline, id, age_ratings.*, artworks.*, keywords.name, rating, similar_games, url, themes.name, websites.*, cover.*; where category = 0; offset {offset}; limit 500;")) + .send() + .await + .context("games request")?; + + resp.error_for_status_ref().context("games response")?; + + let resp = resp.json::>().await.context("games json")?; + + if resp.is_empty() { + tracing::info!("igdb: done"); + break; + } + + struct InsertItem { + id: Ulid, + igdb_id: i32, + name: String, + alternative_names: Vec, + keywords: Vec, + storyline: Option, + summary: Option, + over_18: bool, + cover_id: Option, + rating: f64, + updated_at: chrono::DateTime, + artwork_ids: Vec, + igdb_similar_game_ids: Vec, + websites: Vec, + } + + let mut client = global.db().get().await.context("get db connection")?; + let tx = client.transaction().await.context("start transaction")?; + + let image_ids = resp + .iter() + .flat_map(|item| { + item.artworks + .iter() + .chain(item.cover.as_ref()) + .map(|x| (Ulid::new(), x.image_id.as_str())) + }) + .collect::>(); + + #[derive(FromRow)] + struct ImageId { + image_id: String, + uploaded_file_id: Ulid, + } + + common::database::query("INSERT INTO igdb_image (uploaded_file_id, image_id)") + .push_values(&image_ids, |mut sep, item| { + sep.push_bind(item.0); + sep.push_bind(item.1); + }) + .push("ON CONFLICT (image_id) DO NOTHING;") + .build() + .execute(&tx) + .await + .context("insert igdb_image")?; + + let image_ids = common::database::query("SELECT image_id, uploaded_file_id FROM igdb_image WHERE image_id = ANY($1)") + .bind(image_ids.iter().map(|x| x.1).collect::>()) + .build_query_as::() + .fetch_all(&tx) + .await + .context("select igdb_image")?; + + let image_ids = image_ids + .into_iter() + .map(|row| (row.image_id, row.uploaded_file_id)) + .collect::>(); + + let uploaded_files = resp + .iter() + .flat_map(|item| { + item.cover + .as_ref() + .map(|cover| UploadedFile { + id: image_ids[&cover.image_id], + failed: None, + name: format!("igdb/cover/{}.jpg", cover.image_id), + owner_id: None, + path: format!("https://images.igdb.com/igdb/image/upload/t_cover_big/{}.jpg", cover.image_id), + status: UploadedFileStatus::Unqueued, + metadata: UploadedFileMetadata { + metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { + versions: Vec::new(), + })), + }, + total_size: 0, + ty: FileType::CategoryCover, + updated_at: chrono::Utc::now(), + uploader_id: None, + }) + .into_iter() + .chain(item.artworks.iter().map(|artwork| UploadedFile { + id: image_ids[&artwork.image_id], + failed: None, + name: format!("igdb/artwork/{}.jpg", artwork.image_id), + owner_id: None, + path: format!("https://images.igdb.com/igdb/image/upload/t_1080p/{}.jpg", artwork.image_id), + status: UploadedFileStatus::Unqueued, + metadata: UploadedFileMetadata { + metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { + versions: Vec::new(), + })), + }, + total_size: 0, + ty: FileType::CategoryArtwork, + updated_at: chrono::Utc::now(), + uploader_id: None, + })) + }) + .collect::>(); + + common::database::query("INSERT INTO uploaded_files (id, name, type, metadata, total_size, path, status) ") + .push_values(&uploaded_files, |mut sep, item| { + sep.push_bind(item.id); + sep.push_bind(&item.name); + sep.push_bind(item.ty); + sep.push_bind(common::database::Protobuf(item.metadata.clone())); + sep.push_bind(item.total_size); + sep.push_bind(&item.path); + sep.push_bind(item.status); + }) + .push("ON CONFLICT (id) DO NOTHING;") + .build() + .execute(&tx) + .await + .context("insert uploaded_files")?; + + let uploaded_files = common::database::query("SELECT * FROM uploaded_files WHERE id = ANY($1)") + .bind(uploaded_files.iter().map(|x| x.id).collect::>()) + .build_query_as::() + .fetch_all(&tx) + .await + .context("select uploaded_files")?; + + let resp = resp + .into_iter() + .map(|item| InsertItem { + id: Ulid::new(), + igdb_id: item.id, + name: item.name, + alternative_names: item.alternative_names.into_iter().map(|x| x.name).collect::>(), + keywords: item + .keywords + .into_iter() + .chain(item.genres) + .chain(item.themes) + .map(|x| x.name.to_lowercase()) + .collect::>(), + storyline: item.storyline, + summary: item.summary, + over_18: item.age_ratings.into_iter().any(|x| x.category == 2 && x.rating == 5), // PEGI 18 + cover_id: item.cover.map(|x| image_ids[&x.image_id]), + rating: item.rating, + updated_at: chrono::Utc::now(), + artwork_ids: item.artworks.into_iter().map(|x| image_ids[&x.image_id]).collect::>(), + igdb_similar_game_ids: item.similar_games, + websites: item.websites.into_iter().map(|x| x.url).collect::>(), + }) + .collect::>(); + + offset += resp.len(); + let count = resp.len(); + + common::database::query("INSERT INTO categories (id, igdb_id, name, aliases, keywords, storyline, summary, over_18, cover_id, rating, updated_at, artwork_ids, igdb_similar_game_ids, websites) ") + .push_values(&resp, |mut sep, item| { + sep.push_bind(item.id); + sep.push_bind(item.igdb_id); + sep.push_bind(&item.name); + sep.push_bind(&item.alternative_names); + sep.push_bind(&item.keywords); + sep.push_bind(&item.storyline); + sep.push_bind(&item.summary); + sep.push_bind(item.over_18); + sep.push_bind(item.cover_id); + sep.push_bind(item.rating); + sep.push_bind(item.updated_at); + sep.push_bind(&item.artwork_ids); + sep.push_bind(&item.igdb_similar_game_ids); + sep.push_bind(&item.websites); + }) + .push("ON CONFLICT (igdb_id) WHERE igdb_id IS NOT NULL DO UPDATE SET ") + .push("name = EXCLUDED.name, ") + .push("aliases = EXCLUDED.aliases, ") + .push("keywords = EXCLUDED.keywords, ") + .push("storyline = EXCLUDED.storyline, ") + .push("summary = EXCLUDED.summary, ") + .push("rating = EXCLUDED.rating, ") + .push("updated_at = NOW(), ") + .push("igdb_similar_game_ids = EXCLUDED.igdb_similar_game_ids, ") + .push("websites = EXCLUDED.websites, ") + .push("artwork_ids = EXCLUDED.artwork_ids;") + .build().execute(&tx).await.context("insert categories")?; + + let categories = common::database::query("SELECT * FROM categories WHERE igdb_id = ANY($1)") + .bind(resp.iter().map(|x| x.igdb_id).collect::>()) + .build_query_as::() + .fetch_all(&tx) + .await + .context("select categories")?; + + if categories.len() != count { + tracing::warn!("igdb: categories count mismatch {} != {}", categories.len(), count); + } + + + let categories = categories + .into_iter() + .flat_map(|c| { + c.cover_id + .into_iter() + .chain(c.artwork_ids.into_iter()) + .map(move |id| (id, c.id)) + }) + .collect::>(); + + common::database::query("WITH updated(id, category) AS (") + .push_values(categories.iter().collect::>(), |mut sep, item| { + sep.push_bind(item.0); + sep.push_bind(item.1); + }) + .push( + ") UPDATE igdb_image SET category_id = updated.category FROM updated WHERE igdb_image.uploaded_file_id = updated.id;", + ) + .build().execute(&tx).await.context("update igdb_image")?; + + if config.process_images { + let unqueued = uploaded_files + .into_iter() + .filter(|x| x.status == UploadedFileStatus::Unqueued) + .collect::>(); + + if !unqueued.is_empty() { + let image_processor_config = global.config::(); + common::database::query("INSERT INTO image_jobs (id, priority, task) ") + .push_values(unqueued, |mut sep, item| { + sep.push_bind(item.id); + sep.push_bind(image_processor_config.igdb_image_task_priority); + sep.push_bind(common::database::Protobuf(create_task( + categories[&item.id], + item.id, + item.path.clone(), + image_processor_config, + ))); + }) + .push("ON CONFLICT (id) DO NOTHING;") + .build() + .execute(&tx).await.context("insert image_jobs")?; + } + } + + tx.commit().await.context("commit")?; + + tracing::debug!("igdb progress: {}/{}", offset, total); + + if count < 500 { + tracing::info!("igdb: done"); + break; + } + + timer.tick().await; + } + + Ok(()) +} + +async fn get_access_token(config: &IgDbConfig) -> anyhow::Result<(String, i64)> { + let client = reqwest::Client::new(); + let response = client + .post("https://id.twitch.tv/oauth2/token") + .form(&[ + ("client_id", config.client_id.as_ref()), + ("client_secret", config.client_secret.as_ref()), + ("grant_type", "client_credentials"), + ]) + .send() + .await + .context("request")?; + + response.error_for_status_ref().context("response")?; + + #[derive(serde::Deserialize)] + struct Response { + access_token: String, + expires_in: i64, + token_type: String, + } + + let response = response.json::().await.context("json")?; + + if response.token_type != "bearer" { + anyhow::bail!("invalid token type: {}", response.token_type); + } + + Ok((response.access_token, response.expires_in)) +} + +fn create_task( + category_id: ulid::Ulid, + id: ulid::Ulid, + path: String, + config: &ImageUploaderConfig, +) -> image_processor::Task { + image_processor::Task { + callback_subject: config.igdb_image_callback_subject.clone(), + upscale: false, + output_prefix: format!("categories/{category_id}/{id}"), + scales: vec![1], + limits: Some(image_processor::task::Limits { + max_processing_time_ms: 60000, + ..Default::default() + }), + formats: vec![ + ImageFormat::AvifStatic as i32, + ImageFormat::WebpStatic as i32, + ImageFormat::PngStatic as i32, + ], + input_path: path, + resize_method: image_processor::task::ResizeMethod::Fit as i32, + base_height: 1080, + base_width: 1920, + resize_algorithm: image_processor::task::ResizeAlgorithm::Lanczos3 as i32, + } +} diff --git a/platform/api/src/image_upload_callback.rs b/platform/api/src/image_upload_callback.rs index 9fa43de0..0fda3d41 100644 --- a/platform/api/src/image_upload_callback.rs +++ b/platform/api/src/image_upload_callback.rs @@ -73,7 +73,7 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { match job_result { processed_image::Result::Success(processed_image::Success { variants }) => { - let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET pending = FALSE, metadata = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") + let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET status = 'completed', metadata = $1, updated_at = NOW() WHERE id = $2 AND status = 'queued' RETURNING *") .bind(common::database::Protobuf(UploadedFileMetadata { metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { versions: variants, @@ -118,13 +118,15 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { continue; } + let owner_id = uploaded_file.owner_id.ok_or_else(|| anyhow::anyhow!("uploaded file owner id is null"))?; + if user_updated { global .nats() .publish( - SubscriptionTopic::UserProfilePicture(uploaded_file.owner_id), + SubscriptionTopic::UserProfilePicture(owner_id), pb::scuffle::platform::internal::events::UserProfilePicture { - user_id: Some(uploaded_file.owner_id.into()), + user_id: Some(owner_id.into()), profile_picture_id: Some(uploaded_file.id.into()), }.encode_to_vec().into(), ) @@ -133,7 +135,7 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { } }, processed_image::Result::Failure(processed_image::Failure { reason, friendly_message }) => { - let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET pending = FALSE, failed = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") + let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET status = 'failed', failed = $1, updated_at = NOW() WHERE id = $2 AND status = 'queued' RETURNING *") .bind(reason.clone()) .bind(job_id.into_ulid()) .build_query_as() diff --git a/platform/api/src/lib.rs b/platform/api/src/lib.rs index 5e12c69e..4a4ff94a 100644 --- a/platform/api/src/lib.rs +++ b/platform/api/src/lib.rs @@ -4,6 +4,7 @@ pub mod database; pub mod dataloader; pub mod global; pub mod grpc; +pub mod igdb_cron; pub mod image_upload_callback; pub mod subscription; pub mod turnstile; diff --git a/platform/api/src/main.rs b/platform/api/src/main.rs index 4d356a3c..0dd3c28a 100644 --- a/platform/api/src/main.rs +++ b/platform/api/src/main.rs @@ -3,13 +3,14 @@ use std::time::Duration; use anyhow::Context as _; use async_graphql::SDLExportOptions; -use binary_helper::global::{setup_database, setup_nats}; +use binary_helper::global::{setup_database, setup_nats, setup_redis}; use binary_helper::{bootstrap, grpc_health, grpc_server, impl_global_traits}; +use common::config::RedisConfig; use common::context::Context; use common::dataloader::DataLoader; use common::global::*; use common::grpc::TlsSettings; -use platform_api::config::{ApiConfig, ImageUploaderConfig, JwtConfig, TurnstileConfig, VideoApiConfig}; +use platform_api::config::{ApiConfig, IgDbConfig, ImageUploaderConfig, JwtConfig, TurnstileConfig, VideoApiConfig}; use platform_api::dataloader::category::CategoryByIdLoader; use platform_api::dataloader::global_state::GlobalStateLoader; use platform_api::dataloader::role::RoleByIdLoader; @@ -21,7 +22,7 @@ use platform_api::video_api::{ load_playback_keypair_private_key, setup_video_events_client, setup_video_playback_session_client, setup_video_room_client, VideoEventsClient, VideoPlaybackSessionClient, VideoRoomClient, }; -use platform_api::{image_upload_callback, video_event_handler}; +use platform_api::{igdb_cron, image_upload_callback, video_event_handler}; use tokio::select; #[derive(Debug, Clone, Default, config::Config, serde::Deserialize)] @@ -46,6 +47,12 @@ struct ExtConfig { /// The video api config video_api: VideoApiConfig, + + /// The IGDB config + igdb: IgDbConfig, + + /// Redis Config + redis: RedisConfig, } impl binary_helper::config::ConfigExtention for ExtConfig { @@ -97,12 +104,21 @@ struct GlobalState { video_room_client: VideoRoomClient, video_playback_session_client: VideoPlaybackSessionClient, video_events_client: VideoEventsClient, + + redis: Arc, playback_private_key: Option>, } impl_global_traits!(GlobalState); +impl common::global::GlobalRedis for GlobalState { + #[inline(always)] + fn redis(&self) -> &Arc { + &self.redis + } +} + impl common::global::GlobalConfigProvider for GlobalState { #[inline(always)] fn provide_config(&self) -> &ApiConfig { @@ -138,6 +154,13 @@ impl common::global::GlobalConfigProvider for GlobalState { } } +impl common::global::GlobalConfigProvider for GlobalState { + #[inline(always)] + fn provide_config(&self) -> &IgDbConfig { + &self.config.extra.igdb + } +} + impl platform_api::global::ApiState for GlobalState { fn category_by_id_loader(&self) -> &DataLoader { &self.category_by_id_loader @@ -252,12 +275,15 @@ impl binary_helper::Global for GlobalState { .map(load_playback_keypair_private_key) .transpose()?; + let redis = setup_redis(&config.extra.redis).await?; + Ok(Self { ctx, config, nats, jetstream, db, + redis, category_by_id_loader, global_state_loader, role_by_id_loader, @@ -300,6 +326,7 @@ pub async fn main() { let video_event_handler = video_event_handler::run(global.clone()); let image_upload_callback = image_upload_callback::run(global.clone()); + let igdb_cron = igdb_cron::run(global.clone()); select! { r = grpc_future => r.context("grpc server stopped unexpectedly")?, @@ -307,6 +334,7 @@ pub async fn main() { r = subscription_manager => r.context("subscription manager stopped unexpectedly")?, r = video_event_handler => r.context("video event handler stopped unexpectedly")?, r = image_upload_callback => r.context("image upload callback handler stopped unexpectedly")?, + r = igdb_cron => r.context("igdb cron stopped unexpectedly")?, } Ok(()) diff --git a/platform/migrations/20230825170300_init.down.sql b/platform/migrations/20230825170300_init.down.sql index 020ad6bc..fc3802ca 100644 --- a/platform/migrations/20230825170300_init.down.sql +++ b/platform/migrations/20230825170300_init.down.sql @@ -10,7 +10,11 @@ DROP TABLE IF EXISTS global_state CASCADE; DROP TABLE IF EXISTS roles CASCADE; DROP TABLE IF EXISTS channel_user CASCADE; DROP TABLE IF EXISTS uploaded_files CASCADE; -DROP TYPE IF EXISTS file_type CASCADE; +DROP TABLE IF EXISTS two_fa_requests CASCADE; +DROP TABLE IF EXISTS igdb_image CASCADE; -- Image Processor DROP TABLE IF EXISTS image_jobs CASCADE; + +DROP TYPE IF EXISTS file_type; +DROP TYPE IF EXISTS uploaded_file_status; diff --git a/platform/migrations/20230825170300_init.up.sql b/platform/migrations/20230825170300_init.up.sql index 996fa7fd..4c1f448a 100644 --- a/platform/migrations/20230825170300_init.up.sql +++ b/platform/migrations/20230825170300_init.up.sql @@ -1,4 +1,8 @@ -CREATE TYPE file_type AS ENUM ('custom_thumbnail', 'profile_picture', 'offline_banner', 'role_badge', 'channel_role_badge'); +CREATE EXTENSION IF NOT EXISTS pg_trgm; + +CREATE TYPE file_type AS ENUM ('custom_thumbnail', 'profile_picture', 'offline_banner', 'role_badge', 'channel_role_badge', 'category_cover', 'category_artwork'); + +CREATE TYPE uploaded_file_status AS ENUM ('unqueued', 'queued', 'failed', 'completed'); CREATE TABLE users ( id UUID NOT NULL PRIMARY KEY, @@ -47,13 +51,13 @@ CREATE TABLE users ( CREATE TABLE uploaded_files ( id UUID NOT NULL PRIMARY KEY, - owner_id UUID NOT NULL, - uploader_id UUID NOT NULL, + owner_id UUID, + uploader_id UUID, name VARCHAR(256) NOT NULL, type file_type NOT NULL, metadata BYTES NOT NULL, total_size INT8 NOT NULL, - pending BOOLEAN NOT NULL DEFAULT TRUE, + status uploadedfilestatus NOT NULL, path VARCHAR(256) NOT NULL, failed TEXT, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() @@ -95,11 +99,32 @@ CREATE TABLE recordings ( CREATE TABLE categories ( id UUID NOT NULL PRIMARY KEY, - name VARCHAR(64) NOT NULL, - revision INT4 NOT NULL DEFAULT 0, + igdb_id INT4, + + -- Searchable + name TEXT NOT NULL, + aliases TEXT[] NOT NULL DEFAULT '{}'::TEXT[], + keywords TEXT[] NOT NULL DEFAULT '{}'::TEXT[], + + -- Non-searchable + storyline TEXT, + summary TEXT, + over_18 BOOLEAN NOT NULL DEFAULT FALSE, + cover_id UUID, + rating FLOAT8 NOT NULL DEFAULT 0.0, + artwork_ids UUID[] NOT NULL DEFAULT '{}'::UUID[], + igdb_similar_game_ids INT4[] NOT NULL DEFAULT '{}'::INT4[], + websites TEXT[] NOT NULL DEFAULT '{}'::TEXT[], + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); +CREATE TABLE igdb_image ( + image_id VARCHAR(64) NOT NULL PRIMARY KEY, + uploaded_file_id UUID NOT NULL, + category_id UUID +); + CREATE TABLE chat_messages ( id UUID NOT NULL PRIMARY KEY, user_id UUID NOT NULL, @@ -206,6 +231,14 @@ CREATE INDEX recordings_start_at_idx ON recordings (start_at); -- We want to be able to search for categories by name (fuzzy search) CREATE INVERTED INDEX categories_name_idx ON categories (name gin_trgm_ops); +CREATE INVERTED INDEX categories_aliases_idx ON categories (aliases); +CREATE INVERTED INDEX categories_keywords_idx ON categories (keywords); + +-- We want to be able to search for categories by IGDB ID +CREATE UNIQUE INDEX categories_igdb_id_idx ON categories (igdb_id) WHERE igdb_id IS NOT NULL; + +-- We want to be able to search for categories by rating +CREATE INDEX categories_rating_idx ON categories (rating DESC); -- Image Upload Indexes From 0741d9056a1481d0154ae7ca1c3aa5495b4f8e4b Mon Sep 17 00:00:00 2001 From: Troy Benson Date: Mon, 15 Jan 2024 07:46:23 +0000 Subject: [PATCH 2/6] fix: minor fixes --- platform/api/src/igdb_cron.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/platform/api/src/igdb_cron.rs b/platform/api/src/igdb_cron.rs index b98c3a10..59326859 100644 --- a/platform/api/src/igdb_cron.rs +++ b/platform/api/src/igdb_cron.rs @@ -61,6 +61,7 @@ async fn cron(global: &Arc, config: &IgDbConfig) -> anyhow::Res .publish(config.igdb_cron_subject.clone(), Bytes::new()) .await .context("publish")?; + tracing::debug!("igdb cron"); timer.tick().await; } } @@ -337,7 +338,7 @@ async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> any failed: None, name: format!("igdb/cover/{}.jpg", cover.image_id), owner_id: None, - path: format!("https://images.igdb.com/igdb/image/upload/t_cover_big/{}.jpg", cover.image_id), + path: format!("https://images.igdb.com/igdb/image/upload/t_cover_big_2x/{}.jpg", cover.image_id), status: UploadedFileStatus::Unqueued, metadata: UploadedFileMetadata { metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { @@ -355,7 +356,7 @@ async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> any failed: None, name: format!("igdb/artwork/{}.jpg", artwork.image_id), owner_id: None, - path: format!("https://images.igdb.com/igdb/image/upload/t_1080p/{}.jpg", artwork.image_id), + path: format!("https://images.igdb.com/igdb/image/upload/t_1080p_2x/{}.jpg", artwork.image_id), status: UploadedFileStatus::Unqueued, metadata: UploadedFileMetadata { metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { @@ -493,7 +494,7 @@ async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> any if !unqueued.is_empty() { let image_processor_config = global.config::(); common::database::query("INSERT INTO image_jobs (id, priority, task) ") - .push_values(unqueued, |mut sep, item| { + .push_values(&unqueued, |mut sep, item| { sep.push_bind(item.id); sep.push_bind(image_processor_config.igdb_image_task_priority); sep.push_bind(common::database::Protobuf(create_task( @@ -506,6 +507,13 @@ async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> any .push("ON CONFLICT (id) DO NOTHING;") .build() .execute(&tx).await.context("insert image_jobs")?; + + common::database::query("UPDATE uploaded_files SET status = 'queued' WHERE id = ANY($1)") + .bind(unqueued.iter().map(|x| x.id).collect::>()) + .build() + .execute(&tx) + .await + .context("update uploaded_files")?; } } From cb007795fb6edf89e9e029fd71b648d2200d3158 Mon Sep 17 00:00:00 2001 From: Troy Benson Date: Thu, 18 Jan 2024 19:15:32 +0000 Subject: [PATCH 3/6] feat: igdb image processor --- common/Cargo.toml | 2 +- common/src/config.rs | 2 +- ffmpeg/src/scalar.rs | 15 +- platform/api/Cargo.toml | 2 +- .../api/src/database/uploaded_file_status.rs | 3 +- platform/api/src/igdb_cron.rs | 184 ++++++++++-------- platform/api/src/main.rs | 2 +- platform/image_processor/Cargo.toml | 2 +- platform/image_processor/src/config.rs | 22 ++- platform/image_processor/src/global.rs | 1 + platform/image_processor/src/main.rs | 85 +++++--- .../image_processor/src/processor/error.rs | 20 +- .../src/processor/job/encoder/png.rs | 5 + .../image_processor/src/processor/job/mod.rs | 48 +++-- .../src/processor/job/resize.rs | 14 +- platform/image_processor/src/processor/mod.rs | 59 ++++-- .../image_processor/src/processor/utils.rs | 7 +- platform/image_processor/src/tests/global.rs | 5 + .../src/tests/processor/encoder.rs | 5 +- .../migrations/20230825170300_init.up.sql | 2 +- video/transcoder/Cargo.toml | 2 +- 21 files changed, 296 insertions(+), 191 deletions(-) diff --git a/common/Cargo.toml b/common/Cargo.toml index 3e2e1a9e..35764ea2 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -65,7 +65,7 @@ uuid = { version = "1.6", features = ["v4"], optional = true } ulid = { version = "1.1", features = ["uuid"], optional = true} aws-config = { version = "1.1", optional = true } -aws-sdk-s3 = { version = "1.12", optional = true } +aws-sdk-s3 = { version = "1.12", optional = true, features = ["behavior-version-latest"] } aws-credential-types = { version = "1.1", optional = true, features = ["hardcoded-credentials"] } aws-smithy-types = { version = "1.1", features = ["http-body-1-x"], optional = true } http-body = { version = "1.0.0", optional = true } diff --git a/common/src/config.rs b/common/src/config.rs index e33de0a5..afad301d 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -195,7 +195,7 @@ pub struct S3BucketConfig { impl Default for S3BucketConfig { fn default() -> Self { Self { - name: "scuffle-image-processor".to_owned(), + name: "scuffle".to_owned(), region: "us-east-1".to_owned(), endpoint: Some("http://localhost:9000".to_string()), credentials: S3CredentialsConfig::default(), diff --git a/ffmpeg/src/scalar.rs b/ffmpeg/src/scalar.rs index 85973b61..190cdd68 100644 --- a/ffmpeg/src/scalar.rs +++ b/ffmpeg/src/scalar.rs @@ -57,15 +57,12 @@ impl Scalar { frame_mut.height = height; frame_mut.format = pixel_format as i32; - // Safety: `av_image_alloc` is safe to call, and the pointer returned is valid. - av_image_alloc( - frame_mut.data.as_mut_ptr(), - frame_mut.linesize.as_mut_ptr(), - width, - height, - pixel_format, - 32, - ); + // Safety: `av_frame_get_buffer` is safe to call, and the pointer returned is + // valid. + match av_frame_get_buffer(frame_mut, 32) { + 0 => {} + err => return Err(FfmpegError::Code(err.into())), + } } Ok(Self { diff --git a/platform/api/Cargo.toml b/platform/api/Cargo.toml index cb96a928..bd4a59f6 100644 --- a/platform/api/Cargo.toml +++ b/platform/api/Cargo.toml @@ -43,7 +43,7 @@ thiserror = "1.0" anyhow = "1.0" multer = "3.0" aws-config = "1.1" -aws-sdk-s3 = "1.12" +aws-sdk-s3 = { version = "1.12", features = ["behavior-version-latest"] } http-body = "1.0" http-body-util = "0.1" hyper-util = "0.1" diff --git a/platform/api/src/database/uploaded_file_status.rs b/platform/api/src/database/uploaded_file_status.rs index f5df3f3d..a6c2628d 100644 --- a/platform/api/src/database/uploaded_file_status.rs +++ b/platform/api/src/database/uploaded_file_status.rs @@ -1,6 +1,7 @@ -use postgres_types::{ToSql, FromSql}; +use postgres_types::{FromSql, ToSql}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ToSql, FromSql)] +#[postgres(name = "uploaded_file_status")] pub enum UploadedFileStatus { #[postgres(name = "unqueued")] Unqueued, diff --git a/platform/api/src/igdb_cron.rs b/platform/api/src/igdb_cron.rs index 59326859..65915f88 100644 --- a/platform/api/src/igdb_cron.rs +++ b/platform/api/src/igdb_cron.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; -use async_nats::jetstream::consumer::pull::Config; +use async_nats::jetstream::consumer::pull::{Config, MessagesErrorKind}; use async_nats::jetstream::consumer::Consumer; use async_nats::jetstream::AckKind; use bytes::Bytes; @@ -12,6 +12,7 @@ use fred::interfaces::KeysInterface; use futures_util::StreamExt; use pb::scuffle::platform::internal::image_processor; use pb::scuffle::platform::internal::types::{uploaded_file_metadata, ImageFormat, UploadedFileMetadata}; +use postgres_from_row::tokio_postgres::IsolationLevel; use postgres_from_row::FromRow; use tokio::select; use ulid::Ulid; @@ -56,13 +57,13 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { async fn cron(global: &Arc, config: &IgDbConfig) -> anyhow::Result<()> { let mut timer = tokio::time::interval(Duration::from_secs(60)); loop { + timer.tick().await; + tracing::debug!("igdb cron"); global .nats() .publish(config.igdb_cron_subject.clone(), Bytes::new()) .await .context("publish")?; - tracing::debug!("igdb cron"); - timer.tick().await; } } @@ -72,7 +73,15 @@ async fn process(global: &Arc, consumer: Consumer, conf let duration = chrono::Duration::from_std(config.refresh_interval).context("duration")?; 'outer: while let Some(message) = messages.next().await { - let message = message.context("message")?; + let message = match message { + Ok(message) => message, + Err(err) if matches!(err.kind(), MessagesErrorKind::MissingHeartbeat) => { + continue; + } + Err(err) => { + anyhow::bail!("message: {:#}", err); + } + }; let info = message.info().map_err(|e| anyhow::anyhow!("info: {e}"))?; @@ -316,7 +325,8 @@ async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> any .await .context("insert igdb_image")?; - let image_ids = common::database::query("SELECT image_id, uploaded_file_id FROM igdb_image WHERE image_id = ANY($1)") + let image_ids = + common::database::query("SELECT image_id, uploaded_file_id FROM igdb_image WHERE image_id = ANY($1::TEXT[])") .bind(image_ids.iter().map(|x| x.1).collect::>()) .build_query_as::() .fetch_all(&tx) @@ -338,7 +348,10 @@ async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> any failed: None, name: format!("igdb/cover/{}.jpg", cover.image_id), owner_id: None, - path: format!("https://images.igdb.com/igdb/image/upload/t_cover_big_2x/{}.jpg", cover.image_id), + path: format!( + "https://images.igdb.com/igdb/image/upload/t_cover_big_2x/{}.jpg", + cover.image_id + ), status: UploadedFileStatus::Unqueued, metadata: UploadedFileMetadata { metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { @@ -356,7 +369,10 @@ async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> any failed: None, name: format!("igdb/artwork/{}.jpg", artwork.image_id), owner_id: None, - path: format!("https://images.igdb.com/igdb/image/upload/t_1080p_2x/{}.jpg", artwork.image_id), + path: format!( + "https://images.igdb.com/igdb/image/upload/t_1080p_2x/{}.jpg", + artwork.image_id + ), status: UploadedFileStatus::Unqueued, metadata: UploadedFileMetadata { metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { @@ -371,28 +387,22 @@ async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> any }) .collect::>(); - common::database::query("INSERT INTO uploaded_files (id, name, type, metadata, total_size, path, status) ") - .push_values(&uploaded_files, |mut sep, item| { - sep.push_bind(item.id); - sep.push_bind(&item.name); - sep.push_bind(item.ty); - sep.push_bind(common::database::Protobuf(item.metadata.clone())); - sep.push_bind(item.total_size); - sep.push_bind(&item.path); - sep.push_bind(item.status); - }) - .push("ON CONFLICT (id) DO NOTHING;") - .build() - .execute(&tx) - .await - .context("insert uploaded_files")?; - - let uploaded_files = common::database::query("SELECT * FROM uploaded_files WHERE id = ANY($1)") - .bind(uploaded_files.iter().map(|x| x.id).collect::>()) - .build_query_as::() - .fetch_all(&tx) - .await - .context("select uploaded_files")?; + let uploaded_files_ids = + common::database::query("INSERT INTO uploaded_files (id, name, type, metadata, total_size, path, status) ") + .push_values(&uploaded_files, |mut sep, item| { + sep.push_bind(item.id); + sep.push_bind(&item.name); + sep.push_bind(item.ty); + sep.push_bind(common::database::Protobuf(item.metadata.clone())); + sep.push_bind(item.total_size); + sep.push_bind(&item.path); + sep.push_bind(item.status); + }) + .push("ON CONFLICT (id) DO NOTHING RETURNING id;") + .build_query_single_scalar::() + .fetch_all(&tx) + .await + .context("insert uploaded_files")?; let resp = resp .into_iter() @@ -423,7 +433,7 @@ async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> any offset += resp.len(); let count = resp.len(); - common::database::query("INSERT INTO categories (id, igdb_id, name, aliases, keywords, storyline, summary, over_18, cover_id, rating, updated_at, artwork_ids, igdb_similar_game_ids, websites) ") + let categories = common::database::query("INSERT INTO categories (id, igdb_id, name, aliases, keywords, storyline, summary, over_18, cover_id, rating, updated_at, artwork_ids, igdb_similar_game_ids, websites) ") .push_values(&resp, |mut sep, item| { sep.push_bind(item.id); sep.push_bind(item.igdb_id); @@ -440,85 +450,91 @@ async fn refresh_igdb(global: &Arc, config: &IgDbConfig) -> any sep.push_bind(&item.igdb_similar_game_ids); sep.push_bind(&item.websites); }) - .push("ON CONFLICT (igdb_id) WHERE igdb_id IS NOT NULL DO UPDATE SET ") - .push("name = EXCLUDED.name, ") - .push("aliases = EXCLUDED.aliases, ") - .push("keywords = EXCLUDED.keywords, ") - .push("storyline = EXCLUDED.storyline, ") - .push("summary = EXCLUDED.summary, ") - .push("rating = EXCLUDED.rating, ") - .push("updated_at = NOW(), ") - .push("igdb_similar_game_ids = EXCLUDED.igdb_similar_game_ids, ") - .push("websites = EXCLUDED.websites, ") - .push("artwork_ids = EXCLUDED.artwork_ids;") - .build().execute(&tx).await.context("insert categories")?; - - let categories = common::database::query("SELECT * FROM categories WHERE igdb_id = ANY($1)") - .bind(resp.iter().map(|x| x.igdb_id).collect::>()) + .push("ON CONFLICT (igdb_id) WHERE igdb_id IS NOT NULL DO UPDATE SET ") + .push("name = EXCLUDED.name, ") + .push("aliases = EXCLUDED.aliases, ") + .push("keywords = EXCLUDED.keywords, ") + .push("storyline = EXCLUDED.storyline, ") + .push("summary = EXCLUDED.summary, ") + .push("rating = EXCLUDED.rating, ") + .push("updated_at = NOW(), ") + .push("igdb_similar_game_ids = EXCLUDED.igdb_similar_game_ids, ") + .push("websites = EXCLUDED.websites, ") + .push("artwork_ids = EXCLUDED.artwork_ids RETURNING *;") .build_query_as::() .fetch_all(&tx) .await - .context("select categories")?; + .context("insert categories")?; if categories.len() != count { tracing::warn!("igdb: categories count mismatch {} != {}", categories.len(), count); } - let categories = categories - .into_iter() - .flat_map(|c| { - c.cover_id .into_iter() - .chain(c.artwork_ids.into_iter()) - .map(move |id| (id, c.id)) - }) - .collect::>(); - - common::database::query("WITH updated(id, category) AS (") - .push_values(categories.iter().collect::>(), |mut sep, item| { - sep.push_bind(item.0); - sep.push_bind(item.1); - }) - .push( - ") UPDATE igdb_image SET category_id = updated.category FROM updated WHERE igdb_image.uploaded_file_id = updated.id;", - ) - .build().execute(&tx).await.context("update igdb_image")?; + .flat_map(|c| { + c.cover_id + .into_iter() + .chain(c.artwork_ids.into_iter()) + .map(move |id| (id, c.id)) + }) + .collect::>(); + + common::database::query("WITH updated(id, category) AS (") + .push_values(categories.iter().collect::>(), |mut sep, item| { + sep.push_bind(item.0).push_unseparated("::UUID"); + sep.push_bind(item.1).push_unseparated("::UUID"); + }) + .push( + ") UPDATE igdb_image SET category_id = updated.category FROM updated WHERE igdb_image.uploaded_file_id = updated.id;", + ) + .build() + .execute(&tx) + .await + .context("update igdb_image")?; + + tx.commit().await.context("commit")?; if config.process_images { - let unqueued = uploaded_files - .into_iter() - .filter(|x| x.status == UploadedFileStatus::Unqueued) - .collect::>(); + let image_processor_config = global.config::(); + + let tx = client + .build_transaction() + .isolation_level(IsolationLevel::ReadCommitted) + .start() + .await + .context("start transaction image_jobs")?; + + let unqueued = common::database::query( + "UPDATE uploaded_files SET status = 'queued' WHERE id = ANY($1::UUID[]) AND status = 'unqueued' RETURNING id, path;", + ) + .bind(uploaded_files_ids) + .build_query_scalar::<(Ulid, String)>() + .fetch_all(&tx) + .await + .context("update uploaded_files")?; if !unqueued.is_empty() { - let image_processor_config = global.config::(); common::database::query("INSERT INTO image_jobs (id, priority, task) ") - .push_values(&unqueued, |mut sep, item| { - sep.push_bind(item.id); - sep.push_bind(image_processor_config.igdb_image_task_priority); - sep.push_bind(common::database::Protobuf(create_task( - categories[&item.id], - item.id, - item.path.clone(), + .bind(image_processor_config.igdb_image_task_priority as i64) + .push_values(unqueued, |mut sep, (id, path)| { + sep.push_bind(id).push("$1").push_bind(common::database::Protobuf(create_task( + categories[&id], + id, + path, image_processor_config, ))); }) .push("ON CONFLICT (id) DO NOTHING;") .build() - .execute(&tx).await.context("insert image_jobs")?; - - common::database::query("UPDATE uploaded_files SET status = 'queued' WHERE id = ANY($1)") - .bind(unqueued.iter().map(|x| x.id).collect::>()) - .build() .execute(&tx) .await - .context("update uploaded_files")?; + .context("insert image_jobs")?; + + tx.commit().await.context("commit image_jobs")?; } } - tx.commit().await.context("commit")?; - tracing::debug!("igdb progress: {}/{}", offset, total); if count < 500 { diff --git a/platform/api/src/main.rs b/platform/api/src/main.rs index 0dd3c28a..22657422 100644 --- a/platform/api/src/main.rs +++ b/platform/api/src/main.rs @@ -104,7 +104,7 @@ struct GlobalState { video_room_client: VideoRoomClient, video_playback_session_client: VideoPlaybackSessionClient, video_events_client: VideoEventsClient, - + redis: Arc, playback_private_key: Option>, diff --git a/platform/image_processor/Cargo.toml b/platform/image_processor/Cargo.toml index db84f3ea..feb69bc7 100644 --- a/platform/image_processor/Cargo.toml +++ b/platform/image_processor/Cargo.toml @@ -14,7 +14,7 @@ ulid = { version = "1.1", features = ["uuid"] } postgres-from-row = "0.5" prost = "0.12" aws-config = "1.1" -aws-sdk-s3 = "1.12" +aws-sdk-s3 = { version = "1.12", features = ["behavior-version-latest"] } async-trait = "0.1" anyhow = "1.0" async-nats = "0.33" diff --git a/platform/image_processor/src/config.rs b/platform/image_processor/src/config.rs index 0aa48c16..ea68fe51 100644 --- a/platform/image_processor/src/config.rs +++ b/platform/image_processor/src/config.rs @@ -1,4 +1,4 @@ -use common::config::S3BucketConfig; +use common::config::{S3BucketConfig, S3CredentialsConfig}; use ulid::Ulid; #[derive(Debug, Clone, PartialEq, config::Config, serde::Deserialize)] @@ -23,8 +23,24 @@ pub struct ImageProcessorConfig { impl Default for ImageProcessorConfig { fn default() -> Self { Self { - source_bucket: S3BucketConfig::default(), - target_bucket: S3BucketConfig::default(), + source_bucket: S3BucketConfig { + name: "scuffle-image-processor".to_owned(), + endpoint: Some("http://localhost:9000".to_owned()), + region: "us-east-1".to_owned(), + credentials: S3CredentialsConfig { + access_key: Some("root".to_owned()), + secret_key: Some("scuffle123".to_owned()), + }, + }, + target_bucket: S3BucketConfig { + name: "scuffle-image-processor-public".to_owned(), + endpoint: Some("http://localhost:9000".to_owned()), + region: "us-east-1".to_owned(), + credentials: S3CredentialsConfig { + access_key: Some("root".to_owned()), + secret_key: Some("scuffle123".to_owned()), + }, + }, concurrency: num_cpus::get(), instance_id: Ulid::new(), allow_http: true, diff --git a/platform/image_processor/src/global.rs b/platform/image_processor/src/global.rs index 47c5d786..430ab049 100644 --- a/platform/image_processor/src/global.rs +++ b/platform/image_processor/src/global.rs @@ -5,6 +5,7 @@ use crate::config::ImageProcessorConfig; pub trait ImageProcessorState { fn s3_source_bucket(&self) -> &Bucket; fn s3_target_bucket(&self) -> &Bucket; + fn http_client(&self) -> &reqwest::Client; } pub trait ImageProcessorGlobal: diff --git a/platform/image_processor/src/main.rs b/platform/image_processor/src/main.rs index ad191f85..980d12a7 100644 --- a/platform/image_processor/src/main.rs +++ b/platform/image_processor/src/main.rs @@ -29,6 +29,7 @@ struct GlobalState { jetstream: async_nats::jetstream::Context, s3_source_bucket: common::s3::Bucket, s3_target_bucket: common::s3::Bucket, + http_client: reqwest::Client, } impl_global_traits!(GlobalState); @@ -50,6 +51,11 @@ impl platform_image_processor::global::ImageProcessorState for GlobalState { fn s3_target_bucket(&self) -> &common::s3::Bucket { &self.s3_target_bucket } + + #[inline(always)] + fn http_client(&self) -> &reqwest::Client { + &self.http_client + } } impl binary_helper::Global for GlobalState { @@ -60,6 +66,10 @@ impl binary_helper::Global for GlobalState { let (nats, jetstream) = setup_nats(&config.name, &config.nats).await?; + let http_client = reqwest::Client::builder() + .user_agent(concat!("scuffle-image-processor/", env!("CARGO_PKG_VERSION"))) + .build()?; + Ok(Self { ctx, db, @@ -68,40 +78,53 @@ impl binary_helper::Global for GlobalState { config, s3_source_bucket, s3_target_bucket, + http_client, }) } } -#[tokio::main] -pub async fn main() { - if let Err(err) = bootstrap::(|global| async move { - let grpc_future = { - let mut server = grpc_server(&global.config.grpc) - .await - .context("failed to create grpc server")?; - let router = server.add_service(grpc_health::HealthServer::new(&global, |global, _| async move { - !global.db().is_closed() && global.nats().connection_state() == async_nats::connection::State::Connected - })); - - let router = platform_image_processor::grpc::add_routes(&global, router); - - router.serve_with_shutdown(global.config.grpc.bind_address, async { - global.ctx().done().await; +pub fn main() { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .max_blocking_threads( + std::env::var("TOKIO_MAX_BLOCKING_THREADS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(2048), + ) + .build() + .expect("failed to create tokio runtime") + .block_on(async { + if let Err(err) = bootstrap::(|global| async move { + let grpc_future = { + let mut server = grpc_server(&global.config.grpc) + .await + .context("failed to create grpc server")?; + let router = server.add_service(grpc_health::HealthServer::new(&global, |global, _| async move { + !global.db().is_closed() + && global.nats().connection_state() == async_nats::connection::State::Connected + })); + + let router = platform_image_processor::grpc::add_routes(&global, router); + + router.serve_with_shutdown(global.config.grpc.bind_address, async { + global.ctx().done().await; + }) + }; + + let processor_future = platform_image_processor::processor::run(global.clone()); + + select! { + r = grpc_future => r.context("grpc server stopped unexpectedly")?, + r = processor_future => r.context("processor stopped unexpectedly")?, + } + + Ok(()) }) - }; - - let processor_future = platform_image_processor::processor::run(global.clone()); - - select! { - r = grpc_future => r.context("grpc server stopped unexpectedly")?, - r = processor_future => r.context("processor stopped unexpectedly")?, - } - - Ok(()) - }) - .await - { - tracing::error!("{:#}", err); - std::process::exit(1); - } + .await + { + tracing::error!("{:#}", err); + std::process::exit(1); + } + }) } diff --git a/platform/image_processor/src/processor/error.rs b/platform/image_processor/src/processor/error.rs index dfca20c5..322848b1 100644 --- a/platform/image_processor/src/processor/error.rs +++ b/platform/image_processor/src/processor/error.rs @@ -33,25 +33,13 @@ pub enum ProcessorError { #[error("invalid job state")] InvalidJobState, - #[error("directory create: {0}")] - DirectoryCreate(std::io::Error), - - #[error("file read: {0}")] - FileRead(std::io::Error), - - #[error("working directory change: {0}")] - WorkingDirectoryChange(std::io::Error), - - #[error("file create: {0}")] - FileCreate(std::io::Error), - #[error("download source from s3: {0}")] S3Download(aws_sdk_s3::error::SdkError), #[error("download source from s3: {0}")] S3DownloadStream(aws_sdk_s3::primitives::ByteStreamError), - #[error("upload target to s3: {0}")] + #[error("upload target to s3: {0:?}")] S3Upload(aws_sdk_s3::error::SdkError), #[error("publish to nats: {0}")] @@ -96,6 +84,9 @@ pub enum ProcessorError { #[error("http download disabled")] HttpDownloadDisabled, + #[error("download timeout")] + DownloadTimeout, + #[error("http download: {0}")] HttpDownload(#[from] reqwest::Error), } @@ -105,9 +96,6 @@ impl ProcessorError { let msg = match self { ProcessorError::LostJob => Some("The job was lost"), ProcessorError::InvalidJobState => Some("The job is in an invalid state"), - ProcessorError::DirectoryCreate(_) => Some("Failed to create directory"), - ProcessorError::FileRead(_) => Some("Failed to read file"), - ProcessorError::FileCreate(_) => Some("Failed to create file"), ProcessorError::S3Download(_) => Some("Failed to download file"), ProcessorError::S3Upload(_) => Some("Failed to upload file"), ProcessorError::FileFormat(_) => Some("Failed to read file format"), diff --git a/platform/image_processor/src/processor/job/encoder/png.rs b/platform/image_processor/src/processor/job/encoder/png.rs index 666aba18..4dc8b424 100644 --- a/platform/image_processor/src/processor/job/encoder/png.rs +++ b/platform/image_processor/src/processor/job/encoder/png.rs @@ -47,6 +47,11 @@ impl Encoder for PngEncoder { let mut encoder = png::Encoder::new(&mut result, frame.image.width() as u32, frame.image.height() as u32); + assert!( + frame.image.buf().as_bytes().len() == frame.image.width() * frame.image.height() * 4, + "image buffer size mismatch" + ); + encoder.set_color(png::ColorType::Rgba); encoder.set_depth(png::BitDepth::Eight); encoder diff --git a/platform/image_processor/src/processor/job/mod.rs b/platform/image_processor/src/processor/job/mod.rs index 575a144a..7f3803fb 100644 --- a/platform/image_processor/src/processor/job/mod.rs +++ b/platform/image_processor/src/processor/job/mod.rs @@ -1,15 +1,16 @@ use std::borrow::Cow; use std::sync::Arc; +use std::time::Duration; use aws_sdk_s3::types::ObjectCannedAcl; use bytes::Bytes; +use common::prelude::FutureTimeout; use common::s3::PutObjectOptions; use common::task::AsyncTask; use file_format::FileFormat; use futures::FutureExt; use prost::Message; use tokio::select; -use tokio::sync::SemaphorePermit; use tracing::Instrument; use self::decoder::DecoderBackend; @@ -33,9 +34,12 @@ pub(crate) struct Job<'a, G: ImageProcessorGlobal> { pub(crate) job: database::Job, } -#[tracing::instrument(skip(global, _ticket, job), fields(job_id = %job.id.0), level = "info")] -pub async fn handle_job(global: &Arc, _ticket: SemaphorePermit<'_>, job: database::Job) { +#[tracing::instrument(skip(global, job), fields(job_id = %job.id), level = "info")] +pub async fn handle_job(global: &Arc, job: database::Job) { let job = Job { global, job }; + + tracing::info!("processing job"); + if let Err(err) = job.process().in_current_span().await { tracing::error!(err = %err, "job failed"); } @@ -50,7 +54,11 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { tracing::debug!("downloading {}", self.job.task.input_path); - Ok(reqwest::get(&self.job.task.input_path) + Ok(self + .global + .http_client() + .get(&self.job.task.input_path) + .send() .await .map_err(ProcessorError::HttpDownload)? .error_for_status() @@ -150,9 +158,9 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { let input_data = { let mut tries = 0; loop { - match self.download_source().await { - Ok(data) => break data, - Err(e) => { + match self.download_source().timeout(Duration::from_secs(5)).await { + Ok(Ok(data)) => break data, + Ok(Err(e)) => { if tries >= 60 { return Err(e); } @@ -161,14 +169,20 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { tracing::debug!(err = %e, "failed to download source, retrying"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } + Err(_) => { + if tries >= 60 { + return Err(ProcessorError::DownloadTimeout); + } + + tries += 1; + tracing::debug!("download timed out, retrying"); + } } } }; let backend = DecoderBackend::from_format(FileFormat::from_bytes(&input_data))?; - let url_prefix = format!("{}/{}", self.job.task.output_prefix.trim_end_matches('/'), self.job.id); - let job_c = self.job.clone(); tracing::debug!("processing job"); @@ -184,8 +198,8 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { })??; for image in images.images.iter() { + let url = image.url(&self.job.task.output_prefix); // image upload - let url = image.url(&url_prefix); tracing::debug!("uploading result to {}/{}", self.global.config().target_bucket.name, url); self.global .s3_target_bucket() @@ -214,13 +228,13 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { variants: images .images .iter() - .map(|i| pb::scuffle::platform::internal::types::ProcessedImageVariant { - path: i.url(&url_prefix), - format: i.request.1.into(), - width: i.width as u32, - height: i.height as u32, - byte_size: i.data.len() as u32, - scale: i.request.0 as u32, + .map(|image| pb::scuffle::platform::internal::types::ProcessedImageVariant { + path: image.url(&self.job.task.output_prefix), + format: image.request.1.into(), + width: image.width as u32, + height: image.height as u32, + byte_size: image.data.len() as u32, + scale: image.request.0 as u32, }) .collect(), }, diff --git a/platform/image_processor/src/processor/job/resize.rs b/platform/image_processor/src/processor/job/resize.rs index 667bf520..57f4413e 100644 --- a/platform/image_processor/src/processor/job/resize.rs +++ b/platform/image_processor/src/processor/job/resize.rs @@ -2,6 +2,7 @@ use anyhow::Context; use fast_image_resize as fr; use imgref::Img; use pb::scuffle::platform::internal::image_processor::task::{ResizeAlgorithm, ResizeMethod}; +use rgb::{ComponentBytes, RGBA}; use super::frame::Frame; use crate::processor::error::{ProcessorError, Result}; @@ -175,7 +176,18 @@ impl ImageResizer { let height = dst_image.height().get() as usize; let buffer = dst_image.into_vec(); - let buffer = unsafe { std::mem::transmute::, Vec>>(buffer) }; + let buffer = unsafe { + let buf = buffer.into_boxed_slice(); + let size = buf.len(); + let ptr = Box::into_raw(buf) as *mut u8; + + let new_size = size / 4; + assert!(new_size * 4 == size, "image buffer size mismatch"); + + Vec::from_raw_parts(ptr as *mut RGBA, new_size, new_size) + }; + + assert_eq!(buffer.as_bytes().len(), width * height * 4, "image buffer size mismatch"); Ok(Frame { image: Img::new(buffer, width, height), diff --git a/platform/image_processor/src/processor/mod.rs b/platform/image_processor/src/processor/mod.rs index 655906ef..24d8bf23 100644 --- a/platform/image_processor/src/processor/mod.rs +++ b/platform/image_processor/src/processor/mod.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use futures::StreamExt; @@ -6,7 +7,6 @@ use tokio::select; use self::error::Result; use crate::config::ImageProcessorConfig; use crate::global::ImageProcessorGlobal; -use crate::processor::error::ProcessorError; use crate::processor::job::handle_job; pub(crate) mod error; @@ -16,31 +16,60 @@ pub(crate) mod utils; pub async fn run(global: Arc) -> Result<()> { let config = global.config::(); - let semaphore = tokio::sync::Semaphore::new(config.concurrency); + let concurrency = AtomicUsize::new(config.concurrency); let mut done = global.ctx().done(); let mut futures = futures::stream::FuturesUnordered::new(); + let make_job_query = { + let global = &global; + let concurrency = &concurrency; + move |wait: bool| async move { + if wait { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + + let concurrency = concurrency.load(std::sync::atomic::Ordering::Relaxed); + + if concurrency == 0 { + tracing::debug!("concurrency limit reached, waiting for a slot"); + None + } else { + tracing::debug!("querying for jobs: {concurrency}"); + Some(utils::query_job(global, concurrency).await) + } + } + }; + + let mut job_query = Some(Box::pin(make_job_query(false))); + loop { select! { - ticket_job = async { - let ticket = semaphore.acquire().await?; - - if let Some(job) = utils::query_job(&global).await? { - Ok::<_, ProcessorError>(Some((ticket, job))) + Some(jobs) = async { + if let Some(job_query_fut) = &mut job_query { + let r = job_query_fut.await; + job_query = None; + r } else { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - Ok::<_, ProcessorError>(None) + None } - } => { - let Some((ticket, job)) = ticket_job? else { - continue; - }; + } => { + let jobs = jobs?; + tracing::debug!("got {} jobs", jobs.len()); + job_query = Some(Box::pin(make_job_query(jobs.is_empty()))); - futures.push(handle_job(&global, ticket, job)); + for job in jobs { + concurrency.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + futures.push(handle_job(&global, job)); + } + }, + Some(_) = futures.next() => { + concurrency.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if job_query.is_none() { + job_query = Some(Box::pin(make_job_query(true))); + } }, - Some(_) = futures.next() => {}, _ = &mut done => break, } } diff --git a/platform/image_processor/src/processor/utils.rs b/platform/image_processor/src/processor/utils.rs index 9d187e09..4648b458 100644 --- a/platform/image_processor/src/processor/utils.rs +++ b/platform/image_processor/src/processor/utils.rs @@ -7,7 +7,7 @@ use crate::database::Job; use crate::global::ImageProcessorGlobal; use crate::processor::error::Result; -pub async fn query_job(global: &Arc) -> Result> { +pub async fn query_job(global: &Arc, limit: usize) -> Result> { Ok(common::database::query( "UPDATE image_jobs SET claimed_by = $1, @@ -18,14 +18,15 @@ pub async fn query_job(global: &Arc) -> Result