Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose session stats #266

Merged
merged 1 commit into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,29 @@ message FfiRequest {
SetSubscribedRequest set_subscribed = 8;
UpdateLocalMetadataRequest update_local_metadata = 9;
UpdateLocalNameRequest update_local_name = 10;
GetSessionStatsRequest get_session_stats = 11;

// Track
CreateVideoTrackRequest create_video_track = 11;
CreateAudioTrackRequest create_audio_track = 12;
GetStatsRequest get_stats = 13;
CreateVideoTrackRequest create_video_track = 12;
CreateAudioTrackRequest create_audio_track = 13;
GetStatsRequest get_stats = 14;

// Video
AllocVideoBufferRequest alloc_video_buffer = 14;
NewVideoStreamRequest new_video_stream = 15;
NewVideoSourceRequest new_video_source = 16;
CaptureVideoFrameRequest capture_video_frame = 17;
ToI420Request to_i420 = 18;
ToArgbRequest to_argb = 19;
AllocVideoBufferRequest alloc_video_buffer = 15;
NewVideoStreamRequest new_video_stream = 16;
NewVideoSourceRequest new_video_source = 17;
CaptureVideoFrameRequest capture_video_frame = 18;
ToI420Request to_i420 = 19;
ToArgbRequest to_argb = 20;

// Audio
AllocAudioBufferRequest alloc_audio_buffer = 20;
NewAudioStreamRequest new_audio_stream = 21;
NewAudioSourceRequest new_audio_source = 22;
CaptureAudioFrameRequest capture_audio_frame = 23;
NewAudioResamplerRequest new_audio_resampler = 24;
RemixAndResampleRequest remix_and_resample = 25;
E2eeRequest e2ee = 26;
AllocAudioBufferRequest alloc_audio_buffer = 21;
NewAudioStreamRequest new_audio_stream = 22;
NewAudioSourceRequest new_audio_source = 23;
CaptureAudioFrameRequest capture_audio_frame = 24;
NewAudioResamplerRequest new_audio_resampler = 25;
RemixAndResampleRequest remix_and_resample = 26;
E2eeRequest e2ee = 27;
}
}

Expand All @@ -104,28 +105,29 @@ message FfiResponse {
SetSubscribedResponse set_subscribed = 8;
UpdateLocalMetadataResponse update_local_metadata = 9;
UpdateLocalNameResponse update_local_name = 10;
GetSessionStatsResponse get_session_stats = 11;

// Track
CreateVideoTrackResponse create_video_track = 11;
CreateAudioTrackResponse create_audio_track = 12;
GetStatsResponse get_stats = 13;
CreateVideoTrackResponse create_video_track = 12;
CreateAudioTrackResponse create_audio_track = 13;
GetStatsResponse get_stats = 14;

// Video
AllocVideoBufferResponse alloc_video_buffer = 14;
NewVideoStreamResponse new_video_stream = 15;
NewVideoSourceResponse new_video_source = 16;
CaptureVideoFrameResponse capture_video_frame = 17;
ToI420Response to_i420 = 18;
ToArgbResponse to_argb = 19;
AllocVideoBufferResponse alloc_video_buffer = 15;
NewVideoStreamResponse new_video_stream = 16;
NewVideoSourceResponse new_video_source = 17;
CaptureVideoFrameResponse capture_video_frame = 18;
ToI420Response to_i420 = 19;
ToArgbResponse to_argb = 20;

// Audio
AllocAudioBufferResponse alloc_audio_buffer = 20;
NewAudioStreamResponse new_audio_stream = 21;
NewAudioSourceResponse new_audio_source = 22;
CaptureAudioFrameResponse capture_audio_frame = 23;
NewAudioResamplerResponse new_audio_resampler = 24;
RemixAndResampleResponse remix_and_resample = 25;
E2eeResponse e2ee = 26;
AllocAudioBufferResponse alloc_audio_buffer = 21;
NewAudioStreamResponse new_audio_stream = 22;
NewAudioSourceResponse new_audio_source = 23;
CaptureAudioFrameResponse capture_audio_frame = 24;
NewAudioResamplerResponse new_audio_resampler = 25;
RemixAndResampleResponse remix_and_resample = 26;
E2eeResponse e2ee = 27;
}
}

Expand All @@ -149,6 +151,7 @@ message FfiEvent {
UpdateLocalNameCallback update_local_name = 13;
GetStatsCallback get_stats = 14;
LogBatch logs = 15;
GetSessionStatsCallback get_session_stats = 16;
}
}

Expand Down
14 changes: 14 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import "handle.proto";
import "participant.proto";
import "track.proto";
import "video_frame.proto";
import "stats.proto";

// Connect to a new LiveKit room
message ConnectRequest {
Expand Down Expand Up @@ -130,6 +131,19 @@ message SetSubscribedRequest {
}
message SetSubscribedResponse {}

message GetSessionStatsRequest {
uint64 room_handle = 1;
}
message GetSessionStatsResponse {
uint64 async_id = 1;
}
message GetSessionStatsCallback {
uint64 async_id = 1;
optional string error = 2;
repeated RtcStats publisher_stats = 3;
repeated RtcStats subscriber_stats = 4;
}


//
// Options
Expand Down
100 changes: 65 additions & 35 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,30 @@ pub struct SetSubscribedRequest {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetSubscribedResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetSessionStatsRequest {
#[prost(uint64, tag="1")]
pub room_handle: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetSessionStatsResponse {
#[prost(uint64, tag="1")]
pub async_id: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetSessionStatsCallback {
#[prost(uint64, tag="1")]
pub async_id: u64,
#[prost(string, optional, tag="2")]
pub error: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, repeated, tag="3")]
pub publisher_stats: ::prost::alloc::vec::Vec<RtcStats>,
#[prost(message, repeated, tag="4")]
pub subscriber_stats: ::prost::alloc::vec::Vec<RtcStats>,
}
//
// Options
//
Expand Down Expand Up @@ -3022,7 +3046,7 @@ impl AudioSourceType {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand All @@ -3049,48 +3073,50 @@ pub mod ffi_request {
UpdateLocalMetadata(super::UpdateLocalMetadataRequest),
#[prost(message, tag="10")]
UpdateLocalName(super::UpdateLocalNameRequest),
/// Track
#[prost(message, tag="11")]
CreateVideoTrack(super::CreateVideoTrackRequest),
GetSessionStats(super::GetSessionStatsRequest),
/// Track
#[prost(message, tag="12")]
CreateAudioTrack(super::CreateAudioTrackRequest),
CreateVideoTrack(super::CreateVideoTrackRequest),
#[prost(message, tag="13")]
CreateAudioTrack(super::CreateAudioTrackRequest),
#[prost(message, tag="14")]
GetStats(super::GetStatsRequest),
/// Video
#[prost(message, tag="14")]
AllocVideoBuffer(super::AllocVideoBufferRequest),
#[prost(message, tag="15")]
NewVideoStream(super::NewVideoStreamRequest),
AllocVideoBuffer(super::AllocVideoBufferRequest),
#[prost(message, tag="16")]
NewVideoSource(super::NewVideoSourceRequest),
NewVideoStream(super::NewVideoStreamRequest),
#[prost(message, tag="17")]
CaptureVideoFrame(super::CaptureVideoFrameRequest),
NewVideoSource(super::NewVideoSourceRequest),
#[prost(message, tag="18")]
ToI420(super::ToI420Request),
CaptureVideoFrame(super::CaptureVideoFrameRequest),
#[prost(message, tag="19")]
ToI420(super::ToI420Request),
#[prost(message, tag="20")]
ToArgb(super::ToArgbRequest),
/// Audio
#[prost(message, tag="20")]
AllocAudioBuffer(super::AllocAudioBufferRequest),
#[prost(message, tag="21")]
NewAudioStream(super::NewAudioStreamRequest),
AllocAudioBuffer(super::AllocAudioBufferRequest),
#[prost(message, tag="22")]
NewAudioSource(super::NewAudioSourceRequest),
NewAudioStream(super::NewAudioStreamRequest),
#[prost(message, tag="23")]
CaptureAudioFrame(super::CaptureAudioFrameRequest),
NewAudioSource(super::NewAudioSourceRequest),
#[prost(message, tag="24")]
NewAudioResampler(super::NewAudioResamplerRequest),
CaptureAudioFrame(super::CaptureAudioFrameRequest),
#[prost(message, tag="25")]
RemixAndResample(super::RemixAndResampleRequest),
NewAudioResampler(super::NewAudioResamplerRequest),
#[prost(message, tag="26")]
RemixAndResample(super::RemixAndResampleRequest),
#[prost(message, tag="27")]
E2ee(super::E2eeRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand All @@ -3117,40 +3143,42 @@ pub mod ffi_response {
UpdateLocalMetadata(super::UpdateLocalMetadataResponse),
#[prost(message, tag="10")]
UpdateLocalName(super::UpdateLocalNameResponse),
/// Track
#[prost(message, tag="11")]
CreateVideoTrack(super::CreateVideoTrackResponse),
GetSessionStats(super::GetSessionStatsResponse),
/// Track
#[prost(message, tag="12")]
CreateAudioTrack(super::CreateAudioTrackResponse),
CreateVideoTrack(super::CreateVideoTrackResponse),
#[prost(message, tag="13")]
CreateAudioTrack(super::CreateAudioTrackResponse),
#[prost(message, tag="14")]
GetStats(super::GetStatsResponse),
/// Video
#[prost(message, tag="14")]
AllocVideoBuffer(super::AllocVideoBufferResponse),
#[prost(message, tag="15")]
NewVideoStream(super::NewVideoStreamResponse),
AllocVideoBuffer(super::AllocVideoBufferResponse),
#[prost(message, tag="16")]
NewVideoSource(super::NewVideoSourceResponse),
NewVideoStream(super::NewVideoStreamResponse),
#[prost(message, tag="17")]
CaptureVideoFrame(super::CaptureVideoFrameResponse),
NewVideoSource(super::NewVideoSourceResponse),
#[prost(message, tag="18")]
ToI420(super::ToI420Response),
CaptureVideoFrame(super::CaptureVideoFrameResponse),
#[prost(message, tag="19")]
ToI420(super::ToI420Response),
#[prost(message, tag="20")]
ToArgb(super::ToArgbResponse),
/// Audio
#[prost(message, tag="20")]
AllocAudioBuffer(super::AllocAudioBufferResponse),
#[prost(message, tag="21")]
NewAudioStream(super::NewAudioStreamResponse),
AllocAudioBuffer(super::AllocAudioBufferResponse),
#[prost(message, tag="22")]
NewAudioSource(super::NewAudioSourceResponse),
NewAudioStream(super::NewAudioStreamResponse),
#[prost(message, tag="23")]
CaptureAudioFrame(super::CaptureAudioFrameResponse),
NewAudioSource(super::NewAudioSourceResponse),
#[prost(message, tag="24")]
NewAudioResampler(super::NewAudioResamplerResponse),
CaptureAudioFrame(super::CaptureAudioFrameResponse),
#[prost(message, tag="25")]
RemixAndResample(super::RemixAndResampleResponse),
NewAudioResampler(super::NewAudioResamplerResponse),
#[prost(message, tag="26")]
RemixAndResample(super::RemixAndResampleResponse),
#[prost(message, tag="27")]
E2ee(super::E2eeResponse),
}
}
Expand All @@ -3160,7 +3188,7 @@ pub mod ffi_response {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiEvent {
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15")]
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16")]
pub message: ::core::option::Option<ffi_event::Message>,
}
/// Nested message and enum types in `FfiEvent`.
Expand Down Expand Up @@ -3198,6 +3226,8 @@ pub mod ffi_event {
GetStats(super::GetStatsCallback),
#[prost(message, tag="15")]
Logs(super::LogBatch),
#[prost(message, tag="16")]
GetSessionStats(super::GetSessionStatsCallback),
}
}
/// Stop all rooms synchronously (Do we need async here?).
Expand Down
54 changes: 54 additions & 0 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,54 @@ fn on_e2ee_request(
Ok(proto::E2eeResponse { message: Some(msg) })
}

fn on_get_session_stats(
server: &'static FfiServer,
get_session_stats: proto::GetSessionStatsRequest,
) -> FfiResult<proto::GetSessionStatsResponse> {
let ffi_room = server
.retrieve_handle::<room::FfiRoom>(get_session_stats.room_handle)?
.clone();
let async_id = server.next_id();

server.async_runtime.spawn(async move {
match ffi_room.inner.room.get_stats().await {
Ok(stats) => {
let _ = server
.send_event(proto::ffi_event::Message::GetSessionStats(
proto::GetSessionStatsCallback {
async_id,
error: None,
publisher_stats: stats
.publisher_stats
.into_iter()
.map(Into::into)
.collect(),
subscriber_stats: stats
.subscriber_stats
.into_iter()
.map(Into::into)
.collect(),
},
))
.await;
}
Err(err) => {
let _ = server
.send_event(proto::ffi_event::Message::GetSessionStats(
proto::GetSessionStatsCallback {
async_id,
error: Some(err.to_string()),
..Default::default()
},
))
.await;
}
}
});

Ok(proto::GetSessionStatsResponse { async_id })
}

#[allow(clippy::field_reassign_with_default)] // Avoid uggly format
pub fn handle_request(
server: &'static FfiServer,
Expand Down Expand Up @@ -823,6 +871,12 @@ pub fn handle_request(
proto::ffi_request::Message::E2ee(e2ee) => {
proto::ffi_response::Message::E2ee(on_e2ee_request(server, e2ee)?)
}
proto::ffi_request::Message::GetSessionStats(get_session_stats) => {
proto::ffi_response::Message::GetSessionStats(on_get_session_stats(
server,
get_session_stats,
)?)
}
});

Ok(res)
Expand Down
Loading
Loading