From 68336dbbf219b8d99d680537fa3d744c093c9c14 Mon Sep 17 00:00:00 2001 From: Troy Benson Date: Tue, 16 Jan 2024 03:44:33 +0000 Subject: [PATCH 1/9] feat: remove sqlx Remove SQLx in favor of tokio-postgres. SQLx has had many issues. From the poor design of the query engine to issues with transcations and deserializing. The idea of compile time queries is pretty good but the SQLx implementation is just very sub par. Most features are not supported. --- Cargo.lock | 619 ++++++------------ Cargo.toml | 3 + binary-helper/Cargo.toml | 1 - binary-helper/src/global.rs | 29 +- common/Cargo.toml | 10 +- common/src/database/mod.rs | 20 +- common/src/database/non_null.rs | 74 --- common/src/database/protobuf.rs | 112 +--- common/src/database/query_builder.rs | 433 ++++++++++++ common/src/database/ulid.rs | 74 --- common/src/dataloader/mod.rs | 4 +- common/src/global.rs | 3 +- platform/api/Cargo.toml | 4 +- platform/api/src/api/auth.rs | 4 +- platform/api/src/api/error.rs | 8 +- platform/api/src/api/jwt.rs | 4 +- platform/api/src/api/request_context.rs | 2 +- platform/api/src/api/v1/gql/error.rs | 14 +- .../api/src/api/v1/gql/models/category.rs | 2 +- platform/api/src/api/v1/gql/models/channel.rs | 29 +- .../api/src/api/v1/gql/models/chat_message.rs | 6 +- .../api/src/api/v1/gql/models/image_upload.rs | 4 +- platform/api/src/api/v1/gql/models/role.rs | 4 +- platform/api/src/api/v1/gql/models/user.rs | 6 +- platform/api/src/api/v1/gql/mutations/auth.rs | 74 ++- .../api/src/api/v1/gql/mutations/channel.rs | 11 +- platform/api/src/api/v1/gql/mutations/chat.rs | 10 +- platform/api/src/api/v1/gql/mutations/user.rs | 59 +- .../src/api/v1/gql/mutations/user/two_fa.rs | 21 +- .../api/src/api/v1/gql/queries/category.rs | 7 +- platform/api/src/api/v1/gql/queries/mod.rs | 15 +- platform/api/src/api/v1/gql/queries/user.rs | 29 +- .../src/api/v1/gql/subscription/channel.rs | 9 +- .../api/src/api/v1/gql/subscription/chat.rs | 9 +- .../api/src/api/v1/gql/subscription/file.rs | 2 +- .../api/src/api/v1/gql/subscription/user.rs | 11 +- .../api/src/api/v1/upload/profile_picture.rs | 33 +- platform/api/src/database/category.rs | 4 +- platform/api/src/database/channel.rs | 37 +- platform/api/src/database/chat_message.rs | 10 +- platform/api/src/database/file_type.rs | 14 +- platform/api/src/database/global_state.rs | 4 +- platform/api/src/database/role.rs | 30 +- platform/api/src/database/search_result.rs | 4 +- platform/api/src/database/session.rs | 4 +- platform/api/src/database/two_fa_request.rs | 43 +- platform/api/src/database/uploaded_file.rs | 10 +- platform/api/src/database/user.rs | 6 +- platform/api/src/dataloader/category.rs | 14 +- platform/api/src/dataloader/global_state.rs | 9 +- platform/api/src/dataloader/role.rs | 12 +- platform/api/src/dataloader/session.rs | 12 +- platform/api/src/dataloader/uploaded_file.rs | 11 +- platform/api/src/dataloader/user.rs | 19 +- platform/api/src/image_upload_callback.rs | 43 +- platform/api/src/main.rs | 2 +- platform/api/src/video_event_handler.rs | 31 +- platform/image_processor/Cargo.toml | 2 +- platform/image_processor/src/database.rs | 8 +- platform/image_processor/src/main.rs | 2 +- .../image_processor/src/processor/error.rs | 7 +- .../image_processor/src/processor/job/mod.rs | 8 +- .../image_processor/src/processor/utils.rs | 29 +- platform/image_processor/src/tests/global.rs | 6 +- video/api/Cargo.toml | 4 +- video/api/src/api/access_token/create.rs | 26 +- video/api/src/api/access_token/delete.rs | 45 +- video/api/src/api/access_token/get.rs | 11 +- video/api/src/api/events/ack.rs | 2 +- video/api/src/api/events/fetch.rs | 4 +- video/api/src/api/playback_key_pair/create.rs | 18 +- video/api/src/api/playback_key_pair/delete.rs | 24 +- video/api/src/api/playback_key_pair/get.rs | 11 +- video/api/src/api/playback_key_pair/modify.rs | 26 +- video/api/src/api/playback_session/count.rs | 20 +- video/api/src/api/playback_session/get.rs | 14 +- video/api/src/api/playback_session/revoke.rs | 54 +- video/api/src/api/recording/delete.rs | 169 ++--- video/api/src/api/recording/get.rs | 14 +- video/api/src/api/recording/modify.rs | 41 +- video/api/src/api/recording_config/create.rs | 42 +- video/api/src/api/recording_config/delete.rs | 45 +- video/api/src/api/recording_config/get.rs | 8 +- video/api/src/api/recording_config/modify.rs | 46 +- video/api/src/api/room/create.rs | 99 +-- video/api/src/api/room/delete.rs | 45 +- video/api/src/api/room/disconnect.rs | 6 +- video/api/src/api/room/get.rs | 17 +- video/api/src/api/room/modify.rs | 51 +- video/api/src/api/room/reset_key.rs | 26 +- video/api/src/api/s3_bucket/create.rs | 21 +- video/api/src/api/s3_bucket/delete.rs | 45 +- video/api/src/api/s3_bucket/get.rs | 8 +- video/api/src/api/s3_bucket/modify.rs | 23 +- .../api/src/api/transcoding_config/create.rs | 16 +- .../api/src/api/transcoding_config/delete.rs | 45 +- video/api/src/api/transcoding_config/get.rs | 8 +- .../api/src/api/transcoding_config/modify.rs | 23 +- video/api/src/api/utils/auth.rs | 2 +- video/api/src/api/utils/get.rs | 18 +- video/api/src/api/utils/ratelimit.rs | 2 +- video/api/src/api/utils/tags.rs | 47 +- video/api/src/dataloaders/access_token.rs | 39 +- video/api/src/dataloaders/recording_state.rs | 36 +- video/api/src/dataloaders/room.rs | 35 +- video/api/src/main.rs | 2 +- video/api/src/tests/api/access_token.rs | 38 +- video/api/src/tests/api/events.rs | 2 +- video/api/src/tests/api/playback_key_pair.rs | 68 +- video/api/src/tests/api/playback_session.rs | 127 ++-- video/api/src/tests/api/recording.rs | 138 ++-- video/api/src/tests/api/recording_config.rs | 111 ++-- video/api/src/tests/api/room.rs | 207 +++--- video/api/src/tests/api/s3_bucket.rs | 56 +- video/api/src/tests/api/transcoding_config.rs | 60 +- video/api/src/tests/api/utils.rs | 140 ++-- video/api/src/tests/global.rs | 21 +- video/api/src/tests/utils.rs | 36 +- video/cli/Cargo.toml | 1 - video/cli/src/invoker/direct.rs | 86 +-- video/common/Cargo.toml | 5 +- video/common/src/database/access_token.rs | 17 +- video/common/src/database/organization.rs | 8 +- .../common/src/database/playback_key_pair.rs | 12 +- video/common/src/database/playback_session.rs | 21 +- .../src/database/playback_session_browser.rs | 8 +- .../src/database/playback_session_device.rs | 8 +- .../src/database/playback_session_platform.rs | 8 +- video/common/src/database/recording.rs | 18 +- video/common/src/database/recording_config.rs | 19 +- .../src/database/recording_rendition.rs | 5 +- .../database/recording_rendition_segment.rs | 5 +- .../src/database/recording_thumbnail.rs | 5 +- video/common/src/database/rendition.rs | 22 +- video/common/src/database/room.rs | 52 +- video/common/src/database/room_status.rs | 12 +- video/common/src/database/s3_bucket.rs | 10 +- .../src/database/session_token_revoke.rs | 5 +- .../common/src/database/transcoding_config.rs | 12 +- video/common/src/database/visibility.rs | 10 +- video/edge/Cargo.toml | 3 +- video/edge/src/edge/error.rs | 4 +- video/edge/src/edge/stream/mod.rs | 126 ++-- video/edge/src/edge/stream/playlist.rs | 105 +-- video/edge/src/edge/stream/tokens.rs | 34 +- video/edge/src/main.rs | 2 +- video/ingest/Cargo.toml | 3 +- video/ingest/src/ingest/connection.rs | 55 +- video/ingest/src/ingest/update.rs | 14 +- video/ingest/src/main.rs | 2 +- video/ingest/src/tests/global.rs | 21 +- video/ingest/src/tests/ingest.rs | 61 +- video/transcoder/Cargo.toml | 2 - video/transcoder/src/main.rs | 2 +- video/transcoder/src/tests/global.rs | 21 +- video/transcoder/src/tests/transcoder/mod.rs | 70 +- .../src/transcoder/job/recording.rs | 40 +- .../src/transcoder/job/sql_operations.rs | 74 +-- .../src/transcoder/job/task/generic.rs | 13 +- .../src/transcoder/job/task/recording.rs | 27 +- 160 files changed, 2832 insertions(+), 2578 deletions(-) delete mode 100644 common/src/database/non_null.rs create mode 100644 common/src/database/query_builder.rs delete mode 100644 common/src/database/ulid.rs diff --git a/Cargo.lock b/Cargo.lock index 435fd017..1110860a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,7 +56,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" dependencies = [ "cfg-if", - "getrandom", "once_cell", "version_check", "zerocopy", @@ -71,12 +70,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "allocator-api2" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" - [[package]] name = "amf0" version = "0.0.1" @@ -105,9 +98,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cd2405b3ac1faab2990b74d728624cd9fd115651fcecc7c2d8daf01376275ba" +checksum = "628a8f9bd1e24b4e0db2b4bc2d000b001e7dd032d54afa60a68836aeec5aa54a" dependencies = [ "anstyle", "anstyle-parse", @@ -213,7 +206,7 @@ checksum = "71938f30533e4d95a6d17aa530939da3842c2ab6f4f84b9dae68447e4129f74a" [[package]] name = "async-graphql" version = "7.0.0" -source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#1e83b73f2a909b7707050c0fea822419c2424c6a" +source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#548beefa0b891aefda4a2f23505afccf3a8a565d" dependencies = [ "async-graphql-derive", "async-graphql-parser", @@ -250,7 +243,7 @@ dependencies = [ [[package]] name = "async-graphql-derive" version = "7.0.0" -source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#1e83b73f2a909b7707050c0fea822419c2424c6a" +source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#548beefa0b891aefda4a2f23505afccf3a8a565d" dependencies = [ "Inflector", "async-graphql-parser", @@ -266,7 +259,7 @@ dependencies = [ [[package]] name = "async-graphql-parser" version = "7.0.0" -source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#1e83b73f2a909b7707050c0fea822419c2424c6a" +source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#548beefa0b891aefda4a2f23505afccf3a8a565d" dependencies = [ "async-graphql-value", "pest", @@ -277,7 +270,7 @@ dependencies = [ [[package]] name = "async-graphql-value" version = "7.0.0" -source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#1e83b73f2a909b7707050c0fea822419c2424c6a" +source = "git+https://github.com/ScuffleTV/async-graphql.git?branch=troy/union-generics#548beefa0b891aefda4a2f23505afccf3a8a565d" dependencies = [ "bytes", "indexmap 2.1.0", @@ -352,25 +345,6 @@ dependencies = [ "syn 2.0.48", ] -[[package]] -name = "atoi" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" -dependencies = [ - "num-traits", -] - -[[package]] -name = "atomic-write-file" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edcdbedc2236483ab103a53415653d6b4442ea6141baf1ffa85df29635e88436" -dependencies = [ - "nix", - "rand", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -705,7 +679,7 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2 0.3.23", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -912,7 +886,6 @@ dependencies = [ "rustls 0.22.2", "rustls-pemfile 2.0.0", "serde", - "sqlx", "thiserror", "tokio", "tonic", @@ -955,12 +928,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.1" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" -dependencies = [ - "serde", -] +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" [[package]] name = "bitmask-enum" @@ -1203,6 +1173,7 @@ dependencies = [ "bytes", "config", "const_format", + "deadpool-postgres", "dotenvy", "fnv", "fred", @@ -1218,14 +1189,15 @@ dependencies = [ "path-tree", "pin-project", "portpicker", + "postgres-from-row", + "postgres-types", "prost", "serde", "serde_json", - "sqlx", - "sqlx-postgres", "tempfile", "thiserror", "tokio", + "tokio-postgres", "tokio-util", "tonic", "tonic-build", @@ -1351,21 +1323,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "crc16" version = "0.4.0" @@ -1541,6 +1498,39 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" +[[package]] +name = "deadpool" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-postgres" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda39fa1cfff190d8924d447ad04fd22772c250438ca5ce1dfb3c80621c05aaa" +dependencies = [ + "deadpool", + "tokio", + "tokio-postgres", + "tracing", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49" +dependencies = [ + "tokio", +] + [[package]] name = "default-net" version = "0.21.0" @@ -1677,9 +1667,6 @@ name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" -dependencies = [ - "serde", -] [[package]] name = "elliptic-curve" @@ -1759,23 +1746,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "etcetera" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" -dependencies = [ - "cfg-if", - "home", - "windows-sys 0.48.0", -] - -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "exp_golomb" version = "0.0.1" @@ -1791,7 +1761,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "279d3efcc55e19917fff7ab3ddd6c14afb6a90881a0078465196fe2f99d08c56" dependencies = [ "bit_field", - "flume 0.10.14", + "flume", "half", "lebe", "miniz_oxide", @@ -1800,6 +1770,12 @@ dependencies = [ "zune-inflate", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fallible_collections" version = "0.4.9" @@ -1836,9 +1812,9 @@ checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "fdeflate" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "209098dd6dfc4445aa6111f0e98653ac323eaa4dfd212c9ca3931bf9955c31bd" +checksum = "4f9bfee30e4dedf0ab8b422f03af778d9612b63f502710fc500a334ebe2de645" dependencies = [ "simd-adler32", ] @@ -1958,17 +1934,6 @@ dependencies = [ "spin 0.9.8", ] -[[package]] -name = "flume" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" -dependencies = [ - "futures-core", - "futures-sink", - "spin 0.9.8", -] - [[package]] name = "flv" version = "0.0.1" @@ -2003,7 +1968,7 @@ dependencies = [ [[package]] name = "fred" version = "8.0.0" -source = "git+https://github.com/aembke/fred.rs.git?branch=feat/unix-sockets#32c380e1ab384196422e2d0716a02cba5e9891a4" +source = "git+https://github.com/aembke/fred.rs.git?branch=feat/unix-sockets#073a6d2eec4bf5ce1995df464fe26a76f16c5030" dependencies = [ "arc-swap", "async-trait", @@ -2073,17 +2038,6 @@ dependencies = [ "futures-util", ] -[[package]] -name = "futures-intrusive" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" -dependencies = [ - "futures-core", - "lock_api", - "parking_lot", -] - [[package]] name = "futures-io" version = "0.3.30" @@ -2271,9 +2225,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b553656127a00601c8ae5590fcfdc118e4083a7924b6cf4ffc1ea4b99dc429d7" +checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" dependencies = [ "bytes", "fnv", @@ -2290,9 +2244,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "991910e35c615d8cab86b5ab04be67e6ad24d2bf5f4f11fdbbed26da999bbeab" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" dependencies = [ "bytes", "fnv", @@ -2374,34 +2328,18 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" -dependencies = [ - "ahash 0.8.7", - "allocator-api2", -] - -[[package]] -name = "hashlink" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" -dependencies = [ - "hashbrown 0.14.3", -] [[package]] name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -dependencies = [ - "unicode-segmentation", -] [[package]] name = "hermit-abi" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" [[package]] name = "hex" @@ -2531,7 +2469,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.3.23", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "httparse", @@ -2554,7 +2492,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.1", + "h2 0.4.2", "http 1.0.0", "http-body 1.0.0", "httparse", @@ -2762,15 +2700,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" -[[package]] -name = "ipnetwork" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e" -dependencies = [ - "serde", -] - [[package]] name = "itertools" version = "0.10.5" @@ -2926,17 +2855,6 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" -[[package]] -name = "libsqlite3-sys" -version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4e226dcd58b4be396f7bd3c20da8fdee2911400705297ba7d2d7cc2c30f716" -dependencies = [ - "cc", - "pkg-config", - "vcpkg", -] - [[package]] name = "libwebp-sys2" version = "0.1.9" @@ -2958,9 +2876,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "linux-raw-sys" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" @@ -3222,17 +3140,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" -[[package]] -name = "nix" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" -dependencies = [ - "bitflags 2.4.1", - "cfg-if", - "libc", -] - [[package]] name = "nkeys" version = "0.3.2" @@ -3642,6 +3549,24 @@ dependencies = [ "indexmap 2.1.0", ] +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -3707,9 +3632,9 @@ dependencies = [ [[package]] name = "pkg-config" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" +checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" [[package]] name = "platform-api" @@ -3747,6 +3672,8 @@ dependencies = [ "path-tree", "pb", "pin-project", + "postgres-from-row", + "postgres-types", "prost", "rand", "reqwest", @@ -3755,8 +3682,6 @@ dependencies = [ "serde", "serde_json", "sha2", - "sqlx", - "sqlx-postgres", "tempfile", "thiserror", "tokio", @@ -3796,6 +3721,7 @@ dependencies = [ "num_cpus", "pb", "png", + "postgres-from-row", "prost", "reqwest", "rgb", @@ -3803,7 +3729,6 @@ dependencies = [ "serde", "serde_json", "sha2", - "sqlx", "thiserror", "tokio", "tonic", @@ -3839,6 +3764,71 @@ dependencies = [ "rand", ] +[[package]] +name = "postgres-derive" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83145eba741b050ef981a9a1838c843fa7665e154383325aa8b440ae703180a2" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "postgres-from-row" +version = "0.5.2" +source = "git+https://github.com/ScuffleTV/postgres-from-row.git?branch=troy/from_fn#7c7bf7553a5a7dc2b21ccd777434e426eff4625f" +dependencies = [ + "postgres-from-row-derive", + "tokio-postgres", +] + +[[package]] +name = "postgres-from-row-derive" +version = "0.5.2" +source = "git+https://github.com/ScuffleTV/postgres-from-row.git?branch=troy/from_fn#7c7bf7553a5a7dc2b21ccd777434e426eff4625f" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "postgres-protocol" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" +dependencies = [ + "base64 0.21.7", + "byteorder", + "bytes", + "fallible-iterator", + "hmac", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" +dependencies = [ + "bytes", + "chrono", + "fallible-iterator", + "postgres-derive", + "postgres-protocol", + "serde", + "serde_json", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -4079,9 +4069,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" +checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051" dependencies = [ "either", "rayon-core", @@ -4089,9 +4079,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" dependencies = [ "crossbeam-deque", "crossbeam-utils", @@ -4181,7 +4171,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2 0.3.23", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -4367,7 +4357,7 @@ version = "0.38.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "errno", "libc", "linux-raw-sys", @@ -4812,6 +4802,12 @@ dependencies = [ "quote", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "slab" version = "0.4.9" @@ -4872,227 +4868,6 @@ dependencies = [ "der 0.7.8", ] -[[package]] -name = "sqlformat" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" -dependencies = [ - "itertools 0.12.0", - "nom", - "unicode_categories", -] - -[[package]] -name = "sqlx" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dba03c279da73694ef99763320dea58b51095dfe87d001b1d4b5fe78ba8763cf" -dependencies = [ - "sqlx-core", - "sqlx-macros", - "sqlx-mysql", - "sqlx-postgres", - "sqlx-sqlite", -] - -[[package]] -name = "sqlx-core" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d84b0a3c3739e220d94b3239fd69fb1f74bc36e16643423bd99de3b43c21bfbd" -dependencies = [ - "ahash 0.8.7", - "atoi", - "byteorder", - "bytes", - "chrono", - "crc", - "crossbeam-queue", - "dotenvy", - "either", - "event-listener", - "futures-channel", - "futures-core", - "futures-intrusive", - "futures-io", - "futures-util", - "hashlink", - "hex", - "indexmap 2.1.0", - "ipnetwork", - "log", - "memchr", - "once_cell", - "paste", - "percent-encoding", - "rustls 0.21.10", - "rustls-pemfile 1.0.4", - "serde", - "serde_json", - "sha2", - "smallvec", - "sqlformat", - "thiserror", - "tokio", - "tokio-stream", - "tracing", - "url", - "uuid", - "webpki-roots", -] - -[[package]] -name = "sqlx-macros" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89961c00dc4d7dffb7aee214964b065072bff69e36ddb9e2c107541f75e4f2a5" -dependencies = [ - "proc-macro2", - "quote", - "sqlx-core", - "sqlx-macros-core", - "syn 1.0.109", -] - -[[package]] -name = "sqlx-macros-core" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0bd4519486723648186a08785143599760f7cc81c52334a55d6a83ea1e20841" -dependencies = [ - "atomic-write-file", - "dotenvy", - "either", - "heck", - "hex", - "once_cell", - "proc-macro2", - "quote", - "serde", - "serde_json", - "sha2", - "sqlx-core", - "sqlx-mysql", - "sqlx-postgres", - "sqlx-sqlite", - "syn 1.0.109", - "tempfile", - "tokio", - "url", -] - -[[package]] -name = "sqlx-mysql" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e37195395df71fd068f6e2082247891bc11e3289624bbc776a0cdfa1ca7f1ea4" -dependencies = [ - "atoi", - "base64 0.21.7", - "bitflags 2.4.1", - "byteorder", - "bytes", - "chrono", - "crc", - "digest", - "dotenvy", - "either", - "futures-channel", - "futures-core", - "futures-io", - "futures-util", - "generic-array", - "hex", - "hkdf", - "hmac", - "itoa", - "log", - "md-5", - "memchr", - "once_cell", - "percent-encoding", - "rand", - "rsa", - "serde", - "sha1", - "sha2", - "smallvec", - "sqlx-core", - "stringprep", - "thiserror", - "tracing", - "uuid", - "whoami", -] - -[[package]] -name = "sqlx-postgres" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6ac0ac3b7ccd10cc96c7ab29791a7dd236bd94021f31eec7ba3d46a74aa1c24" -dependencies = [ - "atoi", - "base64 0.21.7", - "bitflags 2.4.1", - "byteorder", - "chrono", - "crc", - "dotenvy", - "etcetera", - "futures-channel", - "futures-core", - "futures-io", - "futures-util", - "hex", - "hkdf", - "hmac", - "home", - "ipnetwork", - "itoa", - "log", - "md-5", - "memchr", - "once_cell", - "rand", - "serde", - "serde_json", - "sha1", - "sha2", - "smallvec", - "sqlx-core", - "stringprep", - "thiserror", - "tracing", - "uuid", - "whoami", -] - -[[package]] -name = "sqlx-sqlite" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "210976b7d948c7ba9fced8ca835b11cbb2d677c59c79de41ac0d397e14547490" -dependencies = [ - "atoi", - "chrono", - "flume 0.11.0", - "futures-channel", - "futures-core", - "futures-executor", - "futures-intrusive", - "futures-util", - "libsqlite3-sys", - "log", - "percent-encoding", - "serde", - "sqlx-core", - "tracing", - "url", - "urlencoding", - "uuid", -] - [[package]] name = "static_assertions_next" version = "1.1.2" @@ -5350,6 +5125,32 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tokio-postgres" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d340244b32d920260ae7448cb72b6e238bddc3d4f7603394e7dd46ed8e48f5b8" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures-channel", + "futures-util", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "rand", + "socket2", + "tokio", + "tokio-util", + "whoami", +] + [[package]] name = "tokio-retry" version = "0.3.0" @@ -5486,7 +5287,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", - "h2 0.3.23", + "h2 0.3.24", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -5783,6 +5584,8 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e37c4b6cbcc59a8dcd09a6429fbc7890286bcbb79215cea7b38a3c4c0921d93" dependencies = [ + "bytes", + "postgres-types", "rand", "serde", "uuid", @@ -5790,9 +5593,9 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" @@ -5821,12 +5624,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" -[[package]] -name = "unicode_categories" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" - [[package]] name = "unsafe-libyaml" version = "0.2.10" @@ -5941,15 +5738,13 @@ dependencies = [ "itertools 0.12.0", "jwt-next", "pb", + "postgres-from-row", "prost", "rand", "rand_chacha", "serde", "serde_json", "sha2", - "sqlx", - "sqlx-core", - "sqlx-postgres", "tokio", "tokio-stream", "tonic", @@ -5981,7 +5776,6 @@ dependencies = [ "serde", "serde_json", "serde_yaml", - "sqlx", "tokio", "tonic", "ulid", @@ -6001,11 +5795,12 @@ dependencies = [ "futures", "futures-util", "pb", + "postgres-from-row", + "postgres-types", "prost", "serde", - "sqlx", - "sqlx-postgres", "tokio", + "tokio-postgres", "tracing", "ulid", "uuid", @@ -6034,14 +5829,13 @@ dependencies = [ "itertools 0.12.0", "jwt-next", "pb", + "postgres-from-row", "prost", "rustls 0.22.2", "rustls-pemfile 2.0.0", "serde", "serde_json", "sha2", - "sqlx", - "sqlx-postgres", "thiserror", "tokio", "tokio-rustls 0.25.0", @@ -6081,14 +5875,13 @@ dependencies = [ "mp4", "pb", "portpicker", + "postgres-from-row", "prost", "rtmp", "rustls 0.22.2", "rustls-pemfile 2.0.0", "serde", "serde_json", - "sqlx", - "sqlx-postgres", "tokio", "tokio-rustls 0.25.0", "tokio-stream", @@ -6169,8 +5962,6 @@ dependencies = [ "serde", "serde_json", "sha2", - "sqlx", - "sqlx-postgres", "tempfile", "thiserror", "tokio", @@ -6320,6 +6111,10 @@ name = "whoami" version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" +dependencies = [ + "wasm-bindgen", + "web-sys", +] [[package]] name = "widestring" diff --git a/Cargo.toml b/Cargo.toml index 73b79843..fce228c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,11 +57,14 @@ video-ingest = { path = "video/ingest" } video-transcoder = { path = "video/transcoder" } binary-helper = { path = "binary-helper" } ffmpeg = { path = "ffmpeg" } +database = { path = "database" } # These patches are pending PRs to the upstream crates # TODO: Remove these once the PRs are merged [patch.crates-io] +# https://github.com/remkop22/postgres-from-row/pull/9 +postgres-from-row = { git = "https://github.com/ScuffleTV/postgres-from-row.git", branch = "troy/from_fn" } # https://github.com/async-graphql/async-graphql/pull/1424 async-graphql = { git = "https://github.com/ScuffleTV/async-graphql.git", branch = "troy/union-generics" } # https://github.com/aembke/fred.rs/pull/199 diff --git a/binary-helper/Cargo.toml b/binary-helper/Cargo.toml index ff15379f..993ac6c3 100644 --- a/binary-helper/Cargo.toml +++ b/binary-helper/Cargo.toml @@ -10,7 +10,6 @@ tokio = { version = "1.35", features = ["full"] } serde = { version = "1.0.1", features = ["derive"] } async-nats = "0.33" ulid = "1.1" -sqlx = { version = "0.7", features = ["postgres"] } async-trait = "0.1" tonic = { version = "0.10", features = ["tls"] } anyhow = "1.0" diff --git a/binary-helper/src/global.rs b/binary-helper/src/global.rs index de7436f8..18d1c673 100644 --- a/binary-helper/src/global.rs +++ b/binary-helper/src/global.rs @@ -1,16 +1,16 @@ use std::io; -use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use anyhow::Context as _; use async_nats::ServerAddr; use common::config::{DatabaseConfig, NatsConfig, RedisConfig}; +use common::database::deadpool_postgres::{ManagerConfig, PoolConfig, RecyclingMethod, Runtime}; +use common::database::tokio_postgres::NoTls; +use common::database::Pool; use fred::interfaces::ClientLike; use fred::types::ServerConfig; use rustls::RootCertStore; -use sqlx::postgres::PgConnectOptions; -use sqlx::ConnectOptions; #[macro_export] macro_rules! impl_global_traits { @@ -36,7 +36,7 @@ macro_rules! impl_global_traits { impl common::global::GlobalDb for $struct { #[inline(always)] - fn db(&self) -> &Arc { + fn db(&self) -> &Arc { &self.db } } @@ -89,16 +89,19 @@ pub async fn setup_nats( Ok((nats, jetstream)) } -pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result> { +pub async fn setup_database(config: &DatabaseConfig) -> anyhow::Result> { Ok(Arc::new( - sqlx::PgPool::connect_with( - PgConnectOptions::from_str(&config.uri) - .context("failed to parse database uri")? - .disable_statement_logging() - .to_owned(), - ) - .await - .context("failed to connect to database")?, + Pool::builder(common::database::deadpool_postgres::Manager::from_config( + config.uri.parse().context("invalid database uri")?, + NoTls, + ManagerConfig { + recycling_method: RecyclingMethod::Fast, + }, + )) + .config(PoolConfig::default()) + .runtime(Runtime::Tokio1) + .build() + .context("failed to create database pool")?, )) } diff --git a/common/Cargo.toml b/common/Cargo.toml index c05ad4aa..3e2e1a9e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -12,11 +12,11 @@ context = ["dep:tokio", "dep:tokio-util"] prelude = ["dep:tokio"] signal = ["tokio/signal", "tokio/process"] macros = [] -database = ["dep:sqlx", "dep:sqlx-postgres", "dep:prost", "dep:uuid", "dep:ulid"] +database = ["dep:tokio-postgres", "dep:postgres-types", "dep:deadpool-postgres", "dep:postgres-from-row", "dep:prost", "ulid/postgres"] dataloader = ["dep:fnv", "dep:futures-util", "dep:futures-channel"] config = ["dep:config", "dep:serde", "logging"] ratelimiter = ["dep:fred"] -global = ["context", "dep:fred", "dep:sqlx", "dep:async-nats"] +global = ["context", "dep:fred", "database", "dep:async-nats"] http = ["dep:hyper", "dep:serde_json", "dep:bytes", "dep:http-body-util", "dep:pin-project", "dep:path-tree"] task = ["dep:tokio", "dep:thiserror"] s3 = ["dep:aws-sdk-s3", "dep:aws-credential-types", "dep:aws-config", "dep:aws-smithy-types", "dep:http-body"] @@ -56,8 +56,10 @@ futures-channel = { version = "0.3", optional = true } const_format = { version = "0.2" } -sqlx = { version = "0.7", features = ["postgres", "json", "chrono", "uuid"], optional = true } -sqlx-postgres = { version = "0.7", optional = true } +tokio-postgres = { version = "0.7", optional = true } +postgres-types = { version = "0.2", optional = true, features = ["with-serde_json-1", "with-chrono-0_4", "derive"] } +deadpool-postgres = { version = "0.12", optional = true } +postgres-from-row = { version = "0.5", optional = true } prost = { version = "0.12", optional = true } uuid = { version = "1.6", features = ["v4"], optional = true } ulid = { version = "1.1", features = ["uuid"], optional = true} diff --git a/common/src/database/mod.rs b/common/src/database/mod.rs index e853438d..73b6e2b8 100644 --- a/common/src/database/mod.rs +++ b/common/src/database/mod.rs @@ -1,7 +1,19 @@ -mod non_null; mod protobuf; -mod ulid; +mod query_builder; -pub use non_null::*; +pub use deadpool_postgres::Pool; +pub use postgres_from_row::FromRow; +pub use postgres_types::Json; pub use protobuf::*; -pub use ulid::*; +pub use query_builder::*; +pub use {deadpool_postgres, postgres_from_row, postgres_types, tokio_postgres}; + +#[inline] +pub fn json(row: Json) -> T { + row.0 +} + +#[inline] +pub fn non_null_vec(vec: Vec>) -> Vec { + vec.into_iter().filter_map(|x| x).collect() +} diff --git a/common/src/database/non_null.rs b/common/src/database/non_null.rs deleted file mode 100644 index 5c254007..00000000 --- a/common/src/database/non_null.rs +++ /dev/null @@ -1,74 +0,0 @@ -#[derive(Default)] -pub struct PgNonNullVec(Vec); - -impl<'r, T> sqlx::Decode<'r, sqlx::Postgres> for PgNonNullVec -where - Vec>: sqlx::Decode<'r, sqlx::Postgres> + sqlx::Type, -{ - fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result> { - let vec: Vec> = sqlx::Decode::decode(value)?; - Ok(PgNonNullVec(vec.into_iter().flatten().collect())) - } -} - -impl Clone for PgNonNullVec { - fn clone(&self) -> Self { - PgNonNullVec(self.0.clone()) - } -} - -impl std::fmt::Debug for PgNonNullVec { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl<'r, T> sqlx::Type for PgNonNullVec -where - Vec: sqlx::Decode<'r, sqlx::Postgres> + sqlx::Type, -{ - fn type_info() -> sqlx::postgres::PgTypeInfo { - as sqlx::Type>::type_info() - } -} - -impl std::ops::Deref for PgNonNullVec { - type Target = Vec; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::ops::DerefMut for PgNonNullVec { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl std::iter::FromIterator for PgNonNullVec { - fn from_iter>(iter: I) -> Self { - PgNonNullVec(iter.into_iter().collect()) - } -} - -impl std::iter::IntoIterator for PgNonNullVec { - type IntoIter = std::vec::IntoIter; - type Item = T; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -impl std::iter::Extend for PgNonNullVec { - fn extend>(&mut self, iter: I) { - self.0.extend(iter) - } -} - -impl PgNonNullVec { - pub fn into_inner(self) -> Vec { - self.0 - } -} diff --git a/common/src/database/protobuf.rs b/common/src/database/protobuf.rs index 8e94af3a..c815a209 100644 --- a/common/src/database/protobuf.rs +++ b/common/src/database/protobuf.rs @@ -1,101 +1,49 @@ -#[repr(transparent)] -pub struct Protobuf(pub T); - -impl Clone for Protobuf -where - T: Clone, -{ - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -pub trait TraitProtobuf { - fn into_inner(self) -> T; -} - -pub trait TraitProtobufVec { - fn into_vec(self) -> Vec; -} - -impl TraitProtobuf for Protobuf { - fn into_inner(self) -> T { - self.0 - } -} - -impl TraitProtobufVec for Vec> { - fn into_vec(self) -> Vec { - self.into_iter().map(|a| a.into_inner()).collect() - } -} - -impl std::fmt::Debug for Protobuf { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} +use bytes::BytesMut; +use postgres_types::{accepts, to_sql_checked, IsNull}; -impl Default for Protobuf { - fn default() -> Self { - Self(T::default()) - } -} +#[derive(Debug, Clone)] +pub struct Protobuf(pub T); -impl sqlx::Type for Protobuf { - fn type_info() -> sqlx::postgres::PgTypeInfo { - as sqlx::Type>::type_info() - } -} +impl postgres_types::FromSql<'_> for Protobuf { + accepts!(BYTEA); -impl sqlx::postgres::PgHasArrayType for Protobuf { - fn array_type_info() -> sqlx::postgres::PgTypeInfo { - as sqlx::postgres::PgHasArrayType>::array_type_info() + fn from_sql(_ty: &postgres_types::Type, raw: &[u8]) -> Result> { + Ok(Self(T::decode(raw)?)) } } -impl sqlx::Encode<'_, sqlx::Postgres> for Protobuf { - fn encode_by_ref(&self, buf: &mut sqlx::postgres::PgArgumentBuffer) -> sqlx::encode::IsNull { - as sqlx::Encode>::encode_by_ref(&self.0.encode_to_vec(), buf) - } -} +impl postgres_types::ToSql for Protobuf { + to_sql_checked!(); -impl sqlx::Decode<'_, sqlx::Postgres> for Protobuf { - fn decode(value: sqlx::postgres::PgValueRef<'_>) -> Result> { - let bytes = as sqlx::Decode>::decode(value)?; - let inner = T::decode(bytes.as_slice())?; - Ok(Self(inner)) + fn to_sql( + &self, + ty: &postgres_types::Type, + w: &mut BytesMut, + ) -> Result> { + <&[u8] as postgres_types::ToSql>::to_sql(&&*self.0.encode_to_vec(), ty, w) } -} -impl AsRef for Protobuf { - fn as_ref(&self) -> &T { - &self.0 + fn accepts(ty: &postgres_types::Type) -> bool { + <&[u8] as postgres_types::ToSql>::accepts(ty) } } -impl AsMut for Protobuf { - fn as_mut(&mut self) -> &mut T { - &mut self.0 - } +#[inline] +pub fn protobuf(row: Protobuf) -> T { + row.0 } -impl std::ops::Deref for Protobuf { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.0 - } +#[inline] +pub fn protobuf_vec(row: Vec>) -> Vec { + row.into_iter().map(|protobuf| protobuf.0).collect() } -impl std::ops::DerefMut for Protobuf { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } +#[inline] +pub fn protobuf_opt(row: Option>) -> Option { + row.map(protobuf) } -impl From for Protobuf { - fn from(inner: T) -> Self { - Self(inner) - } +#[inline] +pub fn protobuf_vec_opt(row: Option>>) -> Option> { + row.map(protobuf_vec) } diff --git a/common/src/database/query_builder.rs b/common/src/database/query_builder.rs new file mode 100644 index 00000000..3e45de14 --- /dev/null +++ b/common/src/database/query_builder.rs @@ -0,0 +1,433 @@ +use std::sync::Arc; + +use futures_util::{Stream, StreamExt}; +use postgres_from_row::FromRow; +use postgres_types::{FromSql, ToSql}; +use tokio_postgres::{Error, Row}; + +pub fn query<'a>(query: impl ToString) -> QueryBuilder<'a> { + QueryBuilder::new(query) +} + +#[derive(Default)] +pub struct QueryBuilder<'a> { + query: String, + params: Vec>, +} + +impl<'args> QueryBuilder<'args> { + pub fn new(query: impl ToString) -> Self { + Self { + query: query.to_string(), + params: Vec::new(), + } + } + + pub fn push_bind(&mut self, param: impl ToSql + Send + Sync + 'args) -> &mut Self { + self.params.push(Box::new(param)); + self.query.push_str(format!("${}", self.params.len()).as_str()); + self + } + + pub fn bind(&mut self, param: impl ToSql + Send + Sync + 'args) -> &mut Self { + self.params.push(Box::new(param)); + self + } + + pub fn push(&mut self, query: impl AsRef) -> &mut Self { + self.query.push_str(query.as_ref()); + self + } + + pub fn separated(&mut self, sep: &'args str) -> Separated<'_, 'args> { + Separated { + sep: sep.as_ref(), + first: true, + query_builder: self, + } + } + + pub fn push_tuples( + &mut self, + tuples: impl IntoIterator, + mut f: impl FnMut(Separated<'_, 'args>, T), + ) -> &mut Self { + self.push(" ("); + + let mut separated = self.separated(","); + + for tuple in tuples { + separated.push("("); + + f(separated.query_builder.separated(", "), tuple); + + separated.push_unseparated(")"); + } + + separated.push_unseparated(")"); + + separated.query_builder + } + + pub fn push_values( + &mut self, + values: impl IntoIterator, + mut f: impl FnMut(Separated<'_, 'args>, T), + ) -> &mut Self { + self.push("VALUES "); + + let mut separated = self.separated(","); + + for value in values { + separated.push("("); + + f(separated.query_builder.separated(", "), value); + + separated.push_unseparated(")"); + } + + separated.query_builder + } + + pub fn build(&self) -> Query<'_, NoParse, Row> { + Query { + query: &self.query, + params: &self.params, + _marker: std::marker::PhantomData, + } + } + + pub fn build_query_as(&self) -> Query<'_, FromRowParse, T> { + Query { + query: &self.query, + params: &self.params, + _marker: std::marker::PhantomData, + } + } + + pub fn build_query_scalar(&self) -> Query<'_, ScalarParse, T> { + Query { + query: &self.query, + params: &self.params, + _marker: std::marker::PhantomData, + } + } + + pub fn build_query_single_scalar FromSql<'a>>(&self) -> Query<'_, SingleScalarParse, T> { + Query { + query: &self.query, + params: &self.params, + _marker: std::marker::PhantomData, + } + } + + pub fn sql(&self) -> &str { + self.query.as_str() + } +} + +pub struct ScalarParse(std::marker::PhantomData); + +pub struct SingleScalarParse(std::marker::PhantomData); + +pub struct FromRowParse(std::marker::PhantomData); +pub struct NoParse; + +impl RowParse for ScalarParse { + type Item = T; + + #[inline] + fn try_from_row(row: Row) -> Result { + T::from_row(&row) + } +} + +impl RowParse for SingleScalarParse +where + T: for<'a> FromSql<'a>, +{ + type Item = T; + + #[inline] + fn try_from_row(row: Row) -> Result { + row.try_get(0) + } +} + +impl RowParse for FromRowParse { + type Item = T; + + #[inline] + fn try_from_row(row: Row) -> Result { + T::try_from_row(&row) + } +} + +impl RowParse for NoParse { + type Item = Row; + + #[inline] + fn try_from_row(row: Row) -> Result { + Ok(row) + } +} + +pub trait RowParse { + type Item; + + fn try_from_row(row: Row) -> Result; +} + +pub struct Query<'a, T: RowParse, O> { + query: &'a str, + params: &'a [Box], + _marker: std::marker::PhantomData<(T, O)>, +} + +fn params<'a>(params: &'a [Box]) -> Vec<&'a (dyn ToSql + Sync)> { + params.iter().map(|param| param.as_ref() as _).collect() +} + +pub enum Client<'a, C> { + Owned(C), + Borrowed(&'a C), +} + +impl> std::ops::Deref for Client<'_, C> { + type Target = tokio_postgres::Client; + + fn deref(&self) -> &Self::Target { + match self { + Client::Owned(client) => client.as_ref(), + Client::Borrowed(client) => client.as_ref(), + } + } +} + +#[allow(async_fn_in_trait)] +pub trait IntoClient { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError>; +} + +struct ClientWrapper<'a>(&'a tokio_postgres::Client); + +impl AsRef for ClientWrapper<'_> { + fn as_ref(&self) -> &tokio_postgres::Client { + self.0 + } +} + +struct TransactionWrapper<'a>(&'a tokio_postgres::Transaction<'a>); + +impl AsRef for TransactionWrapper<'_> { + fn as_ref(&self) -> &tokio_postgres::Client { + self.0.client() + } +} + +struct PoolClientWrapperOwned(deadpool_postgres::Client); + +impl AsRef for PoolClientWrapperOwned { + fn as_ref(&self) -> &tokio_postgres::Client { + self.0.as_ref() + } +} + +struct PoolClientWrapperBorrowed<'a>(&'a deadpool_postgres::Client); + +impl AsRef for PoolClientWrapperBorrowed<'_> { + fn as_ref(&self) -> &tokio_postgres::Client { + self.0.as_ref() + } +} + +struct PoolTransactionWrapper<'a>(&'a deadpool_postgres::Transaction<'a>); + +impl AsRef for PoolTransactionWrapper<'_> { + fn as_ref(&self) -> &tokio_postgres::Client { + self.0.client() + } +} + +impl IntoClient for tokio_postgres::Client { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + Ok(ClientWrapper(self)) + } +} + +impl IntoClient for tokio_postgres::Transaction<'_> { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + Ok(TransactionWrapper(self)) + } +} + +impl IntoClient for deadpool_postgres::Pool { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + Ok(PoolClientWrapperOwned(self.get().await?)) + } +} + +impl IntoClient for deadpool_postgres::Client { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + Ok(PoolClientWrapperBorrowed(self)) + } +} + +impl IntoClient for deadpool_postgres::Transaction<'_> { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + Ok(PoolTransactionWrapper(self)) + } +} + +impl IntoClient for Arc { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + self.as_ref().get_client().await + } +} + +impl IntoClient for &T { + async fn get_client(&self) -> Result + '_, deadpool_postgres::PoolError> { + (*self).get_client().await + } +} + +impl, O> Query<'_, T, O> { + pub async fn execute(self, conn: impl IntoClient) -> Result { + Ok(conn + .get_client() + .await? + .as_ref() + .execute(self.query, ¶ms(self.params)) + .await?) + } + + pub async fn fetch_all(self, conn: impl IntoClient) -> Result, deadpool_postgres::PoolError> { + Ok(conn + .get_client() + .await? + .as_ref() + .query(self.query, ¶ms(self.params)) + .await? + .into_iter() + .map(T::try_from_row) + .collect::>()?) + } + + pub async fn fetch_one(self, conn: impl IntoClient) -> Result { + Ok(T::try_from_row( + conn.get_client() + .await? + .as_ref() + .query_one(self.query, ¶ms(self.params)) + .await?, + )?) + } + + pub async fn fetch_optional(self, conn: impl IntoClient) -> Result, deadpool_postgres::PoolError> { + Ok(conn + .get_client() + .await? + .as_ref() + .query_opt(self.query, ¶ms(self.params)) + .await? + .map(T::try_from_row) + .transpose()?) + } + + pub async fn fetch_many( + self, + conn: impl IntoClient, + ) -> Result> + Send + Sync, deadpool_postgres::PoolError> { + Ok(conn + .get_client() + .await? + .as_ref() + .query_raw(self.query, params(self.params).into_iter()) + .await? + .map(|row| Ok(T::try_from_row(row?)?))) + } +} + +pub trait SqlScalar { + fn from_row(row: &Row) -> Result + where + Self: Sized; +} + +macro_rules! impl_sql_scalar { + ($($ty:ident),*) => { + #[allow(unused_parens)] + impl<$($ty),*> SqlScalar for ($($ty),*,) + where + $($ty: for<'a> FromSql<'a>),* + { + #[allow(non_snake_case)] + #[allow(unused_assignments)] + fn from_row(row: &Row) -> Result { + let mut i = 0; + $( + let $ty = row.try_get::<_, $ty>(i)?; + i += 1; + )* + + Ok(($($ty),*,)) + } + } + }; +} + +macro_rules! impl_recursive { + // Match for a single type + ($ty:ident) => { + impl_sql_scalar!($ty); + }; + + // Match for multiple types + ($first:ident, $($rest:ident),+) => { + // Recursively call for the rest of the types + impl_sql_scalar!($first, $($rest),*); + impl_recursive!($($rest),*); + }; +} + +impl_recursive!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T16); + +pub struct Separated<'b, 'args> { + sep: &'b str, + first: bool, + query_builder: &'b mut QueryBuilder<'args>, +} + +impl<'args> Separated<'_, 'args> { + pub fn push_bind(&mut self, param: impl ToSql + Send + Sync + 'args) -> &mut Self { + if self.first { + self.first = false; + } else { + self.query_builder.push(self.sep); + } + + self.query_builder.push_bind(param); + self + } + + pub fn push(&mut self, query: impl AsRef) -> &mut Self { + if self.first { + self.first = false; + } else { + self.query_builder.push(self.sep); + } + + self.query_builder.push(query.as_ref()); + self + } + + pub fn push_unseparated(&mut self, query: impl AsRef) -> &mut Self { + self.query_builder.push(query.as_ref()); + self + } + + pub fn push_bind_unseparated(&mut self, param: impl ToSql + Send + Sync + 'args) -> &mut Self { + self.query_builder.push_bind(param); + self + } +} diff --git a/common/src/database/ulid.rs b/common/src/database/ulid.rs deleted file mode 100644 index 8055fb1f..00000000 --- a/common/src/database/ulid.rs +++ /dev/null @@ -1,74 +0,0 @@ -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[repr(transparent)] -pub struct Ulid(pub ulid::Ulid); - -impl sqlx::postgres::PgHasArrayType for Ulid { - fn array_compatible(ty: &sqlx_postgres::PgTypeInfo) -> bool { - ::array_compatible(ty) - } - - fn array_type_info() -> sqlx_postgres::PgTypeInfo { - ::array_type_info() - } -} - -impl Default for Ulid { - fn default() -> Self { - Self(ulid::Ulid::nil()) - } -} - -impl std::fmt::Display for Ulid { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.to_string().fmt(f) - } -} - -impl std::fmt::Debug for Ulid { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl sqlx::Decode<'_, sqlx::Postgres> for Ulid { - fn decode(value: sqlx::postgres::PgValueRef<'_>) -> Result> { - let id = >::decode(value)?; - Ok(Ulid(ulid::Ulid::from(id))) - } -} - -impl sqlx::Encode<'_, sqlx::Postgres> for Ulid { - fn encode_by_ref(&self, buf: &mut sqlx::postgres::PgArgumentBuffer) -> sqlx::encode::IsNull { - >::encode_by_ref(&self.0.into(), buf) - } -} - -impl sqlx::Type for Ulid { - fn type_info() -> sqlx::postgres::PgTypeInfo { - >::type_info() - } -} - -impl From for ulid::Ulid { - fn from(id: Ulid) -> Self { - id.0 - } -} - -impl From for Ulid { - fn from(id: ulid::Ulid) -> Self { - Ulid(id) - } -} - -impl From for Ulid { - fn from(id: uuid::Uuid) -> Self { - Ulid(ulid::Ulid::from(id)) - } -} - -impl From for uuid::Uuid { - fn from(id: Ulid) -> Self { - id.0.into() - } -} diff --git a/common/src/dataloader/mod.rs b/common/src/dataloader/mod.rs index da9c32c8..8af86ce7 100644 --- a/common/src/dataloader/mod.rs +++ b/common/src/dataloader/mod.rs @@ -129,14 +129,14 @@ impl, S: Send + Sync + Default + BuildHasher + 'static> DataLoader< } #[inline(always)] - pub async fn load_many(&self, keys: impl Iterator) -> LoaderOutput { + pub async fn load_many(&self, keys: impl IntoIterator) -> LoaderOutput { self.load_many_with_cache(NoCache, keys).await } pub async fn load_many_with_cache>( &self, mut cache: C, - keys: impl Iterator, + keys: impl IntoIterator, ) -> LoaderOutput { let mut results = HashMap::default(); diff --git a/common/src/global.rs b/common/src/global.rs index 4a996bbd..ccb91fc8 100644 --- a/common/src/global.rs +++ b/common/src/global.rs @@ -25,8 +25,9 @@ pub trait GlobalNats { fn jetstream(&self) -> &async_nats::jetstream::Context; } +#[allow(async_fn_in_trait)] pub trait GlobalDb { - fn db(&self) -> &Arc; + fn db(&self) -> &Arc; } pub trait GlobalRedis { diff --git a/platform/api/Cargo.toml b/platform/api/Cargo.toml index d6388878..ab2f8348 100644 --- a/platform/api/Cargo.toml +++ b/platform/api/Cargo.toml @@ -14,8 +14,6 @@ common = { workspace = true, features = ["default"] } rustls = "0.22" rustls-pemfile = "2.0" tokio-rustls = "0.25" -sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls", "json", "chrono", "uuid"] } -sqlx-postgres = "0.7" path-tree = "0.7" serde_json = "1.0" reqwest = { version = "0.11", features = ["json", "rustls-tls"], default-features = false} @@ -52,6 +50,8 @@ http-body-util = "0.1" hyper-util = "0.1" pin-project = "1.1" base64 = "0.21" +postgres-from-row = "0.5" +postgres-types = "0.2" config = { workspace = true } pb = { workspace = true } diff --git a/platform/api/src/api/auth.rs b/platform/api/src/api/auth.rs index ae01d8e3..a8189782 100644 --- a/platform/api/src/api/auth.rs +++ b/platform/api/src/api/auth.rs @@ -64,7 +64,7 @@ impl AuthData { let mut user_roles: Vec = global .role_by_id_loader() - .load_many(user.roles.iter().map(|i| i.0)) + .load_many(user.roles.clone()) .await .map_err(|_| AuthError::FetchRoles)? .into_values() @@ -97,7 +97,7 @@ impl AuthData { pub async fn from_session(global: &Arc, session: Session) -> Result { let user = global .user_by_id_loader() - .load(session.user_id.0) + .load(session.user_id) .await .map_err(|_| AuthError::FetchUser)? .ok_or(AuthError::UserNotFound)?; diff --git a/platform/api/src/api/error.rs b/platform/api/src/api/error.rs index f7347e1c..dc1d3dc7 100644 --- a/platform/api/src/api/error.rs +++ b/platform/api/src/api/error.rs @@ -18,5 +18,11 @@ pub enum ApiError { #[error("failed to query turnstile: {0}")] Turnstile(#[from] TurnstileError), #[error("failed to query database: {0}")] - Database(#[from] sqlx::Error), + Database(#[from] common::database::deadpool_postgres::PoolError), +} + +impl From for ApiError { + fn from(value: common::database::tokio_postgres::Error) -> Self { + Self::Database(value.into()) + } } diff --git a/platform/api/src/api/jwt.rs b/platform/api/src/api/jwt.rs index 3ab2c114..e9058b59 100644 --- a/platform/api/src/api/jwt.rs +++ b/platform/api/src/api/jwt.rs @@ -121,8 +121,8 @@ impl JwtState for AuthJwtPayload { impl From for AuthJwtPayload { fn from(session: Session) -> Self { AuthJwtPayload { - user_id: session.user_id.0, - session_id: session.id.0, + user_id: session.user_id, + session_id: session.id, expiration: Some(session.expires_at), issued_at: Ulid::from(session.id).datetime().into(), not_before: None, diff --git a/platform/api/src/api/request_context.rs b/platform/api/src/api/request_context.rs index 6ee388dc..733f6a42 100644 --- a/platform/api/src/api/request_context.rs +++ b/platform/api/src/api/request_context.rs @@ -38,7 +38,7 @@ impl RequestContext { if !auth.session.is_valid() { Err(AuthError::SessionExpired) } else if inner.websocket { - let auth = AuthData::from_session_id(global, auth.session.id.0).await?; + let auth = AuthData::from_session_id(global, auth.session.id).await?; if auth.session.is_valid() { self.set_auth(auth.clone()).await; Ok(Some(auth)) diff --git a/platform/api/src/api/v1/gql/error.rs b/platform/api/src/api/v1/gql/error.rs index 56a26a17..22e90f28 100644 --- a/platform/api/src/api/v1/gql/error.rs +++ b/platform/api/src/api/v1/gql/error.rs @@ -37,7 +37,7 @@ pub enum GqlError { InternalServerError(&'static str), /// A database error occurred. #[error("database error: {0}")] - Sqlx(#[from] Arc), + Database(#[from] Arc), /// The input was invalid. #[error("invalid input for {fields:?}: {message}")] InvalidInput { @@ -73,6 +73,12 @@ pub enum GqlError { Subscription(#[from] Arc), } +impl From for GqlError { + fn from(value: common::database::tokio_postgres::Error) -> Self { + Self::Database(Arc::new(value.into())) + } +} + macro_rules! impl_arc_from { ($err:ty) => { impl From<$err> for GqlError { @@ -83,7 +89,7 @@ macro_rules! impl_arc_from { }; } -impl_arc_from!(sqlx::Error); +impl_arc_from!(common::database::deadpool_postgres::PoolError); impl_arc_from!(turnstile::TurnstileError); impl_arc_from!(async_nats::PublishError); impl_arc_from!(SubscriptionManagerError); @@ -93,7 +99,7 @@ impl GqlError { matches!( self, GqlError::InternalServerError(_) - | GqlError::Sqlx(_) + | GqlError::Database(_) | GqlError::Turnstile(_) | GqlError::Publish(_) | GqlError::Subscription(_) @@ -122,7 +128,7 @@ impl GqlError { GqlError::Turnstile(_) => "Turnstile", GqlError::Publish(_) => "Publish", GqlError::InternalServerError(_) => "InternalServerError", - GqlError::Sqlx(_) => "Sqlx", + GqlError::Database(_) => "Database", GqlError::VideoApi(_) => "VideoApi", GqlError::Subscription(_) => "Subscription", } diff --git a/platform/api/src/api/v1/gql/models/category.rs b/platform/api/src/api/v1/gql/models/category.rs index a64f493b..1dabc809 100644 --- a/platform/api/src/api/v1/gql/models/category.rs +++ b/platform/api/src/api/v1/gql/models/category.rs @@ -15,7 +15,7 @@ pub struct Category { impl From for Category { fn from(value: database::Category) -> Self { Self { - id: value.id.0.into(), + id: value.id.into(), name: value.name, revision: value.revision, updated_at: value.updated_at.into(), diff --git a/platform/api/src/api/v1/gql/models/channel.rs b/platform/api/src/api/v1/gql/models/channel.rs index c62f6664..1e53f1b5 100644 --- a/platform/api/src/api/v1/gql/models/channel.rs +++ b/platform/api/src/api/v1/gql/models/channel.rs @@ -1,6 +1,5 @@ use async_graphql::{ComplexObject, Context, SimpleObject}; use chrono::Utc; -use common::database::Ulid; use jwt_next::SignWithKey; use super::category::Category; @@ -58,7 +57,7 @@ impl Channel { async fn followers_count(&self, ctx: &Context<'_>) -> Result { let global = ctx.get_global::(); - let (followers,) = sqlx::query_as( + let followers = common::database::query( r#" SELECT COUNT(*) @@ -69,8 +68,9 @@ impl Channel { AND following = true "#, ) - .bind(self.id.to_uuid()) - .fetch_one(global.db().as_ref()) + .bind(self.id.to_ulid()) + .build_query_single_scalar() + .fetch_one(global.db()) .await .map_err_gql("failed to fetch followers")?; @@ -125,12 +125,13 @@ impl ChannelLive { .await .map_err_gql("failed to fetch playback session count")?; - sqlx::query( + common::database::query( "UPDATE users SET channel_live_viewer_count = $1, channel_live_viewer_count_updated_at = NOW() WHERE id = $2", ) .bind(live_viewer_count) - .bind(Ulid::from(self.channel_id)) - .execute(global.db().as_ref()) + .bind(self.channel_id) + .build() + .execute(global.db()) .await .map_err_gql("failed to update live viewer count")?; @@ -197,18 +198,18 @@ impl From for Channel { fn from(value: database::Channel) -> Self { let stream_key_ = value.get_stream_key(); Self { - id: value.id.0.into(), + id: value.id.into(), title: value.title, description: value.description, - links: value.links.0, - custom_thumbnail_id: value.custom_thumbnail_id.map(|v| Into::into(v.0)), - offline_banner_id: value.offline_banner_id.map(|v| Into::into(v.0)), - category_id: value.category_id.map(|v| Into::into(v.0)), + links: value.links, + custom_thumbnail_id: value.custom_thumbnail_id.map(Into::into), + offline_banner_id: value.offline_banner_id.map(Into::into), + category_id: value.category_id.map(Into::into), live: value.active_connection_id.map(|_| ChannelLive { - room_id: value.room_id.0.into(), + room_id: value.room_id.into(), live_viewer_count_: value.live_viewer_count, live_viewer_count_updated_at_: value.live_viewer_count_updated_at.map(DateRFC3339), - channel_id: value.id.0, + channel_id: value.id, _phantom: std::marker::PhantomData, }), last_live_at: value.last_live_at.map(DateRFC3339), diff --git a/platform/api/src/api/v1/gql/models/chat_message.rs b/platform/api/src/api/v1/gql/models/chat_message.rs index 3d896650..b51cd1e4 100644 --- a/platform/api/src/api/v1/gql/models/chat_message.rs +++ b/platform/api/src/api/v1/gql/models/chat_message.rs @@ -64,9 +64,9 @@ impl ChatMessage { impl From for ChatMessage { fn from(model: database::ChatMessage) -> Self { Self { - id: model.id.0.into(), - channel_id: model.channel_id.0.into(), - user_id: model.user_id.0.into(), + id: model.id.into(), + channel_id: model.channel_id.into(), + user_id: model.user_id.into(), content: model.content, r#type: MessageType::User, _phantom: std::marker::PhantomData, diff --git a/platform/api/src/api/v1/gql/models/image_upload.rs b/platform/api/src/api/v1/gql/models/image_upload.rs index c44c053f..6023f197 100644 --- a/platform/api/src/api/v1/gql/models/image_upload.rs +++ b/platform/api/src/api/v1/gql/models/image_upload.rs @@ -55,8 +55,8 @@ impl ImageUpload { return Ok(None); } - if let Some(uploaded_file_metadata::Metadata::Image(image)) = uploaded_file.metadata.0.metadata { - Ok(Some(Self::new(uploaded_file.id.0, image))) + if let Some(uploaded_file_metadata::Metadata::Image(image)) = uploaded_file.metadata.metadata { + Ok(Some(Self::new(uploaded_file.id, image))) } else { Err(GqlError::InternalServerError("uploaded file is not an image").into()) } diff --git a/platform/api/src/api/v1/gql/models/role.rs b/platform/api/src/api/v1/gql/models/role.rs index 1b9969a4..de1049c6 100644 --- a/platform/api/src/api/v1/gql/models/role.rs +++ b/platform/api/src/api/v1/gql/models/role.rs @@ -16,8 +16,8 @@ pub struct Role { impl From for Role { fn from(value: database::Role) -> Self { Self { - id: value.id.0.into(), - channel_id: value.channel_id.map(|v| v.0.into()), + id: value.id.into(), + channel_id: value.channel_id.map(Into::into), name: value.name, description: value.description, allowed_permissions: value.allowed_permissions.bits(), diff --git a/platform/api/src/api/v1/gql/models/user.rs b/platform/api/src/api/v1/gql/models/user.rs index 0afd3357..2803c83b 100644 --- a/platform/api/src/api/v1/gql/models/user.rs +++ b/platform/api/src/api/v1/gql/models/user.rs @@ -75,17 +75,17 @@ impl User { impl From for User { fn from(value: database::User) -> Self { Self { - id: value.id.0.into(), + id: value.id.into(), username: value.username, display_name: value.display_name, display_color: value.display_color.into(), channel: value.channel.into(), - pending_profile_picture_id: value.pending_profile_picture_id.map(|u| u.0.into()), + pending_profile_picture_id: value.pending_profile_picture_id.map(Into::into), email_: value.email, email_verified_: value.email_verified, last_login_at_: value.last_login_at.into(), totp_enabled_: value.totp_enabled, - profile_picture_: value.profile_picture_id.map(|u| u.0), + profile_picture_: value.profile_picture_id, } } } diff --git a/platform/api/src/api/v1/gql/mutations/auth.rs b/platform/api/src/api/v1/gql/mutations/auth.rs index 891abb0d..a46af816 100644 --- a/platform/api/src/api/v1/gql/mutations/auth.rs +++ b/platform/api/src/api/v1/gql/mutations/auth.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use async_graphql::{Context, Object, Union}; use chrono::{Duration, Utc}; -use common::database::{TraitProtobuf, Ulid}; use pb::scuffle::platform::internal::two_fa::two_fa_request_action::{Action, Login}; use pb::scuffle::platform::internal::two_fa::TwoFaRequestAction; use prost::Message; @@ -91,7 +90,7 @@ impl AuthMutation { if user.totp_enabled { let request_id = ulid::Ulid::new(); - sqlx::query( + common::database::query( r#" INSERT INTO two_fa_requests ( id, @@ -103,7 +102,7 @@ impl AuthMutation { $3 )"#, ) - .bind(Ulid::from(request_id)) + .bind(request_id) .bind(user.id) .bind( TwoFaRequestAction { @@ -111,7 +110,8 @@ impl AuthMutation { } .encode_to_vec(), ) - .execute(global.db().as_ref()) + .build() + .execute(global.db()) .await?; Ok(TwoFaResponse::TwoFaRequest(TwoFaRequest { id: request_id.into() })) } else { @@ -129,9 +129,9 @@ impl AuthMutation { } Ok(TwoFaResponse::Success(Session { - id: session.id.0.into(), + id: session.id.into(), token, - user_id: session.user_id.0.into(), + user_id: session.user_id.into(), expires_at: session.expires_at.into(), last_used_at: session.last_used_at.into(), })) @@ -149,7 +149,7 @@ impl AuthMutation { let request_context = ctx.get_req_context(); // TODO: Make this a dataloader - let request: database::TwoFaRequest = sqlx::query_as( + let request: database::TwoFaRequest = common::database::query( r#" SELECT * @@ -159,8 +159,9 @@ impl AuthMutation { id = $1 "#, ) - .bind(Ulid::from(id.to_ulid())) - .fetch_optional(global.db().as_ref()) + .bind(id.to_ulid()) + .build_query_as() + .fetch_optional(global.db()) .await? .ok_or(GqlError::NotFound("2fa request"))?; @@ -179,7 +180,7 @@ impl AuthMutation { .into()); } - sqlx::query( + common::database::query( r#" DELETE FROM two_fa_requests @@ -188,10 +189,11 @@ impl AuthMutation { "#, ) .bind(request.id) - .execute(global.db().as_ref()) + .build() + .execute(global.db()) .await?; - match request.action.into_inner().action { + match request.action.action { Some(Action::Login(action)) => { let update_context = action.update_context; let session = action.execute(global, user.id).await?; @@ -207,9 +209,9 @@ impl AuthMutation { } Ok(Some(TwoFaRequestFulfillResponse::Login(Session { - id: session.id.0.into(), + id: session.id.into(), token, - user_id: session.user_id.0.into(), + user_id: session.user_id.into(), expires_at: session.expires_at.into(), last_used_at: session.last_used_at.into(), }))) @@ -240,7 +242,7 @@ impl AuthMutation { })?; // TODO: maybe look to batch this - let session: database::Session = sqlx::query_as( + let session: database::Session = common::database::query( r#" UPDATE user_sessions @@ -252,8 +254,9 @@ impl AuthMutation { * "#, ) - .bind(Ulid::from(jwt.session_id)) - .fetch_optional(global.db().as_ref()) + .bind(jwt.session_id) + .build_query_as() + .fetch_optional(global.db()) .await? .map_err_gql(GqlError::InvalidInput { fields: vec!["sessionToken"], @@ -271,9 +274,9 @@ impl AuthMutation { } Ok(Session { - id: session.id.0.into(), + id: session.id.into(), token: session_token, - user_id: session.user_id.0.into(), + user_id: session.user_id.into(), expires_at: session.expires_at.into(), last_used_at: session.last_used_at.into(), }) @@ -347,10 +350,12 @@ impl AuthMutation { // TODO: what do we do when the next step fails? delete the room again? - let mut tx = global.db().begin().await?; + let mut client = global.db().get().await?; + + let tx = client.transaction().await?; // TODO: maybe look to batch this - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" INSERT INTO users ( id, @@ -373,22 +378,23 @@ impl AuthMutation { ) RETURNING * "#, ) - .bind(Ulid::from(ulid::Ulid::new())) + .bind(ulid::Ulid::new()) .bind(username) .bind(display_name) .bind(database::User::generate_display_color()) .bind(database::User::hash_password(&password)) .bind(email) - .bind(Ulid::from(channel_room_id)) + .bind(channel_room_id) .bind(res.stream_key) - .fetch_one(&mut *tx) + .build_query_as() + .fetch_one(&tx) .await?; let login_duration = validity.unwrap_or(60 * 60 * 24 * 7); // 7 days let expires_at = Utc::now() + Duration::seconds(login_duration as i64); // TODO: maybe look to batch this - let session: database::Session = sqlx::query_as( + let session: database::Session = common::database::query( r#" INSERT INTO user_sessions ( id, @@ -401,10 +407,11 @@ impl AuthMutation { ) RETURNING * "#, ) - .bind(Ulid::from(ulid::Ulid::new())) + .bind(ulid::Ulid::new()) .bind(user.id) .bind(expires_at) - .fetch_one(&mut *tx) + .build_query_as() + .fetch_one(&tx) .await?; let jwt = AuthJwtPayload::from(session.clone()); @@ -412,6 +419,7 @@ impl AuthMutation { let token = jwt.serialize(global).map_err_gql("failed to serialize JWT")?; tx.commit().await?; + drop(client); // We need to update the request context with the new session if update_context.unwrap_or(true) { @@ -431,9 +439,9 @@ impl AuthMutation { } Ok(Session { - id: session.id.0.into(), + id: session.id.into(), token, - user_id: session.user_id.0.into(), + user_id: session.user_id.into(), expires_at: session.expires_at.into(), last_used_at: session.last_used_at.into(), }) @@ -465,11 +473,10 @@ impl AuthMutation { .map_err_gql(GqlError::Auth(AuthError::NotLoggedIn))? .session .id - .0 }; // TODO: maybe look to batch this - sqlx::query( + common::database::query( r#" DELETE FROM user_sessions @@ -477,8 +484,9 @@ impl AuthMutation { id = $1 "#, ) - .bind(Ulid::from(session_id)) - .execute(global.db().as_ref()) + .bind(session_id) + .build() + .execute(global.db()) .await?; if session_token.is_none() { diff --git a/platform/api/src/api/v1/gql/mutations/channel.rs b/platform/api/src/api/v1/gql/mutations/channel.rs index ff25c2a2..8acacf78 100644 --- a/platform/api/src/api/v1/gql/mutations/channel.rs +++ b/platform/api/src/api/v1/gql/mutations/channel.rs @@ -29,7 +29,7 @@ impl ChannelMutation { .await? .map_err_gql(GqlError::Auth(AuthError::NotLoggedIn))?; - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users SET @@ -42,17 +42,16 @@ impl ChannelMutation { ) .bind(title.clone()) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; - let channel_id = user.id.0; - global .nats() .publish( - SubscriptionTopic::ChannelTitle(channel_id), + SubscriptionTopic::ChannelTitle(user.id), pb::scuffle::platform::internal::events::ChannelTitle { - channel_id: Some(channel_id.into()), + channel_id: Some(user.id.into()), title, } .encode_to_vec() diff --git a/platform/api/src/api/v1/gql/mutations/chat.rs b/platform/api/src/api/v1/gql/mutations/chat.rs index 73ae9a4a..b5360963 100644 --- a/platform/api/src/api/v1/gql/mutations/chat.rs +++ b/platform/api/src/api/v1/gql/mutations/chat.rs @@ -2,7 +2,6 @@ use async_graphql::{Context, Object}; use prost::Message; use tracing::error; use ulid::Ulid; -use uuid::Uuid; use crate::api::auth::AuthError; use crate::api::v1::gql::error::ext::*; @@ -42,7 +41,7 @@ impl ChatMutation { // TODO: Check if the user is allowed to send messages in this chat let message_id = Ulid::new(); - let chat_message: database::ChatMessage = sqlx::query_as( + let chat_message: database::ChatMessage = common::database::query( r#" INSERT INTO chat_messages ( id, @@ -57,11 +56,12 @@ impl ChatMutation { ) RETURNING * "#, ) - .bind(Uuid::from(message_id)) + .bind(message_id) .bind(auth.session.user_id) - .bind(channel_id.to_uuid()) + .bind(channel_id.to_ulid()) .bind(content.clone()) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; match global diff --git a/platform/api/src/api/v1/gql/mutations/user.rs b/platform/api/src/api/v1/gql/mutations/user.rs index 2b232b20..9d544d59 100644 --- a/platform/api/src/api/v1/gql/mutations/user.rs +++ b/platform/api/src/api/v1/gql/mutations/user.rs @@ -1,6 +1,5 @@ use async_graphql::{ComplexObject, Context, SimpleObject}; use bytes::Bytes; -use common::database::Ulid; use pb::scuffle::platform::internal::two_fa::two_fa_request_action::{Action, ChangePassword}; use pb::scuffle::platform::internal::two_fa::TwoFaRequestAction; use prost::Message; @@ -51,7 +50,7 @@ impl UserMutation { .await? .map_err_gql(GqlError::Auth(AuthError::NotLoggedIn))?; - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users SET @@ -65,7 +64,8 @@ impl UserMutation { ) .bind(email) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; Ok(user.into()) @@ -88,7 +88,7 @@ impl UserMutation { // TDOD: Can we combine the two queries into one? let user: database::User = global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user"))?; @@ -102,7 +102,7 @@ impl UserMutation { .into()); } - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users SET @@ -117,15 +117,16 @@ impl UserMutation { .bind(display_name.clone()) .bind(auth.session.user_id) .bind(user.username) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; global .nats() .publish( - SubscriptionTopic::UserDisplayName(user.id.0), + SubscriptionTopic::UserDisplayName(user.id), pb::scuffle::platform::internal::events::UserDisplayName { - user_id: Some(user.id.0.into()), + user_id: Some(user.id.into()), display_name, } .encode_to_vec() @@ -151,7 +152,7 @@ impl UserMutation { .await? .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users SET @@ -164,15 +165,16 @@ impl UserMutation { ) .bind(*color) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; global .nats() .publish( - SubscriptionTopic::UserDisplayColor(user.id.0), + SubscriptionTopic::UserDisplayColor(user.id), pb::scuffle::platform::internal::events::UserDisplayColor { - user_id: Some(user.id.0.into()), + user_id: Some(user.id.into()), display_color: *color, } .encode_to_vec() @@ -194,19 +196,20 @@ impl UserMutation { .await? .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( "UPDATE users SET profile_picture_id = NULL, pending_profile_picture_id = NULL WHERE id = $1 RETURNING *", ) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; global .nats() .publish( - SubscriptionTopic::UserProfilePicture(user.id.0), + SubscriptionTopic::UserProfilePicture(user.id), pb::scuffle::platform::internal::events::UserProfilePicture { - user_id: Some(user.id.0.into()), + user_id: Some(user.id.into()), profile_picture_id: None, } .encode_to_vec() @@ -234,7 +237,7 @@ impl UserMutation { let user = global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user"))?; @@ -249,12 +252,12 @@ impl UserMutation { let change_password = ChangePassword { new_password_hash: database::User::hash_password(&new_password), - current_session_id: Some(auth.session.id.0.into()), + current_session_id: Some(auth.session.id.into()), }; if user.totp_enabled { let request_id = ulid::Ulid::new(); - sqlx::query( + common::database::query( r#" INSERT INTO two_fa_requests ( id, @@ -267,7 +270,7 @@ impl UserMutation { ) "#, ) - .bind(Ulid::from(request_id)) + .bind(request_id) .bind(user.id) .bind( TwoFaRequestAction { @@ -275,7 +278,8 @@ impl UserMutation { } .encode_to_vec(), ) - .execute(global.db().as_ref()) + .build() + .execute(global.db()) .await?; Ok(TwoFaResponse::TwoFaRequest(TwoFaRequest { id: request_id.into() })) } else { @@ -299,7 +303,7 @@ impl UserMutation { .await? .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; - if auth.session.user_id.0 == channel_id.to_ulid() { + if auth.session.user_id == channel_id.to_ulid() { return Err(GqlError::InvalidInput { fields: vec!["channelId"], message: "Cannot follow yourself", @@ -307,7 +311,7 @@ impl UserMutation { .into()); } - sqlx::query( + common::database::query( r#" UPSERT INTO channel_user ( user_id, @@ -321,18 +325,19 @@ impl UserMutation { "#, ) .bind(auth.session.user_id) - .bind(channel_id.to_uuid()) + .bind(channel_id.to_ulid()) .bind(follow) - .execute(global.db().as_ref()) + .build() + .execute(global.db()) .await?; let channel_id = channel_id.to_ulid(); - let user_subject = SubscriptionTopic::UserFollows(auth.session.user_id.0); + let user_subject = SubscriptionTopic::UserFollows(auth.session.user_id); let channel_subject = SubscriptionTopic::ChannelFollows(channel_id); let msg = Bytes::from( pb::scuffle::platform::internal::events::UserFollowChannel { - user_id: Some(auth.session.user_id.0.into()), + user_id: Some(auth.session.user_id.into()), channel_id: Some(channel_id.into()), following: follow, } diff --git a/platform/api/src/api/v1/gql/mutations/user/two_fa.rs b/platform/api/src/api/v1/gql/mutations/user/two_fa.rs index 21d71b0c..c498ab60 100644 --- a/platform/api/src/api/v1/gql/mutations/user/two_fa.rs +++ b/platform/api/src/api/v1/gql/mutations/user/two_fa.rs @@ -34,7 +34,7 @@ impl TwoFaMutation { let user: database::User = global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user"))?; @@ -67,7 +67,7 @@ impl TwoFaMutation { let hex_backup_codes = backup_codes.iter().map(|c| format!("{:08x}", c)).collect(); // Save secret and backup codes to database. - sqlx::query( + common::database::query( r#" UPDATE users @@ -82,7 +82,8 @@ impl TwoFaMutation { .bind(secret) .bind(backup_codes) .bind(auth.session.user_id) - .execute(global.db().as_ref()) + .build() + .execute(global.db()) .await?; let qr_code = totp.get_qr_base64().map_err_ignored_gql("failed generate qr code")?; @@ -105,7 +106,7 @@ impl TwoFaMutation { let user: database::User = global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user"))?; @@ -129,7 +130,7 @@ impl TwoFaMutation { } // Enable 2fa - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users @@ -142,7 +143,8 @@ impl TwoFaMutation { "#, ) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; // TODO: Log out all other sessions? @@ -162,7 +164,7 @@ impl TwoFaMutation { let user: database::User = global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user"))?; @@ -177,7 +179,7 @@ impl TwoFaMutation { } // Disable 2fa, remove secret and backup codes. - let user: database::User = sqlx::query_as( + let user: database::User = common::database::query( r#" UPDATE users SET @@ -191,7 +193,8 @@ impl TwoFaMutation { "#, ) .bind(auth.session.user_id) - .fetch_one(global.db().as_ref()) + .build_query_as() + .fetch_one(global.db()) .await?; Ok(user.into()) diff --git a/platform/api/src/api/v1/gql/queries/category.rs b/platform/api/src/api/v1/gql/queries/category.rs index 303778a5..5016495b 100644 --- a/platform/api/src/api/v1/gql/queries/category.rs +++ b/platform/api/src/api/v1/gql/queries/category.rs @@ -3,10 +3,10 @@ use async_graphql::{Context, Object, SimpleObject}; use crate::api::v1::gql::error::ext::*; use crate::api::v1::gql::error::Result; use crate::api::v1::gql::ext::ContextExt; +use crate::api::v1::gql::models; use crate::api::v1::gql::models::category::Category; use crate::api::v1::gql::models::search_result::SearchResult; use crate::api::v1::gql::models::ulid::GqlUlid; -use crate::api::v1::gql::models::{self}; use crate::database; use crate::global::ApiGlobal; @@ -61,11 +61,12 @@ impl CategoryQuery { ) -> Result { let global = ctx.get_global::(); - let categories: Vec> = sqlx::query_as("SELECT categories.*, similarity(name, $1), COUNT(*) OVER() AS total_count FROM categories WHERE name % $1 ORDER BY similarity DESC LIMIT $2 OFFSET $3") + let categories: Vec> = common::database::query("SELECT categories.*, similarity(name, $1), COUNT(*) OVER() AS total_count FROM categories WHERE name % $1 ORDER BY similarity DESC LIMIT $2 OFFSET $3") .bind(query) .bind(limit.unwrap_or(5)) .bind(offset.unwrap_or(0)) - .fetch_all(global.db().as_ref()) + .build_query_as() + .fetch_all(global.db()) .await .map_err_gql("failed to search categories")?; diff --git a/platform/api/src/api/v1/gql/queries/mod.rs b/platform/api/src/api/v1/gql/queries/mod.rs index 972deb8a..e354e31a 100644 --- a/platform/api/src/api/v1/gql/queries/mod.rs +++ b/platform/api/src/api/v1/gql/queries/mod.rs @@ -1,6 +1,6 @@ use async_graphql::{ComplexObject, Context, SimpleObject}; -use common::database::Ulid; -use sqlx::FromRow; +use postgres_from_row::FromRow; +use ulid::Ulid; use super::error::ext::*; use super::ext::ContextExt; @@ -48,7 +48,7 @@ impl Query { ) -> Result> { let global = ctx.get_global::(); - let query_results: Vec = sqlx::query_as( + let query_results: Vec = common::database::query( r#" WITH CombinedResults AS ( SELECT @@ -84,7 +84,8 @@ impl Query { .bind(query) .bind(limit.unwrap_or(5)) .bind(offset.unwrap_or(0)) - .fetch_all(global.db().as_ref()) + .build_query_as() + .fetch_all(global.db()) .await .map_err_gql("failed to search")?; @@ -96,7 +97,7 @@ impl Query { 1 => &mut store.1, _ => unreachable!(), } - .push(item.id.0); + .push(item.id); store }); @@ -111,8 +112,8 @@ impl Query { .iter() .filter_map(|r| { let object = match r.r#type { - 0 => SearchAllResultData::User(Box::new(User::from(users.get(&r.id.0)?.clone()))), - 1 => SearchAllResultData::Category(categories.get(&r.id.0)?.clone().into()), + 0 => SearchAllResultData::User(Box::new(User::from(users.get(&r.id)?.clone()))), + 1 => SearchAllResultData::Category(categories.get(&r.id)?.clone().into()), _ => unreachable!(), }; diff --git a/platform/api/src/api/v1/gql/queries/user.rs b/platform/api/src/api/v1/gql/queries/user.rs index 9d4d42d0..a38eaab1 100644 --- a/platform/api/src/api/v1/gql/queries/user.rs +++ b/platform/api/src/api/v1/gql/queries/user.rs @@ -48,7 +48,7 @@ impl UserQuery { global .user_by_id_loader() - .load(auth.session.user_id.0) + .load(auth.session.user_id) .await .map_err_ignored_gql("failed to fetch user")? .map_err_gql(GqlError::NotFound("user")) @@ -93,16 +93,17 @@ impl UserQuery { &self, ctx: &Context<'_>, #[graphql(desc = "The search query.")] query: String, - #[graphql(desc = "The result limit, default: 5", validator(minimum = 0, maximum = 50))] limit: Option, - #[graphql(desc = "The result offset, default: 0", validator(minimum = 0, maximum = 950))] offset: Option, + #[graphql(desc = "The result limit, default: 5", validator(minimum = 0, maximum = 50))] limit: Option, + #[graphql(desc = "The result offset, default: 0", validator(minimum = 0, maximum = 950))] offset: Option, ) -> Result> { let global = ctx.get_global::(); - let users: Vec> = sqlx::query_as("SELECT users.*, similarity(username, $1), COUNT(*) OVER() AS total_count FROM users WHERE username % $1 ORDER BY similarity DESC LIMIT $2 OFFSET $3") + let users: Vec> = common::database::query("SELECT users.*, similarity(username, $1), COUNT(*) OVER() AS total_count FROM users WHERE username % $1 ORDER BY similarity DESC LIMIT $2 OFFSET $3") .bind(query) .bind(limit.unwrap_or(5)) .bind(offset.unwrap_or(0)) - .fetch_all(global.db().as_ref()) + .build_query_as() + .fetch_all(global.db()) .await .map_err_gql("failed to search users")?; @@ -119,7 +120,7 @@ impl UserQuery { .await? .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; - let (is_following,): (bool,) = sqlx::query_as( + let is_following = common::database::query( r#" SELECT following @@ -131,10 +132,11 @@ impl UserQuery { "#, ) .bind(auth.session.user_id) - .bind(channel_id.to_uuid()) - .fetch_optional(global.db().as_ref()) + .bind(channel_id.to_ulid()) + .build_query_single_scalar::() + .fetch_optional(global.db()) .await? - .unwrap_or((false,)); + .unwrap_or_default(); Ok(is_following) } @@ -154,12 +156,12 @@ impl UserQuery { .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; // TODO: Also allow users with permission - if id.to_ulid() != auth.session.user_id.0 { + if id.to_ulid() != auth.session.user_id { return Err(GqlError::Unauthorized { field: "following" }.into()); } // This query is not very good, we should have some paging mechinsm with ids. - let channels: Vec = sqlx::query_as( + let channels: Vec = common::database::query( r#" SELECT users.* @@ -178,9 +180,10 @@ impl UserQuery { LIMIT $2 "#, ) - .bind(id.to_uuid()) + .bind(id.to_ulid()) .bind(limit.map(|l| l as i64)) - .fetch_all(global.db().as_ref()) + .build_query_as() + .fetch_all(global.db()) .await?; Ok(channels.into_iter().map(Into::into).collect()) diff --git a/platform/api/src/api/v1/gql/subscription/channel.rs b/platform/api/src/api/v1/gql/subscription/channel.rs index 732fd999..61516e8f 100644 --- a/platform/api/src/api/v1/gql/subscription/channel.rs +++ b/platform/api/src/api/v1/gql/subscription/channel.rs @@ -49,7 +49,7 @@ impl ChannelSubscription { .ok_or(GqlError::Auth(AuthError::NotLoggedIn))?; // TODO: allow other users with permissions - if auth.session.user_id.0 != channel_id.to_ulid() { + if auth.session.user_id != channel_id.to_ulid() { return Err(GqlError::Unauthorized { field: "channel_follows", } @@ -88,7 +88,7 @@ impl ChannelSubscription { let stream = self.channel_follows(ctx, channel_id).await?; - let (mut followers,) = sqlx::query_as( + let mut followers = common::database::query( r#" SELECT COUNT(*) @@ -99,8 +99,9 @@ impl ChannelSubscription { AND following = true "#, ) - .bind(channel_id.to_uuid()) - .fetch_one(global.db().as_ref()) + .bind(channel_id.to_ulid()) + .build_query_single_scalar() + .fetch_one(global.db()) .await?; Ok(stream.map(move |value| { diff --git a/platform/api/src/api/v1/gql/subscription/chat.rs b/platform/api/src/api/v1/gql/subscription/chat.rs index 086cba5e..1b60acaa 100644 --- a/platform/api/src/api/v1/gql/subscription/chat.rs +++ b/platform/api/src/api/v1/gql/subscription/chat.rs @@ -52,12 +52,13 @@ impl ChatSubscription { // load old messages not older than 10 minutes, max 100 messages let not_older_than = chrono::Utc::now() - chrono::Duration::minutes(10); let not_older_than = ulid::Ulid::from_parts(not_older_than.timestamp() as u64, u128::MAX); - let messages: Vec = sqlx::query_as( + let messages: Vec = common::database::query( "SELECT * FROM chat_messages WHERE channel_id = $1 AND deleted_at IS NULL AND id >= $2 ORDER BY id LIMIT 100", ) - .bind(common::database::Ulid::from(channel_id.to_ulid())) - .bind(common::database::Ulid::from(not_older_than)) - .fetch_all(global.db().as_ref()) + .bind(channel_id.to_ulid()) + .bind(not_older_than) + .build_query_as() + .fetch_all(global.db()) .await .map_err_gql("failed to fetch chat messages")?; diff --git a/platform/api/src/api/v1/gql/subscription/file.rs b/platform/api/src/api/v1/gql/subscription/file.rs index 2d335e0d..38b4fa2f 100644 --- a/platform/api/src/api/v1/gql/subscription/file.rs +++ b/platform/api/src/api/v1/gql/subscription/file.rs @@ -71,7 +71,7 @@ impl FileSubscription { FileStatus::Success }; yield Ok(FileStatusStream { - file_id: file.id.0.into(), + file_id: file.id.into(), status, reason: file.failed, // TODO: we don't have access to the friendly message here because it isn't in the db diff --git a/platform/api/src/api/v1/gql/subscription/user.rs b/platform/api/src/api/v1/gql/subscription/user.rs index 78bbeed0..76f0e3d1 100644 --- a/platform/api/src/api/v1/gql/subscription/user.rs +++ b/platform/api/src/api/v1/gql/subscription/user.rs @@ -164,7 +164,7 @@ impl UserSubscription { let profile_picture = if let Some(profile_picture_id) = profile_picture_id { global .uploaded_file_by_id_loader() - .load(profile_picture_id.0) + .load(profile_picture_id) .await .map_err_ignored_gql("failed to fetch profile picture")? .map(ImageUpload::from_uploaded_file) @@ -231,7 +231,7 @@ impl UserSubscription { Ok(async_stream::stream!({ if let Some(channel_id) = channel_id { - let (is_following,): (bool,) = sqlx::query_as( + let is_following = common::database::query( r#" SELECT following @@ -243,11 +243,12 @@ impl UserSubscription { "#, ) .bind(auth.session.user_id) - .bind(channel_id.to_uuid()) - .fetch_optional(global.db().as_ref()) + .bind(channel_id.to_ulid()) + .build_query_single_scalar::() + .fetch_optional(global.db()) .await .map_err_gql("failed to fetch channel_user")? - .unwrap_or((false,)); + .unwrap_or_default(); yield Ok(FollowStream { user_id: user_id.into(), diff --git a/platform/api/src/api/v1/upload/profile_picture.rs b/platform/api/src/api/v1/upload/profile_picture.rs index 2a455890..cca332a8 100644 --- a/platform/api/src/api/v1/upload/profile_picture.rs +++ b/platform/api/src/api/v1/upload/profile_picture.rs @@ -173,27 +173,32 @@ impl UploadType for ProfilePicture { image_format.ext() ); - let mut tx = global + let mut client = global .db() - .begin() + .get() .await - .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to begin transaction"))?; + .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to get database connection"))?; + let tx = client + .transaction() + .await + .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to start transaction"))?; - sqlx::query("INSERT INTO image_jobs (id, priority, task) VALUES ($1, $2, $3)") - .bind(common::database::Ulid(file_id)) + common::database::query("INSERT INTO image_jobs (id, priority, task) VALUES ($1, $2, $3)") + .bind(file_id) .bind(config.profile_picture_task_priority) .bind(common::database::Protobuf(create_task( file_id, &input_path, config, - auth.session.user_id.0, + auth.session.user_id, ))) - .execute(tx.as_mut()) + .build() + .execute(&tx) .await .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to insert image job"))?; - sqlx::query("INSERT INTO uploaded_files(id, owner_id, uploader_id, name, type, metadata, total_size, pending, path) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)") - .bind(common::database::Ulid(file_id)) // id + common::database::query("INSERT INTO uploaded_files(id, owner_id, uploader_id, name, type, metadata, total_size, pending, path) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)") + .bind(file_id) // id .bind(auth.session.user_id) // owner_id .bind(auth.session.user_id) // uploader_id .bind(name.unwrap_or_else(|| format!("untitled.{}", image_format.ext()))) // name @@ -206,15 +211,17 @@ impl UploadType for ProfilePicture { .bind(file.len() as i64) // total_size .bind(true) // pending .bind(&input_path) // path - .execute(tx.as_mut()) + .build() + .execute(&tx) .await .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to insert uploaded file"))?; if self.set_active { - sqlx::query("UPDATE users SET pending_profile_picture_id = $1 WHERE id = $2") - .bind(common::database::Ulid(file_id)) + common::database::query("UPDATE users SET pending_profile_picture_id = $1 WHERE id = $2") + .bind(file_id) .bind(auth.session.user_id) - .execute(tx.as_mut()) + .build() + .execute(&tx) .await .map_err_route((StatusCode::INTERNAL_SERVER_ERROR, "failed to update user"))?; } diff --git a/platform/api/src/database/category.rs b/platform/api/src/database/category.rs index be26d2da..a2fdc1d0 100644 --- a/platform/api/src/database/category.rs +++ b/platform/api/src/database/category.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; -use common::database::Ulid; +use ulid::Ulid; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct Category { pub id: Ulid, pub name: String, diff --git a/platform/api/src/database/channel.rs b/platform/api/src/database/channel.rs index f1e24ad8..597a46fe 100644 --- a/platform/api/src/database/channel.rs +++ b/platform/api/src/database/channel.rs @@ -1,55 +1,56 @@ use async_graphql::SimpleObject; use chrono::{DateTime, Utc}; -use common::database::Ulid; +use common::database::json; +use ulid::Ulid; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct Channel { /// Ulid of the channel pub id: Ulid, /// Video room id - #[sqlx(rename = "channel_room_id")] + #[from_row(rename = "channel_room_id")] pub room_id: Ulid, /// Active connection id - #[sqlx(rename = "channel_active_connection_id")] + #[from_row(rename = "channel_active_connection_id")] pub active_connection_id: Option, /// The current stream's title - #[sqlx(rename = "channel_title")] + #[from_row(rename = "channel_title")] pub title: Option, /// The current stream's live viewer count - #[sqlx(rename = "channel_live_viewer_count")] + #[from_row(rename = "channel_live_viewer_count")] pub live_viewer_count: Option, /// The time the current stream's live viewer count was last updated - #[sqlx(rename = "channel_live_viewer_count_updated_at")] + #[from_row(rename = "channel_live_viewer_count_updated_at")] pub live_viewer_count_updated_at: Option>, /// The current stream's description - #[sqlx(rename = "channel_description")] + #[from_row(rename = "channel_description")] pub description: Option, /// The social links - #[sqlx(rename = "channel_links")] - pub links: sqlx::types::Json>, + #[from_row(rename = "channel_links", from_fn = "json")] + pub links: Vec, /// The current stream's thumbnail - #[sqlx(rename = "channel_custom_thumbnail_id")] + #[from_row(rename = "channel_custom_thumbnail_id")] pub custom_thumbnail_id: Option, /// The offline banner of the channel - #[sqlx(rename = "channel_offline_banner_id")] + #[from_row(rename = "channel_offline_banner_id")] pub offline_banner_id: Option, /// The current stream's category - #[sqlx(rename = "channel_category_id")] + #[from_row(rename = "channel_category_id")] pub category_id: Option, /// Channel stream key - #[sqlx(rename = "channel_stream_key")] + #[from_row(rename = "channel_stream_key")] pub stream_key: Option, /// Channel roles order - #[sqlx(rename = "channel_role_order")] + #[from_row(rename = "channel_role_order")] pub role_order: Vec, /// Channel default permissions - #[sqlx(rename = "channel_default_permissions")] + #[from_row(rename = "channel_default_permissions")] pub default_permissions: i64, /// Channel permissions for followers - #[sqlx(rename = "channel_following_permission")] + #[from_row(rename = "channel_following_permission")] pub following_permission: i64, /// The time the channel was last live - #[sqlx(rename = "channel_last_live_at")] + #[from_row(rename = "channel_last_live_at")] pub last_live_at: Option>, } diff --git a/platform/api/src/database/chat_message.rs b/platform/api/src/database/chat_message.rs index b6186c2a..5f27c08e 100644 --- a/platform/api/src/database/chat_message.rs +++ b/platform/api/src/database/chat_message.rs @@ -1,6 +1,6 @@ -use common::database::Ulid; +use ulid::Ulid; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct ChatMessage { /// The unique identifier for the chat message. pub id: Ulid, @@ -17,9 +17,9 @@ pub struct ChatMessage { impl ChatMessage { pub fn to_protobuf(&self) -> pb::scuffle::platform::internal::events::ChatMessage { pb::scuffle::platform::internal::events::ChatMessage { - id: Some(self.id.0.into()), - channel_id: Some(self.channel_id.0.into()), - user_id: Some(self.user_id.0.into()), + id: Some(self.id.into()), + channel_id: Some(self.channel_id.into()), + user_id: Some(self.user_id.into()), content: self.content.clone(), } } diff --git a/platform/api/src/database/file_type.rs b/platform/api/src/database/file_type.rs index af05e759..2332dea9 100644 --- a/platform/api/src/database/file_type.rs +++ b/platform/api/src/database/file_type.rs @@ -1,13 +1,15 @@ -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, sqlx::Type)] +use postgres_types::{FromSql, ToSql}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ToSql, FromSql)] pub enum FileType { - #[sqlx(rename = "custom_thumbnail")] + #[postgres(name = "custom_thumbnail")] CustomThumbnail, - #[sqlx(rename = "profile_picture")] + #[postgres(name = "profile_picture")] ProfilePicture, - #[sqlx(rename = "offline_banner")] + #[postgres(name = "offline_banner")] OfflineBanner, - #[sqlx(rename = "role_badge")] + #[postgres(name = "role_badge")] RoleBadge, - #[sqlx(rename = "channel_role_badge")] + #[postgres(name = "channel_role_badge")] ChannelRoleBadge, } diff --git a/platform/api/src/database/global_state.rs b/platform/api/src/database/global_state.rs index ba0c6122..0def5554 100644 --- a/platform/api/src/database/global_state.rs +++ b/platform/api/src/database/global_state.rs @@ -1,8 +1,8 @@ -use common::database::Ulid; +use ulid::Ulid; use super::RolePermission; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct GlobalState { pub role_order: Vec, pub default_permissions: RolePermission, diff --git a/platform/api/src/database/role.rs b/platform/api/src/database/role.rs index 82554d17..e93fc253 100644 --- a/platform/api/src/database/role.rs +++ b/platform/api/src/database/role.rs @@ -1,7 +1,7 @@ use bitmask_enum::bitmask; -use common::database::Ulid; +use ulid::Ulid; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] /// A role that can be granted to a user. /// Roles can allow or deny permissions to a user. pub struct Role { @@ -33,15 +33,29 @@ pub enum RolePermission { UploadProfilePicture, } -impl sqlx::Decode<'_, sqlx::Postgres> for RolePermission { - fn decode(value: sqlx::postgres::PgValueRef<'_>) -> Result> { - >::decode(value).map(Self::from) +impl<'a> postgres_types::FromSql<'a> for RolePermission { + fn accepts(ty: &postgres_types::Type) -> bool { + ::accepts(ty) + } + + fn from_sql(ty: &postgres_types::Type, raw: &'a [u8]) -> Result> { + ::from_sql(ty, raw).map(Self::from) } } -impl sqlx::Type for RolePermission { - fn type_info() -> sqlx::postgres::PgTypeInfo { - >::type_info() +impl postgres_types::ToSql for RolePermission { + postgres_types::to_sql_checked!(); + + fn to_sql( + &self, + ty: &postgres_types::Type, + out: &mut bytes::BytesMut, + ) -> Result> { + ::to_sql(&self.bits(), ty, out) + } + + fn accepts(ty: &postgres_types::Type) -> bool { + ::accepts(ty) } } diff --git a/platform/api/src/database/search_result.rs b/platform/api/src/database/search_result.rs index f0035ee2..0ea00096 100644 --- a/platform/api/src/database/search_result.rs +++ b/platform/api/src/database/search_result.rs @@ -1,7 +1,7 @@ -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct SearchResult { /// The category. - #[sqlx(flatten)] + #[from_row(flatten)] pub object: T, /// The similarity of the search query to the category's name. pub similarity: f64, diff --git a/platform/api/src/database/session.rs b/platform/api/src/database/session.rs index 19b50164..f6c09c0b 100644 --- a/platform/api/src/database/session.rs +++ b/platform/api/src/database/session.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; -use common::database::Ulid; +use ulid::Ulid; -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct Session { /// The unique identifier for the session. pub id: Ulid, diff --git a/platform/api/src/database/two_fa_request.rs b/platform/api/src/database/two_fa_request.rs index b086a246..984bcc47 100644 --- a/platform/api/src/database/two_fa_request.rs +++ b/platform/api/src/database/two_fa_request.rs @@ -1,19 +1,22 @@ use std::sync::Arc; use chrono::{Duration, Utc}; -use common::database::{Protobuf, Ulid}; +use common::database::protobuf; use pb::ext::UlidExt; use pb::scuffle::platform::internal::two_fa::two_fa_request_action::{ChangePassword, Login}; use pb::scuffle::platform::internal::two_fa::TwoFaRequestAction; +use ulid::Ulid; use super::{Session, User}; use crate::global::ApiGlobal; -#[derive(Debug, Clone, sqlx::FromRow)] +#[derive(Debug, Clone, postgres_from_row::FromRow)] pub struct TwoFaRequest { pub id: Ulid, pub user_id: Ulid, - pub action: Protobuf, + + #[from_row(from_fn = "protobuf")] + pub action: TwoFaRequestAction, } #[allow(async_fn_in_trait)] @@ -24,15 +27,16 @@ pub trait TwoFaRequestActionTrait { } impl TwoFaRequestActionTrait for Login { - type Result = sqlx::Result; + type Result = Result; async fn execute(self, global: &Arc, user_id: Ulid) -> Self::Result { let expires_at = Utc::now() + Duration::seconds(self.login_duration as i64); // TODO: maybe look to batch this - let mut tx = global.db().begin().await?; + let mut client = global.db().get().await?; + let tx = client.transaction().await?; - let session: Session = sqlx::query_as( + let session = common::database::query( r#" INSERT INTO user_sessions ( id, @@ -48,10 +52,11 @@ impl TwoFaRequestActionTrait for Login { .bind(Ulid::from(ulid::Ulid::new())) .bind(user_id) .bind(expires_at) - .fetch_one(tx.as_mut()) + .build_query_as() + .fetch_one(&tx) .await?; - sqlx::query( + common::database::query( r#" UPDATE users SET @@ -60,7 +65,8 @@ impl TwoFaRequestActionTrait for Login { "#, ) .bind(user_id) - .execute(tx.as_mut()) + .build() + .execute(&tx) .await?; tx.commit().await?; @@ -70,12 +76,13 @@ impl TwoFaRequestActionTrait for Login { } impl TwoFaRequestActionTrait for ChangePassword { - type Result = sqlx::Result<()>; + type Result = Result<(), common::database::deadpool_postgres::PoolError>; - async fn execute(self, global: &Arc, user_id: Ulid) -> sqlx::Result<()> { - let mut tx = global.db().begin().await?; + async fn execute(self, global: &Arc, user_id: Ulid) -> Self::Result { + let mut client = global.db().get().await?; + let tx = client.transaction().await?; - let user: User = sqlx::query_as( + let user: User = common::database::query( r#" UPDATE users @@ -88,11 +95,12 @@ impl TwoFaRequestActionTrait for ChangePassword { ) .bind(self.new_password_hash) .bind(user_id) - .fetch_one(tx.as_mut()) + .build_query_as() + .fetch_one(&tx) .await?; // Delete all sessions except current - sqlx::query( + common::database::query( r#" DELETE FROM user_sessions @@ -102,8 +110,9 @@ impl TwoFaRequestActionTrait for ChangePassword { "#, ) .bind(user.id) - .bind(Ulid::from(self.current_session_id.into_ulid())) - .execute(tx.as_mut()) + .bind(self.current_session_id.into_ulid()) + .build() + .execute(&tx) .await?; tx.commit().await?; diff --git a/platform/api/src/database/uploaded_file.rs b/platform/api/src/database/uploaded_file.rs index 37c923e0..6534c448 100644 --- a/platform/api/src/database/uploaded_file.rs +++ b/platform/api/src/database/uploaded_file.rs @@ -1,16 +1,18 @@ -use common::database::{Protobuf, Ulid}; +use common::database::protobuf; +use ulid::Ulid; use super::FileType; -#[derive(Debug, Clone, sqlx::FromRow)] +#[derive(Debug, Clone, postgres_from_row::FromRow)] pub struct UploadedFile { pub id: Ulid, pub owner_id: Ulid, pub uploader_id: Ulid, pub name: String, - #[sqlx(rename = "type")] + #[from_row(rename = "type")] pub ty: FileType, - pub metadata: Protobuf, + #[from_row(from_fn = "protobuf")] + pub metadata: pb::scuffle::platform::internal::types::UploadedFileMetadata, pub total_size: i64, pub pending: bool, pub path: String, diff --git a/platform/api/src/database/user.rs b/platform/api/src/database/user.rs index f2e230d3..8967b6f3 100644 --- a/platform/api/src/database/user.rs +++ b/platform/api/src/database/user.rs @@ -2,8 +2,8 @@ use argon2::password_hash::rand_core::OsRng; use argon2::password_hash::SaltString; use argon2::{Argon2, PasswordHash, PasswordHasher, PasswordVerifier}; use chrono::{DateTime, Utc}; -use common::database::Ulid; use rand::Rng; +use ulid::Ulid; use super::Channel; @@ -17,7 +17,7 @@ pub enum TotpError { Generate, } -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct User { /// The unique identifier for the user. pub id: Ulid, @@ -52,7 +52,7 @@ pub struct User { pub roles: Vec, /// Channel - #[sqlx(flatten)] + #[from_row(flatten)] pub channel: Channel, } diff --git a/platform/api/src/dataloader/category.rs b/platform/api/src/dataloader/category.rs index 4a461bb2..3b092555 100644 --- a/platform/api/src/dataloader/category.rs +++ b/platform/api/src/dataloader/category.rs @@ -2,16 +2,15 @@ use std::sync::Arc; use common::dataloader::{DataLoader, Loader, LoaderOutput}; use ulid::Ulid; -use uuid::Uuid; use crate::database::Category; pub struct CategoryByIdLoader { - db: Arc, + db: Arc, } impl CategoryByIdLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -22,14 +21,15 @@ impl Loader for CategoryByIdLoader { type Value = Category; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM categories WHERE id = ANY($1)") - .bind(keys.iter().copied().map(Uuid::from).collect::>()) - .fetch_all(self.db.as_ref()) + let results: Vec = common::database::query("SELECT * FROM categories WHERE id = ANY($1)") + .bind(keys) + .build_query_as() + .fetch_all(&self.db) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch categories by id"); })?; - Ok(results.into_iter().map(|r| (r.id.0, r)).collect()) + Ok(results.into_iter().map(|r| (r.id, r)).collect()) } } diff --git a/platform/api/src/dataloader/global_state.rs b/platform/api/src/dataloader/global_state.rs index 5282bc21..5548a1b4 100644 --- a/platform/api/src/dataloader/global_state.rs +++ b/platform/api/src/dataloader/global_state.rs @@ -6,11 +6,11 @@ use common::dataloader::{DataLoader, Loader, LoaderOutput}; use crate::database::GlobalState; pub struct GlobalStateLoader { - db: Arc, + db: Arc, } impl GlobalStateLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -21,8 +21,9 @@ impl Loader for GlobalStateLoader { type Value = GlobalState; async fn load(&self, _: &[Self::Key]) -> LoaderOutput { - let state = sqlx::query_as("SELECT * FROM global_state") - .fetch_one(self.db.as_ref()) + let state = common::database::query("SELECT * FROM global_state") + .build_query_as() + .fetch_one(&self.db) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch global state"); diff --git a/platform/api/src/dataloader/role.rs b/platform/api/src/dataloader/role.rs index 7eff9da8..bfb1c92b 100644 --- a/platform/api/src/dataloader/role.rs +++ b/platform/api/src/dataloader/role.rs @@ -2,16 +2,15 @@ use std::sync::Arc; use common::dataloader::{DataLoader, Loader, LoaderOutput}; use ulid::Ulid; -use uuid::Uuid; use crate::database::Role; pub struct RoleByIdLoader { - db: Arc, + db: Arc, } impl RoleByIdLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -22,14 +21,15 @@ impl Loader for RoleByIdLoader { type Value = Role; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM roles WHERE id = ANY($1)") - .bind(keys.iter().copied().map(Uuid::from).collect::>()) + let results: Vec = common::database::query("SELECT * FROM roles WHERE id = ANY($1)") + .bind(keys) + .build_query_as() .fetch_all(self.db.as_ref()) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch roles"); })?; - Ok(results.into_iter().map(|r| (r.id.0, r)).collect()) + Ok(results.into_iter().map(|r| (r.id, r)).collect()) } } diff --git a/platform/api/src/dataloader/session.rs b/platform/api/src/dataloader/session.rs index 4c95ddcd..024e5d01 100644 --- a/platform/api/src/dataloader/session.rs +++ b/platform/api/src/dataloader/session.rs @@ -2,16 +2,15 @@ use std::sync::Arc; use common::dataloader::{DataLoader, Loader, LoaderOutput}; use ulid::Ulid; -use uuid::Uuid; use crate::database::Session; pub struct SessionByIdLoader { - db: Arc, + db: Arc, } impl SessionByIdLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -22,14 +21,15 @@ impl Loader for SessionByIdLoader { type Value = Session; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM user_sessions WHERE id = ANY($1)") - .bind(keys.iter().copied().map(Uuid::from).collect::>()) + let results: Vec = common::database::query("SELECT * FROM user_sessions WHERE id = ANY($1)") + .bind(keys) + .build_query_as() .fetch_all(self.db.as_ref()) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch sessions"); })?; - Ok(results.into_iter().map(|r| (r.id.0, r)).collect()) + Ok(results.into_iter().map(|r| (r.id, r)).collect()) } } diff --git a/platform/api/src/dataloader/uploaded_file.rs b/platform/api/src/dataloader/uploaded_file.rs index 7df07f14..9820e23a 100644 --- a/platform/api/src/dataloader/uploaded_file.rs +++ b/platform/api/src/dataloader/uploaded_file.rs @@ -6,11 +6,11 @@ use ulid::Ulid; use crate::database::UploadedFile; pub struct UploadedFileByIdLoader { - db: Arc, + db: Arc, } impl UploadedFileByIdLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -21,14 +21,15 @@ impl Loader for UploadedFileByIdLoader { type Value = UploadedFile; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM uploaded_files WHERE id = ANY($1)") - .bind(keys.iter().copied().map(common::database::Ulid).collect::>()) + let results: Vec = common::database::query("SELECT * FROM uploaded_files WHERE id = ANY($1)") + .bind(keys) + .build_query_as() .fetch_all(self.db.as_ref()) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch users by username"); })?; - Ok(results.into_iter().map(|r| (r.id.0, r)).collect()) + Ok(results.into_iter().map(|r| (r.id, r)).collect()) } } diff --git a/platform/api/src/dataloader/user.rs b/platform/api/src/dataloader/user.rs index b59adc2b..591d9f12 100644 --- a/platform/api/src/dataloader/user.rs +++ b/platform/api/src/dataloader/user.rs @@ -2,16 +2,15 @@ use std::sync::Arc; use common::dataloader::{DataLoader, Loader, LoaderOutput}; use ulid::Ulid; -use uuid::Uuid; use crate::database::User; pub struct UserByUsernameLoader { - db: Arc, + db: Arc, } impl UserByUsernameLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -22,8 +21,9 @@ impl Loader for UserByUsernameLoader { type Value = User; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM users WHERE username = ANY($1)") + let results: Vec = common::database::query("SELECT * FROM users WHERE username = ANY($1)") .bind(keys) + .build_query_as() .fetch_all(self.db.as_ref()) .await .map_err(|e| { @@ -35,11 +35,11 @@ impl Loader for UserByUsernameLoader { } pub struct UserByIdLoader { - db: Arc, + db: Arc, } impl UserByIdLoader { - pub fn new(db: Arc) -> DataLoader { + pub fn new(db: Arc) -> DataLoader { DataLoader::new(Self { db }) } } @@ -50,14 +50,15 @@ impl Loader for UserByIdLoader { type Value = User; async fn load(&self, keys: &[Self::Key]) -> LoaderOutput { - let results: Vec = sqlx::query_as("SELECT * FROM users WHERE id = ANY($1)") - .bind(keys.iter().copied().map(Uuid::from).collect::>()) + let results: Vec = common::database::query("SELECT * FROM users WHERE id = ANY($1)") + .bind(keys) + .build_query_as() .fetch_all(self.db.as_ref()) .await .map_err(|e| { tracing::error!(err = %e, "failed to fetch users by id"); })?; - Ok(results.into_iter().map(|r| (r.id.0, r)).collect()) + Ok(results.into_iter().map(|r| (r.id, r)).collect()) } } diff --git a/platform/api/src/image_upload_callback.rs b/platform/api/src/image_upload_callback.rs index 213d3e3d..9fa43de0 100644 --- a/platform/api/src/image_upload_callback.rs +++ b/platform/api/src/image_upload_callback.rs @@ -68,18 +68,20 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { }; tracing::debug!("received profile picture job result: {:?}", job_result); - let mut tx = global.db().begin().await.context("failed to begin transaction")?; + let mut client = global.db().get().await.context("failed to get db connection")?; + let tx = client.transaction().await.context("failed to start transaction")?; match job_result { processed_image::Result::Success(processed_image::Success { variants }) => { - let uploaded_file: UploadedFile = match sqlx::query_as("UPDATE uploaded_files SET pending = FALSE, metadata = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") + let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET pending = FALSE, metadata = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") .bind(common::database::Protobuf(UploadedFileMetadata { metadata: Some(uploaded_file_metadata::Metadata::Image(uploaded_file_metadata::Image { versions: variants, })), })) - .bind(common::database::Ulid(job_id.into_ulid())) - .fetch_optional(tx.as_mut()) + .bind(job_id.into_ulid()) + .build_query_as() + .fetch_optional(&tx) .await .context("failed to get uploaded file")? { Some(uploaded_file) => uploaded_file, @@ -93,21 +95,22 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { global .nats() .publish( - SubscriptionTopic::UploadedFileStatus(uploaded_file.id.0), + SubscriptionTopic::UploadedFileStatus(uploaded_file.id), pb::scuffle::platform::internal::events::UploadedFileStatus { - file_id: Some(uploaded_file.id.0.into()), + file_id: Some(uploaded_file.id.into()), status: Some(pb::scuffle::platform::internal::events::uploaded_file_status::Status::Success(pb::scuffle::platform::internal::events::uploaded_file_status::Success {})), }.encode_to_vec().into(), ) .await .context("failed to publish file update event")?; - let user_updated = sqlx::query("UPDATE users SET profile_picture_id = $1, pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $2 AND pending_profile_picture_id = $1") + let user_updated = common::database::query("UPDATE users SET profile_picture_id = $1, pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $2 AND pending_profile_picture_id = $1") .bind(uploaded_file.id) .bind(uploaded_file.owner_id) - .execute(tx.as_mut()) + .build() + .execute(&tx) .await - .context("failed to update user")?.rows_affected() == 1; + .context("failed to update user")? == 1; if let Err(err) = tx.commit().await.context("failed to commit transaction") { tracing::warn!(error = %err, "failed to commit transaction"); @@ -119,10 +122,10 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { global .nats() .publish( - SubscriptionTopic::UserProfilePicture(uploaded_file.owner_id.0), + SubscriptionTopic::UserProfilePicture(uploaded_file.owner_id), pb::scuffle::platform::internal::events::UserProfilePicture { - user_id: Some(uploaded_file.owner_id.0.into()), - profile_picture_id: Some(uploaded_file.id.0.into()), + user_id: Some(uploaded_file.owner_id.into()), + profile_picture_id: Some(uploaded_file.id.into()), }.encode_to_vec().into(), ) .await @@ -130,10 +133,11 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { } }, processed_image::Result::Failure(processed_image::Failure { reason, friendly_message }) => { - let uploaded_file: UploadedFile = match sqlx::query_as("UPDATE uploaded_files SET pending = FALSE, failed = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") + let uploaded_file: UploadedFile = match common::database::query("UPDATE uploaded_files SET pending = FALSE, failed = $1, updated_at = NOW() WHERE id = $2 AND pending = TRUE RETURNING *") .bind(reason.clone()) - .bind(common::database::Ulid(job_id.into_ulid())) - .fetch_optional(tx.as_mut()) + .bind(job_id.into_ulid()) + .build_query_as() + .fetch_optional(&tx) .await .context("failed to get uploaded file")? { Some(uploaded_file) => uploaded_file, @@ -147,9 +151,9 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { global .nats() .publish( - SubscriptionTopic::UploadedFileStatus(uploaded_file.id.0), + SubscriptionTopic::UploadedFileStatus(uploaded_file.id), pb::scuffle::platform::internal::events::UploadedFileStatus { - file_id: Some(uploaded_file.id.0.into()), + file_id: Some(uploaded_file.id.into()), status: Some(pb::scuffle::platform::internal::events::uploaded_file_status::Status::Failure(pb::scuffle::platform::internal::events::uploaded_file_status::Failure { reason, friendly_message, @@ -159,10 +163,11 @@ pub async fn run(global: Arc) -> anyhow::Result<()> { .await .context("failed to publish file update event")?; - sqlx::query("UPDATE users SET pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $1 AND pending_profile_picture_id = $2") + common::database::query("UPDATE users SET pending_profile_picture_id = NULL, updated_at = NOW() WHERE id = $1 AND pending_profile_picture_id = $2") .bind(uploaded_file.owner_id) .bind(uploaded_file.id) - .execute(tx.as_mut()) + .build() + .execute(&tx) .await .context("failed to update user")?; diff --git a/platform/api/src/main.rs b/platform/api/src/main.rs index a3e06a2e..4d356a3c 100644 --- a/platform/api/src/main.rs +++ b/platform/api/src/main.rs @@ -80,7 +80,7 @@ struct GlobalState { config: AppConfig, nats: async_nats::Client, jetstream: async_nats::jetstream::Context, - db: Arc, + db: Arc, category_by_id_loader: DataLoader, global_state_loader: DataLoader, diff --git a/platform/api/src/video_event_handler.rs b/platform/api/src/video_event_handler.rs index 70726d07..0b9a729e 100644 --- a/platform/api/src/video_event_handler.rs +++ b/platform/api/src/video_event_handler.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use anyhow::Context; -use common::database::Ulid; use pb::scuffle::video::v1::types::{event, Event}; use pb::scuffle::video::v1::{EventsAckRequest, EventsFetchRequest}; use prost::Message; @@ -63,19 +62,21 @@ async fn handle_room_event(global: &Arc, event: event::Room, ti .await .context("failed to fetch playback session count")?; - let (channel_id,): (common::database::Ulid,) = sqlx::query_as("UPDATE users SET channel_active_connection_id = $1, channel_live_viewer_count = $2, channel_live_viewer_count_updated_at = NOW(), channel_last_live_at = $3 WHERE channel_room_id = $4 RETURNING id") - .bind(Ulid::from(connection_id.into_ulid())) + let channel_id = common::database::query("UPDATE users SET channel_active_connection_id = $1, channel_live_viewer_count = $2, channel_live_viewer_count_updated_at = NOW(), channel_last_live_at = $3 WHERE channel_room_id = $4 RETURNING id") + .bind(connection_id.into_ulid()) .bind(live_viewer_count) .bind(chrono::NaiveDateTime::from_timestamp_millis(timestamp)) - .bind(Ulid::from(room_id.into_ulid())) - .fetch_one(global.db().as_ref()) + .bind(room_id.into_ulid()) + .build_query_single_scalar() + .fetch_one(global.db()) .await?; + global .nats() .publish( - SubscriptionTopic::ChannelLive(channel_id.0), + SubscriptionTopic::ChannelLive(channel_id), pb::scuffle::platform::internal::events::ChannelLive { - channel_id: Some(channel_id.0.into()), + channel_id: Some(channel_id.into()), live: true, } .encode_to_vec() @@ -88,18 +89,20 @@ async fn handle_room_event(global: &Arc, event: event::Room, ti connection_id: Some(connection_id), .. }) => { - let res: Option<(common::database::Ulid,)> = sqlx::query_as("UPDATE users SET channel_active_connection_id = NULL, channel_live_viewer_count = 0, channel_live_viewer_count_updated_at = NOW() WHERE channel_room_id = $1 AND channel_active_connection_id = $2 RETURNING id") - .bind(Ulid::from(room_id.into_ulid())) - .bind(Ulid::from(connection_id.into_ulid())) - .fetch_optional(global.db().as_ref()) + let res = common::database::query("UPDATE users SET channel_active_connection_id = NULL, channel_live_viewer_count = 0, channel_live_viewer_count_updated_at = NOW() WHERE channel_room_id = $1 AND channel_active_connection_id = $2 RETURNING id") + .bind(room_id.into_ulid()) + .bind(connection_id.into_ulid()) + .build_query_single_scalar() + .fetch_optional(global.db()) .await?; - if let Some((channel_id,)) = res { + + if let Some(channel_id) = res { global .nats() .publish( - SubscriptionTopic::ChannelLive(channel_id.0), + SubscriptionTopic::ChannelLive(channel_id), pb::scuffle::platform::internal::events::ChannelLive { - channel_id: Some(channel_id.0.into()), + channel_id: Some(channel_id.into()), live: false, } .encode_to_vec() diff --git a/platform/image_processor/Cargo.toml b/platform/image_processor/Cargo.toml index ed5805e5..422f1f95 100644 --- a/platform/image_processor/Cargo.toml +++ b/platform/image_processor/Cargo.toml @@ -10,8 +10,8 @@ tracing = "0.1" tokio = { version = "1.34", features = ["full"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls", "json", "chrono", "uuid"] } ulid = { version = "1.1", features = ["uuid"] } +postgres-from-row = "0.5" prost = "0.12" aws-config = "1.1" aws-sdk-s3 = "1.12" diff --git a/platform/image_processor/src/database.rs b/platform/image_processor/src/database.rs index b70b4e12..2a4b292c 100644 --- a/platform/image_processor/src/database.rs +++ b/platform/image_processor/src/database.rs @@ -1,11 +1,13 @@ -use common::database::{Protobuf, Ulid}; +use common::database::protobuf; use pb::scuffle::platform::internal::image_processor::Task; +use ulid::Ulid; // The actual table has more columns but we only need id and task to process a // job -#[derive(Debug, Clone, Default, sqlx::FromRow)] +#[derive(Debug, Clone, Default, postgres_from_row::FromRow)] pub struct Job { pub id: Ulid, - pub task: Protobuf, + #[from_row(from_fn = "protobuf")] + pub task: Task, } diff --git a/platform/image_processor/src/main.rs b/platform/image_processor/src/main.rs index 4fd2b501..ad191f85 100644 --- a/platform/image_processor/src/main.rs +++ b/platform/image_processor/src/main.rs @@ -23,7 +23,7 @@ type AppConfig = binary_helper::config::AppConfig; struct GlobalState { ctx: Context, - db: Arc, + db: Arc, config: AppConfig, nats: async_nats::Client, jetstream: async_nats::jetstream::Context, diff --git a/platform/image_processor/src/processor/error.rs b/platform/image_processor/src/processor/error.rs index c34c021d..dfca20c5 100644 --- a/platform/image_processor/src/processor/error.rs +++ b/platform/image_processor/src/processor/error.rs @@ -21,8 +21,11 @@ pub enum ProcessorError { #[error("semaphore ticket acquire: {0}")] SemaphoreAcquire(#[from] tokio::sync::AcquireError), - #[error("sqlx: {0}")] - Sqlx(#[from] sqlx::Error), + #[error("database: {0}")] + Database(#[from] common::database::tokio_postgres::Error), + + #[error("database pool: {0}")] + DatabasePool(#[from] common::database::deadpool_postgres::PoolError), #[error("lost job")] LostJob, diff --git a/platform/image_processor/src/processor/job/mod.rs b/platform/image_processor/src/processor/job/mod.rs index deea821c..575a144a 100644 --- a/platform/image_processor/src/processor/job/mod.rs +++ b/platform/image_processor/src/processor/job/mod.rs @@ -86,7 +86,7 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { .publish( self.job.task.callback_subject.clone(), pb::scuffle::platform::internal::events::ProcessedImage { - job_id: Some(self.job.id.0.into()), + job_id: Some(self.job.id.into()), result: Some(pb::scuffle::platform::internal::events::processed_image::Result::Failure( pb::scuffle::platform::internal::events::processed_image::Failure { reason: e.to_string(), @@ -106,7 +106,7 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { } // delete job - utils::delete_job(self.global, self.job.id.0).await?; + utils::delete_job(self.global, self.job.id).await?; Ok(()) } @@ -114,7 +114,7 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { async fn process_with_timeout(&self) -> Result<()> { let mut interval = tokio::time::interval(std::time::Duration::from_secs(15)); - let job_id = self.job.id.0; + let job_id = self.job.id; let max_processing_time_ms = self.job.task.limits.as_ref().map(|l| l.max_processing_time_ms); let time_limit = async { @@ -208,7 +208,7 @@ impl<'a, G: ImageProcessorGlobal> Job<'a, G> { .publish( self.job.task.callback_subject.clone(), pb::scuffle::platform::internal::events::ProcessedImage { - job_id: Some(self.job.id.0.into()), + job_id: Some(self.job.id.into()), result: Some(pb::scuffle::platform::internal::events::processed_image::Result::Success( pb::scuffle::platform::internal::events::processed_image::Success { variants: images diff --git a/platform/image_processor/src/processor/utils.rs b/platform/image_processor/src/processor/utils.rs index f573242a..9d187e09 100644 --- a/platform/image_processor/src/processor/utils.rs +++ b/platform/image_processor/src/processor/utils.rs @@ -8,7 +8,7 @@ use crate::global::ImageProcessorGlobal; use crate::processor::error::Result; pub async fn query_job(global: &Arc) -> Result> { - Ok(sqlx::query_as( + Ok(common::database::query( "UPDATE image_jobs SET claimed_by = $1, hold_until = NOW() + INTERVAL '30 seconds' @@ -23,33 +23,32 @@ pub async fn query_job(global: &Arc) -> Result