From 3710267e6a2ac5e7b03755e2927f864e78846592 Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Fri, 12 Feb 2021 14:44:19 +0900 Subject: [PATCH] Upgrade Jetty from 9.4.x to 11.0.2 to fix CVE-2020-27218 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Change Java EE dependencies for Jetty 11 - javax.activation:activation:1.1.1 → com.sun.activation:javax.activation:1.2.0 - javax.ws.rs:javax.ws.rs-api 2.1.1 → jakarta.ws.rs:jakarta.ws.rs-api 3.0.0. - javax.xml.bind:jaxb-api:2.3.0 → jakarta.xml.bind:jakarta.xml.bind-api:2.3.0 - Change all javax.ws.rs.* imports into jakarta.ws.rs.*. 2. Upgrade jackson from 2.10.5 to 2.12.1 + Add jakarta classifier to jackson-jaxrs-json-provider for compatibility with jakarta.ws.rs-api - Update jackson-databind dependency: now it uses the same version with the other jackson dependencies. - Add '@JsonIgnoreProperties(ignoreUnknown = true)' annotation to ErrorResponse: Prevent 'com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "servlet"'. - Remove ScalaObjectMapper from ConsumerGroupCommand: deprecated from jackson-module-scala 2.12.1. - Mark Empty with @JsonSerialize, @JsonDeserialize: now serializable with Jetty 11. 3. Upgrade jersey from 2.31 to 3.0.1 4. Add additional dependencies for Jetty 11 - javax.ws.rs:jsr311-api:1.1.1 - jakarta.servlet:jakarta.servlet-api:5.0.0 - org.glassfish.jersey.media:jersey-media-json-jackson:3.0.1: Required to register jackson as a MessageBodyWriter. 5. Upgrade Jetty from 9.4.x to 11.0.2 - Exclude org.slf4j:slf4j-api:2.0.0-alpha1 dependency. - Fix deprecated method calls in RestClient#httpRequest, InternalRequestSignature#addToRequest - Update InternalRequestSignatureTest#addToRequestShouldThrowExceptionOnInvalidSignatureAlgorithm Since Request is an Interface from Jetty 11, mocking without when..then.. clause does not work. So it is now replaced to real instance. - Update SSLUtils From Jetty 11, SslContextFactory is separated into SslContextFactory.Server and SslContextFactory.Client subtypes; So, the return types of SSLUtils#[createServerSideSslContextFactory, createClientSideSslContextFactory] are now changed. Since SslContextFactory.Client does not have getNeedClientAuth, getWantClientAuth methods so SSLUtilsTest is also updated accordingly. - Increase JsonRestServer#GRACEFUL_SHUTDOWN_TIMEOUT_MS from 100 to 3000 Jetty 11 requires more time to tear down their resources than Jetty 9.4.x; without this modification, TimeoutException is thrown during shutdown in AgentTest, CoordinatorTest. --- build.gradle | 38 ++++++++++++---- checkstyle/import-control.xml | 16 +++---- .../connect/rest/ConnectRestExtension.java | 2 +- .../rest/ConnectRestExtensionContext.java | 8 ++-- .../BasicAuthSecurityRestExtension.java | 5 ++- .../auth/extension/JaasBasicAuthFilter.java | 19 ++++---- .../BasicAuthSecurityRestExtensionTest.java | 7 +-- .../extension/JaasBasicAuthFilterTest.java | 18 ++++---- .../distributed/DistributedHerder.java | 8 ++-- .../runtime/rest/ConnectRestConfigurable.java | 4 +- .../rest/ConnectRestExtensionContextImpl.java | 2 +- .../rest/InternalRequestSignature.java | 15 ++++--- .../connect/runtime/rest/RestClient.java | 26 ++++++----- .../connect/runtime/rest/RestServer.java | 10 +++-- .../rest/errors/BadRequestException.java | 2 +- .../rest/errors/ConnectExceptionMapper.java | 11 ++--- .../rest/errors/ConnectRestException.java | 2 +- .../resources/ConnectorPluginsResource.java | 19 ++++---- .../rest/resources/ConnectorsResource.java | 45 +++++++++---------- .../rest/resources/LoggingResource.java | 18 ++++---- .../runtime/rest/resources/RootResource.java | 8 ++-- .../connect/runtime/rest/util/SSLUtils.java | 6 +-- .../RestExtensionIntegrationTest.java | 10 +++-- .../SessionedProtocolIntegrationTest.java | 4 +- .../distributed/DistributedHerderTest.java | 2 +- .../rest/InternalRequestSignatureTest.java | 31 ++++++------- .../connect/runtime/rest/RestServerTest.java | 18 ++++---- .../ConnectorPluginsResourceTest.java | 10 +++-- .../resources/ConnectorsResourceTest.java | 23 +++++----- .../runtime/rest/util/SSLUtilsTest.java | 12 ++--- .../util/clusters/EmbeddedConnectCluster.java | 8 ++-- .../EmbeddedConnectClusterAssertions.java | 4 +- .../kafka/admin/ConsumerGroupCommand.scala | 3 +- gradle/dependencies.gradle | 26 ++++++----- .../kafka/trogdor/agent/AgentClient.java | 19 ++++---- .../trogdor/agent/AgentRestResource.java | 26 ++++++----- .../coordinator/CoordinatorClient.java | 27 +++++------ .../coordinator/CoordinatorRestResource.java | 32 ++++++------- .../org/apache/kafka/trogdor/rest/Empty.java | 6 +++ .../kafka/trogdor/rest/ErrorResponse.java | 7 ++- .../kafka/trogdor/rest/JsonRestServer.java | 2 +- .../trogdor/rest/RestExceptionMapper.java | 11 ++--- .../trogdor/coordinator/CoordinatorTest.java | 10 +++-- .../trogdor/rest/RestExceptionMapperTest.java | 15 ++++--- 44 files changed, 328 insertions(+), 267 deletions(-) diff --git a/build.gradle b/build.gradle index 78a60ea0a809f..c67170dd39a5c 100644 --- a/build.gradle +++ b/build.gradle @@ -1525,11 +1525,20 @@ project(':trogdor') { implementation libs.jacksonJaxrsJsonProvider implementation libs.jerseyContainerServlet implementation libs.jerseyHk2 + implementation libs.jerseyMediaJsonJackson implementation libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9 implementation libs.activation // Jersey dependency that was available in the JDK before Java 9 - implementation libs.jettyServer - implementation libs.jettyServlet - implementation libs.jettyServlets + implementation (libs.jettyServer) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyServlet) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyServlets) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation libs.jakartaServletApi + implementation libs.jsr311Api testImplementation project(':clients') testImplementation libs.junitJupiter @@ -2120,7 +2129,7 @@ project(':connect:api') { dependencies { api project(':clients') implementation libs.slf4jApi - implementation libs.jaxrsApi + implementation libs.jakartaRsApi testImplementation libs.junitJupiter testRuntimeOnly libs.slf4jlog4j @@ -2245,12 +2254,23 @@ project(':connect:runtime') { implementation libs.jacksonJaxrsJsonProvider implementation libs.jerseyContainerServlet implementation libs.jerseyHk2 + implementation libs.jerseyMediaJsonJackson implementation libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9 implementation libs.activation // Jersey dependency that was available in the JDK before Java 9 - implementation libs.jettyServer - implementation libs.jettyServlet - implementation libs.jettyServlets - implementation libs.jettyClient + implementation (libs.jettyServer) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyServlet) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyServlets) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyClient) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation libs.jakartaServletApi + implementation libs.jsr311Api implementation libs.reflections implementation libs.mavenArtifact @@ -2377,7 +2397,7 @@ project(':connect:basic-auth-extension') { dependencies { implementation project(':connect:api') implementation libs.slf4jApi - implementation libs.jaxrsApi + implementation libs.jakartaRsApi testImplementation libs.bcpkix testImplementation libs.easymock diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index e7e2e4dca414f..16dcc3724c3a4 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -302,8 +302,8 @@ - - + + @@ -451,7 +451,7 @@ - + @@ -484,8 +484,8 @@ - - + + @@ -501,7 +501,7 @@ - + @@ -530,7 +530,7 @@ - + @@ -540,7 +540,7 @@ - + diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java index aa479a3d449ac..ffe358fb41c31 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java @@ -50,7 +50,7 @@ public interface ConnectRestExtension extends Configurable, Versioned, Closeable * method. The Connect framework will invoke this method after registering the default Connect resources. If the implementations attempt * to re-register any of the Connect resources, it will be be ignored and will be logged. * - * @param restPluginContext The context provides access to JAX-RS {@link javax.ws.rs.core.Configurable} and {@link + * @param restPluginContext The context provides access to JAX-RS {@link jakarta.ws.rs.core.Configurable} and {@link * ConnectClusterState}.The custom JAX-RS resources can be registered via the {@link * ConnectRestExtensionContext#configurable()} */ diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java index 76085971c379b..6a72d2573730d 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java @@ -19,19 +19,19 @@ import org.apache.kafka.connect.health.ConnectClusterState; -import javax.ws.rs.core.Configurable; +import jakarta.ws.rs.core.Configurable; /** * The interface provides the ability for {@link ConnectRestExtension} implementations to access the JAX-RS - * {@link javax.ws.rs.core.Configurable} and cluster state {@link ConnectClusterState}. The implementation for the interface is provided + * {@link jakarta.ws.rs.core.Configurable} and cluster state {@link ConnectClusterState}. The implementation for the interface is provided * by the Connect framework. */ public interface ConnectRestExtensionContext { /** - * Provides an implementation of {@link javax.ws.rs.core.Configurable} that be used to register JAX-RS resources. + * Provides an implementation of {@link jakarta.ws.rs.core.Configurable} that be used to register JAX-RS resources. * - * @return @return the JAX-RS {@link javax.ws.rs.core.Configurable}; never {@code null} + * @return @return the JAX-RS {@link jakarta.ws.rs.core.Configurable}; never {@code null} */ Configurable configurable(); diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java index 8c41762d76b66..d61698a0fab17 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java @@ -21,13 +21,14 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.Configuration; import java.io.IOException; import java.util.Map; import java.util.function.Supplier; +import javax.security.auth.login.Configuration; /** * Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link @@ -50,7 +51,7 @@ * * *

This is a reference implementation of the {@link ConnectRestExtension} interface. It registers an implementation of {@link - * javax.ws.rs.container.ContainerRequestFilter} that does JAAS based authentication of incoming Basic Auth credentials. {@link + * jakarta.ws.rs.container.ContainerRequestFilter} that does JAAS based authentication of incoming Basic Auth credentials. {@link * ConnectRestExtension} implementations are loaded via the plugin class loader using {@link java.util.ServiceLoader} mechanism and hence * the packaged jar includes {@code META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} with the entry * {@code org.apache.kafka.connect.extension.auth.jaas.BasicAuthSecurityRestExtension} diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java index 0299cbba0b546..5fadb5f209766 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java @@ -17,30 +17,31 @@ package org.apache.kafka.connect.rest.basic.auth.extension; -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Pattern; -import javax.security.auth.login.Configuration; -import javax.ws.rs.HttpMethod; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Base64; - +import java.util.List; +import java.util.regex.Pattern; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; -import javax.ws.rs.container.ContainerRequestContext; -import javax.ws.rs.container.ContainerRequestFilter; -import javax.ws.rs.core.Response; + +import jakarta.ws.rs.HttpMethod; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.ContainerRequestFilter; +import jakarta.ws.rs.core.Response; public class JaasBasicAuthFilter implements ContainerRequestFilter { diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java index a0ec4bba553f5..18aa6639f650b 100644 --- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java +++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java @@ -19,25 +19,26 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; + import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import javax.security.auth.login.Configuration; -import javax.ws.rs.core.Configurable; - import java.io.IOException; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import javax.security.auth.login.Configuration; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import jakarta.ws.rs.core.Configurable; + public class BasicAuthSecurityRestExtensionTest { Configuration priorConfiguration; diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java index c6c674a6338c6..b361d55c447fa 100644 --- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java +++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java @@ -17,14 +17,9 @@ package org.apache.kafka.connect.rest.basic.auth.extension; -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.ChoiceCallback; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.UriInfo; - import org.apache.kafka.common.security.authenticator.TestJaasConfig; import org.apache.kafka.connect.errors.ConnectException; + import org.easymock.EasyMock; import org.junit.jupiter.api.Test; @@ -37,13 +32,18 @@ import java.util.Collections; import java.util.List; import java.util.Map; - -import javax.ws.rs.container.ContainerRequestContext; -import javax.ws.rs.core.Response; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.ChoiceCallback; import static org.easymock.EasyMock.replay; import static org.junit.jupiter.api.Assertions.assertThrows; +import jakarta.ws.rs.HttpMethod; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriInfo; + public class JaasBasicAuthFilterTest { private static final String LOGIN_MODULE = diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index b3b9ba9b94d2e..f465f697ed62f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -61,11 +61,9 @@ import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.SinkUtils; + import org.slf4j.Logger; -import javax.crypto.KeyGenerator; -import javax.crypto.SecretKey; -import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -90,6 +88,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0; @@ -97,6 +97,8 @@ import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; +import jakarta.ws.rs.core.Response; + /** *

* Distributed "herder" that coordinates with other workers to spread work across multiple processes. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java index 0d5cbd6333b48..55f0f678141c0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java @@ -24,8 +24,8 @@ import java.util.Map; import java.util.Objects; -import javax.ws.rs.core.Configurable; -import javax.ws.rs.core.Configuration; +import jakarta.ws.rs.core.Configurable; +import jakarta.ws.rs.core.Configuration; /** * The implementation delegates to {@link ResourceConfig} so that we can handle duplicate diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java index cdf282fc90768..0de8f582251ed 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java @@ -20,7 +20,7 @@ import org.apache.kafka.connect.health.ConnectClusterState; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; -import javax.ws.rs.core.Configurable; +import jakarta.ws.rs.core.Configurable; public class ConnectRestExtensionContextImpl implements ConnectRestExtensionContext { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java index d59425b13f6e2..8e653de1504a9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java @@ -18,16 +18,19 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; + import org.eclipse.jetty.client.api.Request; -import javax.crypto.Mac; -import javax.crypto.SecretKey; -import javax.ws.rs.core.HttpHeaders; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Base64; import java.util.Objects; +import javax.crypto.Mac; +import javax.crypto.SecretKey; + +import jakarta.ws.rs.core.HttpHeaders; +import org.eclipse.jetty.http.HttpFields; public class InternalRequestSignature { @@ -53,8 +56,10 @@ public static void addToRequest(SecretKey key, byte[] requestBody, String signat throw new ConnectException(e); } byte[] requestSignature = sign(mac, key, requestBody); - request.header(InternalRequestSignature.SIGNATURE_HEADER, Base64.getEncoder().encodeToString(requestSignature)) - .header(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER, signatureAlgorithm); + request.headers((HttpFields.Mutable field) -> { + field.add(InternalRequestSignature.SIGNATURE_HEADER, Base64.getEncoder().encodeToString(requestSignature)); + field.add(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER, signatureAlgorithm); + }); } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java index 58d7df0ca4d77..5048a1a7fe664 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java @@ -17,33 +17,35 @@ package org.apache.kafka.connect.runtime.rest; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; - -import javax.crypto.SecretKey; -import javax.ws.rs.core.HttpHeaders; - import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.runtime.rest.util.SSLUtils; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.util.StringContentProvider; +import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; +import org.eclipse.jetty.client.util.StringRequestContent; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.ClientConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.core.Response; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import javax.crypto.SecretKey; + +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.Response; public class RestClient { private static final Logger log = LoggerFactory.getLogger(RestClient.class); @@ -86,7 +88,9 @@ public static HttpResponse httpRequest(String url, String method, HttpHea HttpClient client; if (url.startsWith("https://")) { - client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config)); + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setSslContextFactory(SSLUtils.createClientSideSslContextFactory(config)); + client = new HttpClient(new HttpClientTransportDynamic(clientConnector)); } else { client = new HttpClient(); } @@ -111,7 +115,7 @@ public static HttpResponse httpRequest(String url, String method, HttpHea addHeadersToRequest(headers, req); if (serializedBody != null) { - req.content(new StringContentProvider(serializedBody, StandardCharsets.UTF_8), "application/json"); + req.body(new StringRequestContent("application/json", serializedBody, StandardCharsets.UTF_8)); if (sessionKey != null && requestSignatureAlgorithm != null) { InternalRequestSignature.addToRequest( sessionKey, @@ -161,7 +165,7 @@ private static void addHeadersToRequest(HttpHeaders headers, Request req) { if (headers != null) { String credentialAuthorization = headers.getHeaderString(HttpHeaders.AUTHORIZATION); if (credentialAuthorization != null) { - req.header(HttpHeaders.AUTHORIZATION, credentialAuthorization); + req.headers((HttpFields.Mutable field) -> field.add(HttpHeaders.AUTHORIZATION, credentialAuthorization)); } } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 8f371bbfaee40..2df91824a7301 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.connect.runtime.rest; -import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; @@ -33,6 +32,8 @@ import org.apache.kafka.connect.runtime.rest.resources.LoggingResource; import org.apache.kafka.connect.runtime.rest.resources.RootResource; import org.apache.kafka.connect.runtime.rest.util.SSLUtils; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.CustomRequestLog; import org.eclipse.jetty.server.Handler; @@ -55,8 +56,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.DispatcherType; -import javax.ws.rs.core.UriBuilder; import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -69,6 +68,9 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX; +import jakarta.servlet.DispatcherType; +import jakarta.ws.rs.core.UriBuilder; + /** * Embedded server for the REST API that provides the control plane for Kafka Connect workers. */ @@ -172,7 +174,7 @@ public Connector createConnector(String listener, boolean isAdmin) { ServerConnector connector; if (PROTOCOL_HTTPS.equals(protocol)) { - SslContextFactory ssl; + SslContextFactory.Server ssl; if (isAdmin) { ssl = SSLUtils.createServerSideSslContextFactory(config, ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX); } else { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java index bc9c7f273bf10..42a5094235e86 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.connect.runtime.rest.errors; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; public class BadRequestException extends ConnectRestException { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java index 8678fbf16cd58..75075b0aece68 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java @@ -19,14 +19,15 @@ import org.apache.kafka.connect.errors.AlreadyExistsException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriInfo; -import javax.ws.rs.ext.ExceptionMapper; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriInfo; +import jakarta.ws.rs.ext.ExceptionMapper; public class ConnectExceptionMapper implements ExceptionMapper { private static final Logger log = LoggerFactory.getLogger(ConnectExceptionMapper.class); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java index f45f72ddd8bd3..0d45ea578be86 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java @@ -18,7 +18,7 @@ import org.apache.kafka.connect.errors.ConnectException; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; public class ConnectRestException extends ConnectException { private final int statusCode; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 1f6161e2ebfb8..4979383a71577 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -31,15 +31,6 @@ import org.apache.kafka.connect.tools.VerifiableSourceConnector; import org.apache.kafka.connect.util.FutureCallback; -import javax.ws.rs.BadRequestException; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -48,6 +39,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + @Path("/connector-plugins") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 27c250af6ba01..ea3e2d5c9e505 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -16,11 +16,6 @@ */ package org.apache.kafka.connect.runtime.rest.resources; -import com.fasterxml.jackson.core.type.TypeReference; - -import javax.ws.rs.core.HttpHeaders; - -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; @@ -37,26 +32,12 @@ import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.FutureCallback; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.ServletContext; -import javax.ws.rs.BadRequestException; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriInfo; - import java.net.URI; import java.util.Collections; import java.util.HashMap; @@ -69,6 +50,24 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; +import jakarta.servlet.ServletContext; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; +import jakarta.ws.rs.core.UriInfo; + @Path("/connectors") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) @@ -88,7 +87,7 @@ public class ConnectorsResource { private final Herder herder; private final WorkerConfig config; - @javax.ws.rs.core.Context + @Context private ServletContext context; private final boolean isTopicTrackingDisabled; private final boolean isTopicTrackingResetDisabled; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java index 05796600a936b..e4c75092274ec 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java @@ -18,18 +18,11 @@ import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; + import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; @@ -39,6 +32,15 @@ import java.util.Objects; import java.util.TreeMap; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + /** * A set of endpoints to adjust the log levels of runtime loggers. */ diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java index 9666bf15954f9..d9ec611370e81 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java @@ -19,10 +19,10 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; @Path("/") @Produces(MediaType.APPLICATION_JSON) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java index bf22bb6d67033..1b0f9b732cd82 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java @@ -38,7 +38,7 @@ public class SSLUtils { /** * Configures SSL/TLS for HTTPS Jetty Server using configs with the given prefix */ - public static SslContextFactory createServerSideSslContextFactory(WorkerConfig config, String prefix) { + public static SslContextFactory.Server createServerSideSslContextFactory(WorkerConfig config, String prefix) { Map sslConfigValues = config.valuesWithPrefixAllOrNothing(prefix); final SslContextFactory.Server ssl = new SslContextFactory.Server(); @@ -54,14 +54,14 @@ public static SslContextFactory createServerSideSslContextFactory(WorkerConfig c /** * Configures SSL/TLS for HTTPS Jetty Server */ - public static SslContextFactory createServerSideSslContextFactory(WorkerConfig config) { + public static SslContextFactory.Server createServerSideSslContextFactory(WorkerConfig config) { return createServerSideSslContextFactory(config, "listeners.https."); } /** * Configures SSL/TLS for HTTPS Jetty Client */ - public static SslContextFactory createClientSideSslContextFactory(WorkerConfig config) { + public static SslContextFactory.Client createClientSideSslContextFactory(WorkerConfig config) { Map sslConfigValues = config.valuesWithPrefixAllOrNothing("listeners.https."); final SslContextFactory.Client ssl = new SslContextFactory.Client(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index 6ec86bd9342b1..d0c874b6e1242 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -27,19 +27,17 @@ import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.WorkerHandle; import org.apache.kafka.test.IntegrationTest; + import org.junit.After; import org.junit.Test; import org.junit.experimental.categories.Category; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.core.Response; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import static javax.ws.rs.core.Response.Status.BAD_REQUEST; +import static jakarta.ws.rs.core.Response.Status.BAD_REQUEST; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; @@ -48,6 +46,10 @@ import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.Assert.assertEquals; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.Response; + /** * A simple integration test to ensure that REST extensions are registered correctly. */ diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java index 8956a86e7c73c..238630e0b776d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java @@ -32,8 +32,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import static javax.ws.rs.core.Response.Status.BAD_REQUEST; -import static javax.ws.rs.core.Response.Status.FORBIDDEN; +import static jakarta.ws.rs.core.Response.Status.BAD_REQUEST; +import static jakarta.ws.rs.core.Response.Status.FORBIDDEN; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 65ec89c8cf99b..3c7c7ecf59989 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -85,7 +85,7 @@ import java.util.concurrent.TimeoutException; import static java.util.Collections.singletonList; -import static javax.ws.rs.core.Response.Status.FORBIDDEN; +import static jakarta.ws.rs.core.Response.Status.FORBIDDEN; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignatureTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignatureTest.java index f60ad3584a4fd..80feb9d4156d6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignatureTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignatureTest.java @@ -19,16 +19,17 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; + + +import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.junit.Test; -import org.mockito.ArgumentCaptor; +import java.net.URI; +import java.util.Base64; import javax.crypto.Mac; import javax.crypto.SecretKey; import javax.crypto.spec.SecretKeySpec; -import javax.ws.rs.core.HttpHeaders; - -import java.util.Base64; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -40,6 +41,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import jakarta.ws.rs.core.HttpHeaders; + public class InternalRequestSignatureTest { private static final byte[] REQUEST_BODY = @@ -94,35 +97,27 @@ public void fromHeadersShouldReturnNonNullResultOnValidSignatureAndSignatureAlgo @Test public void addToRequestShouldThrowExceptionOnInvalidSignatureAlgorithm() { - Request request = mock(Request.class); + HttpClient httpClient = new HttpClient(); + Request request = httpClient.newRequest(URI.create("http://localhost")); assertThrows(ConnectException.class, () -> InternalRequestSignature.addToRequest(KEY, REQUEST_BODY, "doesn'texist", request)); } @Test public void addToRequestShouldAddHeadersOnValidSignatureAlgorithm() { - Request request = mock(Request.class); - ArgumentCaptor signatureCapture = ArgumentCaptor.forClass(String.class); - ArgumentCaptor signatureAlgorithmCapture = ArgumentCaptor.forClass(String.class); - when(request.header( - eq(InternalRequestSignature.SIGNATURE_HEADER), - signatureCapture.capture() - )).thenReturn(request); - when(request.header( - eq(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER), - signatureAlgorithmCapture.capture() - )).thenReturn(request); + HttpClient httpClient = new HttpClient(); + Request request = httpClient.newRequest(URI.create("http://localhost")); InternalRequestSignature.addToRequest(KEY, REQUEST_BODY, SIGNATURE_ALGORITHM, request); assertEquals( "Request should have valid base 64-encoded signature added as header", ENCODED_SIGNATURE, - signatureCapture.getValue() + request.getHeaders().get(InternalRequestSignature.SIGNATURE_HEADER) ); assertEquals( "Request should have provided signature algorithm added as header", SIGNATURE_ALGORITHM, - signatureAlgorithmCapture.getValue() + request.getHeaders().get(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER) ); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index ff3af44aa3715..150a0bac470f3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -16,6 +16,14 @@ */ package org.apache.kafka.connect.runtime.rest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.connect.rest.ConnectRestExtension; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; + import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpHost; @@ -28,13 +36,6 @@ import org.apache.http.impl.client.BasicResponseHandler; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.connect.rest.ConnectRestExtension; -import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.WorkerConfig; -import org.apache.kafka.connect.runtime.distributed.DistributedConfig; -import org.apache.kafka.connect.runtime.isolation.Plugins; -import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -46,7 +47,6 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.LoggerFactory; -import javax.ws.rs.core.MediaType; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -62,6 +62,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import jakarta.ws.rs.core.MediaType; + @RunWith(PowerMockRunner.class) @PowerMockIgnore({"javax.net.ssl.*", "javax.security.*", "javax.crypto.*"}) public class RestServerTest { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 148b782e9fead..4be69fc1855e3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -16,9 +16,6 @@ */ package org.apache.kafka.connect.runtime.rest.resources; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import javax.ws.rs.core.HttpHeaders; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -53,6 +50,9 @@ import org.apache.kafka.connect.tools.VerifiableSinkConnector; import org.apache.kafka.connect.tools.VerifiableSourceConnector; import org.apache.kafka.connect.util.Callback; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; @@ -65,7 +65,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import javax.ws.rs.BadRequestException; import java.net.URL; import java.util.Arrays; import java.util.Collections; @@ -82,6 +81,9 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.core.HttpHeaders; + @RunWith(PowerMockRunner.class) @PrepareForTest(RestClient.class) @PowerMockIgnore("javax.management.*") diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index f3d42a2f588e3..e68383348c114 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -16,12 +16,6 @@ */ package org.apache.kafka.connect.runtime.rest.resources; -import com.fasterxml.jackson.core.type.TypeReference; - -import javax.crypto.Mac; -import javax.ws.rs.core.HttpHeaders; - -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.errors.AlreadyExistsException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; @@ -40,6 +34,9 @@ import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; @@ -53,12 +50,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import javax.ws.rs.BadRequestException; -import javax.ws.rs.core.MultivaluedHashMap; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriInfo; - import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -71,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import javax.crypto.Mac; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; @@ -78,6 +70,13 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MultivaluedHashMap; +import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriInfo; + @RunWith(PowerMockRunner.class) @PrepareForTest(RestClient.class) @PowerMockIgnore({"javax.management.*", "javax.crypto.*"}) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java index 8959a6c6e0b1f..6dc0269c3bf7e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java @@ -79,7 +79,7 @@ public void testCreateServerSideSslContextFactory() { configMap.put("ssl.trustmanager.algorithm", "PKIX"); DistributedConfig config = new DistributedConfig(configMap); - SslContextFactory ssl = SSLUtils.createServerSideSslContextFactory(config); + SslContextFactory.Server ssl = SSLUtils.createServerSideSslContextFactory(config); Assert.assertEquals("file:///path/to/keystore", ssl.getKeyStorePath()); Assert.assertEquals("file:///path/to/truststore", ssl.getTrustStorePath()); @@ -117,15 +117,13 @@ public void testCreateClientSideSslContextFactory() { configMap.put("ssl.trustmanager.algorithm", "PKIX"); DistributedConfig config = new DistributedConfig(configMap); - SslContextFactory ssl = SSLUtils.createClientSideSslContextFactory(config); + SslContextFactory.Client ssl = SSLUtils.createClientSideSslContextFactory(config); Assert.assertEquals("file:///path/to/keystore", ssl.getKeyStorePath()); Assert.assertEquals("file:///path/to/truststore", ssl.getTrustStorePath()); Assert.assertEquals("SunJSSE", ssl.getProvider()); Assert.assertArrayEquals(new String[] {"SSL_RSA_WITH_RC4_128_SHA", "SSL_RSA_WITH_RC4_128_MD5"}, ssl.getIncludeCipherSuites()); Assert.assertEquals("SHA1PRNG", ssl.getSecureRandomAlgorithm()); - Assert.assertFalse(ssl.getNeedClientAuth()); - Assert.assertFalse(ssl.getWantClientAuth()); Assert.assertEquals("JKS", ssl.getKeyStoreType()); Assert.assertEquals("JKS", ssl.getTrustStoreType()); Assert.assertEquals("TLS", ssl.getProtocol()); @@ -152,7 +150,7 @@ public void testCreateServerSideSslContextFactoryDefaultValues() { configMap.put("ssl.secure.random.implementation", "SHA1PRNG"); DistributedConfig config = new DistributedConfig(configMap); - SslContextFactory ssl = SSLUtils.createServerSideSslContextFactory(config); + SslContextFactory.Server ssl = SSLUtils.createServerSideSslContextFactory(config); Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ssl.getKeyStoreType()); Assert.assertEquals(SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ssl.getTrustStoreType()); @@ -182,7 +180,7 @@ public void testCreateClientSideSslContextFactoryDefaultValues() { configMap.put("ssl.secure.random.implementation", "SHA1PRNG"); DistributedConfig config = new DistributedConfig(configMap); - SslContextFactory ssl = SSLUtils.createClientSideSslContextFactory(config); + SslContextFactory.Client ssl = SSLUtils.createClientSideSslContextFactory(config); Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ssl.getKeyStoreType()); Assert.assertEquals(SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ssl.getTrustStoreType()); @@ -190,7 +188,5 @@ public void testCreateClientSideSslContextFactoryDefaultValues() { Assert.assertArrayEquals(Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split("\\s*,\\s*")).toArray(), ssl.getIncludeProtocols()); Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ssl.getKeyManagerFactoryAlgorithm()); Assert.assertEquals(SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ssl.getTrustManagerFactoryAlgorithm()); - Assert.assertFalse(ssl.getNeedClientAuth()); - Assert.assertFalse(ssl.getWantClientAuth()); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index b7111d008b0dd..d9a7e87a0e367 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.connect.util.clusters; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -26,10 +24,12 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; @@ -62,6 +62,8 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG; +import jakarta.ws.rs.core.Response; + /** * Start an embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, setup any tmp * directories and clean up them on them. Methods on the same {@code EmbeddedConnectCluster} are diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java index 6e1f87e307e18..ded28408abee4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java @@ -21,10 +21,10 @@ import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.core.Response; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; @@ -38,6 +38,8 @@ import static org.apache.kafka.test.TestUtils.waitForCondition; +import jakarta.ws.rs.core.Response; + /** * A set of common assertions that can be applied to a Connect cluster during integration testing */ diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 2c0dc8c423ef7..6c6090f006dee 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -21,7 +21,6 @@ import java.time.{Duration, Instant} import java.util.Properties import com.fasterxml.jackson.dataformat.csv.CsvMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule -import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import kafka.utils._ import kafka.utils.Implicits._ import org.apache.kafka.clients.admin._ @@ -146,7 +145,7 @@ object ConsumerGroupCommand extends Logging { } // Example: CsvUtils().readerFor[CsvRecordWithoutGroup] private[admin] case class CsvUtils() { - val mapper = new CsvMapper with ScalaObjectMapper + val mapper = new CsvMapper mapper.registerModule(DefaultScalaModule) def readerFor[T <: CsvRecord : ClassTag] = { val schema = getSchema[T] diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 91df77adcb2a7..0dfe75a2f42ae 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -54,7 +54,7 @@ if ( !versions.scala.contains('-') ) { } versions += [ - activation: "1.1.1", + activation: "1.2.0", apacheda: "1.0.2", apacheds: "2.0.0-M24", argparse4j: "0.7.0", @@ -65,21 +65,22 @@ versions += [ grgit: "4.1.0", httpclient: "4.5.13", easymock: "4.2", - jackson: "2.10.5", - jacksonDatabind: "2.10.5.1", + jackson: "2.12.1", jacoco: "0.8.5", javassist: "3.27.0-GA", - jetty: "9.4.39.v20210325", - jersey: "2.31", + jetty: "11.0.2", + jersey: "3.0.1", jline: "3.12.1", jmh: "1.27", hamcrest: "2.2", log4j: "1.2.17", scalaLogging: "3.9.2", jaxb: "2.3.0", - jaxrs: "2.1.1", + jakartaRs: "3.0.0", + jakartaServlet: "5.0.0", jfreechart: "1.0.0", jopt: "5.0.4", + jsr311: "1.1.1", junit: "5.7.1", jqwik: "1.5.0", kafka_0100: "0.10.0.1", @@ -116,7 +117,7 @@ versions += [ zstd: "1.4.9-1" ] libs += [ - activation: "javax.activation:activation:$versions.activation", + activation: "com.sun.activation:javax.activation:$versions.activation", apacheda: "org.apache.directory.api:api-all:$versions.apacheda", apachedsCoreApi: "org.apache.directory.server:apacheds-core-api:$versions.apacheds", apachedsInterceptorKerberos: "org.apache.directory.server:apacheds-interceptor-kerberos:$versions.apacheds", @@ -131,14 +132,15 @@ libs += [ commonsCli: "commons-cli:commons-cli:$versions.commonsCli", easymock: "org.easymock:easymock:$versions.easymock", jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson", - jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jacksonDatabind", + jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson", jacksonDataformatCsv: "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:$versions.jackson", jacksonModuleScala: "com.fasterxml.jackson.module:jackson-module-scala_$versions.baseScala:$versions.jackson", jacksonJDK8Datatypes: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson", - jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson", - jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb", - jaxrsApi: "javax.ws.rs:javax.ws.rs-api:$versions.jaxrs", + jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson:jakarta", + jakartaRsApi: "jakarta.ws.rs:jakarta.ws.rs-api:$versions.jakartaRs", + jakartaServletApi: "jakarta.servlet:jakarta.servlet-api:$versions.jakartaServlet", javassist: "org.javassist:javassist:$versions.javassist", + jaxbApi: "jakarta.xml.bind:jakarta.xml.bind-api:$versions.jaxb", jettyServer: "org.eclipse.jetty:jetty-server:$versions.jetty", jettyClient: "org.eclipse.jetty:jetty-client:$versions.jetty", jettyServlet: "org.eclipse.jetty:jetty-servlet:$versions.jetty", @@ -146,10 +148,12 @@ libs += [ jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey", jerseyHk2: "org.glassfish.jersey.inject:jersey-hk2:$versions.jersey", jline: "org.jline:jline:$versions.jline", + jerseyMediaJsonJackson: "org.glassfish.jersey.media:jersey-media-json-jackson:$versions.jersey", jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh", jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh", jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh", joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt", + jsr311Api: "javax.ws.rs:jsr311-api:$versions.jsr311", junitJupiter: "org.junit.jupiter:junit-jupiter:$versions.junit", junitJupiterApi: "org.junit.jupiter:junit-jupiter-api:$versions.junit", junitVintageEngine: "org.junit.vintage:junit-vintage-engine:$versions.junit", diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java index 0f47e92a92e8f..f1904ebf020e8 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java @@ -17,12 +17,6 @@ package org.apache.kafka.trogdor.agent; -import com.fasterxml.jackson.core.type.TypeReference; -import net.sourceforge.argparse4j.ArgumentParsers; -import net.sourceforge.argparse4j.inf.ArgumentParser; -import net.sourceforge.argparse4j.inf.Namespace; -import net.sourceforge.argparse4j.inf.Subparser; -import net.sourceforge.argparse4j.inf.Subparsers; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.StringFormatter; @@ -33,14 +27,19 @@ import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.UptimeResponse; import org.apache.kafka.trogdor.rest.WorkerState; import org.apache.kafka.trogdor.task.TaskSpec; -import org.apache.kafka.trogdor.rest.UptimeResponse; + +import com.fasterxml.jackson.core.type.TypeReference; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.core.UriBuilder; - import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.ArrayList; @@ -53,6 +52,8 @@ import static org.apache.kafka.trogdor.common.StringFormatter.dateString; import static org.apache.kafka.trogdor.common.StringFormatter.durationString; +import jakarta.ws.rs.core.UriBuilder; + /** * A client for the Trogdor agent. */ diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java index ec3df8bf84696..3078ca2f09cb6 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java @@ -23,19 +23,21 @@ import org.apache.kafka.trogdor.rest.StopWorkerRequest; import org.apache.kafka.trogdor.rest.UptimeResponse; -import javax.servlet.ServletContext; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; import java.util.concurrent.atomic.AtomicReference; +import jakarta.servlet.ServletContext; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.DefaultValue; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.MediaType; + /** * The REST resource for the Agent. This describes the RPCs which the agent can accept. * @@ -53,7 +55,7 @@ public class AgentRestResource { private final AtomicReference agent = new AtomicReference<>(null); - @javax.ws.rs.core.Context + @Context private ServletContext context; public void setAgent(Agent myAgent) { diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java index 078dbbcd158ae..3e892be61a46a 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java @@ -17,13 +17,6 @@ package org.apache.kafka.trogdor.coordinator; -import com.fasterxml.jackson.core.type.TypeReference; -import net.sourceforge.argparse4j.ArgumentParsers; -import net.sourceforge.argparse4j.inf.ArgumentParser; -import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup; -import net.sourceforge.argparse4j.inf.Namespace; -import net.sourceforge.argparse4j.inf.Subparser; -import net.sourceforge.argparse4j.inf.Subparsers; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.StringFormatter; @@ -33,25 +26,30 @@ import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse; +import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.TaskPending; import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TaskRunning; +import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TaskStateType; import org.apache.kafka.trogdor.rest.TaskStopping; import org.apache.kafka.trogdor.rest.TasksRequest; -import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; -import org.apache.kafka.trogdor.task.TaskSpec; -import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.UptimeResponse; +import org.apache.kafka.trogdor.task.TaskSpec; + +import com.fasterxml.jackson.core.type.TypeReference; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.core.UriBuilder; - import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.ArrayList; @@ -69,6 +67,9 @@ import static org.apache.kafka.trogdor.common.StringFormatter.dateString; import static org.apache.kafka.trogdor.common.StringFormatter.durationString; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.core.UriBuilder; + /** * A client for the Trogdor coordinator. */ diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java index 337f2b46ebc1c..44b98abaa9f7e 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java @@ -29,24 +29,26 @@ import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.UptimeResponse; -import javax.servlet.ServletContext; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import jakarta.servlet.ServletContext; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.DefaultValue; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + /** * The REST resource for the Coordinator. This describes the RPCs which the coordinator * can accept. @@ -65,7 +67,7 @@ public class CoordinatorRestResource { private final AtomicReference coordinator = new AtomicReference(); - @javax.ws.rs.core.Context + @Context private ServletContext context; public void setCoordinator(Coordinator myCoordinator) { diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/Empty.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/Empty.java index da2fcbaefbe3e..5b4ed4b243916 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/Empty.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/Empty.java @@ -18,11 +18,17 @@ package org.apache.kafka.trogdor.rest; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + + import org.apache.kafka.trogdor.common.JsonUtil; /** * An empty request or response. */ +@JsonSerialize +@JsonDeserialize public class Empty { public static final Empty INSTANCE = new Empty(); diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java index 08bf6cdad12b2..45651b11a36d8 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/ErrorResponse.java @@ -17,15 +17,18 @@ package org.apache.kafka.trogdor.rest; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.kafka.trogdor.common.JsonUtil; import java.util.Objects; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + /** * An error response. */ +@JsonIgnoreProperties(ignoreUnknown = true) public class ErrorResponse { private final int code; private final String message; diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java index d1f1948e2fba1..e785804d3c5a1 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java @@ -56,7 +56,7 @@ public class JsonRestServer { private static final Logger log = LoggerFactory.getLogger(JsonRestServer.class); - private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 100; + private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 3000; private final ScheduledExecutorService shutdownExecutor; diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java index 57c54ec04d846..6f85562bba88d 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java @@ -16,16 +16,17 @@ */ package org.apache.kafka.trogdor.rest; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.SerializationException; + +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.core.Response; -import javax.ws.rs.ext.ExceptionMapper; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.ext.ExceptionMapper; public class RestExceptionMapper implements ExceptionMapper { private static final Logger log = LoggerFactory.getLogger(RestExceptionMapper.class); diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index b77b2b0007409..4de0db854327e 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -17,9 +17,6 @@ package org.apache.kafka.trogdor.coordinator; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.common.utils.MockScheduler; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Scheduler; @@ -49,13 +46,16 @@ import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.task.NoOpTaskSpec; import org.apache.kafka.trogdor.task.SampleTaskSpec; + +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.NotFoundException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -69,6 +69,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import jakarta.ws.rs.NotFoundException; + @Tag("integration") @Timeout(value = 120000, unit = MILLISECONDS) public class CoordinatorTest { diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java b/trogdor/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java index e61aec0165e71..4e1c4636f86e5 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java @@ -16,20 +16,21 @@ */ package org.apache.kafka.trogdor.rest; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.SerializationException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.core.Response; - -import org.apache.kafka.common.errors.InvalidRequestException; -import org.apache.kafka.common.errors.SerializationException; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.core.Response; + public class RestExceptionMapperTest { @Test