From c57f37cebfd47ba5dc602d60336702a4a5a4d6ce Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Tue, 12 Dec 2023 06:47:03 +0700 Subject: [PATCH] Stops publishing kafka08 sender This stops publishing the kafka08 sender, beginning at the next minor version, 2.17. This has been unsupported by Kafka since 2015 and corresponding configuration removed in Zipkin in mid 2019. Removing this removes tech debt around the similarly abandoned junit rule used to test it. See https://github.com/openzipkin/zipkin/pull/2564 Signed-off-by: Adrian Cole --- bom/pom.xml | 5 - kafka08/bnd.bnd | 4 - kafka08/pom.xml | 118 -------- .../zipkin2/reporter/kafka08/KafkaSender.java | 271 ------------------ .../reporter/kafka08/ITKafkaSender.java | 172 ----------- pom.xml | 1 - 6 files changed, 571 deletions(-) delete mode 100644 kafka08/bnd.bnd delete mode 100644 kafka08/pom.xml delete mode 100644 kafka08/src/main/java/zipkin2/reporter/kafka08/KafkaSender.java delete mode 100644 kafka08/src/test/java/zipkin2/reporter/kafka08/ITKafkaSender.java diff --git a/bom/pom.xml b/bom/pom.xml index 9a1f0400..dcae322e 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -111,11 +111,6 @@ zipkin-sender-urlconnection ${project.version} - - ${project.groupId} - zipkin-sender-kafka08 - ${project.version} - ${project.groupId} zipkin-sender-kafka diff --git a/kafka08/bnd.bnd b/kafka08/bnd.bnd deleted file mode 100644 index 38298c6b..00000000 --- a/kafka08/bnd.bnd +++ /dev/null @@ -1,4 +0,0 @@ -Import-Package: \ - * -Export-Package: \ - zipkin2.reporter.kafka08 diff --git a/kafka08/pom.xml b/kafka08/pom.xml deleted file mode 100644 index d755720e..00000000 --- a/kafka08/pom.xml +++ /dev/null @@ -1,118 +0,0 @@ - - - - 4.0.0 - - - io.zipkin.reporter2 - zipkin-reporter-parent - 2.17.0-SNAPSHOT - - - zipkin-sender-kafka08 - Zipkin Sender: Kafka 0.8 - - - - zipkin2.reporter.kafka08 - - ${project.basedir}/.. - - - - - ${project.groupId} - zipkin-reporter - ${project.version} - - - - org.apache.kafka - kafka-clients - - 0.8.2.2 - - - - org.slf4j - slf4j-log4j12 - - - - - - com.github.charithe - kafka-junit - - 1.7 - test - - - - - - - - maven-failsafe-plugin - ${maven-surefire-plugin.version} - true - - - --add-opens=java.base/java.lang=ALL-UNNAMED - - - - - - - - - release - - - 1.6 - java16 - - - - - maven-enforcer-plugin - ${maven-enforcer-plugin.version} - - - enforce-java - - enforce - - - - - - [11,12) - - - - - - - - - - - diff --git a/kafka08/src/main/java/zipkin2/reporter/kafka08/KafkaSender.java b/kafka08/src/main/java/zipkin2/reporter/kafka08/KafkaSender.java deleted file mode 100644 index 8a7ba4ac..00000000 --- a/kafka08/src/main/java/zipkin2/reporter/kafka08/KafkaSender.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Copyright 2016-2019 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.kafka08; - -import java.util.List; -import java.util.Map; -import java.util.Properties; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import zipkin2.Call; -import zipkin2.Callback; -import zipkin2.CheckResult; -import zipkin2.codec.Encoding; -import zipkin2.reporter.AwaitableCallback; -import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.ClosedSenderException; -import zipkin2.reporter.Sender; - -/** - * This sends (usually json v2) encoded spans to a Kafka topic. - * - *

This sender is thread-safe. - * - * @deprecated Please use io.zipkin.reporter2:zipkin-sender-kafka - */ -@Deprecated -public final class KafkaSender extends Sender { - - /** Creates a sender that sends {@link Encoding#JSON} messages. */ - public static KafkaSender create(String bootstrapServers) { - return newBuilder().bootstrapServers(bootstrapServers).build(); - } - - public static Builder newBuilder() { - // Settings below correspond to "Producer Configs" - // http://kafka.apache.org/0102/documentation.html#producerconfigs - Properties properties = new Properties(); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - ByteArraySerializer.class.getName()); - properties.put(ProducerConfig.ACKS_CONFIG, "0"); - return new Builder(properties); - } - - /** Configuration including defaults needed to send spans to a Kafka topic. */ - public static final class Builder { - final Properties properties; - Encoding encoding = Encoding.JSON; - String topic = "zipkin"; - int messageMaxBytes = 500_000; - - Builder(Properties properties) { - this.properties = properties; - } - - Builder(KafkaSender sender) { - properties = new Properties(); - properties.putAll(sender.properties); - encoding = sender.encoding; - topic = sender.topic; - messageMaxBytes = sender.messageMaxBytes; - } - - /** Topic zipkin spans will be send to. Defaults to "zipkin" */ - public Builder topic(String topic) { - if (topic == null) throw new NullPointerException("topic == null"); - this.topic = topic; - return this; - } - - /** - * Initial set of kafka servers to connect to, rest of cluster will be discovered (comma - * separated). Ex "192.168.99.100:9092" No default - * - * @see ProducerConfig#BOOTSTRAP_SERVERS_CONFIG - */ - public final Builder bootstrapServers(String bootstrapServers) { - if (bootstrapServers == null) throw new NullPointerException("bootstrapServers == null"); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - return this; - } - - /** - * Maximum size of a message. Must be equal to or less than the server's "message.max.bytes". - * Default 500KB. - */ - public Builder messageMaxBytes(int messageMaxBytes) { - this.messageMaxBytes = messageMaxBytes; - properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, messageMaxBytes); - return this; - } - - /** - * By default, a producer will be created, targeted to {@link #bootstrapServers(String)} with 0 - * required {@link ProducerConfig#ACKS_CONFIG acks}. Any properties set here will affect the - * producer config. - * - *

For example: Reduce the timeout blocking from one minute to 5 seconds. - *

{@code
-     * Map overrides = new LinkedHashMap<>();
-     * overrides.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "5000");
-     * builder.overrides(overrides);
-     * }
- * - * @see ProducerConfig - */ - public Builder overrides(Map overrides) { - if (overrides == null) throw new NullPointerException("overrides == null"); - properties.putAll(overrides); - return this; - } - - /** - * Use this to change the encoding used in messages. Default is {@linkplain Encoding#JSON} - * - *

Note: If ultimately sending to Zipkin, version 2.8+ is required to process protobuf. - */ - public Builder encoding(Encoding encoding) { - if (encoding == null) throw new NullPointerException("encoding == null"); - this.encoding = encoding; - return this; - } - - - public KafkaSender build() { - return new KafkaSender(this); - } - } - - final Properties properties; - final String topic; - final Encoding encoding; - final BytesMessageEncoder encoder; - final int messageMaxBytes; - - KafkaSender(Builder builder) { - properties = new Properties(); - properties.putAll(builder.properties); - topic = builder.topic; - encoding = builder.encoding; - encoder = BytesMessageEncoder.forEncoding(builder.encoding); - messageMaxBytes = builder.messageMaxBytes; - } - - public Builder toBuilder() { - return new Builder(this); - } - - /** get and close are typically called from different threads */ - volatile KafkaProducer producer; - volatile boolean closeCalled; - - @Override public int messageSizeInBytes(List encodedSpans) { - return encoding().listSizeInBytes(encodedSpans); - } - - @Override public int messageSizeInBytes(int encodedSizeInBytes) { - return encoding.listSizeInBytes(encodedSizeInBytes); - } - - @Override public Encoding encoding() { - return encoding; - } - - @Override public int messageMaxBytes() { - return messageMaxBytes; - } - - /** - * This sends all of the spans as a single message. - * - *

NOTE: this blocks until the metadata server is available. - */ - @Override public Call sendSpans(List encodedSpans) { - if (closeCalled) throw new ClosedSenderException(); - byte[] message = encoder.encode(encodedSpans); - return new KafkaCall(message); - } - - /** Ensures there are no problems reading metadata about the topic. */ - @Override public CheckResult check() { - try { - get().partitionsFor(topic); // make sure we can query the metadata - return CheckResult.OK; - } catch (RuntimeException e) { - return CheckResult.failed(e); - } - } - - KafkaProducer get() { - if (producer == null) { - synchronized (this) { - if (producer == null) { - producer = new KafkaProducer<>(properties); - } - } - } - return producer; - } - - @Override public synchronized void close() { - if (closeCalled) return; - KafkaProducer producer = this.producer; - if (producer != null) producer.close(); - closeCalled = true; - } - - @Override public final String toString() { - return "KafkaSender{" - + "bootstrapServers=" + properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) - + ", topic=" + topic - + "}"; - } - - class KafkaCall extends Call.Base { // KafkaFuture is not cancelable - private final byte[] message; - - KafkaCall(byte[] message) { - this.message = message; - } - - @Override protected Void doExecute() { - AwaitableCallback callback = new AwaitableCallback(); - get().send(new ProducerRecord<>(topic, message), new CallbackAdapter(callback)); - callback.await(); - return null; - } - - @Override protected void doEnqueue(Callback callback) { - get().send(new ProducerRecord<>(topic, message), new CallbackAdapter(callback)); - } - - @Override public Call clone() { - return new KafkaCall(message); - } - } - - static final class CallbackAdapter implements org.apache.kafka.clients.producer.Callback { - final Callback delegate; - - CallbackAdapter(Callback delegate) { - this.delegate = delegate; - } - - @Override public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception == null) { - delegate.onSuccess(null); - } else { - delegate.onError(exception); - } - } - - @Override public String toString() { - return delegate.toString(); - } - } -} diff --git a/kafka08/src/test/java/zipkin2/reporter/kafka08/ITKafkaSender.java b/kafka08/src/test/java/zipkin2/reporter/kafka08/ITKafkaSender.java deleted file mode 100644 index 509a1ea3..00000000 --- a/kafka08/src/test/java/zipkin2/reporter/kafka08/ITKafkaSender.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright 2016-2020 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.kafka08; - -import com.github.charithe.kafka.KafkaJunitRule; -import java.lang.management.ManagementFactory; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeoutException; -import java.util.stream.Stream; -import javax.management.ObjectName; -import kafka.serializer.DefaultDecoder; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import zipkin2.Call; -import zipkin2.CheckResult; -import zipkin2.Span; -import zipkin2.codec.Encoding; -import zipkin2.codec.SpanBytesDecoder; -import zipkin2.codec.SpanBytesEncoder; -import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.Sender; - -import static java.util.stream.Collectors.toList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static zipkin2.TestObjects.CLIENT_SPAN; - -public class ITKafkaSender { - @Rule public KafkaJunitRule kafka = new KafkaJunitRule(); - - String bootstrapServers = "localhost:" + kafka.kafkaBrokerPort(); - KafkaSender sender = KafkaSender.create(bootstrapServers); - - @After - public void close() { - sender.close(); - } - - @Test - public void sendsSpans() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessages().get(0))) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); - } - - @Test - public void sendsSpans_PROTO3() throws Exception { - sender.close(); - sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessages().get(0))) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); - } - - @Test - public void sendsSpans_THRIFT() throws Exception { - sender.close(); - sender = sender.toBuilder().encoding(Encoding.THRIFT).build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessages().get(0))) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); - } - - @Test - public void sendsSpansToCorrectTopic() throws Exception { - sender.close(); - sender = sender.toBuilder().topic("customzipkintopic").build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessages("customzipkintopic").get(0))) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); - } - - @Test - public void checkFalseWhenKafkaIsDown() { - kafka.shutdownKafka(); - - // Make a new tracer that fails faster than 60 seconds - sender.close(); - Map overrides = new LinkedHashMap<>(); - overrides.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "100"); - sender = sender.toBuilder().overrides(overrides).build(); - - CheckResult check = sender.check(); - assertThat(check.ok()).isFalse(); - assertThat(check.error()).isInstanceOf(org.apache.kafka.common.errors.TimeoutException.class); - } - - @Test - public void illegalToSendWhenClosed() { - sender.close(); - - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN).execute()) - .isInstanceOf(IllegalStateException.class); - } - - @Test - public void shouldCloseKafkaProducerOnClose() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - final ObjectName kafkaProducerMXBeanName = new ObjectName("kafka.producer:*"); - final Set withProducers = - ManagementFactory.getPlatformMBeanServer().queryNames(kafkaProducerMXBeanName, null); - assertThat(withProducers).isNotEmpty(); - - sender.close(); - - final Set withNoProducers = - ManagementFactory.getPlatformMBeanServer().queryNames(kafkaProducerMXBeanName, null); - assertThat(withNoProducers).isEmpty(); - } - - /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. - */ - @Test - public void toStringContainsOnlySummaryInformation() { - assertThat(sender.toString()) - .isEqualTo("KafkaSender{bootstrapServers=" + bootstrapServers + ", topic=zipkin}"); - } - - Call send(Span... spans) { - SpanBytesEncoder bytesEncoder; - switch (sender.encoding()) { - case JSON: - bytesEncoder = SpanBytesEncoder.JSON_V2; - break; - case THRIFT: - bytesEncoder = SpanBytesEncoder.THRIFT; - break; - case PROTO3: - bytesEncoder = SpanBytesEncoder.PROTO3; - break; - default: - throw new UnsupportedOperationException("encoding: " + sender.encoding()); - } - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); - } - - private List readMessages(String topic) throws TimeoutException { - return kafka.readMessages(topic, 1, new DefaultDecoder(kafka.consumerConfig().props())); - } - - private List readMessages() throws TimeoutException { - return readMessages("zipkin"); - } -} diff --git a/pom.xml b/pom.xml index 01719445..243e40bf 100755 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,6 @@ bom core kafka - kafka08 amqp-client activemq-client urlconnection