Skip to content

Commit

Permalink
Apply idea code analysis suggestions M-N
Browse files Browse the repository at this point in the history
  • Loading branch information
mocenas committed Nov 15, 2024
1 parent 8b757df commit cdcfb93
Show file tree
Hide file tree
Showing 30 changed files with 45 additions and 96 deletions.
34 changes: 3 additions & 31 deletions messaging/infinispan-grpc-kafka/README.md
Original file line number Diff line number Diff line change
@@ -1,43 +1,15 @@
# Table of Contents
1. [Quarkus infinispan grpc kafka](#Quarkus-infinispan-grpc-kafka)
2. [Quarkus SSL/TLS Infinispan scenario ](#Quarkus-SSL/TLS-Infinispan-scenario )
3. [Quarkus Grateful Shutdown for Kafka connectors](#Quarkus-Grateful-Shutdown-for-Kafka-connectors)
2. [Kafka-client SSL SASL](#Kafka-client-SSL-SASL)

## Quarkus infinispan grpc kafka
Module that test whether gRPC, Infinispan and Kafka extensions work together:
- for gRPC: there is a simple greetings endpoint. This example will use a `helloworld.proto` file to generate the required sources.
- for Infinispan: to check whether the cache persistence is working fine
- for Kafka: to verify the messages are working in a chain workflow.

## Kafka-client SASL
## Kafka-client SSL SASL
Verifies SASL authentication through Quarkus Kafka client extension.
Endpoint `SaslKafkaEndpoint` is able to produce events
Endpoint `SaslSslKafkaEndpoint` is able to produce events
and consume events and check topics through `AdminClient` and `KafkaConsumer`.

## Quarkus Infinispan scenario

##Infinispan server SSL/TLS
TrustStore is used to store certificates from Certified Authorities (CA) that verify the certificate presented by the server
in an SSL connection. While Keystore is used to store private key and identity certificates that a specific program should present to
both parties (server or client) for verification.

Security infinispan documentation:
- https://infinispan.org/docs/stable/titles/server/server.html#authentication-mechanisms
- https://infinispan.org/docs/stable/titles/server/server.html#security-realms

We have used the following commands in order to generate the required certificates.

Create the Keystore certificate
```shell
keytool -v -genkeypair -keyalg RSA -dname "cn=Quarkus, ou=Quarkus, o=Redhat, L=San Francisco, st=CA, c=US" -ext SAN="DNS:localhost,IP:127.0.0.1" -validity 3825 -alias 1 -keystore keystore.jks -keypass password -storepass password
```

Export the Certificate to add it into Truststore
```shell
keytool -export -alias 1 -file localhost.cer -keystore keystore.jks -storepass password
```

Create a Trustore certificate
```shell
keytool -import -v -trustcacerts -alias 1 -file localhost.cer -keystore truststore.jks -storepass password
```
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.commons.configuration.XMLStringConfiguration;
import org.infinispan.commons.configuration.StringConfiguration;
import org.jboss.logging.Logger;

import io.quarkus.runtime.StartupEvent;
Expand All @@ -26,7 +26,7 @@ public class InfinispanPopulated {
void onStart(@Observes StartupEvent ev) {
LOGGER.info("Create or get cache named mycache with the default configuration");
RemoteCache<Object, Object> cache = cacheManager.administration().getOrCreateCache("mycache",
new XMLStringConfiguration(CACHE_CONFIG));
new StringConfiguration(CACHE_CONFIG));
cache.put("hello", "Hello World, Infinispan is up!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import jakarta.inject.Inject;

import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.commons.configuration.XMLStringConfiguration;
import org.infinispan.commons.configuration.StringConfiguration;

import io.quarkus.runtime.StartupEvent;

Expand All @@ -23,7 +23,7 @@ public class BookCacheInitializer {

void onStart(@Observes StartupEvent ev) {
cacheManager.administration().getOrCreateCache(CACHE_NAME,
new XMLStringConfiguration(String.format(CACHE_CONFIG, CACHE_NAME)));
new StringConfiguration(String.format(CACHE_CONFIG, CACHE_NAME)));

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.quarkus.ts.messaging.infinispan.grpc.kafka.books;

import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;
import org.infinispan.protostream.annotations.ProtoSchema;

@AutoProtoSchemaBuilder(includeClasses = { Book.class }, schemaPackageName = "book_sample")
@ProtoSchema(includeClasses = { Book.class }, schemaPackageName = "book_sample")
interface BookContextInitializer extends SerializationContextInitializer {
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public class BookResource {
@Path("/{title}")
@Produces(MediaType.APPLICATION_JSON)
public Book getBook(@PathParam("title") String title) {
Book found = cache.get(title);
return found;
return cache.get(title);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.smallrye.mutiny.Multi;

/**
* A bean producing random prices every 1 seconds.
* A bean producing random prices every 1 second.
* The prices are written to a Kafka topic (prices). The Kafka configuration is specified in the application configuration.
*/
@ApplicationScoped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public abstract class KafkaEndpoint {
protected void initialize(KafkaConsumer<String, String> consumer) {
consumer.subscribe(Collections.singleton(TOPIC));
new Thread(() -> {
try {
try (consumer) {
while (true) {
final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

Expand All @@ -38,8 +38,6 @@ protected void initialize(KafkaConsumer<String, String> consumer) {
}
} catch (Exception e) {
LOG.error(e.getMessage());
} finally {
consumer.close();
}
}).start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -30,12 +28,11 @@ abstract class BaseKafkaAvroGroupIdIT {

private String endpoint;
private Client client = ClientBuilder.newClient();
private List<String> receive = new CopyOnWriteArrayList<>();
private boolean completed;
private Random rand = new Random();

@Test
public void testAlertMonitorEventStream() throws InterruptedException {
public void testAlertMonitorEventStream() {
GivenSomeStockPrices(getAppA(), EVENTS_AMOUNT);
AndApplicationEndpoint(getEndpoint(getAppA()) + "/stock-price/stream");
whenRequestSomeEvents(EVENTS_AMOUNT);
Expand Down Expand Up @@ -73,7 +70,6 @@ private void whenRequestSomeEvents(int expectedAmount) {
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> {
final var data = inboundSseEvent.readData(String.class, MediaType.APPLICATION_JSON_TYPE);
receive.add(data);
totalAmountReceived.incrementAndGet();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void onStop(@Observes ShutdownEvent ev) {
private void addRoute(HttpMethod method, String path, Handler<RoutingContext> handler) {
Route route = this.router.route(method, path)
.handler(LoggerHandler.create())
.handler(CorsHandler.create("*"));
.handler(CorsHandler.create().addRelativeOrigin(".*"));

if (method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT))
route.handler(BodyHandler.create());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ public class FailureHandler {
public void handler(final RoutingContext ctx) {
JsonObject error = defaultError(ctx.normalizedPath());

if (ctx.failure() instanceof HttpException) {
HttpException httpExp = (HttpException) ctx.failure();
if (ctx.failure() instanceof HttpException httpExp) {
error.put("status", httpExp.getStatusCode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void pushEvent(final RoutingContext context) {
}

public void pushMessageEvent(final RoutingContext routingContext) {
String message = routingContext.getBodyAsString();
String message = routingContext.body().asString();
emitter.send(message)
.onFailure().invoke(exception -> {
routingContext.response()
Expand Down Expand Up @@ -73,7 +73,7 @@ public void pushEventToTopic(final RoutingContext context) {
.withNack(handlerError(context, startMs))
.addMetadata(metadata);

emitter.send(msg);
emitter.sendMessageAndForget(msg);
}

private Function<Throwable, CompletionStage<Void>> handlerError(RoutingContext context, Long startMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Topology buildTopology() {

builder.stream(LOGIN_ATTEMPTS_TOPIC, Consumed.with(Serdes.String(), loginAttemptSerde))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(windowsLoginSec)))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(windowsLoginSec)))
.aggregate(LoginAggregation::new,
(id, value, aggregation) -> aggregation.updateFrom(value),
Materialized.<String, LoginAggregation, WindowStore<Bytes, byte[]>> as(LOGIN_AGGREGATION_STORE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
@DisabledOnRHBQandWindows(reason = "QUARKUS-3434")
public class DevModeRedPandaDevServiceUserExperienceIT {

private static final String RED_PANDA_VERSION = getImageVersion("redpanda.image");;
private static final String RED_PANDA_VERSION = getImageVersion("redpanda.image");
private static final String RED_PANDA_IMAGE = getImageName("redpanda.image");

@DevModeQuarkusApplication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -30,12 +28,11 @@ abstract class BaseKafkaAvroGroupIdIT {

private String endpoint;
private Client client = ClientBuilder.newClient();
private List<String> receive = new CopyOnWriteArrayList<>();
private boolean completed;
private Random rand = new Random();

@Test
public void testAlertMonitorEventStream() throws InterruptedException {
public void testAlertMonitorEventStream() {
GivenSomeStockPrices(getAppA(), EVENTS_AMOUNT);
AndApplicationEndpoint(getEndpoint(getAppA()) + "/stock-price/stream");
whenRequestSomeEvents(EVENTS_AMOUNT);
Expand Down Expand Up @@ -73,7 +70,6 @@ private void whenRequestSomeEvents(int expectedAmount) {
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> {
final var data = inboundSseEvent.readData(String.class, MediaType.APPLICATION_JSON_TYPE);
receive.add(data);
totalAmountReceived.incrementAndGet();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public abstract class KafkaEndpoint {
protected void initialize(KafkaConsumer<String, String> consumer) {
consumer.subscribe(Collections.singleton(TOPIC));
new Thread(() -> {
try {
try (consumer) {
while (true) {
final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

Expand All @@ -38,8 +38,6 @@ protected void initialize(KafkaConsumer<String, String> consumer) {
}
} catch (Exception e) {
LOG.error(e.getMessage());
} finally {
consumer.close();
}
}).start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ public class PriceConsumer {
public String getLastPrice() {
try (JMSContext context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE);
JMSConsumer consumer = context.createConsumer(context.createTopic("prices"))) {
while (true) {
Message message = consumer.receive();
if (message == null) {
// receive returns `null` if the JMSConsumer is closed
return "";
}

return message.getBody(String.class);
Message message = consumer.receive();
if (message == null) {
// receive returns `null` if the JMSConsumer is closed
return "";
}

return message.getBody(String.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private void thenKafkaConsumerMetricsAreFound() throws Exception {
thenMetricIsExposedInPrometheus(KAFKA_CONSUMER_COUNT_METRIC, any());
}

private void thenMetricIsExposedInPrometheus(String name, Predicate<String> valueMatcher) throws Exception {
private void thenMetricIsExposedInPrometheus(String name, Predicate<String> valueMatcher) {
await().ignoreExceptions().atMost(ASSERT_PROMETHEUS_TIMEOUT_MINUTES, TimeUnit.MINUTES).untilAsserted(() -> {
String output = runPrometheusCommandInPod(PROMETHEUS_POD, name);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.quarkus.ts.micrometer.prometheus.kafka.reactive;

import static io.restassured.RestAssured.given;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void thenKafkaConsumerMetricsAreFound() throws Exception {
thenMetricIsExposedInPrometheus(KAFKA_CONSUMER_COUNT_METRIC, any());
}

private void thenMetricIsExposedInPrometheus(String name, Predicate<String> valueMatcher) throws Exception {
private void thenMetricIsExposedInPrometheus(String name, Predicate<String> valueMatcher) {
await().ignoreExceptions().atMost(ASSERT_PROMETHEUS_TIMEOUT_MINUTES, TimeUnit.MINUTES).untilAsserted(() -> {
String output = runPrometheusCommandInPod(PROMETHEUS_POD, name);
assertTrue(output.contains("\"status\":\"success\""), "Verify the status was ok");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void whenCheckPrimeNumber(int number) {
app.given().get("/check/" + number).then().statusCode(HttpStatus.SC_OK);
}

private void thenMetricIsExposedInPrometheus(String name, Integer expected) throws Exception {
private void thenMetricIsExposedInPrometheus(String name, Integer expected) {
await().ignoreExceptions().atMost(ASSERT_PROMETHEUS_TIMEOUT_MINUTES, TimeUnit.MINUTES).untilAsserted(() -> {
String output = runPrometheusCommandInPod(PROMETHEUS_POD, primeNumberCustomMetricName(name));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ public class OpenTelemetryGrpcIT {
private static final String SAY_PONG_PROTO = "SayPong";

@Test
public void testServerClientTrace() throws InterruptedException {
public void testServerClientTrace() {
// When calling ping, the rest will invoke also the pong rest endpoint.
given()
.when().get(PING_ENDPOINT)
.then().statusCode(HttpStatus.SC_OK)
.body(containsString("ping pong"));

// Then both ping and pong rest endpoints should have the same trace Id.
// Then both ping and pong rest endpoints should have the same trace ID.
String pingTraceId = given()
.when().get(PING_ENDPOINT + "/lastTraceId")
.then().statusCode(HttpStatus.SC_OK).and().extract().asString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ public class OpenTelemetrySseIT {
private static final String PONG_ENDPOINT = "/server-sent-events-pong";

@Test
public void testServerClientTrace() throws InterruptedException {
public void testServerClientTrace() {
// When calling ping, the rest will invoke also the pong rest endpoint.
given()
.when().get(PING_ENDPOINT)
.then().statusCode(HttpStatus.SC_OK)
.body(containsString("ping pong"));

// Then both ping and pong rest endpoints should have the same trace Id.
// Then both ping and pong rest endpoints should have the same trace ID.
String pingTraceId = given()
.when().get(PING_ENDPOINT + "/lastTraceId")
.then().statusCode(HttpStatus.SC_OK).and().extract().asString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ public class OpenTelemetryGrpcIT {
private static final String SAY_PONG_PROTO = "SayPong";

@Test
public void testServerClientTrace() throws InterruptedException {
public void testServerClientTrace() {
// When calling ping, the rest will invoke also the pong rest endpoint.
given()
.when().get(PING_ENDPOINT)
.then().statusCode(HttpStatus.SC_OK)
.body(containsString("ping pong"));

// Then both ping and pong rest endpoints should have the same trace Id.
// Then both ping and pong rest endpoints should have the same trace ID.
String pingTraceId = given()
.when().get(PING_ENDPOINT + "/lastTraceId")
.then().statusCode(HttpStatus.SC_OK).and().extract().asString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ public class OpenTelemetrySseIT {
private static final String PONG_ENDPOINT = "/server-sent-events-pong";

@Test
public void testServerClientTrace() throws InterruptedException {
public void testServerClientTrace() {
// When calling ping, the rest will invoke also the pong rest endpoint.
given()
.when().get(PING_ENDPOINT)
.then().statusCode(HttpStatus.SC_OK)
.contentType(MediaType.SERVER_SENT_EVENTS)
.body(containsString("ping pong"));

// Then both ping and pong rest endpoints should have the same trace Id.
// Then both ping and pong rest endpoints should have the same trace ID.
String pingTraceId = given()
.when().get(PING_ENDPOINT + "/lastTraceId")
.then().statusCode(HttpStatus.SC_OK).and().extract().asString();
Expand Down
Loading

0 comments on commit cdcfb93

Please sign in to comment.