Skip to content

Commit

Permalink
use finally block for manual acknowledgment
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasBrand committed Sep 26, 2023
1 parent cdfe798 commit 38843a1
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,39 +50,43 @@ public class SubscribeMqtt3PublishCallback implements Consumer<Mqtt3Publish> {

@Override
public void accept(final @NotNull Mqtt3Publish mqtt3Publish) {
String message;
try {
if (isJsonOutput) {
message = new JsonMqttPublish(mqtt3Publish, isBase64).toString();
} else {
message = MqttPublishUtils.formatPayload(mqtt3Publish.getPayloadAsBytes(), isBase64);
}
String message;
try {
if (isJsonOutput) {
message = new JsonMqttPublish(mqtt3Publish, isBase64).toString();
} else {
message = MqttPublishUtils.formatPayload(mqtt3Publish.getPayloadAsBytes(), isBase64);
}

if (showTopics) {
message = mqtt3Publish.getTopic() + ": " + message;
}
if (showTopics) {
message = mqtt3Publish.getTopic() + ": " + message;
}

Logger.debug("{} received PUBLISH ('{}')\n {}",
LoggerUtils.getClientPrefix(client.getConfig()),
new String(mqtt3Publish.getPayloadAsBytes(), StandardCharsets.UTF_8),
mqtt3Publish);
} catch (final Exception e) {
Logger.error("An error occurred while processing an incoming PUBLISH.", e);
mqtt3Publish.acknowledge();
return;
}
Logger.debug("{} received PUBLISH ('{}')\n {}",
LoggerUtils.getClientPrefix(client.getConfig()),
new String(mqtt3Publish.getPayloadAsBytes(), StandardCharsets.UTF_8),
mqtt3Publish);
} catch (final Exception e) {
Logger.error("An error occurred while processing an incoming PUBLISH.", e);
return;
}

if (outputFile != null) {
MqttPublishUtils.printToFile(outputFile, message);
}
if (outputFile != null) {
MqttPublishUtils.printToFile(outputFile, message);
}

if (printToStdout) {
if (System.out.checkError()) {
//TODO: Handle SIGPIPE
//throw new RuntimeException("SIGNAL RECEIVED. PIPE CLOSED");
if (printToStdout) {
if (System.out.checkError()) {
//TODO: Handle SIGPIPE
//throw new RuntimeException("SIGNAL RECEIVED. PIPE CLOSED");
}
System.out.println(message);
}
System.out.println(message);

} finally {
//Necessary to ensure log ordering
mqtt3Publish.acknowledge();
}
mqtt3Publish.acknowledge();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,39 +50,43 @@ public class SubscribeMqtt5PublishCallback implements Consumer<Mqtt5Publish> {

@Override
public void accept(final @NotNull Mqtt5Publish mqtt5Publish) {
String message;
try {
if (isJsonOutput) {
message = new JsonMqttPublish(mqtt5Publish, isBase64).toString();
} else {
message = MqttPublishUtils.formatPayload(mqtt5Publish.getPayloadAsBytes(), isBase64);
}
String message;
try {
if (isJsonOutput) {
message = new JsonMqttPublish(mqtt5Publish, isBase64).toString();
} else {
message = MqttPublishUtils.formatPayload(mqtt5Publish.getPayloadAsBytes(), isBase64);
}

if (showTopics) {
message = mqtt5Publish.getTopic() + ": " + message;
}
if (showTopics) {
message = mqtt5Publish.getTopic() + ": " + message;
}

Logger.debug("{} received PUBLISH ('{}')\n {}",
LoggerUtils.getClientPrefix(client.getConfig()),
new String(mqtt5Publish.getPayloadAsBytes(), StandardCharsets.UTF_8),
mqtt5Publish);
} catch (final Exception e) {
Logger.error("An error occurred while processing an incoming PUBLISH.", e);
mqtt5Publish.acknowledge();
return;
}
Logger.debug("{} received PUBLISH ('{}')\n {}",
LoggerUtils.getClientPrefix(client.getConfig()),
new String(mqtt5Publish.getPayloadAsBytes(), StandardCharsets.UTF_8),
mqtt5Publish);
} catch (final Exception e) {
Logger.error("An error occurred while processing an incoming PUBLISH.", e);
return;
}

if (outputFile != null) {
MqttPublishUtils.printToFile(outputFile, message);
}
if (outputFile != null) {
MqttPublishUtils.printToFile(outputFile, message);
}

if (printToStdout) {
if (System.out.checkError()) {
//TODO: Handle SIGPIPE
//throw new RuntimeException("SIGNAL RECEIVED. PIPE CLOSED");
if (printToStdout) {
if (System.out.checkError()) {
//TODO: Handle SIGPIPE
//throw new RuntimeException("SIGNAL RECEIVED. PIPE CLOSED");
}
System.out.println(message);
}
System.out.println(message);

} finally {
//Necessary to ensure log ordering
mqtt5Publish.acknowledge();
}
mqtt5Publish.acknowledge();
}
}

0 comments on commit 38843a1

Please sign in to comment.