Skip to content

Commit

Permalink
feat: implement watch_state
Browse files Browse the repository at this point in the history
Signed-off-by: Rustin170506 <[email protected]>
  • Loading branch information
Rustin170506 committed Sep 5, 2024
1 parent 1b485cc commit d0810c6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
23 changes: 12 additions & 11 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub struct Aggregator {
poll_ops: Vec<proto::resources::PollOp>,

/// The time "state" of the aggregator, such as paused or live.
temporality: Temporality,
temporality: proto::instrument::Temporality,

/// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
/// timestamp that can be sent over the wire.
Expand All @@ -102,11 +102,6 @@ pub(crate) struct Flush {
triggered: AtomicBool,
}

#[derive(Debug)]
enum Temporality {
Live,
Paused,
}
// Represent static data for resources
struct Resource {
id: Id,
Expand Down Expand Up @@ -162,7 +157,7 @@ impl Aggregator {
async_ops: IdData::default(),
async_op_stats: IdData::default(),
poll_ops: Default::default(),
temporality: Temporality::Live,
temporality: proto::instrument::Temporality::Live,
base_time,
}
}
Expand All @@ -179,8 +174,8 @@ impl Aggregator {
// if the flush interval elapses, flush data to the client
_ = publish.tick() => {
match self.temporality {
Temporality::Live => true,
Temporality::Paused => false,
proto::instrument::Temporality::Live => true,
proto::instrument::Temporality::Paused => false,
}
}

Expand All @@ -199,11 +194,17 @@ impl Aggregator {
Some(Command::WatchTaskDetail(watch_request)) => {
self.add_task_detail_subscription(watch_request);
},
Some(Command::WatchState(subscription)) => {
let state = proto::instrument::State {
temporality: self.temporality.into(),
};
subscription.update(&state);
},
Some(Command::Pause) => {
self.temporality = Temporality::Paused;
self.temporality = proto::instrument::Temporality::Paused;
}
Some(Command::Resume) => {
self.temporality = Temporality::Live;
self.temporality = proto::instrument::Temporality::Live;
}
None => {
tracing::debug!("rpc channel closed, terminating");
Expand Down
15 changes: 15 additions & 0 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ struct Watch<T>(mpsc::Sender<Result<T, tonic::Status>>);
enum Command {
Instrument(Watch<proto::instrument::Update>),
WatchTaskDetail(WatchRequest<proto::tasks::TaskDetails>),
WatchState(Watch<proto::instrument::State>),
Pause,
Resume,
}
Expand Down Expand Up @@ -1190,6 +1191,8 @@ impl proto::instrument::instrument_server::Instrument for Server {
tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::Update, tonic::Status>>;
type WatchTaskDetailsStream =
tokio_stream::wrappers::ReceiverStream<Result<proto::tasks::TaskDetails, tonic::Status>>;
type WatchStateStream =
tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::State, tonic::Status>>;
async fn watch_updates(
&self,
req: tonic::Request<proto::instrument::InstrumentRequest>,
Expand Down Expand Up @@ -1245,6 +1248,18 @@ impl proto::instrument::instrument_server::Instrument for Server {
Ok(tonic::Response::new(stream))
}

async fn watch_state(
&self,
_req: tonic::Request<proto::instrument::StateRequest>,
) -> Result<tonic::Response<Self::WatchStateStream>, tonic::Status> {
let (stream_sender, stream_recv) = mpsc::channel(self.client_buffer);
self.subscribe.send(Command::WatchState(Watch(stream_sender))).await.map_err(|_| {
tonic::Status::internal("cannot get state, aggregation task is not running")
})?;
let stream = tokio_stream::wrappers::ReceiverStream::new(stream_recv);
Ok(tonic::Response::new(stream))
}

async fn pause(
&self,
_req: tonic::Request<proto::instrument::PauseRequest>,
Expand Down

0 comments on commit d0810c6

Please sign in to comment.