Skip to content

Commit

Permalink
[SDCISA-16147, #583] Some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed Jun 4, 2024
1 parent ae29409 commit 721a69a
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory;

/**
* Handler class for all Kafka related requests.
*
Expand Down Expand Up @@ -56,41 +53,41 @@ public class KafkaHandler extends ConfigurationResourceConsumer {

private boolean initialized = false;

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
String streamingPath) {
this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath, new HashMap<>());
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath, properties);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {
this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager,
kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath,
properties);
log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!");
}
// /** @deprecated Use {@link #builder()} */
// @Deprecated
// public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
// KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
// this(configurationResourceManager, null, repository, kafkaMessageSender,
// configResourceUri, streamingPath);
// }
//
// /** @deprecated Use {@link #builder()} */
// @Deprecated
// public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
// KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
// String streamingPath) {
// this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
// configResourceUri, streamingPath, new HashMap<>());
// }
//
// /** @deprecated Use {@link #builder()} */
// @Deprecated
// public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
// KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {
//
// this(configurationResourceManager, null, repository, kafkaMessageSender,
// configResourceUri, streamingPath, properties);
// }
//
// /** @deprecated Use {@link #builder()} */
// @Deprecated
// public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
// KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {
// this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager,
// kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath,
// properties);
// log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!");
// }

/** Use {@link #builder()} to get an instance. */
KafkaHandler(
Expand Down Expand Up @@ -181,7 +178,7 @@ public boolean handle(final HttpServerRequest request) {
// TODO refactor away this callback-hell (Counts for the COMPLETE method
// surrounding this line, named 'KafkaHandler.handle()', NOT only
// those lines below).
kafkaProducerRecordBuilder.buildRecords(topic, payload).compose((List<KafkaProducerRecord<String, String>> kafkaProducerRecords) -> {
kafkaProducerRecordBuilder.buildRecordsAsync(topic, payload).compose((List<KafkaProducerRecord<String, String>> kafkaProducerRecords) -> {
maybeValidate(request, kafkaProducerRecords).onComplete(validationEvent -> {
if(validationEvent.succeeded()) {
if(validationEvent.result().isSuccess()) {
Expand All @@ -208,6 +205,7 @@ public boolean handle(final HttpServerRequest request) {
return;
}
log.error("TODO error handling", exceptionFactory.newException(ex));
respondWith(StatusCode.INTERNAL_SERVER_ERROR, ex.getMessage(), request);
});
});
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class KafkaProducerRecordBuilder {
* @return A list of {@link KafkaProducerRecord}s created from the provided payload
* @throws ValidationException when the payload is not valid (missing properties, wrong types, etc.)
*/
Future<List<KafkaProducerRecord<String, String>>> buildRecords(String topic, Buffer payload) {
Future<List<KafkaProducerRecord<String, String>>> buildRecordsAsync(String topic, Buffer payload) {
return Future.<Void>succeededFuture().compose((Void v) -> {
JsonObject payloadObj;
try {
Expand Down Expand Up @@ -82,9 +82,9 @@ Future<List<KafkaProducerRecord<String, String>>> buildRecords(String topic, Buf
});
}

/** @deprecated Use {@link #buildRecords(String, Buffer)}. */
/** @deprecated Use {@link #buildRecordsAsync(String, Buffer)}. */
@Deprecated
static List<KafkaProducerRecord<String, String>> buildRecordsBlocking(String topic, Buffer payload) throws ValidationException {
static List<KafkaProducerRecord<String, String>> buildRecords(String topic, Buffer payload) throws ValidationException {
List<KafkaProducerRecord<String, String>> kafkaProducerRecords = new ArrayList<>();
JsonObject payloadObj;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecordsBlocking;
import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords;

/**
* Test class for the {@link KafkaMessageSender}
Expand All @@ -47,7 +47,7 @@ public void sendSingleMessage(TestContext context) throws ValidationException {
Async async = context.async();
String topic = "myTopic";
final List<KafkaProducerRecord<String, String>> records =
buildRecordsBlocking(topic, Buffer.buffer(buildSingleRecordPayload("someKey").encode()));
buildRecords(topic, Buffer.buffer(buildSingleRecordPayload("someKey").encode()));

when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic)));

Expand All @@ -64,7 +64,7 @@ public void sendSingleMessageWithoutKey(TestContext context) throws ValidationEx
Async async = context.async();
String topic = "myTopic";
final List<KafkaProducerRecord<String, String>> records =
buildRecordsBlocking(topic, Buffer.buffer(buildSingleRecordPayload(null).encode()));
buildRecords(topic, Buffer.buffer(buildSingleRecordPayload(null).encode()));

when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic)));

Expand All @@ -81,7 +81,7 @@ public void sendMultipleMessages(TestContext context) throws ValidationException
Async async = context.async();
String topic = "myTopic";
final List<KafkaProducerRecord<String, String>> records =
buildRecordsBlocking(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode()));
buildRecords(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode()));

when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic)));

Expand All @@ -105,7 +105,7 @@ public void sendMultipleMessagesWithFailingMessage(TestContext context) throws V
Async async = context.async();
String topic = "myTopic";
final List<KafkaProducerRecord<String, String>> records =
buildRecordsBlocking(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode()));
buildRecords(topic, Buffer.buffer(buildThreeRecordsPayload("key_1", "key_2", "key_3").encode()));

when(producer.send(any())).thenReturn(Future.succeededFuture(new RecordMetadata(1,1,1, topic)));
when(producer.send(eq(records.get(1)))).thenReturn(Future.failedFuture("Message with key '" + records.get(1).key() + "' failed."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.util.List;

import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecordsBlocking;
import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords;

/**
* Test class for the {@link KafkaProducerRecordBuilder}
Expand All @@ -34,77 +34,77 @@ public class KafkaProducerRecordBuilderTest {
public void buildRecordsInvalidJson() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Error while parsing payload");
buildRecordsBlocking("myTopic", Buffer.buffer("notValidJson"));
buildRecords("myTopic", Buffer.buffer("notValidJson"));
}

@Test
public void buildRecordsMissingRecordsArray() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Missing 'records' array");
buildRecordsBlocking("myTopic", Buffer.buffer("{}"));
buildRecords("myTopic", Buffer.buffer("{}"));
}

@Test
public void buildRecordsNotArray() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects");
buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}"));
buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}"));
}

@Test
public void buildRecordsInvalidRecordsType() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects");
buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [123]}"));
buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}"));
}

@Test
public void buildRecordsEmptyRecordsArray(TestContext context) throws ValidationException {
final List<KafkaProducerRecord<String, String>> records =
buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": []}"));
buildRecords("myTopic", Buffer.buffer("{\"records\": []}"));
context.assertTrue(records.isEmpty());
}

@Test
public void buildRecordsInvalidKeyType() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'key' must be of type String");
buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}"));
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}"));
}

@Test
public void buildRecordsInvalidValueType() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'value' must be of type JsonObject");
buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}"));
buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}"));
}

@Test
public void buildRecordsMissingValue() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'value' is required");
buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\":[{}]}"));
buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}"));
}

@Test
public void buildRecordsInvalidHeadersType() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'headers' must be of type JsonObject");
buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}"));
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}"));
}

@Test
public void buildRecordsInvalidHeadersValueType() throws ValidationException {
thrown.expect( ValidationException.class );
thrown.expectMessage("Property 'headers' must be of type JsonObject holding String values only");
buildRecordsBlocking("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}"));
buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}"));
}

@Test
public void buildRecordsValidNoKeyNoHeaders(TestContext context) throws ValidationException {
JsonObject payload = buildPayload(null, null);
final List<KafkaProducerRecord<String, String>> records =
buildRecordsBlocking("myTopic", Buffer.buffer(payload.encode()));
buildRecords("myTopic", Buffer.buffer(payload.encode()));
context.assertFalse(records.isEmpty());
context.assertEquals(1, records.size());
context.assertEquals("myTopic", records.get(0).topic());
Expand All @@ -118,7 +118,7 @@ public void buildRecordsValidNoKeyNoHeaders(TestContext context) throws Validati
public void buildRecordsValidWithKeyNoHeaders(TestContext context) throws ValidationException {
JsonObject payload = buildPayload("someKey", null);
final List<KafkaProducerRecord<String, String>> records =
buildRecordsBlocking("myTopic", Buffer.buffer(payload.encode()));
buildRecords("myTopic", Buffer.buffer(payload.encode()));
context.assertFalse(records.isEmpty());
context.assertEquals(1, records.size());
context.assertEquals("myTopic", records.get(0).topic());
Expand All @@ -135,7 +135,7 @@ public void buildRecordsValidWithKeyWithHeaders(TestContext context) throws Vali
.add("header_1", "value_1")
.add("header_2", "value_2"));
final List<KafkaProducerRecord<String, String>> records =
buildRecordsBlocking("myTopic", Buffer.buffer(payload.encode()));
buildRecords("myTopic", Buffer.buffer(payload.encode()));
context.assertFalse(records.isEmpty());
context.assertEquals(1, records.size());
context.assertEquals("myTopic", records.get(0).topic());
Expand Down

0 comments on commit 721a69a

Please sign in to comment.