Skip to content

Commit

Permalink
feat(status): prevent duplicate status publishes
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Nov 28, 2023
1 parent 7c5f5d8 commit f3bb670
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 25 deletions.
9 changes: 7 additions & 2 deletions bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ async fn main() -> anyhow::Result<()> {
command_topic_prefix: DEFAULT_COMMANDS_TOPIC.trim_matches(trimmer).to_owned(),
publisher: context.clone(),
notify_stream,
status_stream: status_stream.clone(),
};
let events_manager: ConsumerManager<EventConsumer> = ConsumerManager::new(
permit_pool.clone(),
Expand Down Expand Up @@ -373,6 +374,7 @@ struct EventWorkerCreator<StateStore> {
command_topic_prefix: String,
publisher: Context,
notify_stream: Stream,
status_stream: Stream,
}

#[async_trait::async_trait]
Expand All @@ -392,8 +394,11 @@ where
self.publisher.clone(),
&format!("{}.{lattice_id}", self.command_topic_prefix),
);
let status_publisher =
StatusPublisher::new(self.publisher.clone(), &format!("wadm.status.{lattice_id}"));
let status_publisher = StatusPublisher::new(
self.publisher.clone(),
Some(self.status_stream.clone()),
&format!("wadm.status.{lattice_id}"),
);
let manager = ScalerManager::new(
self.publisher.clone(),
self.notify_stream.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/scaler/daemonscaler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ mod test {
},
);
let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher,None, "doesntmatter");
let worker = EventWorker::new(
store.clone(),
lattice_source.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/scaler/spreadscaler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ mod test {

let lattice_source = TestLatticeSource::default();
let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
let worker = EventWorker::new(
store.clone(),
lattice_source.clone(),
Expand Down
26 changes: 13 additions & 13 deletions src/workers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ mod test {
let lattice_id = "all_state";

let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
let worker = EventWorker::new(
store.clone(),
lattice_source.clone(),
Expand Down Expand Up @@ -1490,7 +1490,7 @@ mod test {
ActorDescription {
id: actor1.public_key.to_string(),
image_ref: None,
/// The individual instances of this actor that are running
// The individual instances of this actor that are running
instances: vec![
ActorInstance {
annotations: None,
Expand All @@ -1512,7 +1512,7 @@ mod test {
ActorDescription {
id: actor2.public_key.to_string(),
image_ref: None,
/// The individual instances of this actor that are running
// The individual instances of this actor that are running
instances: vec![
ActorInstance {
annotations: None,
Expand Down Expand Up @@ -1565,7 +1565,7 @@ mod test {
ActorDescription {
id: actor1.public_key.to_string(),
image_ref: None,
/// The individual instances of this actor that are running
// The individual instances of this actor that are running
instances: vec![
ActorInstance {
annotations: None,
Expand All @@ -1587,7 +1587,7 @@ mod test {
ActorDescription {
id: actor2.public_key.to_string(),
image_ref: None,
/// The individual instances of this actor that are running
// The individual instances of this actor that are running
instances: vec![
ActorInstance {
annotations: None,
Expand Down Expand Up @@ -1795,7 +1795,7 @@ mod test {
actors: vec![ActorDescription {
id: actor2.public_key.to_string(),
image_ref: None,
/// The individual instances of this actor that are running
// The individual instances of this actor that are running
instances: vec![
ActorInstance {
annotations: None,
Expand Down Expand Up @@ -1828,7 +1828,7 @@ mod test {
actors: vec![ActorDescription {
id: actor2.public_key.to_string(),
image_ref: None,
/// The individual instances of this actor that are running
// The individual instances of this actor that are running
instances: vec![
ActorInstance {
annotations: None,
Expand Down Expand Up @@ -1992,7 +1992,7 @@ mod test {
..Default::default()
};
let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
let worker = EventWorker::new(
store.clone(),
lattice_source.clone(),
Expand Down Expand Up @@ -2023,7 +2023,7 @@ mod test {
ActorDescription {
id: actor1_id.to_string(),
image_ref: None,
/// The individual instances of this actor that are running
// The individual instances of this actor that are running
instances: vec![
ActorInstance {
annotations: None,
Expand All @@ -2045,7 +2045,7 @@ mod test {
ActorDescription {
id: actor2_id.to_string(),
image_ref: None,
/// The individual instances of this actor that are running
// The individual instances of this actor that are running
instances: vec![ActorInstance {
annotations: None,
instance_id: "3".to_string(),
Expand Down Expand Up @@ -2155,7 +2155,7 @@ mod test {
let lattice_source = TestLatticeSource::default();
let lattice_id = "provider_status";
let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
let worker = EventWorker::new(
store.clone(),
lattice_source.clone(),
Expand Down Expand Up @@ -2272,7 +2272,7 @@ mod test {
let lattice_id = "provider_contract_id";

let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
let worker = EventWorker::new(
store.clone(),
lattice_source.clone(),
Expand Down Expand Up @@ -2376,7 +2376,7 @@ mod test {
let lattice_id = "update_data";

let command_publisher = CommandPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, "doesntmatter");
let status_publisher = StatusPublisher::new(NoopPublisher, None, "doesntmatter");
let worker = EventWorker::new(
store.clone(),
lattice_source.clone(),
Expand Down
46 changes: 38 additions & 8 deletions src/workers/event_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use async_nats::jetstream::stream::Stream;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;

use tracing::{instrument, warn};
use tracing::{instrument, trace, warn};
use wasmcloud_control_interface::{HostInventory, LinkDefinition};

use crate::{commands::Command, publisher::Publisher, server::StatusInfo, APP_SPEC_ANNOTATION};
Expand Down Expand Up @@ -98,16 +99,23 @@ impl LinkSource for wasmcloud_control_interface::Client {
#[derive(Clone)]
pub struct StatusPublisher<Pub> {
publisher: Pub,
// Stream for querying current status to avoid duplicate updates
status_stream: Option<Stream>,
// Topic prefix, e.g. wadm.status.default
topic_prefix: String,
}

impl<Pub> StatusPublisher<Pub> {
/// Creates an new status publisher configured with the given publisher that will send to the
/// manifest status topic using the given prefix
pub fn new(publisher: Pub, topic_prefix: &str) -> StatusPublisher<Pub> {
pub fn new(
publisher: Pub,
status_stream: Option<Stream>,
topic_prefix: &str,
) -> StatusPublisher<Pub> {
StatusPublisher {
publisher,
status_stream,
topic_prefix: topic_prefix.to_owned(),
}
}
Expand All @@ -116,12 +124,34 @@ impl<Pub> StatusPublisher<Pub> {
impl<Pub: Publisher> StatusPublisher<Pub> {
#[instrument(level = "trace", skip(self))]
pub async fn publish_status(&self, name: &str, status: StatusInfo) -> anyhow::Result<()> {
self.publisher
.publish(
serde_json::to_vec(&status)?,
Some(&format!("{}.{name}", self.topic_prefix)),
)
.await
let topic = format!("{}.{name}", self.topic_prefix);

// NOTE(brooksmtownsend): This direct get may not always query the jetstream leader. In the
// worst case where the last message isn't all the way updated, we may publish a duplicate
// status. This is an acceptable tradeoff to not have to query the leader directly every time.
let prev_status = if let Some(status_stream) = &self.status_stream {
status_stream
.direct_get_last_for_subject(&topic)
.await
.map(|m| serde_json::from_slice::<StatusInfo>(&m.payload).ok())
.ok()
.flatten()
} else {
None
};

match prev_status {
// If the status hasn't changed, skip publishing
Some(prev_status) if prev_status == status => {
trace!(%name, "Status hasn't changed since last update. Skipping");
Ok(())
}
_ => {
self.publisher
.publish(serde_json::to_vec(&status)?, Some(&topic))
.await
}
}
}
}

Expand Down

0 comments on commit f3bb670

Please sign in to comment.