diff --git a/platform/api/Cargo.toml b/platform/api/Cargo.toml index 052e9acc..caf82cd7 100644 --- a/platform/api/Cargo.toml +++ b/platform/api/Cargo.toml @@ -50,11 +50,11 @@ http-body = "1.0" http-body-util = "0.1" hyper-util = "0.1" pin-project = "1.1" +base64 = "0.21" config = { workspace = true } pb = { workspace = true } binary-helper = { workspace = true } -base64 = "0.21.5" [dev-dependencies] tempfile = "3.8" diff --git a/platform/api/src/main.rs b/platform/api/src/main.rs index 8695c70e..586151a5 100644 --- a/platform/api/src/main.rs +++ b/platform/api/src/main.rs @@ -7,7 +7,7 @@ use binary_helper::{bootstrap, grpc_health, grpc_server, impl_global_traits}; use common::context::Context; use common::dataloader::DataLoader; use common::global::*; -use platform_api::config::{ApiConfig, JwtConfig, TurnstileConfig, VideoApiConfig}; +use platform_api::config::{ApiConfig, ImageUploaderConfig, JwtConfig, TurnstileConfig, VideoApiConfig}; use platform_api::dataloader::category::CategoryByIdLoader; use platform_api::dataloader::global_state::GlobalStateLoader; use platform_api::dataloader::role::RoleByIdLoader; @@ -184,7 +184,6 @@ impl binary_helper::Global for GlobalState { let subscription_manager = SubscriptionManager::default(); - let image_processor_s3 = config .extra .image_uploader diff --git a/platform/website/src/components/dev-banner.svelte b/platform/website/src/components/dev-banner.svelte index 34bba3c7..68eeae9f 100644 --- a/platform/website/src/components/dev-banner.svelte +++ b/platform/website/src/components/dev-banner.svelte @@ -20,7 +20,7 @@ PUBLIC_BASE_URL, PUBLIC_TWITTER_HANDLE, PUBLIC_ORG_ID, - PUBLIC_EDGE_ENDPOINT + PUBLIC_EDGE_ENDPOINT, } from "$env/static/public"; import { websocketOpen } from "$/store/websocket"; import { dev } from "$app/environment"; diff --git a/platform/website/src/components/home/big-stream-preview.svelte b/platform/website/src/components/home/big-stream-preview.svelte index 24f794ec..efb6ca75 100644 --- a/platform/website/src/components/home/big-stream-preview.svelte +++ b/platform/website/src/components/home/big-stream-preview.svelte @@ -38,11 +38,7 @@ {#if playing} - + {:else} Stream Thumbnail diff --git a/proto/scuffle/video/internal/events/organization_event.proto b/proto/scuffle/video/internal/events/organization_event.proto deleted file mode 100644 index e94e6fa9..00000000 --- a/proto/scuffle/video/internal/events/organization_event.proto +++ /dev/null @@ -1,33 +0,0 @@ -syntax = "proto3"; - -package scuffle.video.internal.events; - -import "scuffle/types/ulid.proto"; - -message OrganizationEvent { - int64 timestamp = 1; - scuffle.types.Ulid id = 2; - - message RoomLive { - scuffle.types.Ulid connection_id = 1; - scuffle.types.Ulid room_id = 2; - } - - message RoomReady { - scuffle.types.Ulid connection_id = 1; - scuffle.types.Ulid room_id = 2; - } - - message RoomDisconnect { - scuffle.types.Ulid connection_id = 1; - scuffle.types.Ulid room_id = 2; - bool clean = 3; - optional string error = 4; - } - - oneof event { - RoomLive room_live = 3; - RoomDisconnect room_disconnect = 4; - RoomReady room_ready = 5; - } -} \ No newline at end of file diff --git a/proto/scuffle/video/v1/types/event.proto b/proto/scuffle/video/v1/types/event.proto index c3db41aa..2a1c6346 100644 --- a/proto/scuffle/video/v1/types/event.proto +++ b/proto/scuffle/video/v1/types/event.proto @@ -96,25 +96,8 @@ message Event { // If the disconnection was clean. Meaning the client disconnected and it // was handled gracefully. bool clean = 2; - - // The cause of the disconnection. - enum Cause { - // The cause is unknown. - DISCONNECTED_CAUSE_UNKNOWN = 0; - // The server requested the disconnection. This is usually because an - // API request on Room.Disconnect was made. Or because another client - // connected to the room, or the room was deleted. - DISCONNECTED_CAUSE_DISCONNECT_REQUEST = 1; - // The ingest bitrate was too high. - DISCONNECTED_CAUSE_HIGH_BITRATE = 2; - // The ingest resolution was too high. - DISCONNECTED_CAUSE_HIGH_RESOLUTION = 3; - // The ingest FPS was too high. - DISCONNECTED_CAUSE_HIGH_FPS = 4; - } - // The cause of the disconnection. - Cause cause = 3; + optional string cause = 3; } // If the room is ready to be watched. @@ -131,8 +114,9 @@ message Event { // If the room failed to be go live. message Failed { + scuffle.types.Ulid connection_id = 1; // The error that occurred. - string error = 1; + string error = 2; } // The event that occurred. diff --git a/video/api/src/api/access_token/create.rs b/video/api/src/api/access_token/create.rs index 7daebfe1..b1a65752 100644 --- a/video/api/src/api/access_token/create.rs +++ b/video/api/src/api/access_token/create.rs @@ -107,7 +107,7 @@ impl ApiRequest for tonic::Request for tonic::Request for tonic::Request for tonic::Request for tonic::Request { video_common::events::emit( - global.jetstream(), + global.nats(), access_token.organization_id.0, Target::PlaybackKeyPair, event::Event::PlaybackKeyPair(event::PlaybackKeyPair { diff --git a/video/api/src/api/recording/modify.rs b/video/api/src/api/recording/modify.rs index 36743a63..ea6e31c9 100644 --- a/video/api/src/api/recording/modify.rs +++ b/video/api/src/api/recording/modify.rs @@ -114,7 +114,7 @@ impl ApiRequest for tonic::Request for tonic::Request for tonic::Request for tonic::Request { video_common::events::emit( - global.jetstream(), + global.nats(), access_token.organization_id.0, Target::RecordingConfig, event::Event::RecordingConfig(event::RecordingConfig { diff --git a/video/api/src/api/room/create.rs b/video/api/src/api/room/create.rs index e4aeddf5..e6861d54 100644 --- a/video/api/src/api/room/create.rs +++ b/video/api/src/api/room/create.rs @@ -123,7 +123,7 @@ impl ApiRequest for tonic::Request { })?; video_common::events::emit( - global.jetstream(), + global.nats(), access_token.organization_id.0, Target::Room, event::Event::Room(event::Room { diff --git a/video/api/src/api/room/delete.rs b/video/api/src/api/room/delete.rs index b25b80ea..8b336178 100644 --- a/video/api/src/api/room/delete.rs +++ b/video/api/src/api/room/delete.rs @@ -95,7 +95,7 @@ impl ApiRequest for tonic::Request { for id in deleted_ids.iter().copied() { video_common::events::emit( - global.jetstream(), + global.nats(), access_token.organization_id.0, Target::Room, event::Event::Room(event::Room { diff --git a/video/api/src/api/room/modify.rs b/video/api/src/api/room/modify.rs index 19472f6f..015020e8 100644 --- a/video/api/src/api/room/modify.rs +++ b/video/api/src/api/room/modify.rs @@ -144,7 +144,7 @@ impl ApiRequest for tonic::Request { }; video_common::events::emit( - global.jetstream(), + global.nats(), access_token.organization_id.0, Target::Room, event::Event::Room(event::Room { diff --git a/video/api/src/api/room/reset_key.rs b/video/api/src/api/room/reset_key.rs index 899a9a93..ea232548 100644 --- a/video/api/src/api/room/reset_key.rs +++ b/video/api/src/api/room/reset_key.rs @@ -93,7 +93,7 @@ impl ApiRequest for tonic::Request { for RoomKeyPair { id, .. } in rooms.iter() { if let Some(id) = id { video_common::events::emit( - global.jetstream(), + global.nats(), access_token.organization_id.0, Target::Room, event::Event::Room(event::Room { diff --git a/video/api/src/api/s3_bucket/create.rs b/video/api/src/api/s3_bucket/create.rs index 32ad1474..c6a0488a 100644 --- a/video/api/src/api/s3_bucket/create.rs +++ b/video/api/src/api/s3_bucket/create.rs @@ -111,7 +111,7 @@ impl ApiRequest for tonic::Request for tonic::Request for tonic::Request { video_common::events::emit( - global.jetstream(), + global.nats(), access_token.organization_id.0, Target::S3Bucket, event::Event::S3Bucket(event::S3Bucket { diff --git a/video/api/src/api/transcoding_config/create.rs b/video/api/src/api/transcoding_config/create.rs index 6b75d7cf..b19aa178 100644 --- a/video/api/src/api/transcoding_config/create.rs +++ b/video/api/src/api/transcoding_config/create.rs @@ -88,7 +88,7 @@ impl ApiRequest for tonic::Request for tonic::Request for tonic::Request { video_common::events::emit( - global.jetstream(), + global.nats(), access_token.organization_id.0, Target::TranscodingConfig, event::Event::TranscodingConfig(event::TranscodingConfig { diff --git a/video/api/src/api/utils/tags.rs b/video/api/src/api/utils/tags.rs index a745cd75..f42b6beb 100644 --- a/video/api/src/api/utils/tags.rs +++ b/video/api/src/api/utils/tags.rs @@ -179,7 +179,7 @@ macro_rules! impl_tag_req { })?; let $id = pb::ext::UlidExt::into_ulid(req.id); - video_common::events::emit(global.jetstream(), access_token.organization_id.0, $event_target, $event).await; + video_common::events::emit(global.nats(), access_token.organization_id.0, $event_target, $event).await; Ok(tonic::Response::new($resp { tags: Some(result.into_tags()?) @@ -218,7 +218,7 @@ macro_rules! impl_untag_req { })?; let $id = pb::ext::UlidExt::into_ulid(req.id); - video_common::events::emit(global.jetstream(), access_token.organization_id.0, $event_target, $event).await; + video_common::events::emit(global.nats(), access_token.organization_id.0, $event_target, $event).await; Ok(tonic::Response::new($resp { tags: Some(result.into_tags()?) diff --git a/video/cli/src/cli/events/fetch.rs b/video/cli/src/cli/events/fetch.rs index ade49efc..d4dbadda 100644 --- a/video/cli/src/cli/events/fetch.rs +++ b/video/cli/src/cli/events/fetch.rs @@ -287,7 +287,7 @@ impl Invokable for Fetch { action: "disconnected".to_owned(), connection_id: Some(disconnected.connection_id.into_ulid()), clean: Some(disconnected.clean), - cause: Some(disconnected.cause().as_str_name().into()), + cause: disconnected.cause, ..Default::default() }, Some(event::room::Event::Ready(ready)) => EventPayload { diff --git a/video/common/src/events.rs b/video/common/src/events.rs index 5fc8ade7..756b8851 100644 --- a/video/common/src/events.rs +++ b/video/common/src/events.rs @@ -4,21 +4,20 @@ use prost::Message; use crate::keys::event_subject; -pub async fn emit(jetstream: &async_nats::jetstream::Context, org_id: ulid::Ulid, target: Target, event: event::Event) { - jetstream - .publish( - event_subject(org_id, target), - Event { - timestamp: chrono::Utc::now().timestamp_millis(), - event_id: Some(ulid::Ulid::new().into()), - event: Some(event), - } - .encode_to_vec() - .into(), - ) - .await - .map_err(|e| { - tracing::error!(err = %e, "failed to publish event"); - }) - .ok(); +pub async fn emit(nats: &async_nats::Client, org_id: ulid::Ulid, target: Target, event: event::Event) { + nats.publish( + event_subject(org_id, target), + Event { + timestamp: chrono::Utc::now().timestamp_millis(), + event_id: Some(ulid::Ulid::new().into()), + event: Some(event), + } + .encode_to_vec() + .into(), + ) + .await + .map_err(|e| { + tracing::error!(err = %e, "failed to publish event"); + }) + .ok(); } diff --git a/video/ingest/src/config.rs b/video/ingest/src/config.rs index 067f0aed..4079546f 100644 --- a/video/ingest/src/config.rs +++ b/video/ingest/src/config.rs @@ -28,9 +28,6 @@ pub struct IngestConfig { /// NATS subject to send transcoder requests to pub transcoder_request_subject: String, - /// NATS subject for events - pub events_subject: String, - /// The interval in to update the bitrate for a room pub bitrate_update_interval: Duration, @@ -58,7 +55,6 @@ impl Default for IngestConfig { fn default() -> Self { Self { transcoder_request_subject: "transcoder-request".to_string(), - events_subject: "events".to_string(), bitrate_update_interval: Duration::from_secs(5), max_bitrate: 12000 * 1024, max_bytes_between_keyframes: 5 * 12000 * 1024 / 8, diff --git a/video/ingest/src/ingest/connection.rs b/video/ingest/src/ingest/connection.rs index be66ae07..c3e734fc 100644 --- a/video/ingest/src/ingest/connection.rs +++ b/video/ingest/src/ingest/connection.rs @@ -7,11 +7,10 @@ use anyhow::Result; use base64::Engine; use bytes::Bytes; use bytesio::bytesio::AsyncReadWrite; -use chrono::Utc; use flv::{FlvTag, FlvTagData, FlvTagType}; use futures::Future; use futures_util::StreamExt; -use pb::scuffle::video::internal::events::{organization_event, OrganizationEvent, TranscoderRequestTask}; +use pb::scuffle::video::internal::events::TranscoderRequestTask; use pb::scuffle::video::internal::{ingest_watch_request, ingest_watch_response, IngestWatchRequest, IngestWatchResponse}; use pb::scuffle::video::v1::events_fetch_request::Target; use pb::scuffle::video::v1::types::{event, Rendition}; @@ -125,42 +124,8 @@ pub async fn handle(global: Arc, socket: } }; - // emit room connected event - video_common::events::emit( - global.jetstream(), - connection.organization_id, - Target::Room, - event::Event::Room(event::Room { - room_id: Some(connection.room_id.into()), - event: Some(event::room::Event::Connected(event::room::Connected { - connection_id: Some(connection.id.into()), - })), - }), - ) - .await; - let clean_disconnect = connection.run(&global, session).await; - // emit room disconnected event - video_common::events::emit( - global.jetstream(), - connection.organization_id, - Target::Room, - event::Event::Room(event::Room { - room_id: Some(connection.room_id.into()), - event: Some(event::room::Event::Disconnected(event::room::Disconnected { - connection_id: Some(connection.id.into()), - clean: clean_disconnect, - cause: connection - .error - .clone() - .map(|e| e.into()) - .unwrap_or(event::room::disconnected::Cause::DisconnectedCauseUnknown) as i32, - })), - }), - ) - .await; - if let Err(err) = connection.cleanup(&global, clean_disconnect).await { tracing::error!(error = %err, "failed to cleanup connection") } @@ -755,25 +720,18 @@ impl Connection { } } - if let Err(err) = global - .nats() - .publish( - format!("{}.{}", global.config::().events_subject, self.organization_id), - OrganizationEvent { - timestamp: Utc::now().timestamp_micros(), - id: Some(self.organization_id.into()), - event: Some(organization_event::Event::RoomLive(organization_event::RoomLive { - connection_id: Some(self.id.into()), - room_id: Some(self.room_id.into()), - })), - } - .encode_to_vec() - .into(), - ) - .await - { - tracing::error!(error = %err, "failed to publish disconnect event"); - } + video_common::events::emit( + global.nats(), + self.organization_id, + Target::Room, + event::Event::Room(event::Room { + room_id: Some(self.room_id.into()), + event: Some(event::room::Event::Connected(event::room::Connected { + connection_id: Some(self.id.into()), + })), + }), + ) + .await; self.request_transcoder(global).await } @@ -1083,29 +1041,20 @@ impl Connection { .ok(); } - if let Err(err) = global - .nats() - .publish( - format!("{}.{}", global.config::().events_subject, self.organization_id), - OrganizationEvent { - timestamp: Utc::now().timestamp_micros(), - id: Some(self.organization_id.into()), - event: Some(organization_event::Event::RoomDisconnect( - organization_event::RoomDisconnect { - connection_id: Some(self.id.into()), - room_id: Some(self.room_id.into()), - clean: clean_disconnect, - error: self.error.as_ref().map(|e| e.to_string()), - }, - )), - } - .encode_to_vec() - .into(), - ) - .await - { - tracing::error!(error = %err, "failed to publish room disconnect event"); - } + video_common::events::emit( + global.nats(), + self.organization_id, + Target::Room, + event::Event::Room(event::Room { + room_id: Some(self.room_id.into()), + event: Some(event::room::Event::Disconnected(event::room::Disconnected { + connection_id: Some(self.id.into()), + clean: clean_disconnect, + cause: self.error.as_ref().map(|e| e.to_string()), + })), + }), + ) + .await; sqlx::query( r#" diff --git a/video/ingest/src/ingest/errors.rs b/video/ingest/src/ingest/errors.rs index c533da39..004682ab 100644 --- a/video/ingest/src/ingest/errors.rs +++ b/video/ingest/src/ingest/errors.rs @@ -19,19 +19,6 @@ pub enum IngestError { FailedToUpdateRoom, } -impl From for pb::scuffle::video::v1::types::event::room::disconnected::Cause { - fn from(value: IngestError) -> Self { - // TODO: return the correct cause - match value { - IngestError::KeyframeBitrateDistance(_, _) => Self::DisconnectedCauseHighBitrate, - IngestError::BitrateLimit(_, _) => Self::DisconnectedCauseHighBitrate, - IngestError::KeyframeTimeLimit(_) => Self::DisconnectedCauseHighBitrate, - IngestError::DisconnectRequested => Self::DisconnectedCauseDisconnectRequest, - _ => Self::DisconnectedCauseUnknown, - } - } -} - impl std::fmt::Display for IngestError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/video/ingest/src/tests/ingest.rs b/video/ingest/src/tests/ingest.rs index a0cea71d..ee18e66a 100644 --- a/video/ingest/src/tests/ingest.rs +++ b/video/ingest/src/tests/ingest.rs @@ -1,3 +1,4 @@ +use std::ops::Sub; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::process::Stdio; @@ -12,10 +13,11 @@ use common::global::*; use common::prelude::FutureTimeout; use futures::StreamExt; use pb::ext::UlidExt; -use pb::scuffle::video::internal::events::{organization_event, OrganizationEvent, TranscoderRequestTask}; +use pb::scuffle::video::internal::events::TranscoderRequestTask; use pb::scuffle::video::internal::ingest_client::IngestClient; use pb::scuffle::video::internal::{ingest_watch_request, ingest_watch_response, IngestWatchRequest, IngestWatchResponse}; -use pb::scuffle::video::v1::types::Rendition; +use pb::scuffle::video::v1::events_fetch_request::Target; +use pb::scuffle::video::v1::types::{event, Event, Rendition}; use prost::Message; use tokio::io::AsyncWriteExt; use tokio::process::Command; @@ -25,7 +27,7 @@ use tokio::task::JoinHandle; use ulid::Ulid; use uuid::Uuid; use video_common::database::Room; -use video_common::keys; +use video_common::keys::{self, event_subject}; use super::global::GlobalState; use crate::config::{IngestConfig, RtmpConfig}; @@ -153,7 +155,7 @@ struct TestState { pub global: Arc, pub handler: common::context::Handler, pub transcoder_requests: Pin>>, - pub organization_events: Pin>>, + pub events: Pin>>, pub ingest_handle: JoinHandle>, pub grpc_handle: JoinHandle>, } @@ -178,7 +180,6 @@ impl TestState { let rtmp_port = portpicker::pick_unused_port().unwrap(); let (global, handler) = mock_global_state(IngestConfig { - events_subject: Uuid::new_v4().to_string(), transcoder_request_subject: Uuid::new_v4().to_string(), bitrate_update_interval: Duration::from_secs(1), grpc_advertise_address: format!("127.0.0.1:{grpc_port}"), @@ -225,19 +226,17 @@ impl TestState { }) }; - let organization_events = { + let org_id = Ulid::new(); + + let events = { let global = global.clone(); - let mut stream = global - .nats() - .subscribe(format!("{}.*", global.config::().events_subject)) - .await - .unwrap(); + let mut stream = global.nats().subscribe(event_subject(org_id, Target::Room)).await.unwrap(); stream!({ loop { select! { message = stream.next() => { let message = message.unwrap(); - yield OrganizationEvent::decode(message.payload).unwrap(); + yield Event::decode(message.payload).unwrap(); } _ = global.ctx().done() => { break; @@ -247,8 +246,6 @@ impl TestState { }) }; - let org_id = Ulid::new(); - sqlx::query("INSERT INTO organizations (id, name) VALUES ($1, $2)") .bind(Uuid::from(org_id)) .bind("test") @@ -272,7 +269,7 @@ impl TestState { rtmp_port, global, handler, - organization_events: Box::pin(organization_events), + events: Box::pin(events), transcoder_requests: Box::pin(transcoder_requests), ingest_handle, grpc_handle, @@ -286,8 +283,8 @@ impl TestState { .expect("failed to receive event") } - async fn organization_event(&mut self) -> OrganizationEvent { - tokio::time::timeout(Duration::from_secs(2), self.organization_events.next()) + async fn organization_event(&mut self) -> Event { + tokio::time::timeout(Duration::from_secs(2), self.events.next()) .await .expect("failed to receive event") .expect("failed to receive event") @@ -315,12 +312,17 @@ async fn test_ingest_stream() { ); let update = state.organization_event().await; - assert!(update.timestamp > 0); - assert_eq!(update.id.into_ulid(), state.org_id); + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); match update.event { - Some(organization_event::Event::RoomLive(room_live)) => { - assert_eq!(room_live.room_id.into_ulid(), state.room_id); - assert!(!room_live.connection_id.into_ulid().is_nil()); + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Connected(live)) => { + assert!(!live.connection_id.into_ulid().is_nil()); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), } @@ -473,14 +475,19 @@ async fn test_ingest_stream() { tokio::time::sleep(Duration::from_millis(200)).await; let update = state.organization_event().await; - assert!(update.timestamp > 0); - assert_eq!(update.id.into_ulid(), state.org_id); + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); match update.event { - Some(organization_event::Event::RoomDisconnect(room_disconnect)) => { - assert_eq!(room_disconnect.room_id.into_ulid(), state.room_id); - assert!(!room_disconnect.connection_id.into_ulid().is_nil()); - assert!(!room_disconnect.clean); - assert_eq!(room_disconnect.error, None); + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Disconnected(disconnected)) => { + assert!(!disconnected.connection_id.into_ulid().is_nil()); + assert!(!disconnected.clean); + assert!(disconnected.cause.is_none()); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), } @@ -513,12 +520,17 @@ async fn test_ingest_stream_transcoder_disconnect() { ); let update = state.organization_event().await; - assert!(update.timestamp > 0); - assert_eq!(update.id.into_ulid(), state.org_id); + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); match update.event { - Some(organization_event::Event::RoomLive(room_live)) => { - assert_eq!(room_live.room_id.into_ulid(), state.room_id); - assert!(!room_live.connection_id.into_ulid().is_nil()); + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Connected(live)) => { + assert!(!live.connection_id.into_ulid().is_nil()); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), } @@ -588,14 +600,19 @@ async fn test_ingest_stream_transcoder_disconnect() { ffmpeg.kill().await.unwrap(); let update = state.organization_event().await; - assert!(update.timestamp > 0); - assert_eq!(update.id.into_ulid(), state.org_id); + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); match update.event { - Some(organization_event::Event::RoomDisconnect(room_disconnect)) => { - assert_eq!(room_disconnect.room_id.into_ulid(), state.room_id); - assert!(!room_disconnect.connection_id.into_ulid().is_nil()); - assert!(!room_disconnect.clean); - assert_eq!(room_disconnect.error, None); + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Disconnected(disconnected)) => { + assert!(!disconnected.connection_id.into_ulid().is_nil()); + assert!(!disconnected.clean); + assert!(disconnected.cause.is_none()) + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), } @@ -613,17 +630,22 @@ async fn test_ingest_stream_shutdown() { ); let update = state.organization_event().await; - assert!(update.timestamp > 0); - assert_eq!(update.id.into_ulid(), state.org_id); - - let connection_id = match update.event { - Some(organization_event::Event::RoomLive(room_live)) => { - assert_eq!(room_live.room_id.into_ulid(), state.room_id); - assert!(!room_live.connection_id.into_ulid().is_nil()); - room_live.connection_id.into_ulid() + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); + let connection_id: Ulid; + match update.event { + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Connected(live)) => { + assert!(!live.connection_id.into_ulid().is_nil()); + connection_id = live.connection_id.into_ulid(); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), - }; + } state .global @@ -637,16 +659,19 @@ async fn test_ingest_stream_shutdown() { assert!(ffmpeg.wait().timeout(Duration::from_secs(1)).await.is_ok()); let update = state.organization_event().await; - - assert!(update.timestamp > 0); - assert_eq!(update.id.into_ulid(), state.org_id); - + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); match update.event { - Some(organization_event::Event::RoomDisconnect(room_disconnect)) => { - assert_eq!(room_disconnect.room_id.into_ulid(), state.room_id); - assert_eq!(room_disconnect.connection_id.into_ulid(), connection_id); - assert!(room_disconnect.clean); - assert_eq!(room_disconnect.error, Some("I14: Disconnect requested".into())); + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Disconnected(disconnected)) => { + assert!(!disconnected.connection_id.into_ulid().is_nil()); + assert!(disconnected.clean); + assert_eq!(disconnected.cause(), "I14: Disconnect requested"); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), } @@ -676,17 +701,20 @@ async fn test_ingest_stream_transcoder_full() { ); let update = state.organization_event().await; - assert!(update.timestamp > 0); - assert_eq!(update.id.into_ulid(), state.org_id); - - let connection_id = match update.event { - Some(organization_event::Event::RoomLive(room_live)) => { - assert_eq!(room_live.room_id.into_ulid(), state.room_id); - assert!(!room_live.connection_id.into_ulid().is_nil()); - room_live.connection_id + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); + match update.event { + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Connected(live)) => { + assert!(!live.connection_id.into_ulid().is_nil()); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), - }; + } let room: Room = sqlx::query_as("SELECT * FROM rooms WHERE organization_id = $1 AND id = $2") .bind(Uuid::from(state.org_id)) @@ -769,16 +797,20 @@ async fn test_ingest_stream_transcoder_full() { assert!(ffmpeg.try_wait().is_ok()); - let room_disconnect = state.organization_event().await; - assert!(room_disconnect.timestamp > 0); - assert_eq!(room_disconnect.id.into_ulid(), state.org_id); - - match room_disconnect.event { - Some(organization_event::Event::RoomDisconnect(room_disconnect)) => { - assert_eq!(room_disconnect.room_id.into_ulid(), state.room_id); - assert_eq!(room_disconnect.connection_id, connection_id); - assert!(room_disconnect.clean); - assert!(room_disconnect.error.is_none()); + let update = state.organization_event().await; + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); + match update.event { + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Disconnected(disconnected)) => { + assert!(!disconnected.connection_id.into_ulid().is_nil()); + assert!(disconnected.clean); + assert!(disconnected.cause.is_none()); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), } @@ -817,14 +849,18 @@ async fn test_ingest_stream_transcoder_full_tls(tls_dir: PathBuf) { &generate_key(state.org_id, state.room_id), ); - let live = state.organization_event().await; - assert!(live.timestamp > 0); - assert_eq!(live.id.into_ulid(), state.org_id); - - match live.event { - Some(organization_event::Event::RoomLive(live)) => { - assert_eq!(live.room_id.into_ulid(), state.room_id); - assert!(!live.connection_id.into_ulid().is_nil()); + let update = state.organization_event().await; + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); + match update.event { + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Connected(live)) => { + assert!(!live.connection_id.into_ulid().is_nil()); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), } @@ -882,16 +918,20 @@ async fn test_ingest_stream_transcoder_full_tls(tls_dir: PathBuf) { assert!(ffmpeg.try_wait().is_ok()); - let room_disconnect = state.organization_event().await; - assert!(room_disconnect.timestamp > 0); - assert_eq!(room_disconnect.id.into_ulid(), state.org_id); - - match room_disconnect.event { - Some(organization_event::Event::RoomDisconnect(room_disconnect)) => { - assert_eq!(room_disconnect.room_id.into_ulid(), state.room_id); - assert!(!room_disconnect.connection_id.into_ulid().is_nil()); - assert!(room_disconnect.clean); - assert!(room_disconnect.error.is_none()); + let update = state.organization_event().await; + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); + match update.event { + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Disconnected(disconnected)) => { + assert!(!disconnected.connection_id.into_ulid().is_nil()); + assert!(disconnected.clean); + assert!(disconnected.cause.is_none()); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), } @@ -918,14 +958,18 @@ async fn test_ingest_stream_transcoder_probe() { &generate_key(state.org_id, state.room_id), ); - let live = state.organization_event().await; - assert!(live.timestamp > 0); - assert_eq!(live.id.into_ulid(), state.org_id); - - match live.event { - Some(organization_event::Event::RoomLive(live)) => { - assert_eq!(live.room_id.into_ulid(), state.room_id); - assert!(!live.connection_id.into_ulid().is_nil()); + let update = state.organization_event().await; + assert!(update.timestamp.sub(chrono::Utc::now().timestamp_millis()) < 1000); + assert!(!update.event_id.into_ulid().is_nil(), "event_id is nil"); + match update.event { + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), state.room_id); + match room.event { + Some(event::room::Event::Connected(live)) => { + assert!(!live.connection_id.into_ulid().is_nil()); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), } diff --git a/video/transcoder/src/config.rs b/video/transcoder/src/config.rs index df789268..56c22e84 100644 --- a/video/transcoder/src/config.rs +++ b/video/transcoder/src/config.rs @@ -11,9 +11,6 @@ pub struct TranscoderConfig { /// The name of the transcoder requests queue to use pub transcoder_request_subject: String, - /// The name of the events queue to use - pub events_subject: String, - /// The uid to use for the unix socket and ffmpeg process pub ffmpeg_uid: u32, @@ -45,7 +42,6 @@ pub struct TranscoderConfig { impl Default for TranscoderConfig { fn default() -> Self { Self { - events_subject: "events".to_string(), transcoder_request_subject: "transcoder-request".to_string(), socket_dir: format!("/tmp/{}", std::process::id()), ffmpeg_uid: 1000, diff --git a/video/transcoder/src/tests/transcoder/mod.rs b/video/transcoder/src/tests/transcoder/mod.rs index ee8ec374..4edaa188 100644 --- a/video/transcoder/src/tests/transcoder/mod.rs +++ b/video/transcoder/src/tests/transcoder/mod.rs @@ -13,12 +13,13 @@ use common::global::*; use common::prelude::FutureTimeout; use futures_util::Stream; use pb::ext::UlidExt; -use pb::scuffle::video::internal::events::{organization_event, OrganizationEvent, TranscoderRequestTask}; +use pb::scuffle::video::internal::events::TranscoderRequestTask; use pb::scuffle::video::internal::ingest_server::{Ingest, IngestServer}; use pb::scuffle::video::internal::{ ingest_watch_request, ingest_watch_response, IngestWatchRequest, IngestWatchResponse, LiveRenditionManifest, }; -use pb::scuffle::video::v1::types::{AudioConfig, Rendition, VideoConfig}; +use pb::scuffle::video::v1::events_fetch_request::Target; +use pb::scuffle::video::v1::types::{event, AudioConfig, Event, Rendition, VideoConfig}; use prost::Message; use tokio::process::Command; use tokio::sync::mpsc; @@ -84,7 +85,6 @@ async fn test_transcode() { let port = portpicker::pick_unused_port().unwrap(); let (global, handler) = crate::tests::global::mock_global_state(TranscoderConfig { - events_subject: Ulid::new().to_string(), transcoder_request_subject: Ulid::new().to_string(), metadata_kv_store: Ulid::new().to_string(), media_ob_store: Ulid::new().to_string(), @@ -92,9 +92,11 @@ async fn test_transcode() { }) .await; + let org_id = Ulid::new(); + let mut event_stream = global .nats() - .subscribe(global.config::().events_subject.clone()) + .subscribe(video_common::keys::event_subject(org_id, Target::Room)) .await .unwrap(); @@ -107,7 +109,6 @@ async fn test_transcode() { let req_id = Ulid::new(); let room_id = Ulid::new(); - let org_id = Ulid::new(); let connection_id = Ulid::new(); sqlx::query( @@ -269,13 +270,18 @@ async fn test_transcode() { } { - let event = OrganizationEvent::decode(event_stream.next().await.unwrap().payload).unwrap(); - assert_eq!(event.id.into_ulid(), org_id); - assert!(event.timestamp > 0); + let event = Event::decode(event_stream.next().await.unwrap().payload).unwrap(); + assert!(event.timestamp - chrono::Utc::now().timestamp_millis() < 1000); match event.event { - Some(organization_event::Event::RoomReady(r)) => { - assert_eq!(r.room_id.into_ulid(), room_id); - assert_eq!(r.connection_id.into_ulid(), connection_id); + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), room_id); + + match room.event { + Some(event::room::Event::Ready(ready)) => { + assert_eq!(ready.connection_id.into_ulid(), connection_id); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), }; @@ -604,7 +610,6 @@ async fn test_transcode_reconnect() { let port = portpicker::pick_unused_port().unwrap(); let (global, handler) = crate::tests::global::mock_global_state(TranscoderConfig { - events_subject: Ulid::new().to_string(), transcoder_request_subject: Ulid::new().to_string(), metadata_kv_store: Ulid::new().to_string(), media_ob_store: Ulid::new().to_string(), @@ -623,7 +628,13 @@ async fn test_transcode_reconnect() { .await .unwrap(); - let mut event_stream = global.nats().subscribe(config.events_subject.clone()).await.unwrap(); + let org_id = Ulid::new(); + + let mut event_stream = global + .nats() + .subscribe(video_common::keys::event_subject(org_id, Target::Room)) + .await + .unwrap(); let addr = SocketAddr::from(([127, 0, 0, 1], port)); @@ -634,7 +645,6 @@ async fn test_transcode_reconnect() { let req_id = Ulid::new(); let room_id = Ulid::new(); - let org_id = Ulid::new(); let connection_id = Ulid::new(); sqlx::query( @@ -808,13 +818,18 @@ async fn test_transcode_reconnect() { } { - let event = OrganizationEvent::decode(event_stream.next().await.unwrap().payload).unwrap(); - assert_eq!(event.id.into_ulid(), org_id); - assert!(event.timestamp > 0); + let event = Event::decode(event_stream.next().await.unwrap().payload).unwrap(); + assert!(event.timestamp - chrono::Utc::now().timestamp_millis() < 1000); match event.event { - Some(organization_event::Event::RoomReady(r)) => { - assert_eq!(r.room_id.into_ulid(), room_id); - assert_eq!(r.connection_id.into_ulid(), connection_id); + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), room_id); + + match room.event { + Some(event::room::Event::Ready(ready)) => { + assert_eq!(ready.connection_id.into_ulid(), connection_id); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), }; @@ -977,13 +992,18 @@ async fn test_transcode_reconnect() { } { - let event = OrganizationEvent::decode(event_stream.next().await.unwrap().payload).unwrap(); - assert_eq!(event.id.into_ulid(), org_id); - assert!(event.timestamp > 0); + let event = Event::decode(event_stream.next().await.unwrap().payload).unwrap(); + assert!(event.timestamp - chrono::Utc::now().timestamp_millis() < 1000); match event.event { - Some(organization_event::Event::RoomReady(r)) => { - assert_eq!(r.room_id.into_ulid(), room_id); - assert_eq!(r.connection_id.into_ulid(), connection_id); + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), room_id); + + match room.event { + Some(event::room::Event::Ready(ready)) => { + assert_eq!(ready.connection_id.into_ulid(), connection_id); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), }; @@ -1152,13 +1172,18 @@ async fn test_transcode_reconnect() { } { - let event = OrganizationEvent::decode(event_stream.next().await.unwrap().payload).unwrap(); - assert_eq!(event.id.into_ulid(), org_id); - assert!(event.timestamp > 0); + let event = Event::decode(event_stream.next().await.unwrap().payload).unwrap(); + assert!(event.timestamp - chrono::Utc::now().timestamp_millis() < 1000); match event.event { - Some(organization_event::Event::RoomReady(r)) => { - assert_eq!(r.room_id.into_ulid(), room_id); - assert_eq!(r.connection_id.into_ulid(), connection_id); + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), room_id); + + match room.event { + Some(event::room::Event::Ready(ready)) => { + assert_eq!(ready.connection_id.into_ulid(), connection_id); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), }; @@ -1332,13 +1357,18 @@ async fn test_transcode_reconnect() { } { - let event = OrganizationEvent::decode(event_stream.next().await.unwrap().payload).unwrap(); - assert_eq!(event.id.into_ulid(), org_id); - assert!(event.timestamp > 0); + let event = Event::decode(event_stream.next().await.unwrap().payload).unwrap(); + assert!(event.timestamp - chrono::Utc::now().timestamp_millis() < 1000); match event.event { - Some(organization_event::Event::RoomReady(r)) => { - assert_eq!(r.room_id.into_ulid(), room_id); - assert_eq!(r.connection_id.into_ulid(), connection_id); + Some(event::Event::Room(room)) => { + assert_eq!(room.room_id.into_ulid(), room_id); + + match room.event { + Some(event::room::Event::Ready(ready)) => { + assert_eq!(ready.connection_id.into_ulid(), connection_id); + } + _ => panic!("unexpected event"), + } } _ => panic!("unexpected event"), }; diff --git a/video/transcoder/src/transcoder/job/mod.rs b/video/transcoder/src/transcoder/job/mod.rs index c9693f04..8b549993 100644 --- a/video/transcoder/src/transcoder/job/mod.rs +++ b/video/transcoder/src/transcoder/job/mod.rs @@ -12,7 +12,7 @@ use common::prelude::FutureTimeout; use futures::{FutureExt, StreamExt}; use futures_util::{Future, Stream, TryFutureExt}; use pb::ext::UlidExt; -use pb::scuffle::video::internal::events::{organization_event, OrganizationEvent, TranscoderRequestTask}; +use pb::scuffle::video::internal::events::TranscoderRequestTask; use pb::scuffle::video::internal::ingest_client::IngestClient; use pb::scuffle::video::internal::{ ingest_watch_request, ingest_watch_response, live_rendition_manifest, IngestWatchRequest, IngestWatchResponse, @@ -73,6 +73,20 @@ pub async fn handle_message(global: Arc, msg: Message, s ); if let Err(err) = job.run(&global, shutdown_token, &mut streams).await { + video_common::events::emit( + global.nats(), + job.organization_id, + Target::Room, + event::Event::Room(event::Room { + room_id: Some(job.room_id.into()), + event: Some(event::room::Event::Failed(event::room::Failed { + connection_id: Some(job.connection_id.into()), + error: err.to_string(), + })), + }), + ) + .await; + tracing::error!(error = %err, "failed to run transcoder"); } @@ -99,7 +113,8 @@ struct Job { video_input: VideoConfig, recording: Option, - ready: bool, + ingest_ready: bool, + transcoder_ready: bool, init_segment: Option, track_state: HashMap, @@ -248,7 +263,8 @@ impl Job { .iter() .map(|(_, rendition)| (*rendition, LiveRenditionManifest::default())) .collect(), - ready: false, + ingest_ready: false, + transcoder_ready: false, track_state: tracks .iter() .map(|(_, rendition)| (*rendition, TrackState::default())) @@ -359,6 +375,7 @@ impl Job { } self.update_manifest(); + self.ready(); } r = streams.next() => { let Some((result, rendition)) = r else { @@ -377,6 +394,63 @@ impl Job { Ok(()) } + fn ready(&mut self) { + if self.transcoder_ready { + return; + } + + self.transcoder_ready = true; + + let organization_id = self.organization_id; + let connection_id = self.connection_id; + let room_id = self.room_id; + + self.tasker + .submit_with_abort(TaskDomain::Generic, "room_ready", move |global| { + let global = global.clone(); + Box::pin(async move { + let resp = sqlx::query( + r#" + UPDATE rooms + SET + updated_at = NOW(), + status = $1 + WHERE + organization_id = $2 AND + id = $3 AND + active_ingest_connection_id = $4 + "#, + ) + .bind(RoomStatus::Ready) + .bind(Uuid::from(organization_id)) + .bind(Uuid::from(room_id)) + .bind(Uuid::from(connection_id)) + .execute(global.db().as_ref()) + .await + .map_err(|e| TaskError::Custom(e.into()))?; + + if resp.rows_affected() != 1 { + return Err(TaskError::Custom(anyhow::anyhow!("failed to update room status"))); + } + + video_common::events::emit( + global.nats(), + organization_id, + Target::Room, + event::Event::Room(event::Room { + room_id: Some(room_id.into()), + event: Some(event::room::Event::Ready(event::room::Ready { + connection_id: Some(connection_id.into()), + })), + }), + ) + .await; + + Ok(()) + }) + }); + } + async fn handle_msg(&mut self, global: &Arc, msg: Option>) -> Result<()> { tracing::trace!("recieved message"); @@ -425,7 +499,7 @@ impl Job { self.ffmpeg.stdin.take(); } ingest_watch_response::Message::Ready(_) => { - self.ready = true; + self.ingest_ready = true; self.fetch_manifests(global).await?; self.put_init_segments()?; for rendition in self.track_state.keys().cloned().collect::>() { @@ -439,7 +513,7 @@ impl Job { } fn take_screenshot(&mut self, global: &Arc, data: &Bytes, start_time: f64) -> Result<()> { - if !self.ready { + if !self.ingest_ready { return Ok(()); } @@ -495,7 +569,7 @@ impl Job { } fn put_init_segments(&mut self) -> Result<()> { - if !self.ready { + if !self.ingest_ready { return Ok(()); } @@ -514,74 +588,6 @@ impl Job { if self.first_init_put { self.first_init_put = false; - let event = Bytes::from( - OrganizationEvent { - id: Some(self.organization_id.into()), - timestamp: chrono::Utc::now().timestamp_micros(), - event: Some(organization_event::Event::RoomReady(organization_event::RoomReady { - room_id: Some(self.room_id.into()), - connection_id: Some(self.connection_id.into()), - })), - } - .encode_to_vec(), - ); - - let organization_id = self.organization_id; - let connection_id = self.connection_id; - let room_id = self.room_id; - - self.tasker - .submit_with_abort(TaskDomain::Generic, "room_ready", move |global| { - let global = global.clone(); - let event = event.clone(); - Box::pin(async move { - let resp = sqlx::query( - r#" - UPDATE rooms - SET - updated_at = NOW(), - status = $1 - WHERE - organization_id = $2 AND - id = $3 AND - active_ingest_connection_id = $4 - "#, - ) - .bind(RoomStatus::Ready) - .bind(Uuid::from(organization_id)) - .bind(Uuid::from(room_id)) - .bind(Uuid::from(connection_id)) - .execute(global.db().as_ref()) - .await - .map_err(|e| TaskError::Custom(e.into()))?; - - video_common::events::emit( - global.jetstream(), - organization_id, - Target::Room, - event::Event::Room(event::Room { - room_id: Some(room_id.into()), - event: Some(event::room::Event::Ready(event::room::Ready { - connection_id: Some(connection_id.into()), - })), - }), - ) - .await; - - if resp.rows_affected() != 1 { - return Err(TaskError::Custom(anyhow::anyhow!("failed to update room status"))); - } - - global - .nats() - .publish(global.config::().events_subject.clone(), event) - .await - .map_err(|e| TaskError::Custom(e.into()))?; - - Ok(()) - }) - }); - if let Some(recording) = &mut self.recording { self.track_state.iter().for_each(|(rendition, state)| { self.tasker @@ -594,7 +600,7 @@ impl Job { } fn handle_sample(&mut self, global: &Arc, rendition: Rendition) -> Result<()> { - if !self.ready || self.first_init_put { + if !self.ingest_ready || self.first_init_put { return Ok(()); } @@ -815,7 +821,7 @@ impl Job { } fn update_manifest(&mut self) { - if !self.ready { + if !self.ingest_ready { return; } @@ -832,7 +838,7 @@ impl Job { } fn update_rendition_manifest(&mut self, rendition: Rendition) { - if !self.ready { + if !self.ingest_ready { return; }