diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index b6846d62e..259a139ce 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -89,7 +89,7 @@ pub struct Aggregator { poll_ops: Vec, /// The time "state" of the aggregator, such as paused or live. - temporality: Temporality, + temporality: proto::instrument::Temporality, /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a /// timestamp that can be sent over the wire. @@ -102,11 +102,6 @@ pub(crate) struct Flush { triggered: AtomicBool, } -#[derive(Debug)] -enum Temporality { - Live, - Paused, -} // Represent static data for resources struct Resource { id: Id, @@ -162,7 +157,7 @@ impl Aggregator { async_ops: IdData::default(), async_op_stats: IdData::default(), poll_ops: Default::default(), - temporality: Temporality::Live, + temporality: proto::instrument::Temporality::Live, base_time, } } @@ -179,8 +174,8 @@ impl Aggregator { // if the flush interval elapses, flush data to the client _ = publish.tick() => { match self.temporality { - Temporality::Live => true, - Temporality::Paused => false, + proto::instrument::Temporality::Live => true, + proto::instrument::Temporality::Paused => false, } } @@ -199,11 +194,17 @@ impl Aggregator { Some(Command::WatchTaskDetail(watch_request)) => { self.add_task_detail_subscription(watch_request); }, + Some(Command::WatchState(subscription)) => { + let state = proto::instrument::State { + temporality: self.temporality.into(), + }; + subscription.update(&state); + }, Some(Command::Pause) => { - self.temporality = Temporality::Paused; + self.temporality = proto::instrument::Temporality::Paused; } Some(Command::Resume) => { - self.temporality = Temporality::Live; + self.temporality = proto::instrument::Temporality::Live; } None => { tracing::debug!("rpc channel closed, terminating"); diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index aa0085796..d2bb52e8e 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -178,6 +178,7 @@ struct Watch(mpsc::Sender>); enum Command { Instrument(Watch), WatchTaskDetail(WatchRequest), + WatchState(Watch), Pause, Resume, } @@ -1190,6 +1191,8 @@ impl proto::instrument::instrument_server::Instrument for Server { tokio_stream::wrappers::ReceiverStream>; type WatchTaskDetailsStream = tokio_stream::wrappers::ReceiverStream>; + type WatchStateStream = + tokio_stream::wrappers::ReceiverStream>; async fn watch_updates( &self, req: tonic::Request, @@ -1245,6 +1248,18 @@ impl proto::instrument::instrument_server::Instrument for Server { Ok(tonic::Response::new(stream)) } + async fn watch_state( + &self, + _req: tonic::Request, + ) -> Result, tonic::Status> { + let (stream_sender, stream_recv) = mpsc::channel(self.client_buffer); + self.subscribe.send(Command::WatchState(Watch(stream_sender))).await.map_err(|_| { + tonic::Status::internal("cannot get state, aggregation task is not running") + })?; + let stream = tokio_stream::wrappers::ReceiverStream::new(stream_recv); + Ok(tonic::Response::new(stream)) + } + async fn pause( &self, _req: tonic::Request,