Skip to content

Commit

Permalink
feat: remove old event system
Browse files Browse the repository at this point in the history
For some reason we had an event system in place already. Who knew?! I
removed it and converted it to the NEW event system.
  • Loading branch information
TroyKomodo committed Jan 1, 2024
1 parent 762d35a commit 826d6ac
Show file tree
Hide file tree
Showing 35 changed files with 371 additions and 418 deletions.
2 changes: 1 addition & 1 deletion platform/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ http-body = "1.0"
http-body-util = "0.1"
hyper-util = "0.1"
pin-project = "1.1"
base64 = "0.21"

config = { workspace = true }
pb = { workspace = true }
binary-helper = { workspace = true }
base64 = "0.21.5"

[dev-dependencies]
tempfile = "3.8"
Expand Down
3 changes: 1 addition & 2 deletions platform/api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use binary_helper::{bootstrap, grpc_health, grpc_server, impl_global_traits};
use common::context::Context;
use common::dataloader::DataLoader;
use common::global::*;
use platform_api::config::{ApiConfig, JwtConfig, TurnstileConfig, VideoApiConfig};
use platform_api::config::{ApiConfig, ImageUploaderConfig, JwtConfig, TurnstileConfig, VideoApiConfig};
use platform_api::dataloader::category::CategoryByIdLoader;
use platform_api::dataloader::global_state::GlobalStateLoader;
use platform_api::dataloader::role::RoleByIdLoader;
Expand Down Expand Up @@ -184,7 +184,6 @@ impl binary_helper::Global<AppConfig> for GlobalState {

let subscription_manager = SubscriptionManager::default();


let image_processor_s3 = config
.extra
.image_uploader
Expand Down
2 changes: 1 addition & 1 deletion platform/website/src/components/dev-banner.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
PUBLIC_BASE_URL,
PUBLIC_TWITTER_HANDLE,
PUBLIC_ORG_ID,
PUBLIC_EDGE_ENDPOINT
PUBLIC_EDGE_ENDPOINT,
} from "$env/static/public";
import { websocketOpen } from "$/store/websocket";
import { dev } from "$app/environment";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@
</div>
<a class="video" href="/{user.username}">
{#if playing}
<Player
roomId={user.channel.roomId}
showPip={false}
showTheater={false}
/>
<Player roomId={user.channel.roomId} showPip={false} showTheater={false} />
{:else}
<img src={preview} alt="Stream Thumbnail" class="blurred" aria-hidden="true" />
<img src={preview} alt="Stream Thumbnail" class="thumbnail" />
Expand Down
33 changes: 0 additions & 33 deletions proto/scuffle/video/internal/events/organization_event.proto

This file was deleted.

22 changes: 3 additions & 19 deletions proto/scuffle/video/v1/types/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,8 @@ message Event {
// If the disconnection was clean. Meaning the client disconnected and it
// was handled gracefully.
bool clean = 2;

// The cause of the disconnection.
enum Cause {
// The cause is unknown.
DISCONNECTED_CAUSE_UNKNOWN = 0;
// The server requested the disconnection. This is usually because an
// API request on Room.Disconnect was made. Or because another client
// connected to the room, or the room was deleted.
DISCONNECTED_CAUSE_DISCONNECT_REQUEST = 1;
// The ingest bitrate was too high.
DISCONNECTED_CAUSE_HIGH_BITRATE = 2;
// The ingest resolution was too high.
DISCONNECTED_CAUSE_HIGH_RESOLUTION = 3;
// The ingest FPS was too high.
DISCONNECTED_CAUSE_HIGH_FPS = 4;
}

// The cause of the disconnection.
Cause cause = 3;
optional string cause = 3;
}

// If the room is ready to be watched.
Expand All @@ -131,8 +114,9 @@ message Event {

// If the room failed to be go live.
message Failed {
scuffle.types.Ulid connection_id = 1;
// The error that occurred.
string error = 1;
string error = 2;
}

// The event that occurred.
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/access_token/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl ApiRequest<AccessTokenCreateResponse> for tonic::Request<AccessTokenCreateR
})?;

video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::AccessToken,
event::Event::AccessToken(event::AccessToken {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/access_token/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl ApiRequest<AccessTokenDeleteResponse> for tonic::Request<AccessTokenDeleteR

for id in deleted_ids.iter().copied() {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::AccessToken,
event::Event::AccessToken(event::AccessToken {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/playback_key_pair/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl ApiRequest<PlaybackKeyPairCreateResponse> for tonic::Request<PlaybackKeyPai
})?;

video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::PlaybackKeyPair,
event::Event::PlaybackKeyPair(event::PlaybackKeyPair {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/playback_key_pair/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl ApiRequest<PlaybackKeyPairDeleteResponse> for tonic::Request<PlaybackKeyPai

for id in deleted_ids.iter().copied() {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::PlaybackKeyPair,
event::Event::PlaybackKeyPair(event::PlaybackKeyPair {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/playback_key_pair/modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl ApiRequest<PlaybackKeyPairModifyResponse> for tonic::Request<PlaybackKeyPai
match result {
Some(result) => {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::PlaybackKeyPair,
event::Event::PlaybackKeyPair(event::PlaybackKeyPair {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/recording/modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl ApiRequest<RecordingModifyResponse> for tonic::Request<RecordingModifyReque
.unwrap_or_default();

video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::Recording,
event::Event::Recording(event::Recording {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/recording_config/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl ApiRequest<RecordingConfigCreateResponse> for tonic::Request<RecordingConfi
})?;

video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::RecordingConfig,
event::Event::RecordingConfig(event::RecordingConfig {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/recording_config/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl ApiRequest<RecordingConfigDeleteResponse> for tonic::Request<RecordingConfi

for id in deleted_ids.iter().copied() {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::RecordingConfig,
event::Event::RecordingConfig(event::RecordingConfig {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/recording_config/modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl ApiRequest<RecordingConfigModifyResponse> for tonic::Request<RecordingConfi
match result {
Some(result) => {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::RecordingConfig,
event::Event::RecordingConfig(event::RecordingConfig {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/room/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl ApiRequest<RoomCreateResponse> for tonic::Request<RoomCreateRequest> {
})?;

video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::Room,
event::Event::Room(event::Room {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/room/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl ApiRequest<RoomDeleteResponse> for tonic::Request<RoomDeleteRequest> {

for id in deleted_ids.iter().copied() {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::Room,
event::Event::Room(event::Room {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/room/modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl ApiRequest<RoomModifyResponse> for tonic::Request<RoomModifyRequest> {
};

video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::Room,
event::Event::Room(event::Room {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/room/reset_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl ApiRequest<RoomResetKeyResponse> for tonic::Request<RoomResetKeyRequest> {
for RoomKeyPair { id, .. } in rooms.iter() {
if let Some(id) = id {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::Room,
event::Event::Room(event::Room {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/s3_bucket/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl ApiRequest<S3BucketCreateResponse> for tonic::Request<S3BucketCreateRequest
})?;

video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::S3Bucket,
event::Event::S3Bucket(event::S3Bucket {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/s3_bucket/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl ApiRequest<S3BucketDeleteResponse> for tonic::Request<S3BucketDeleteRequest

for id in deleted_ids.iter().copied() {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::S3Bucket,
event::Event::S3Bucket(event::S3Bucket {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/s3_bucket/modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl ApiRequest<S3BucketModifyResponse> for tonic::Request<S3BucketModifyRequest
match result {
Some(result) => {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::S3Bucket,
event::Event::S3Bucket(event::S3Bucket {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/transcoding_config/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl ApiRequest<TranscodingConfigCreateResponse> for tonic::Request<TranscodingC
})?;

video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::TranscodingConfig,
event::Event::TranscodingConfig(event::TranscodingConfig {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/transcoding_config/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl ApiRequest<TranscodingConfigDeleteResponse> for tonic::Request<TranscodingC

for id in deleted_ids.iter().copied() {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::TranscodingConfig,
event::Event::TranscodingConfig(event::TranscodingConfig {
Expand Down
2 changes: 1 addition & 1 deletion video/api/src/api/transcoding_config/modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl ApiRequest<TranscodingConfigModifyResponse> for tonic::Request<TranscodingC
match result {
Some(result) => {
video_common::events::emit(
global.jetstream(),
global.nats(),
access_token.organization_id.0,
Target::TranscodingConfig,
event::Event::TranscodingConfig(event::TranscodingConfig {
Expand Down
4 changes: 2 additions & 2 deletions video/api/src/api/utils/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ macro_rules! impl_tag_req {
})?;

let $id = pb::ext::UlidExt::into_ulid(req.id);
video_common::events::emit(global.jetstream(), access_token.organization_id.0, $event_target, $event).await;
video_common::events::emit(global.nats(), access_token.organization_id.0, $event_target, $event).await;

Ok(tonic::Response::new($resp {
tags: Some(result.into_tags()?)
Expand Down Expand Up @@ -218,7 +218,7 @@ macro_rules! impl_untag_req {
})?;

let $id = pb::ext::UlidExt::into_ulid(req.id);
video_common::events::emit(global.jetstream(), access_token.organization_id.0, $event_target, $event).await;
video_common::events::emit(global.nats(), access_token.organization_id.0, $event_target, $event).await;

Ok(tonic::Response::new($resp {
tags: Some(result.into_tags()?)
Expand Down
2 changes: 1 addition & 1 deletion video/cli/src/cli/events/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl Invokable for Fetch {
action: "disconnected".to_owned(),
connection_id: Some(disconnected.connection_id.into_ulid()),
clean: Some(disconnected.clean),
cause: Some(disconnected.cause().as_str_name().into()),
cause: disconnected.cause,
..Default::default()
},
Some(event::room::Event::Ready(ready)) => EventPayload {
Expand Down
33 changes: 16 additions & 17 deletions video/common/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ use prost::Message;

use crate::keys::event_subject;

pub async fn emit(jetstream: &async_nats::jetstream::Context, org_id: ulid::Ulid, target: Target, event: event::Event) {
jetstream
.publish(
event_subject(org_id, target),
Event {
timestamp: chrono::Utc::now().timestamp_millis(),
event_id: Some(ulid::Ulid::new().into()),
event: Some(event),
}
.encode_to_vec()
.into(),
)
.await
.map_err(|e| {
tracing::error!(err = %e, "failed to publish event");
})
.ok();
pub async fn emit(nats: &async_nats::Client, org_id: ulid::Ulid, target: Target, event: event::Event) {
nats.publish(
event_subject(org_id, target),
Event {
timestamp: chrono::Utc::now().timestamp_millis(),
event_id: Some(ulid::Ulid::new().into()),
event: Some(event),
}
.encode_to_vec()
.into(),
)
.await
.map_err(|e| {
tracing::error!(err = %e, "failed to publish event");
})
.ok();
}
4 changes: 0 additions & 4 deletions video/ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ pub struct IngestConfig {
/// NATS subject to send transcoder requests to
pub transcoder_request_subject: String,

/// NATS subject for events
pub events_subject: String,

/// The interval in to update the bitrate for a room
pub bitrate_update_interval: Duration,

Expand Down Expand Up @@ -58,7 +55,6 @@ impl Default for IngestConfig {
fn default() -> Self {
Self {
transcoder_request_subject: "transcoder-request".to_string(),
events_subject: "events".to_string(),
bitrate_update_interval: Duration::from_secs(5),
max_bitrate: 12000 * 1024,
max_bytes_between_keyframes: 5 * 12000 * 1024 / 8,
Expand Down
Loading

0 comments on commit 826d6ac

Please sign in to comment.