Skip to content

Commit

Permalink
feat: make pubsub topic configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo committed Mar 26, 2024
1 parent 01b04e8 commit 7602a06
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class Constants {
public static final String AWS_GREENGRASS_TELEMETRY_NUCLEUS_EMITTER = "aws.greengrass.telemetry.NucleusEmitter";

public static final String PUBSUB_PUBLISH_CONFIG_NAME = "pubSubPublish";
public static final String PUBSUB_TOPIC_CONFIG_NAME = "pubSubTopic";
public static final String MQTT_TOPIC_CONFIG_NAME = "mqttTopic";
public static final String TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME = "telemetryPublishIntervalMs";
public static final String DEFAULT_TELEMETRY_PUBSUB_TOPIC = "$local/greengrass/telemetry";
Expand All @@ -20,9 +21,9 @@ public class Constants {
public static final long MIN_TELEMETRY_PUBLISH_INTERVAL_MS = 500;

public static final String PUBSUB_PUBLISH_SUCCESS_LOG = "Published local pub/sub message on topic "
+ "'$local/greengrass/telemetry'";
+ "'{}'";
public static final String PUBSUB_PUBLISH_FAILURE_LOG = "Failed to publish local pub/sub message on topic "
+ "'$local/greengrass/telemetry'";
+ "'{}'";
public static final String TELEMETRY_PUBLISH_SCHEDULED = "Scheduling telemetry publish";
public static final String TELEMETRY_PUBLISH_STOPPING = "Stopping telemetry publish";
public static final String PUBSUB_PUBLISH_STARTING = "Starting local pub/sub publishing";
Expand All @@ -40,6 +41,9 @@ public class Constants {
+ " configuration.";
public static final String PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG = "Could not parse the pubSubPublish config option"
+ " {}. Please make sure this is set to a valid boolean value";

public static final String PUBSUB_TOPIC_CONFIG_PARSE_ERROR_LOG = "Could not parse the pubSubTopic config option {}."
+ " Please make sure this is set to a valid topic string value";
public static final String MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG = "Could not parse the mqttTopic config option {}."
+ " Please make sure this is set to a valid topic string value";
public static final String TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG = "Could not parse the "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.AWS_GREENGRASS_TELEMETRY_NUCLEUS_EMITTER;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.CONFIG_UPDATE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.DEFAULT_TELEMETRY_PUBSUB_TOPIC;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.INVALID_PUBLISH_THRESHOLD_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.JSON_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MIN_TELEMETRY_PUBLISH_INTERVAL_MS;
Expand Down Expand Up @@ -132,10 +131,10 @@ public void startup() {
scheduleTelemetryPublish();
}

private void publishTelemetry(boolean pubSubPublish, boolean mqttPublish, String mqttTopic) {
private void publishTelemetry(boolean pubSubPublish, String pubSubTopic, boolean mqttPublish, String mqttTopic) {
String jsonString = retrieveMetricsJson(jsonMapper);
if (pubSubPublish) {
this.pubSubPublisher.publishMessage(jsonString, DEFAULT_TELEMETRY_PUBSUB_TOPIC);
this.pubSubPublisher.publishMessage(jsonString, pubSubTopic);
}
if (mqttPublish) {
this.mqttPublisher.publishMessage(jsonString, mqttTopic);
Expand All @@ -162,12 +161,14 @@ private void scheduleTelemetryPublish() {
}
logger.debug(TELEMETRY_PUBLISH_SCHEDULED);
telemetryPublishFuture = ses.scheduleAtFixedRate(
() -> publishTelemetry(newPubPublish,!Utils.isEmpty(newMqttTopic), newMqttTopic), 0,
() -> publishTelemetry(newPubPublish, configuration.getPubsubTopic(),
!Utils.isEmpty(newMqttTopic), newMqttTopic),
0,
newTelemetryPublishIntervalMs, TimeUnit.MILLISECONDS);
}

logger.info(STARTUP_CONFIGURATION_LOG, newPubPublish,
DEFAULT_TELEMETRY_PUBSUB_TOPIC, newMqttTopic, newTelemetryPublishIntervalMs);
configuration.getPubsubTopic(), newMqttTopic, newTelemetryPublishIntervalMs);
}

protected String retrieveMetricsJson(ObjectMapper jsonMapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@

import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.CONFIG_INVALID_OPTION_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.DEFAULT_TELEMETRY_PUBLISH_INTERVAL_MS;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.DEFAULT_TELEMETRY_PUBSUB_TOPIC;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_TOPIC_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_TOPIC_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_TOPIC_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG;

Expand All @@ -30,6 +33,9 @@ public class NucleusEmitterConfiguration {
//Only local pub/sub is enabled by default
@Builder.Default
boolean pubsubPublish = true;
@Builder.Default
String pubsubTopic = DEFAULT_TELEMETRY_PUBSUB_TOPIC;

@Builder.Default
String mqttTopic = "";
@Builder.Default
Expand All @@ -39,15 +45,14 @@ public class NucleusEmitterConfiguration {
* Get the Nucleus Emitter configuration from the POJO map.
* @param pojo POJO Topics object.
* @param logger Greengrass logger.
* @return the Nucleus Emitter configuration.
* @return the Nucleus Emitter configuration.
*/
public static NucleusEmitterConfiguration fromPojo(Map<String, Object> pojo, Logger logger) {
if (pojo.isEmpty()) {
return null;
}
long telemetryPublishIntervalMs = DEFAULT_TELEMETRY_PUBLISH_INTERVAL_MS;
boolean pubsubPublish = true;
String mqttTopic = "";
NucleusEmitterConfigurationBuilder config = NucleusEmitterConfiguration.builder();

for (Map.Entry<String, Object> entry : pojo.entrySet()) {
switch (entry.getKey()) {
case PUBSUB_PUBLISH_CONFIG_NAME:
Expand All @@ -58,42 +63,47 @@ public static NucleusEmitterConfiguration fromPojo(Map<String, Object> pojo, Log
logger.error(PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
pubsubPublish = parsedBoolean;
config.pubsubPublish(parsedBoolean);
break;
} else {
logger.error(PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
case TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME:
if (entry.getValue() instanceof Number || entry.getValue() instanceof String) {
telemetryPublishIntervalMs = Coerce.toLong(entry.getValue());
long telemetryPublishIntervalMs = Coerce.toLong(entry.getValue());
if (telemetryPublishIntervalMs == 0L) { //If value is 0 or non-numeric String
logger.error(TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
config.telemetryPublishIntervalMs(telemetryPublishIntervalMs);
break;
} else { //If not a Number or String
logger.error(TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
case MQTT_TOPIC_CONFIG_NAME:
if (entry.getValue() instanceof String) {
mqttTopic = Coerce.toString(entry.getValue());
config.mqttTopic(Coerce.toString(entry.getValue()));
break;
} else {
logger.error(MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
case PUBSUB_TOPIC_CONFIG_NAME:
if (entry.getValue() instanceof String) {
config.pubsubTopic(Coerce.toString(entry.getValue()));
break;
} else {
logger.error(PUBSUB_TOPIC_CONFIG_PARSE_ERROR_LOG, entry.getValue());
return null;
}
default:
logger.error(CONFIG_INVALID_OPTION_ERROR_LOG, entry.getKey());
return null;
}
}

return NucleusEmitterConfiguration.builder()
.pubsubPublish(pubsubPublish)
.mqttTopic(mqttTopic)
.telemetryPublishIntervalMs(telemetryPublishIntervalMs)
.build();
return config.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public void publishMessage(String message, String topic) {
try {
this.pubSubIPCEventStreamAgent.publish(topic, message.getBytes(StandardCharsets.UTF_8),
AWS_GREENGRASS_TELEMETRY_NUCLEUS_EMITTER);
logger.trace(PUBSUB_PUBLISH_SUCCESS_LOG);
logger.trace(PUBSUB_PUBLISH_SUCCESS_LOG, topic);
} catch (InvalidArgumentsError e) {
logger.error(PUBSUB_PUBLISH_FAILURE_LOG, e);
logger.error(PUBSUB_PUBLISH_FAILURE_LOG, topic, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.MQTT_TOPIC_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_PUBLISH_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.PUBSUB_TOPIC_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_NAME;
import static com.aws.greengrass.telemetry.nucleus.emitter.Constants.TELEMETRY_PUBLISH_INTERVAL_CONFIG_PARSE_ERROR_LOG;
import static com.aws.greengrass.telemetry.nucleus.emitter.NucleusEmitterConfiguration.fromPojo;
Expand Down Expand Up @@ -58,6 +59,14 @@ void GIVEN_valid_string_config_options_THEN_parses_correctly() {
assertEquals(defaultConfiguration, generatedConfiguration);
}

@Test
void GIVEN_valid_nondefault_string_config_options_THEN_parses_correctly() {
Map<String, Object> pojo = new TreeMap<>();
pojo.put(PUBSUB_TOPIC_CONFIG_NAME,"pubsub");
NucleusEmitterConfiguration generatedConfiguration = fromPojo(pojo, logger);
assertEquals("pubsub", generatedConfiguration.getPubsubTopic());
}

@Test
void GIVEN_invalid_pubSubPublish_option_THEN_fails() {
Map<String, Object> pojo = new TreeMap<>();
Expand Down

0 comments on commit 7602a06

Please sign in to comment.