Skip to content

Commit

Permalink
BE: Chore: Upgrade kafka to confluent 7.8.0 (#710)
Browse files Browse the repository at this point in the history
  • Loading branch information
yeikel authored Dec 23, 2024
1 parent e44d16f commit a9bc82c
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 59 deletions.
10 changes: 5 additions & 5 deletions .dev/dev_arm64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'

kafka0:
image: confluentinc/cp-kafka:7.6.0.arm64
image: confluentinc/cp-kafka:7.8.0.arm64
user: "0:0"
hostname: kafka0
container_name: kafka0
Expand Down Expand Up @@ -60,7 +60,7 @@ services:
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

schema-registry0:
image: confluentinc/cp-schema-registry:7.6.0.arm64
image: confluentinc/cp-schema-registry:7.8.0.arm64
ports:
- 8085:8085
depends_on:
Expand All @@ -76,7 +76,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

kafka-connect0:
image: confluentinc/cp-kafka-connect:7.6.0.arm64
image: confluentinc/cp-kafka-connect:7.8.0.arm64
ports:
- 8083:8083
depends_on:
Expand All @@ -101,7 +101,7 @@ services:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"

ksqldb0:
image: confluentinc/cp-ksqldb-server:7.6.0.arm64
image: confluentinc/cp-ksqldb-server:7.8.0.arm64
depends_on:
- kafka0
- kafka-connect0
Expand All @@ -119,7 +119,7 @@ services:
KSQL_CACHE_MAX_BYTES_BUFFERING: 0

kafka-init-topics:
image: confluentinc/cp-kafka:7.6.0.arm64
image: confluentinc/cp-kafka:7.8.0.arm64
volumes:
- ../documentation/compose/data/message.json:/data/message.json
depends_on:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ build/
*.tgz

/docker/*.override.yaml
/e2e-tests/allure-results/
5 changes: 4 additions & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
<!-- ccs stands for Confluent Community Edition
See https://www.confluent.io/confluent-community-license-faq/
-->
<version>${confluent.version}-ccs</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
108 changes: 67 additions & 41 deletions api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.ResponseEntity;
import org.springframework.util.unit.DataSize;
import org.springframework.web.client.RestClientException;
Expand Down Expand Up @@ -51,14 +52,36 @@ private static Retry conflictCodeRetry() {
(WebClientResponseException.Conflict) signal.failure()));
}

private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
private static @NotNull Retry retryOnRebalance() {
return Retry.fixedDelay(MAX_RETRIES, RETRIES_DELAY).filter(e -> {

if (e instanceof WebClientResponseException.InternalServerError exception) {
final var errorMessage = getMessage(exception);
return StringUtils.equals(errorMessage,
// From https://github.com/apache/kafka/blob/dfc07e0e0c6e737a56a5402644265f634402b864/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2340
"Request cannot be completed because a rebalance is expected");
}
return false;
});
}

private static <T> Mono<T> withRetryOnConflictOrRebalance(Mono<T> publisher) {
return publisher
.retryWhen(retryOnRebalance())
.retryWhen(conflictCodeRetry());
}

private static <T> Flux<T> withRetryOnConflictOrRebalance(Flux<T> publisher) {
return publisher
.retryWhen(retryOnRebalance())
.retryWhen(conflictCodeRetry());
}

private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
private static <T> Mono<T> withRetryOnRebalance(Mono<T> publisher) {
return publisher.retryWhen(retryOnRebalance());
}


private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class,
Expand All @@ -73,197 +96,200 @@ private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
}

private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
return Mono.error(new ValidationException(getMessage(parseException)));
}

private static String getMessage(WebClientResponseException parseException) {
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
return Mono.error(new ValidationException(
Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message()));
return Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message();
}

@Override
public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
return withBadRequestErrorHandling(
super.createConnector(newConnector)
withRetryOnRebalance(super.createConnector(newConnector))
);
}

@Override
public Mono<Connector> setConnectorConfig(String connectorName, Map<String, Object> requestBody)
throws RestClientException {
return withBadRequestErrorHandling(
super.setConnectorConfig(connectorName, requestBody)
withRetryOnRebalance(super.setConnectorConfig(connectorName, requestBody))
);
}

@Override
public Mono<ResponseEntity<Connector>> createConnectorWithHttpInfo(NewConnector newConnector)
throws WebClientResponseException {
return withRetryOnConflict(super.createConnectorWithHttpInfo(newConnector));
return withRetryOnConflictOrRebalance(super.createConnectorWithHttpInfo(newConnector));
}

@Override
public Mono<Void> deleteConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.deleteConnector(connectorName));
return withRetryOnConflictOrRebalance(super.deleteConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> deleteConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.deleteConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.deleteConnectorWithHttpInfo(connectorName));
}


@Override
public Mono<Connector> getConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnector(connectorName));
return withRetryOnConflictOrRebalance(super.getConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Connector>> getConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Map<String, Object>> getConnectorConfig(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorConfig(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorConfig(connectorName));
}

@Override
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfigWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorConfigWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorConfigWithHttpInfo(connectorName));
}

@Override
public Flux<ConnectorPlugin> getConnectorPlugins() throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorPlugins());
return withRetryOnConflictOrRebalance(super.getConnectorPlugins());
}

@Override
public Mono<ResponseEntity<List<ConnectorPlugin>>> getConnectorPluginsWithHttpInfo()
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorPluginsWithHttpInfo());
return withRetryOnConflictOrRebalance(super.getConnectorPluginsWithHttpInfo());
}

@Override
public Mono<ConnectorStatus> getConnectorStatus(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorStatus(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorStatus(connectorName));
}

@Override
public Mono<ResponseEntity<ConnectorStatus>> getConnectorStatusWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorStatusWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorStatusWithHttpInfo(connectorName));
}

@Override
public Mono<TaskStatus> getConnectorTaskStatus(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTaskStatus(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.getConnectorTaskStatus(connectorName, taskId));
}

@Override
public Mono<ResponseEntity<TaskStatus>> getConnectorTaskStatusWithHttpInfo(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
}

@Override
public Flux<ConnectorTask> getConnectorTasks(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTasks(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTasks(connectorName));
}

@Override
public Mono<ResponseEntity<List<ConnectorTask>>> getConnectorTasksWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTasksWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTasksWithHttpInfo(connectorName));
}

@Override
public Mono<Map<String, ConnectorTopics>> getConnectorTopics(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTopics(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTopics(connectorName));
}

@Override
public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTopicsWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.getConnectorTopicsWithHttpInfo(connectorName));
}

@Override
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectors(search));
return withRetryOnConflictOrRebalance(super.getConnectors(search));
}

@Override
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorsWithHttpInfo(search));
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search));
}

@Override
public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.pauseConnector(connectorName));
return withRetryOnConflictOrRebalance(super.pauseConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.pauseConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
throws WebClientResponseException {
return withRetryOnConflict(super.restartConnector(connectorName, includeTasks, onlyFailed));
return withRetryOnConflictOrRebalance(super.restartConnector(connectorName, includeTasks, onlyFailed));
}

@Override
public Mono<ResponseEntity<Void>> restartConnectorWithHttpInfo(String connectorName, Boolean includeTasks,
Boolean onlyFailed) throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
return withRetryOnConflictOrRebalance(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
}

@Override
public Mono<Void> restartConnectorTask(String connectorName, Integer taskId) throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorTask(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.restartConnectorTask(connectorName, taskId));
}

@Override
public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
}

@Override
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
return super.resumeConnector(connectorName);
return withRetryOnRebalance(super.resumeConnector(connectorName));
}

@Override
public Mono<ResponseEntity<Void>> resumeConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.resumeConnectorWithHttpInfo(connectorName));
return withRetryOnConflictOrRebalance(super.resumeConnectorWithHttpInfo(connectorName));
}

@Override
public Mono<ResponseEntity<Connector>> setConnectorConfigWithHttpInfo(String connectorName,
Map<String, Object> requestBody)
throws WebClientResponseException {
return withRetryOnConflict(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
return withRetryOnConflictOrRebalance(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
}

@Override
public Mono<ConnectorPluginConfigValidationResponse> validateConnectorPluginConfig(String pluginName,
Map<String, Object> requestBody)
throws WebClientResponseException {
return withRetryOnConflict(super.validateConnectorPluginConfig(pluginName, requestBody));
return withRetryOnConflictOrRebalance(super.validateConnectorPluginConfig(pluginName, requestBody));
}

@Override
public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>> validateConnectorPluginConfigWithHttpInfo(
String pluginName, Map<String, Object> requestBody) throws WebClientResponseException {
return withRetryOnConflict(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
return withRetryOnConflictOrRebalance(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
}

private static class RetryingApiClient extends ApiClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class AbstractIntegrationTest {
private static final boolean IS_ARM =
System.getProperty("os.arch").contains("arm") || System.getProperty("os.arch").contains("aarch64");

private static final String CONFLUENT_PLATFORM_VERSION = IS_ARM ? "7.2.1.arm64" : "7.2.1";
private static final String CONFLUENT_PLATFORM_VERSION = IS_ARM ? "7.8.0.arm64" : "7.8.0";

public static final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION))
Expand Down
Loading

0 comments on commit a9bc82c

Please sign in to comment.