Skip to content

Commit

Permalink
fix: remove duplicate metric collection (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
avipinku authored Sep 29, 2021
1 parent 25bab7d commit 0182a63
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ private void defaultInitialization() throws Exception {
})) {

startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(DEFAULT_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir);
assertTrue(firstConfigLog.await(15, TimeUnit.SECONDS), "Running with default config.");
assertTrue(firstPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(firstMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log not detected.");
assertTrue(firstConfigLog.await(30, TimeUnit.SECONDS), "Running with default config.");
assertTrue(firstPubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(firstMqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log not detected.");

checkForPubSubMessages(130000);
}
Expand All @@ -134,16 +134,16 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception {
defaultInitialization();

//--------------------------------------------------
//Change mqttTopic=test/topic, pubsubPublish=false, telemetryPublishInterval=5000ms
//Change telemetryPublishInterval=5000ms

final CountDownLatch firstPubsubLog = new CountDownLatch(1);
final CountDownLatch firstMqttLog = new CountDownLatch(1);
final CountDownLatch firstConfigLog = new CountDownLatch(1);
try (AutoCloseable listener = TestUtils.createCloseableLogListener((m) -> {
String stdoutStr = m.getMessage();
if (stdoutStr == null || stdoutStr.length() == 0) {return;}
//Config should now be pubsub:false, pubsub_topic:$local/greengrass/telemetry, mqtt_topic:test/topic, telemetryPublishIntervalMs:5000
if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "false", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, TEST_MQTT_TOPIC, "5000"))) {
//Config should now be pubsub:true, pubsub_topic:$local/greengrass/telemetry, mqtt_topic:, telemetryPublishIntervalMs:5000
if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "true", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "", "5000"))) {
firstConfigLog.countDown();
}
if (stdoutStr.contains(PUBSUB_PUBLISH_STARTING)) {
Expand All @@ -153,16 +153,11 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception {
firstMqttLog.countDown();
}
})) {
//Turn MQTT publishing on
getConfigTopic(MQTT_TOPIC_CONFIG_NAME).withValue(TEST_MQTT_TOPIC);
//Turn pubsub off
getConfigTopic(PUBSUB_PUBLISH_CONFIG_NAME).withValue(false);
//Change publish interval to 5s to speed up testing
getConfigTopic(TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME).withValue(5000);

assertTrue(firstConfigLog.await(30, TimeUnit.SECONDS), "Running with expected config.");
assertFalse(firstPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log not detected.");
assertTrue(firstMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected.");
assertTrue(firstPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(firstMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log not detected.");
}

//--------------------------------------------------
Expand All @@ -181,30 +176,32 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception {
if (stdoutStr.contains(MQTT_PUBLISH_STARTING)) {
secondMqttLog.countDown();
}
//Config should now be pubSubPublish:false, pubSubTopic:$local/greengrass/telemetry, mqttTopic:"greengrass/nucleus/telemetry", telemetryPublishIntervalMs:5000
if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "false", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "greengrass/nucleus/telemetry", "5000"))) {
//Config should now be pubSubPublish:true, pubSubTopic:$local/greengrass/telemetry, mqttTopic:"greengrass/nucleus/telemetry", telemetryPublishIntervalMs:5000
if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "true", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "greengrass/nucleus/telemetry", "5000"))) {
secondConfigLog.countDown();
}
})) {

//Change MQTT Topic
getConfigTopic(MQTT_TOPIC_CONFIG_NAME).withValue("greengrass/nucleus/telemetry");

assertFalse(secondPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertTrue(secondMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected.");
assertTrue(secondConfigLog.await(30, TimeUnit.SECONDS), "Running with expected config.");
assertTrue(secondPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertTrue(secondMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected.");
checkForPubSubMessages(20000);
}

//--------------------------------------------------
//Change mqttPublish=false, pubsubPublish=true
//Change pubsubPublish=false

final CountDownLatch thirdPubsubLog = new CountDownLatch(1);
final CountDownLatch thirdMqttLog = new CountDownLatch(1);
final CountDownLatch thirdConfigLog = new CountDownLatch(1);
try (AutoCloseable listener = TestUtils.createCloseableLogListener((m) -> {
String stdoutStr = m.getMessage();
if (stdoutStr == null || stdoutStr.length() == 0) {return;}
//Config should now be pubSubPublish:true, pubSubTopic:$local/greengrass/telemetry, mqttTopic:"" telemetryPublishIntervalMs:5000
if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "true", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "", "5000"))) {
//Config should now be pubSubPublish:false, pubSubTopic:$local/greengrass/telemetry, mqttTopic:"greengrass/nucleus/telemetry" telemetryPublishIntervalMs:5000
if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "false", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "greengrass/nucleus/telemetry", "5000"))) {
thirdConfigLog.countDown();
}
if (stdoutStr.contains(PUBSUB_PUBLISH_STARTING)) {
Expand All @@ -213,16 +210,41 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception {
if (stdoutStr.contains(MQTT_PUBLISH_STARTING)) {
thirdMqttLog.countDown();
}
})) {
//Turn pubsub off
getConfigTopic(PUBSUB_PUBLISH_CONFIG_NAME).withValue(false);

assertTrue(thirdConfigLog.await(30, TimeUnit.SECONDS), "Running with expected config.");
assertFalse(thirdPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log not detected.");
assertTrue(thirdMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected.");
}

//--------------------------------------------------
//Change mqttPublish=false

final CountDownLatch fourthPubsubLog = new CountDownLatch(1);
final CountDownLatch fourthMqttLog = new CountDownLatch(1);
final CountDownLatch fourthConfigLog = new CountDownLatch(1);
try (AutoCloseable listener = TestUtils.createCloseableLogListener((m) -> {
String stdoutStr = m.getMessage();
if (stdoutStr == null || stdoutStr.length() == 0) {return;}
//Config should now be pubSubPublish:false, pubSubTopic:$local/greengrass/telemetry, mqttTopic:"" telemetryPublishIntervalMs:5000
if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "false", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "", "5000"))) {
fourthConfigLog.countDown();
}
if (stdoutStr.contains(PUBSUB_PUBLISH_STARTING)) {
fourthPubsubLog.countDown();
}
if (stdoutStr.contains(MQTT_PUBLISH_STARTING)) {
fourthMqttLog.countDown();
}
})) {
//Turn MQTT publishing off
getConfigTopic(MQTT_TOPIC_CONFIG_NAME).withValue("");
//Turn pubsub on
getConfigTopic(PUBSUB_PUBLISH_CONFIG_NAME).withValue(true);

assertTrue(thirdConfigLog.await(30, TimeUnit.SECONDS), "Running with expected config.");
assertTrue(thirdPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(thirdMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log not detected.");
checkForPubSubMessages(20000);
assertTrue(fourthConfigLog.await(30, TimeUnit.SECONDS), "Running with expected config.");
assertFalse(fourthPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log not detected.");
assertFalse(fourthMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log not detected.");
}
}

Expand Down Expand Up @@ -250,10 +272,10 @@ void GIVEN_invalid_publish_threshold_THEN_it_reverts_to_minimum() throws Excepti
}
})) {
startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(INVALID_THRESHOLD_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir);
assertTrue(logFound.await(15, TimeUnit.SECONDS), "Invalid threshold detected.");
assertTrue(configLog.await(15, TimeUnit.SECONDS), "Running with expected config.");
assertTrue(pubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertTrue(mqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected.");
assertTrue(logFound.await(30, TimeUnit.SECONDS), "Invalid threshold detected.");
assertTrue(configLog.await(30, TimeUnit.SECONDS), "Running with expected config.");
assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertTrue(mqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log detected.");
checkForPubSubMessages(20000);
}
}
Expand Down Expand Up @@ -282,10 +304,10 @@ void GIVEN_no_config_options_THEN_it_uses_default() throws Exception {
}
})) {
startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(NO_CONFIG_OPTIONS_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir);
assertTrue(logFound.await(15, TimeUnit.SECONDS), "Invalid config options detected.");
assertTrue(configLog.await(15, TimeUnit.SECONDS), "Running with default config.");
assertTrue(pubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(mqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected.");
assertTrue(logFound.await(30, TimeUnit.SECONDS), "Invalid config options detected.");
assertTrue(configLog.await(30, TimeUnit.SECONDS), "Running with default config.");
assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(mqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log detected.");
checkForPubSubMessages(130000);
}
}
Expand Down Expand Up @@ -314,10 +336,10 @@ void GIVEN_invalid_pubSubPublish_option_THEN_it_uses_default() throws Exception
}
})) {
startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(INVALID_PUBSUB_PUBLISH_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir);
assertTrue(logFound.await(15, TimeUnit.SECONDS), "Invalid config options detected.");
assertTrue(configLog.await(15, TimeUnit.SECONDS), "Running with default config.");
assertTrue(pubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(mqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected.");
assertTrue(logFound.await(30, TimeUnit.SECONDS), "Invalid config options detected.");
assertTrue(configLog.await(30, TimeUnit.SECONDS), "Running with default config.");
assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(mqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log detected.");
checkForPubSubMessages(130000);
}
}
Expand Down Expand Up @@ -346,10 +368,10 @@ void GIVEN_invalid_telemetryPublishIntervalMs_option_THEN_it_uses_default() thro
}
})) {
startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(INVALID_TELEMETRY_PUBLISH_INTERVALMS_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir);
assertTrue(logFound.await(15, TimeUnit.SECONDS), "Invalid config options detected.");
assertTrue(configLog.await(15, TimeUnit.SECONDS), "Running with default config.");
assertTrue(pubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(mqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected.");
assertTrue(logFound.await(30, TimeUnit.SECONDS), "Invalid config options detected.");
assertTrue(configLog.await(30, TimeUnit.SECONDS), "Running with default config.");
assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(mqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log detected.");
checkForPubSubMessages(130000);
}
}
Expand Down Expand Up @@ -378,10 +400,10 @@ void GIVEN_invalid_mqttTopic_option_THEN_it_uses_default() throws Exception {
}
})) {
startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(INVALID_MQTT_TOPIC_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir);
assertTrue(logFound.await(15, TimeUnit.SECONDS), "Invalid config options detected.");
assertTrue(configLog.await(15, TimeUnit.SECONDS), "Running with default config.");
assertTrue(pubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(mqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected.");
assertTrue(logFound.await(30, TimeUnit.SECONDS), "Invalid config options detected.");
assertTrue(configLog.await(30, TimeUnit.SECONDS), "Running with default config.");
assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected.");
assertFalse(mqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log detected.");
checkForPubSubMessages(130000);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ public class Constants {
+ "'$local/greengrass/telemetry'";
public static final String PUBSUB_PUBLISH_FAILURE_LOG = "Failed to publish local pub/sub message on topic "
+ "'$local/greengrass/telemetry'";
public static final String TELEMETRY_PUBLISH_SCHEDULED = "Scheduling telemetry publish";
public static final String TELEMETRY_PUBLISH_STOPPING = "Stopping telemetry publish";
public static final String PUBSUB_PUBLISH_STARTING = "Starting local pub/sub publishing";
public static final String PUBSUB_PUBLISH_STOPPING = "Stopping local pub/sub publishing";
public static final String MQTT_PUBLISH_SUCCESS_LOG = "Published MQTT message on topic '{}'";
public static final String MQTT_PUBLISH_FAILURE_LOG = "Failed to publish MQTT message on topic '{}'";
public static final String MQTT_PUBLISH_STARTING = "Starting MQTT publishing";
public static final String MQTT_PUBLISH_STOPPING = "Stopping MQTT publishing";
public static final String INVALID_PUBLISH_THRESHOLD_LOG = "Publish interval should not be smaller than 500ms. "
+ "Using minimum of 500ms";
public static final String STARTUP_CONFIGURATION_LOG = "Starting telemetry emission with pubSubPublish:{}, "
Expand Down
Loading

0 comments on commit 0182a63

Please sign in to comment.