Skip to content

Commit

Permalink
add test for DLQ metric counting (#1960)
Browse files Browse the repository at this point in the history
* Add test for DLQ metric counting
  • Loading branch information
mocenas authored Sep 3, 2024
1 parent 38cdbd5 commit f8e76c8
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ public final class Channels {
public static final String CHANNEL_SOURCE_ALERTS = "alerts-source";
public static final String CHANNEL_TARGET_ALERTS = "alerts-target";
public static final String ALERTS_STREAM = "alerts-stream";
public static final String CHANNEL_UNDELIVERABLE_SOURCE = "undeliverable-source";
public static final String CHANNEL_UNDELIVERABLE_TARGET = "undeliverable-target";

private Channels() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.ts.micrometer.prometheus.kafka;

import java.util.concurrent.CompletionStage;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

public class UndeliverableConsumer {
@Incoming(Channels.CHANNEL_UNDELIVERABLE_TARGET)
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> consume(Message<String> message) {
return message.nack(new IllegalArgumentException("Can't invoke this"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.ts.micrometer.prometheus.kafka;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/undeliverable")
public class UndeliverableProducer {
@Channel(Channels.CHANNEL_UNDELIVERABLE_SOURCE)
Emitter<String> dataEmitter;

@GET
public void sendData() {
dataEmitter.send("random data");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,13 @@ mp.messaging.outgoing.alerts-source.topic=alerts-target
mp.messaging.outgoing.alerts-source.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.alerts-target.connector=smallrye-kafka
mp.messaging.incoming.alerts-target.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

mp.messaging.outgoing.undeliverable-source.connector=smallrye-kafka
mp.messaging.outgoing.undeliverable-source.topic=undeliverable-target
mp.messaging.outgoing.undeliverable-source.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.undeliverable-target.connector=smallrye-kafka
mp.messaging.incoming.undeliverable-target.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.undeliverable-target.failure-strategy=dead-letter-queue
mp.messaging.incoming.dead-letter-topic-undeliverable-target.connector=smallrye-kafka
mp.messaging.incoming.dead-letter-topic-undeliverable-target.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.quarkus.ts.micrometer.prometheus.kafka;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import org.apache.http.HttpStatus;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import io.quarkus.test.bootstrap.KafkaService;
import io.quarkus.test.bootstrap.RestService;
import io.quarkus.test.scenarios.QuarkusScenario;
import io.quarkus.test.services.KafkaContainer;
import io.quarkus.test.services.QuarkusApplication;

/**
* Test that messages send to a dead-letter-queue are counted in micrometer metrics.
* See links for more details:
* - https://issues.redhat.com/browse/QUARKUS-4120
* - https://github.com/smallrye/smallrye-reactive-messaging/issues/2473
*/
@QuarkusScenario
@Tag("QUARKUS-4120")
public class DLQMetricCountIT {
private static final String DEAD_LETTER_TOPIC_NAME = "dead-letter-topic-undeliverable-target";
private static final String MESSAGES_SEND_METRIC = "kafka_producer_topic_record_send_total";
private static final String UNDELIVERABLE_ENDPOINT = "undeliverable";
private static final int NUMBER_OF_MESSAGES_SEND = 3;

@KafkaContainer
static final KafkaService kafka = new KafkaService();

@QuarkusApplication
static RestService app = new RestService().withProperty("kafka.bootstrap.servers", kafka::getBootstrapUrl);

@Test
public void testDeadLetterMessagesAreCounted() {
// wait for kafka to be fully loaded
// if we send requests (messages) too soon, they will fail to propagate into topic
kafka.logs().assertContains("Assignment received from leader kafka-consumer-undeliverable-target");

// send requests
for (int i = 0; i < NUMBER_OF_MESSAGES_SEND; i++) {
app.given().get(UNDELIVERABLE_ENDPOINT).then().statusCode(HttpStatus.SC_NO_CONTENT);
}

thenMetricIsExposedInServiceEndpoint(MESSAGES_SEND_METRIC, DEAD_LETTER_TOPIC_NAME,
greaterOrEqual(NUMBER_OF_MESSAGES_SEND));
}

private void thenMetricIsExposedInServiceEndpoint(String name, String key, Predicate<Double> valueMatcher) {
await().ignoreExceptions().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
String response = app.given().get("/q/metrics").then()
.statusCode(HttpStatus.SC_OK)
.extract().asString();

for (String line : response.split("[\r\n]+")) {
if (line.startsWith(name) && line.contains(key)) {
Double value = extractValueFromMetric(line);
assertTrue(valueMatcher.test(value), "Metric value is not expected. Found: " + value);
return;
}
}

fail("Metric " + name + " not found in " + response);
});
}

private Predicate<Double> greaterOrEqual(double expected) {
return actual -> actual >= expected;
}

private Double extractValueFromMetric(String line) {
return Double.parseDouble(line.substring(line.lastIndexOf(" ")));
}
}

0 comments on commit f8e76c8

Please sign in to comment.