diff --git a/bom/pom.xml b/bom/pom.xml index 9a1f0400..dcae322e 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -111,11 +111,6 @@ zipkin-sender-urlconnection ${project.version} - - ${project.groupId} - zipkin-sender-kafka08 - ${project.version} - ${project.groupId} zipkin-sender-kafka diff --git a/kafka08/bnd.bnd b/kafka08/bnd.bnd deleted file mode 100644 index 38298c6b..00000000 --- a/kafka08/bnd.bnd +++ /dev/null @@ -1,4 +0,0 @@ -Import-Package: \ - * -Export-Package: \ - zipkin2.reporter.kafka08 diff --git a/kafka08/pom.xml b/kafka08/pom.xml deleted file mode 100644 index d755720e..00000000 --- a/kafka08/pom.xml +++ /dev/null @@ -1,118 +0,0 @@ - - - - 4.0.0 - - - io.zipkin.reporter2 - zipkin-reporter-parent - 2.17.0-SNAPSHOT - - - zipkin-sender-kafka08 - Zipkin Sender: Kafka 0.8 - - - - zipkin2.reporter.kafka08 - - ${project.basedir}/.. - - - - - ${project.groupId} - zipkin-reporter - ${project.version} - - - - org.apache.kafka - kafka-clients - - 0.8.2.2 - - - - org.slf4j - slf4j-log4j12 - - - - - - com.github.charithe - kafka-junit - - 1.7 - test - - - - - - - - maven-failsafe-plugin - ${maven-surefire-plugin.version} - true - - - --add-opens=java.base/java.lang=ALL-UNNAMED - - - - - - - - - release - - - 1.6 - java16 - - - - - maven-enforcer-plugin - ${maven-enforcer-plugin.version} - - - enforce-java - - enforce - - - - - - [11,12) - - - - - - - - - - - diff --git a/kafka08/src/main/java/zipkin2/reporter/kafka08/KafkaSender.java b/kafka08/src/main/java/zipkin2/reporter/kafka08/KafkaSender.java deleted file mode 100644 index 8a7ba4ac..00000000 --- a/kafka08/src/main/java/zipkin2/reporter/kafka08/KafkaSender.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Copyright 2016-2019 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.kafka08; - -import java.util.List; -import java.util.Map; -import java.util.Properties; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import zipkin2.Call; -import zipkin2.Callback; -import zipkin2.CheckResult; -import zipkin2.codec.Encoding; -import zipkin2.reporter.AwaitableCallback; -import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.ClosedSenderException; -import zipkin2.reporter.Sender; - -/** - * This sends (usually json v2) encoded spans to a Kafka topic. - * - *

This sender is thread-safe. - * - * @deprecated Please use io.zipkin.reporter2:zipkin-sender-kafka - */ -@Deprecated -public final class KafkaSender extends Sender { - - /** Creates a sender that sends {@link Encoding#JSON} messages. */ - public static KafkaSender create(String bootstrapServers) { - return newBuilder().bootstrapServers(bootstrapServers).build(); - } - - public static Builder newBuilder() { - // Settings below correspond to "Producer Configs" - // http://kafka.apache.org/0102/documentation.html#producerconfigs - Properties properties = new Properties(); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - ByteArraySerializer.class.getName()); - properties.put(ProducerConfig.ACKS_CONFIG, "0"); - return new Builder(properties); - } - - /** Configuration including defaults needed to send spans to a Kafka topic. */ - public static final class Builder { - final Properties properties; - Encoding encoding = Encoding.JSON; - String topic = "zipkin"; - int messageMaxBytes = 500_000; - - Builder(Properties properties) { - this.properties = properties; - } - - Builder(KafkaSender sender) { - properties = new Properties(); - properties.putAll(sender.properties); - encoding = sender.encoding; - topic = sender.topic; - messageMaxBytes = sender.messageMaxBytes; - } - - /** Topic zipkin spans will be send to. Defaults to "zipkin" */ - public Builder topic(String topic) { - if (topic == null) throw new NullPointerException("topic == null"); - this.topic = topic; - return this; - } - - /** - * Initial set of kafka servers to connect to, rest of cluster will be discovered (comma - * separated). Ex "192.168.99.100:9092" No default - * - * @see ProducerConfig#BOOTSTRAP_SERVERS_CONFIG - */ - public final Builder bootstrapServers(String bootstrapServers) { - if (bootstrapServers == null) throw new NullPointerException("bootstrapServers == null"); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - return this; - } - - /** - * Maximum size of a message. Must be equal to or less than the server's "message.max.bytes". - * Default 500KB. - */ - public Builder messageMaxBytes(int messageMaxBytes) { - this.messageMaxBytes = messageMaxBytes; - properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, messageMaxBytes); - return this; - } - - /** - * By default, a producer will be created, targeted to {@link #bootstrapServers(String)} with 0 - * required {@link ProducerConfig#ACKS_CONFIG acks}. Any properties set here will affect the - * producer config. - * - *

For example: Reduce the timeout blocking from one minute to 5 seconds. - *

{@code
-     * Map overrides = new LinkedHashMap<>();
-     * overrides.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "5000");
-     * builder.overrides(overrides);
-     * }
- * - * @see ProducerConfig - */ - public Builder overrides(Map overrides) { - if (overrides == null) throw new NullPointerException("overrides == null"); - properties.putAll(overrides); - return this; - } - - /** - * Use this to change the encoding used in messages. Default is {@linkplain Encoding#JSON} - * - *

Note: If ultimately sending to Zipkin, version 2.8+ is required to process protobuf. - */ - public Builder encoding(Encoding encoding) { - if (encoding == null) throw new NullPointerException("encoding == null"); - this.encoding = encoding; - return this; - } - - - public KafkaSender build() { - return new KafkaSender(this); - } - } - - final Properties properties; - final String topic; - final Encoding encoding; - final BytesMessageEncoder encoder; - final int messageMaxBytes; - - KafkaSender(Builder builder) { - properties = new Properties(); - properties.putAll(builder.properties); - topic = builder.topic; - encoding = builder.encoding; - encoder = BytesMessageEncoder.forEncoding(builder.encoding); - messageMaxBytes = builder.messageMaxBytes; - } - - public Builder toBuilder() { - return new Builder(this); - } - - /** get and close are typically called from different threads */ - volatile KafkaProducer producer; - volatile boolean closeCalled; - - @Override public int messageSizeInBytes(List encodedSpans) { - return encoding().listSizeInBytes(encodedSpans); - } - - @Override public int messageSizeInBytes(int encodedSizeInBytes) { - return encoding.listSizeInBytes(encodedSizeInBytes); - } - - @Override public Encoding encoding() { - return encoding; - } - - @Override public int messageMaxBytes() { - return messageMaxBytes; - } - - /** - * This sends all of the spans as a single message. - * - *

NOTE: this blocks until the metadata server is available. - */ - @Override public Call sendSpans(List encodedSpans) { - if (closeCalled) throw new ClosedSenderException(); - byte[] message = encoder.encode(encodedSpans); - return new KafkaCall(message); - } - - /** Ensures there are no problems reading metadata about the topic. */ - @Override public CheckResult check() { - try { - get().partitionsFor(topic); // make sure we can query the metadata - return CheckResult.OK; - } catch (RuntimeException e) { - return CheckResult.failed(e); - } - } - - KafkaProducer get() { - if (producer == null) { - synchronized (this) { - if (producer == null) { - producer = new KafkaProducer<>(properties); - } - } - } - return producer; - } - - @Override public synchronized void close() { - if (closeCalled) return; - KafkaProducer producer = this.producer; - if (producer != null) producer.close(); - closeCalled = true; - } - - @Override public final String toString() { - return "KafkaSender{" - + "bootstrapServers=" + properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) - + ", topic=" + topic - + "}"; - } - - class KafkaCall extends Call.Base { // KafkaFuture is not cancelable - private final byte[] message; - - KafkaCall(byte[] message) { - this.message = message; - } - - @Override protected Void doExecute() { - AwaitableCallback callback = new AwaitableCallback(); - get().send(new ProducerRecord<>(topic, message), new CallbackAdapter(callback)); - callback.await(); - return null; - } - - @Override protected void doEnqueue(Callback callback) { - get().send(new ProducerRecord<>(topic, message), new CallbackAdapter(callback)); - } - - @Override public Call clone() { - return new KafkaCall(message); - } - } - - static final class CallbackAdapter implements org.apache.kafka.clients.producer.Callback { - final Callback delegate; - - CallbackAdapter(Callback delegate) { - this.delegate = delegate; - } - - @Override public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception == null) { - delegate.onSuccess(null); - } else { - delegate.onError(exception); - } - } - - @Override public String toString() { - return delegate.toString(); - } - } -} diff --git a/kafka08/src/test/java/zipkin2/reporter/kafka08/ITKafkaSender.java b/kafka08/src/test/java/zipkin2/reporter/kafka08/ITKafkaSender.java deleted file mode 100644 index 509a1ea3..00000000 --- a/kafka08/src/test/java/zipkin2/reporter/kafka08/ITKafkaSender.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright 2016-2020 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.kafka08; - -import com.github.charithe.kafka.KafkaJunitRule; -import java.lang.management.ManagementFactory; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeoutException; -import java.util.stream.Stream; -import javax.management.ObjectName; -import kafka.serializer.DefaultDecoder; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import zipkin2.Call; -import zipkin2.CheckResult; -import zipkin2.Span; -import zipkin2.codec.Encoding; -import zipkin2.codec.SpanBytesDecoder; -import zipkin2.codec.SpanBytesEncoder; -import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.Sender; - -import static java.util.stream.Collectors.toList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static zipkin2.TestObjects.CLIENT_SPAN; - -public class ITKafkaSender { - @Rule public KafkaJunitRule kafka = new KafkaJunitRule(); - - String bootstrapServers = "localhost:" + kafka.kafkaBrokerPort(); - KafkaSender sender = KafkaSender.create(bootstrapServers); - - @After - public void close() { - sender.close(); - } - - @Test - public void sendsSpans() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessages().get(0))) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); - } - - @Test - public void sendsSpans_PROTO3() throws Exception { - sender.close(); - sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessages().get(0))) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); - } - - @Test - public void sendsSpans_THRIFT() throws Exception { - sender.close(); - sender = sender.toBuilder().encoding(Encoding.THRIFT).build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessages().get(0))) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); - } - - @Test - public void sendsSpansToCorrectTopic() throws Exception { - sender.close(); - sender = sender.toBuilder().topic("customzipkintopic").build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessages("customzipkintopic").get(0))) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); - } - - @Test - public void checkFalseWhenKafkaIsDown() { - kafka.shutdownKafka(); - - // Make a new tracer that fails faster than 60 seconds - sender.close(); - Map overrides = new LinkedHashMap<>(); - overrides.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "100"); - sender = sender.toBuilder().overrides(overrides).build(); - - CheckResult check = sender.check(); - assertThat(check.ok()).isFalse(); - assertThat(check.error()).isInstanceOf(org.apache.kafka.common.errors.TimeoutException.class); - } - - @Test - public void illegalToSendWhenClosed() { - sender.close(); - - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN).execute()) - .isInstanceOf(IllegalStateException.class); - } - - @Test - public void shouldCloseKafkaProducerOnClose() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - final ObjectName kafkaProducerMXBeanName = new ObjectName("kafka.producer:*"); - final Set withProducers = - ManagementFactory.getPlatformMBeanServer().queryNames(kafkaProducerMXBeanName, null); - assertThat(withProducers).isNotEmpty(); - - sender.close(); - - final Set withNoProducers = - ManagementFactory.getPlatformMBeanServer().queryNames(kafkaProducerMXBeanName, null); - assertThat(withNoProducers).isEmpty(); - } - - /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. - */ - @Test - public void toStringContainsOnlySummaryInformation() { - assertThat(sender.toString()) - .isEqualTo("KafkaSender{bootstrapServers=" + bootstrapServers + ", topic=zipkin}"); - } - - Call send(Span... spans) { - SpanBytesEncoder bytesEncoder; - switch (sender.encoding()) { - case JSON: - bytesEncoder = SpanBytesEncoder.JSON_V2; - break; - case THRIFT: - bytesEncoder = SpanBytesEncoder.THRIFT; - break; - case PROTO3: - bytesEncoder = SpanBytesEncoder.PROTO3; - break; - default: - throw new UnsupportedOperationException("encoding: " + sender.encoding()); - } - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); - } - - private List readMessages(String topic) throws TimeoutException { - return kafka.readMessages(topic, 1, new DefaultDecoder(kafka.consumerConfig().props())); - } - - private List readMessages() throws TimeoutException { - return readMessages("zipkin"); - } -} diff --git a/pom.xml b/pom.xml index 01719445..243e40bf 100755 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,6 @@ bom core kafka - kafka08 amqp-client activemq-client urlconnection