diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/telemetry/nucleus/emitter/NucleusEmitterIntegrationTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/telemetry/nucleus/emitter/NucleusEmitterIntegrationTest.java index 690b62c..dd3ce1e 100644 --- a/src/integrationtests/java/com/aws/greengrass/integrationtests/telemetry/nucleus/emitter/NucleusEmitterIntegrationTest.java +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/telemetry/nucleus/emitter/NucleusEmitterIntegrationTest.java @@ -115,9 +115,9 @@ private void defaultInitialization() throws Exception { })) { startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(DEFAULT_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir); - assertTrue(firstConfigLog.await(15, TimeUnit.SECONDS), "Running with default config."); - assertTrue(firstPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected."); - assertFalse(firstMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log not detected."); + assertTrue(firstConfigLog.await(30, TimeUnit.SECONDS), "Running with default config."); + assertTrue(firstPubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected."); + assertFalse(firstMqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log not detected."); checkForPubSubMessages(130000); } @@ -134,7 +134,7 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception { defaultInitialization(); //-------------------------------------------------- - //Change mqttTopic=test/topic, pubsubPublish=false, telemetryPublishInterval=5000ms + //Change telemetryPublishInterval=5000ms final CountDownLatch firstPubsubLog = new CountDownLatch(1); final CountDownLatch firstMqttLog = new CountDownLatch(1); @@ -142,8 +142,8 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception { try (AutoCloseable listener = TestUtils.createCloseableLogListener((m) -> { String stdoutStr = m.getMessage(); if (stdoutStr == null || stdoutStr.length() == 0) {return;} - //Config should now be pubsub:false, pubsub_topic:$local/greengrass/telemetry, mqtt_topic:test/topic, telemetryPublishIntervalMs:5000 - if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "false", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, TEST_MQTT_TOPIC, "5000"))) { + //Config should now be pubsub:true, pubsub_topic:$local/greengrass/telemetry, mqtt_topic:, telemetryPublishIntervalMs:5000 + if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "true", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "", "5000"))) { firstConfigLog.countDown(); } if (stdoutStr.contains(PUBSUB_PUBLISH_STARTING)) { @@ -153,16 +153,11 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception { firstMqttLog.countDown(); } })) { - //Turn MQTT publishing on - getConfigTopic(MQTT_TOPIC_CONFIG_NAME).withValue(TEST_MQTT_TOPIC); - //Turn pubsub off - getConfigTopic(PUBSUB_PUBLISH_CONFIG_NAME).withValue(false); - //Change publish interval to 5s to speed up testing getConfigTopic(TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME).withValue(5000); assertTrue(firstConfigLog.await(30, TimeUnit.SECONDS), "Running with expected config."); - assertFalse(firstPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log not detected."); - assertTrue(firstMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected."); + assertTrue(firstPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected."); + assertFalse(firstMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log not detected."); } //-------------------------------------------------- @@ -181,8 +176,8 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception { if (stdoutStr.contains(MQTT_PUBLISH_STARTING)) { secondMqttLog.countDown(); } - //Config should now be pubSubPublish:false, pubSubTopic:$local/greengrass/telemetry, mqttTopic:"greengrass/nucleus/telemetry", telemetryPublishIntervalMs:5000 - if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "false", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "greengrass/nucleus/telemetry", "5000"))) { + //Config should now be pubSubPublish:true, pubSubTopic:$local/greengrass/telemetry, mqttTopic:"greengrass/nucleus/telemetry", telemetryPublishIntervalMs:5000 + if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "true", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "greengrass/nucleus/telemetry", "5000"))) { secondConfigLog.countDown(); } })) { @@ -190,12 +185,14 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception { //Change MQTT Topic getConfigTopic(MQTT_TOPIC_CONFIG_NAME).withValue("greengrass/nucleus/telemetry"); - assertFalse(secondPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected."); - assertTrue(secondMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected."); assertTrue(secondConfigLog.await(30, TimeUnit.SECONDS), "Running with expected config."); + assertTrue(secondPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected."); + assertTrue(secondMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected."); + checkForPubSubMessages(20000); } + //-------------------------------------------------- - //Change mqttPublish=false, pubsubPublish=true + //Change pubsubPublish=false final CountDownLatch thirdPubsubLog = new CountDownLatch(1); final CountDownLatch thirdMqttLog = new CountDownLatch(1); @@ -203,8 +200,8 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception { try (AutoCloseable listener = TestUtils.createCloseableLogListener((m) -> { String stdoutStr = m.getMessage(); if (stdoutStr == null || stdoutStr.length() == 0) {return;} - //Config should now be pubSubPublish:true, pubSubTopic:$local/greengrass/telemetry, mqttTopic:"" telemetryPublishIntervalMs:5000 - if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "true", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "", "5000"))) { + //Config should now be pubSubPublish:false, pubSubTopic:$local/greengrass/telemetry, mqttTopic:"greengrass/nucleus/telemetry" telemetryPublishIntervalMs:5000 + if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "false", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "greengrass/nucleus/telemetry", "5000"))) { thirdConfigLog.countDown(); } if (stdoutStr.contains(PUBSUB_PUBLISH_STARTING)) { @@ -213,16 +210,41 @@ void GIVEN_config_options_changing_THEN_it_works() throws Exception { if (stdoutStr.contains(MQTT_PUBLISH_STARTING)) { thirdMqttLog.countDown(); } + })) { + //Turn pubsub off + getConfigTopic(PUBSUB_PUBLISH_CONFIG_NAME).withValue(false); + + assertTrue(thirdConfigLog.await(30, TimeUnit.SECONDS), "Running with expected config."); + assertFalse(thirdPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log not detected."); + assertTrue(thirdMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected."); + } + + //-------------------------------------------------- + //Change mqttPublish=false + + final CountDownLatch fourthPubsubLog = new CountDownLatch(1); + final CountDownLatch fourthMqttLog = new CountDownLatch(1); + final CountDownLatch fourthConfigLog = new CountDownLatch(1); + try (AutoCloseable listener = TestUtils.createCloseableLogListener((m) -> { + String stdoutStr = m.getMessage(); + if (stdoutStr == null || stdoutStr.length() == 0) {return;} + //Config should now be pubSubPublish:false, pubSubTopic:$local/greengrass/telemetry, mqttTopic:"" telemetryPublishIntervalMs:5000 + if (stdoutStr.contains(format(STARTUP_CONFIGURATION_LOG, "false", REGEX_DEFAULT_TELEMETRY_PUBSUB_TOPIC, "", "5000"))) { + fourthConfigLog.countDown(); + } + if (stdoutStr.contains(PUBSUB_PUBLISH_STARTING)) { + fourthPubsubLog.countDown(); + } + if (stdoutStr.contains(MQTT_PUBLISH_STARTING)) { + fourthMqttLog.countDown(); + } })) { //Turn MQTT publishing off getConfigTopic(MQTT_TOPIC_CONFIG_NAME).withValue(""); - //Turn pubsub on - getConfigTopic(PUBSUB_PUBLISH_CONFIG_NAME).withValue(true); - assertTrue(thirdConfigLog.await(30, TimeUnit.SECONDS), "Running with expected config."); - assertTrue(thirdPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected."); - assertFalse(thirdMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log not detected."); - checkForPubSubMessages(20000); + assertTrue(fourthConfigLog.await(30, TimeUnit.SECONDS), "Running with expected config."); + assertFalse(fourthPubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log not detected."); + assertFalse(fourthMqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log not detected."); } } @@ -250,10 +272,10 @@ void GIVEN_invalid_publish_threshold_THEN_it_reverts_to_minimum() throws Excepti } })) { startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(INVALID_THRESHOLD_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir); - assertTrue(logFound.await(15, TimeUnit.SECONDS), "Invalid threshold detected."); - assertTrue(configLog.await(15, TimeUnit.SECONDS), "Running with expected config."); - assertTrue(pubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected."); - assertTrue(mqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected."); + assertTrue(logFound.await(30, TimeUnit.SECONDS), "Invalid threshold detected."); + assertTrue(configLog.await(30, TimeUnit.SECONDS), "Running with expected config."); + assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected."); + assertTrue(mqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log detected."); checkForPubSubMessages(20000); } } @@ -282,10 +304,10 @@ void GIVEN_no_config_options_THEN_it_uses_default() throws Exception { } })) { startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(NO_CONFIG_OPTIONS_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir); - assertTrue(logFound.await(15, TimeUnit.SECONDS), "Invalid config options detected."); - assertTrue(configLog.await(15, TimeUnit.SECONDS), "Running with default config."); - assertTrue(pubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected."); - assertFalse(mqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected."); + assertTrue(logFound.await(30, TimeUnit.SECONDS), "Invalid config options detected."); + assertTrue(configLog.await(30, TimeUnit.SECONDS), "Running with default config."); + assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected."); + assertFalse(mqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log detected."); checkForPubSubMessages(130000); } } @@ -314,10 +336,10 @@ void GIVEN_invalid_pubSubPublish_option_THEN_it_uses_default() throws Exception } })) { startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(INVALID_PUBSUB_PUBLISH_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir); - assertTrue(logFound.await(15, TimeUnit.SECONDS), "Invalid config options detected."); - assertTrue(configLog.await(15, TimeUnit.SECONDS), "Running with default config."); - assertTrue(pubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected."); - assertFalse(mqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected."); + assertTrue(logFound.await(30, TimeUnit.SECONDS), "Invalid config options detected."); + assertTrue(configLog.await(30, TimeUnit.SECONDS), "Running with default config."); + assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected."); + assertFalse(mqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log detected."); checkForPubSubMessages(130000); } } @@ -346,10 +368,10 @@ void GIVEN_invalid_telemetryPublishIntervalMs_option_THEN_it_uses_default() thro } })) { startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(INVALID_TELEMETRY_PUBLISH_INTERVALMS_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir); - assertTrue(logFound.await(15, TimeUnit.SECONDS), "Invalid config options detected."); - assertTrue(configLog.await(15, TimeUnit.SECONDS), "Running with default config."); - assertTrue(pubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected."); - assertFalse(mqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected."); + assertTrue(logFound.await(30, TimeUnit.SECONDS), "Invalid config options detected."); + assertTrue(configLog.await(30, TimeUnit.SECONDS), "Running with default config."); + assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected."); + assertFalse(mqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log detected."); checkForPubSubMessages(130000); } } @@ -378,10 +400,10 @@ void GIVEN_invalid_mqttTopic_option_THEN_it_uses_default() throws Exception { } })) { startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(INVALID_MQTT_TOPIC_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir); - assertTrue(logFound.await(15, TimeUnit.SECONDS), "Invalid config options detected."); - assertTrue(configLog.await(15, TimeUnit.SECONDS), "Running with default config."); - assertTrue(pubsubLog.await(15, TimeUnit.SECONDS), "Pub/sub publish log detected."); - assertFalse(mqttLog.await(15, TimeUnit.SECONDS), "MQTT publish log detected."); + assertTrue(logFound.await(30, TimeUnit.SECONDS), "Invalid config options detected."); + assertTrue(configLog.await(30, TimeUnit.SECONDS), "Running with default config."); + assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected."); + assertFalse(mqttLog.await(30, TimeUnit.SECONDS), "MQTT publish log detected."); checkForPubSubMessages(130000); } } diff --git a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/Constants.java b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/Constants.java index 0b7c825..852a420 100644 --- a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/Constants.java +++ b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/Constants.java @@ -23,12 +23,12 @@ public class Constants { + "'$local/greengrass/telemetry'"; public static final String PUBSUB_PUBLISH_FAILURE_LOG = "Failed to publish local pub/sub message on topic " + "'$local/greengrass/telemetry'"; + public static final String TELEMETRY_PUBLISH_SCHEDULED = "Scheduling telemetry publish"; + public static final String TELEMETRY_PUBLISH_STOPPING = "Stopping telemetry publish"; public static final String PUBSUB_PUBLISH_STARTING = "Starting local pub/sub publishing"; - public static final String PUBSUB_PUBLISH_STOPPING = "Stopping local pub/sub publishing"; public static final String MQTT_PUBLISH_SUCCESS_LOG = "Published MQTT message on topic '{}'"; public static final String MQTT_PUBLISH_FAILURE_LOG = "Failed to publish MQTT message on topic '{}'"; public static final String MQTT_PUBLISH_STARTING = "Starting MQTT publishing"; - public static final String MQTT_PUBLISH_STOPPING = "Stopping MQTT publishing"; public static final String INVALID_PUBLISH_THRESHOLD_LOG = "Publish interval should not be smaller than 500ms. " + "Using minimum of 500ms"; public static final String STARTUP_CONFIGURATION_LOG = "Starting telemetry emission with pubSubPublish:{}, " diff --git a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitter.java b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitter.java index 67e7dd5..aa57f7e 100644 --- a/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitter.java +++ b/src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitter.java @@ -40,19 +40,17 @@ import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.JSON_PARSE_ERROR_LOG; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MIN_TELEMETRY_PUBLISH_INTERVAL_MS; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_PUBLISH_STARTING; -import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_PUBLISH_STOPPING; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_STARTING; -import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_STOPPING; import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.STARTUP_CONFIGURATION_LOG; +import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_SCHEDULED; +import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_STOPPING; @ImplementsService(name = AWS_GREENGRASS_TELEMETRY_NUCLEUS_EMITTER) public class NucleusEmitter extends PluginService { private final ScheduledExecutorService ses; - private final Object telemetryPubSubPublishInProgressLock = new Object(); - private final Object telemetryMqttPublishInProgressLock = new Object(); - private ScheduledFuture telemetryPubSubPublishFuture; - private ScheduledFuture telemetryMqttPublishFuture; + private final Object telemetryPublishInProgressLock = new Object(); + private ScheduledFuture telemetryPublishFuture; @Getter(AccessLevel.PACKAGE) // Needed for unit tests. private final AtomicReference currentConfiguration = @@ -63,7 +61,7 @@ public class NucleusEmitter extends PluginService { private final SystemMetricsEmitter sme; private final KernelMetricsEmitter kme; - //RTT publishers + //Metric publishers private final PubSubPublisher pubSubPublisher; private final MqttPublisher mqttPublisher; @@ -107,6 +105,10 @@ private void handleConfiguration(Topics configurationTopics) { boolean telemetryPublishIntervalMsChanged = configuration.getTelemetryPublishIntervalMs() != newConfiguration.getTelemetryPublishIntervalMs(); + if (!pubSubPublishChanged && !mqttTopicChanged && !telemetryPublishIntervalMsChanged) { + return; + } + //If the new requested publish interval is below the minimum, use the minimum if (newConfiguration.getTelemetryPublishIntervalMs() < MIN_TELEMETRY_PUBLISH_INTERVAL_MS) { logger.warn(INVALID_PUBLISH_THRESHOLD_LOG, MIN_TELEMETRY_PUBLISH_INTERVAL_MS, @@ -117,14 +119,9 @@ private void handleConfiguration(Topics configurationTopics) { .telemetryPublishIntervalMs(MIN_TELEMETRY_PUBLISH_INTERVAL_MS) .build(); } - + currentConfiguration.set(newConfiguration); - - if (telemetryPublishIntervalMsChanged) { - scheduleTelemetryPublish(true, true); - } else if (pubSubPublishChanged || mqttTopicChanged) { - scheduleTelemetryPublish(pubSubPublishChanged, mqttTopicChanged); - } + scheduleTelemetryPublish(); } @SuppressWarnings("UseSpecificCatch") @@ -132,54 +129,41 @@ private void handleConfiguration(Topics configurationTopics) { public void startup() { reportState(State.RUNNING); config.lookupTopics(CONFIGURATION_CONFIG_KEY).subscribe(subscribeToConfigChanges); - scheduleTelemetryPublish(true, true); + scheduleTelemetryPublish(); } - private void publishMqttTelemetry(String mqttTopic) { + private void publishTelemetry(boolean pubSubPublish, boolean mqttPublish, String mqttTopic) { String jsonString = retrieveMetricsJson(jsonMapper); - this.mqttPublisher.publishMessage(jsonString, mqttTopic); - } - - private void publishPubSubTelemetry() { - String jsonString = retrieveMetricsJson(jsonMapper); - this.pubSubPublisher.publishMessage(jsonString, DEFAULT_TELEMETRY_PUBSUB_TOPIC); + if (pubSubPublish) { + this.pubSubPublisher.publishMessage(jsonString, DEFAULT_TELEMETRY_PUBSUB_TOPIC); + } + if (mqttPublish) { + this.mqttPublisher.publishMessage(jsonString, mqttTopic); + } } - private void scheduleTelemetryPublish(boolean pubSubPublishChanged, boolean mqttTopicChanged) { + private void scheduleTelemetryPublish() { final NucleusEmitterConfiguration configuration = currentConfiguration.get(); final boolean newPubPublish = configuration.isPubsubPublish(); final String newMqttTopic = configuration.getMqttTopic(); final long newTelemetryPublishIntervalMs = configuration.getTelemetryPublishIntervalMs(); - //Only change if either telemetryPublishIntervalMs or pubSubPublish is changed - synchronized (telemetryPubSubPublishInProgressLock) { - if (pubSubPublishChanged) { - if (telemetryPubSubPublishFuture != null) { - logger.debug(PUBSUB_PUBLISH_STOPPING); - cancelJob(telemetryPubSubPublishFuture, telemetryPubSubPublishInProgressLock, false); - } - if (newPubPublish) { - logger.debug(PUBSUB_PUBLISH_STARTING); - telemetryPubSubPublishFuture = ses.scheduleAtFixedRate( - this::publishPubSubTelemetry, 0, - newTelemetryPublishIntervalMs, TimeUnit.MILLISECONDS); - } + //Start publish thread + synchronized (telemetryPublishInProgressLock) { + if (telemetryPublishFuture != null) { + logger.debug(TELEMETRY_PUBLISH_STOPPING); + cancelJob(telemetryPublishFuture, telemetryPublishInProgressLock, false); } - } - //Only change if either telemetryPublishIntervalMs or mqttTopic is changed - synchronized (telemetryMqttPublishInProgressLock) { - if (mqttTopicChanged) { - if (telemetryMqttPublishFuture != null) { - logger.debug(MQTT_PUBLISH_STOPPING); - cancelJob(telemetryMqttPublishFuture, telemetryMqttPublishInProgressLock, false); - } - if (!Utils.isEmpty(newMqttTopic)) { - logger.debug(MQTT_PUBLISH_STARTING); - telemetryMqttPublishFuture = ses.scheduleAtFixedRate( - () -> publishMqttTelemetry(newMqttTopic), 0, - newTelemetryPublishIntervalMs, TimeUnit.MILLISECONDS); - } + if (newPubPublish) { + logger.debug(PUBSUB_PUBLISH_STARTING); + } + if (!Utils.isEmpty(newMqttTopic)) { + logger.debug(MQTT_PUBLISH_STARTING); } + logger.debug(TELEMETRY_PUBLISH_SCHEDULED); + telemetryPublishFuture = ses.scheduleAtFixedRate( + () -> publishTelemetry(newPubPublish,!Utils.isEmpty(newMqttTopic), newMqttTopic), 0, + newTelemetryPublishIntervalMs, TimeUnit.MILLISECONDS); } logger.info(STARTUP_CONFIGURATION_LOG, newPubPublish, @@ -202,8 +186,7 @@ protected String retrieveMetricsJson(ObjectMapper jsonMapper) { @Override public void shutdown() { - cancelJob(telemetryPubSubPublishFuture, telemetryPubSubPublishInProgressLock, true); - cancelJob(telemetryMqttPublishFuture, telemetryMqttPublishInProgressLock, true); + cancelJob(telemetryPublishFuture, telemetryPublishInProgressLock, true); } private void cancelJob(ScheduledFuture future, Object lock, boolean immediately) {