Skip to content

Commit

Permalink
Updated Spark to 3.x, Scala to 2.12 and ElasticSearch to 7.13.x (#201)
Browse files Browse the repository at this point in the history
Used lowest major versions that works

Fixes #119

Signed-off-by: Wiktor Stasiak <[email protected]>
  • Loading branch information
WiktorS authored Dec 7, 2023
1 parent 4ee021b commit 602c47f
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 The OpenZipkin Authors
* Copyright 2016-2022 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -39,6 +39,7 @@
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
Expand Down Expand Up @@ -180,7 +181,8 @@ public void run() {
try {
JavaRDD<DependencyLink> links = flatMapToLinksByTraceId(
javaFunctions(sc).cassandraTable(keyspace, "span"), microsUpper, microsLower).values()
.mapToPair(l -> Tuple2.apply(Tuple2.apply(l.parent(), l.child()), l))
.mapToPair((PairFunction<DependencyLink, Tuple2<String, String>, DependencyLink>) l ->
new Tuple2<Tuple2<String, String>, DependencyLink>(new Tuple2<>(l.parent(), l.child()), l))
.reduceByKey((l, r) -> DependencyLink.newBuilder()
.parent(l.parent())
.child(l.child())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 The OpenZipkin Authors
* Copyright 2016-2022 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -14,16 +14,17 @@
package zipkin2.dependencies.cassandra3;

import com.datastax.spark.connector.japi.CassandraRow;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.FlatMapFunction;
import scala.Serializable;
import zipkin2.DependencyLink;
import zipkin2.Span;

final class CassandraRowsToDependencyLinks
implements Serializable, Function<Iterable<CassandraRow>, Iterable<DependencyLink>> {
implements Serializable, FlatMapFunction<Iterable<CassandraRow>, DependencyLink> {
private static final long serialVersionUID = 0L;

@Nullable final Runnable logInitializer;
Expand All @@ -34,7 +35,7 @@ final class CassandraRowsToDependencyLinks
this.spansToDependencyLinks = new SpansToDependencyLinks(logInitializer, startTs, endTs);
}

@Override public Iterable<DependencyLink> call(Iterable<CassandraRow> rows) {
@Override public Iterator<DependencyLink> call(Iterable<CassandraRow> rows) {
if (logInitializer != null) logInitializer.run();
// use a hash set to dedupe any redundantly accepted spans
Set<Span> sameTraceId = new LinkedHashSet<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 The OpenZipkin Authors
* Copyright 2016-2022 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -15,16 +15,17 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.FlatMapFunction;
import scala.Serializable;
import zipkin2.DependencyLink;
import zipkin2.Span;
import zipkin2.internal.DependencyLinker;

final class SpansToDependencyLinks
implements Serializable, Function<Iterable<Span>, Iterable<DependencyLink>> {
implements Serializable, FlatMapFunction<Iterable<Span>, DependencyLink> {
private static final long serialVersionUID = 0L;

@Nullable final Runnable logInitializer;
Expand All @@ -37,19 +38,19 @@ final class SpansToDependencyLinks
this.endTs = endTs;
}

@Override public Iterable<DependencyLink> call(Iterable<Span> spans) {
@Override public Iterator<DependencyLink> call(Iterable<Span> spans) {
if (logInitializer != null) logInitializer.run();
List<Span> sameTraceId = new ArrayList<>();
for (Span span : spans) {
// check to see if the trace is within the interval
if (span.parentId() == null) {
long timestamp = span.timestampAsLong();
if (timestamp == 0 || timestamp < startTs || timestamp > endTs) {
return Collections.emptyList();
return Collections.emptyIterator();
}
}
sameTraceId.add(span);
}
return new DependencyLinker().putTrace(sameTraceId).link();
return new DependencyLinker().putTrace(sameTraceId).link().iterator();
}
}
6 changes: 3 additions & 3 deletions elasticsearch/pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2016-2021 The OpenZipkin Authors
Copyright 2016-2022 The OpenZipkin Authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -34,8 +34,8 @@
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_${scala.binary.version}</artifactId>
<version>7.10.1</version>
<artifactId>elasticsearch-spark-30_${scala.binary.version}</artifactId>
<version>7.13.4</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 The OpenZipkin Authors
* Copyright 2016-2022 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -30,6 +30,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -189,7 +190,8 @@ void run(String spanResource, String dependencyLinkResource, SpanBytesDecoder de
.groupBy(JSON_TRACE_ID)
.flatMapValues(new TraceIdAndJsonToDependencyLinks(logInitializer, decoder))
.values()
.mapToPair(l -> Tuple2.apply(Tuple2.apply(l.parent(), l.child()), l))
.mapToPair((PairFunction<DependencyLink, Tuple2<String, String>, DependencyLink>) l ->
new Tuple2<Tuple2<String, String>, DependencyLink>(new Tuple2<>(l.parent(), l.child()), l))
.reduceByKey((l, r) -> DependencyLink.newBuilder()
.parent(l.parent())
.child(l.child())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 The OpenZipkin Authors
* Copyright 2016-2022 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -14,9 +14,10 @@
package zipkin2.dependencies.elasticsearch;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Serializable;
Expand All @@ -27,7 +28,7 @@
import zipkin2.internal.DependencyLinker;

final class TraceIdAndJsonToDependencyLinks
implements Serializable, Function<Iterable<Tuple2<String, String>>, Iterable<DependencyLink>> {
implements Serializable, FlatMapFunction<Iterable<Tuple2<String, String>>, DependencyLink> {
private static final long serialVersionUID = 0L;
private static final Logger log = LoggerFactory.getLogger(TraceIdAndJsonToDependencyLinks.class);

Expand All @@ -40,7 +41,7 @@ final class TraceIdAndJsonToDependencyLinks
}

@Override
public Iterable<DependencyLink> call(Iterable<Tuple2<String, String>> traceIdJson) {
public Iterator<DependencyLink> call(Iterable<Tuple2<String, String>> traceIdJson) {
if (logInitializer != null) logInitializer.run();
List<Span> sameTraceId = new ArrayList<>();
for (Tuple2<String, String> row : traceIdJson) {
Expand All @@ -52,6 +53,6 @@ public Iterable<DependencyLink> call(Iterable<Tuple2<String, String>> traceIdJso
}
DependencyLinker linker = new DependencyLinker();
linker.putTrace(sameTraceId);
return linker.link();
return linker.link().iterator();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 The OpenZipkin Authors
* Copyright 2016-2022 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -18,15 +18,15 @@
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Row;
import scala.Serializable;
import zipkin2.DependencyLink;
import zipkin2.Span;
import zipkin2.internal.DependencyLinker;

final class RowsToDependencyLinks
implements Serializable, Function<Iterable<Row>, Iterable<DependencyLink>> {
implements Serializable, FlatMapFunction<Iterable<Row>, DependencyLink> {
private static final long serialVersionUID = 0L;

@Nullable final Runnable logInitializer;
Expand All @@ -37,12 +37,12 @@ final class RowsToDependencyLinks
this.hasTraceIdHigh = hasTraceIdHigh;
}

@Override public Iterable<DependencyLink> call(Iterable<Row> rows) {
@Override public Iterator<DependencyLink> call(Iterable<Row> rows) {
if (logInitializer != null) logInitializer.run();
Iterator<Iterator<Span>> traces =
new DependencyLinkSpanIterator.ByTraceId(rows.iterator(), hasTraceIdHigh);

if (!traces.hasNext()) return Collections.emptyList();
if (!traces.hasNext()) return Collections.emptyIterator();

DependencyLinker linker = new DependencyLinker();
List<Span> nextTrace = new ArrayList<>();
Expand All @@ -52,6 +52,6 @@ final class RowsToDependencyLinks
linker.putTrace(nextTrace);
nextTrace.clear();
}
return linker.link();
return linker.link().iterator();
}
}
21 changes: 14 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2016-2021 The OpenZipkin Authors
Copyright 2016-2022 The OpenZipkin Authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -44,9 +44,8 @@
<errorprone.version>2.4.0</errorprone.version>
<javac.version>9+181-r4173-1</javac.version>

<!-- can't switch to 2.12 until elasticsearch-spark updates -->
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.7</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.3</spark.version>

<!-- When updating, also update IT*.java -->
<zipkin.version>2.23.2</zipkin.version>
Expand Down Expand Up @@ -378,10 +377,8 @@
<!-- Enforcement is only needed when running tests -->
<skip>${skipTests}</skip>
<rules>
<!-- Spark doesn't support Java 9+ until v3.0
https://issues.apache.org/jira/browse/SPARK-24421 -->
<requireJavaVersion>
<version>[1.8,9)</version>
<version>[1.8,12)</version>
</requireJavaVersion>
</rules>
</configuration>
Expand Down Expand Up @@ -601,5 +598,15 @@
</plugins>
</build>
</profile>

<profile>
<id>java-8-api</id>
<activation>
<jdk>[9,)</jdk>
</activation>
<properties>
<maven.compiler.release>8</maven.compiler.release>
</properties>
</profile>
</profiles>
</project>

0 comments on commit 602c47f

Please sign in to comment.