Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
findingrish committed May 17, 2024
2 parents 81dc604 + ed9881d commit 4a63cc8
Show file tree
Hide file tree
Showing 191 changed files with 7,230 additions and 1,871 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
dist
longCompress
target
*.iml
*.ipr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -47,6 +49,7 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.nested.StructuredData;
import org.apache.druid.timeline.SegmentId;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -82,6 +85,7 @@ public class FrameChannelMergerBenchmark
{
static {
NullHandling.initializeForTests();
NestedDataModule.registerHandlersAndSerde();
}

private static final String KEY = "key";
Expand All @@ -99,6 +103,9 @@ public class FrameChannelMergerBenchmark
@Param({"100"})
private int rowLength;

@Param({"string", "nested"})
private String columnType;

/**
* Linked to {@link KeyGenerator}.
*/
Expand All @@ -121,13 +128,20 @@ enum KeyGenerator
*/
RANDOM {
@Override
public String generateKey(int rowNumber, int keyLength)
public Comparable generateKey(int rowNumber, int keyLength, String columnType)
{
final StringBuilder builder = new StringBuilder(keyLength);
for (int i = 0; i < keyLength; i++) {
builder.append((char) ('a' + ThreadLocalRandom.current().nextInt(26)));
}
return builder.toString();
String str = builder.toString();
if ("string".equals(columnType)) {
return str;
} else if ("nested".equals(columnType)) {
return StructuredData.wrap(str);
} else {
throw new IAE("unsupported column type");
}
}
},

Expand All @@ -136,13 +150,20 @@ public String generateKey(int rowNumber, int keyLength)
*/
SEQUENTIAL {
@Override
public String generateKey(int rowNumber, int keyLength)
public Comparable generateKey(int rowNumber, int keyLength, String columnType)
{
return StringUtils.format("%0" + keyLength + "d", rowNumber);
String str = StringUtils.format("%0" + keyLength + "d", rowNumber);
if ("string".equals(columnType)) {
return str;
} else if ("nested".equals(columnType)) {
return StructuredData.wrap(str);
} else {
throw new IAE("unsupported column type");
}
}
};

public abstract String generateKey(int rowNumber, int keyLength);
public abstract Comparable generateKey(int rowNumber, int keyLength, String columnType);
}

/**
Expand Down Expand Up @@ -176,13 +197,9 @@ public int getChannelNumber(int rowNumber, int numRows, int numChannels)
public abstract int getChannelNumber(int rowNumber, int numRows, int numChannels);
}

private final RowSignature signature =
RowSignature.builder()
.add(KEY, ColumnType.STRING)
.add(VALUE, ColumnType.STRING)
.build();
private RowSignature signature;
private FrameReader frameReader;

private final FrameReader frameReader = FrameReader.create(signature);
private final List<KeyColumn> sortKey = ImmutableList.of(new KeyColumn(KEY, KeyOrder.ASCENDING));

private List<List<Frame>> channelFrames;
Expand All @@ -200,6 +217,14 @@ public int getChannelNumber(int rowNumber, int numRows, int numChannels)
@Setup(Level.Trial)
public void setupTrial()
{
signature =
RowSignature.builder()
.add(KEY, createKeyColumnTypeFromTypeString(columnType))
.add(VALUE, ColumnType.STRING)
.build();

frameReader = FrameReader.create(signature);

exec = new FrameProcessorExecutor(
MoreExecutors.listeningDecorator(
Execs.singleThreaded(StringUtils.encodeForFormat(getClass().getSimpleName()))
Expand All @@ -211,14 +236,15 @@ public void setupTrial()
ChannelDistribution.valueOf(StringUtils.toUpperCase(channelDistributionString));

// Create channelRows which holds rows for each channel.
final List<List<NonnullPair<String, String>>> channelRows = new ArrayList<>();
final List<List<NonnullPair<Comparable, String>>> channelRows = new ArrayList<>();
channelFrames = new ArrayList<>();
for (int channelNumber = 0; channelNumber < numChannels; channelNumber++) {
channelRows.add(new ArrayList<>());
channelFrames.add(new ArrayList<>());
}

// Create "valueString", a string full of spaces to pad out the row.
// Create "valueString", a string full of spaces to pad out the row. Nested columns wrap up strings with the
// corresponding keyLength, therefore the padding works out for the nested types as well.
final StringBuilder valueStringBuilder = new StringBuilder();
for (int i = 0; i < rowLength - keyLength; i++) {
valueStringBuilder.append(' ');
Expand All @@ -227,20 +253,20 @@ public void setupTrial()

// Populate "channelRows".
for (int rowNumber = 0; rowNumber < numRows; rowNumber++) {
final String keyString = keyGenerator.generateKey(rowNumber, keyLength);
final NonnullPair<String, String> row = new NonnullPair<>(keyString, valueString);
final Comparable keyObject = keyGenerator.generateKey(rowNumber, keyLength, columnType);
final NonnullPair<Comparable, String> row = new NonnullPair<>(keyObject, valueString);
channelRows.get(channelDistribution.getChannelNumber(rowNumber, numRows, numChannels)).add(row);
}

// Sort each "channelRows".
for (List<NonnullPair<String, String>> rows : channelRows) {
for (List<NonnullPair<Comparable, String>> rows : channelRows) {
rows.sort(Comparator.comparing(row -> row.lhs));
}

// Populate each "channelFrames".
for (int channelNumber = 0; channelNumber < numChannels; channelNumber++) {
final List<NonnullPair<String, String>> rows = channelRows.get(channelNumber);
final RowBasedSegment<NonnullPair<String, String>> segment =
final List<NonnullPair<Comparable, String>> rows = channelRows.get(channelNumber);
final RowBasedSegment<NonnullPair<Comparable, String>> segment =
new RowBasedSegment<>(
SegmentId.dummy("__dummy"),
Sequences.simple(rows),
Expand Down Expand Up @@ -350,4 +376,14 @@ public void mergeChannels(Blackhole blackhole)
throw new ISE("Incorrect numRows[%s], expected[%s]", FutureUtils.getUncheckedImmediately(retVal), numRows);
}
}

private ColumnType createKeyColumnTypeFromTypeString(final String columnTypeString)
{
if ("string".equals(columnTypeString)) {
return ColumnType.STRING;
} else if ("nested".equals(columnTypeString)) {
return ColumnType.NESTED_DATA;
}
throw new IAE("Unsupported type [%s]", columnTypeString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,27 @@ public void planNotEquals(Blackhole blackhole)
blackhole.consume(plannerResult);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void planEqualsInsideAndOutsideCase(Blackhole blackhole)
{
final String sql = StringUtils.format(
"SELECT COUNT(*) FROM foo\n"
+ "WHERE\n"
+ " CASE WHEN LOOKUP(dimZipf, 'benchmark-lookup', 'N/A') = '%s'\n"
+ " THEN NULL\n"
+ " ELSE LOOKUP(dimZipf, 'benchmark-lookup', 'N/A')\n"
+ " END IN ('%s', '%s', '%s')",
LookupBenchmarkUtil.makeKeyOrValue(0),
LookupBenchmarkUtil.makeKeyOrValue(1),
LookupBenchmarkUtil.makeKeyOrValue(2),
LookupBenchmarkUtil.makeKeyOrValue(3)
);
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sql, ImmutableMap.of())) {
final PlannerResult plannerResult = planner.plan();
blackhole.consume(plannerResult);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
Expand All @@ -36,6 +37,7 @@
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
Expand Down Expand Up @@ -204,7 +206,7 @@ public void setup() throws JsonProcessingException
);

String prefix = ("explain plan for select long1 from foo where long1 in ");
final String sql = createQuery(prefix, inClauseLiteralsCount);
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.LONG);

final Sequence<Object[]> resultSequence = getPlan(sql, null);
final Object[] planResult = resultSequence.toList().get(0);
Expand All @@ -222,12 +224,13 @@ public void tearDown() throws Exception
closer.close();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryInSql(Blackhole blackhole)
{
String prefix = "explain plan for select long1 from foo where long1 in ";
final String sql = createQuery(prefix, inClauseLiteralsCount);
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.LONG);
getPlan(sql, blackhole);
}

Expand All @@ -238,7 +241,7 @@ public void queryEqualOrInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select long1 from foo where string1 = '7' or long1 in ";
final String sql = createQuery(prefix, inClauseLiteralsCount);
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.LONG);
getPlan(sql, blackhole);
}

Expand All @@ -250,28 +253,74 @@ public void queryMultiEqualOrInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select long1 from foo where string1 = '7' or string1 = '8' or long1 in ";
final String sql = createQuery(prefix, inClauseLiteralsCount);
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.LONG);
getPlan(sql, blackhole);
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryJoinEqualOrInSql(Blackhole blackhole)
public void queryStringFunctionInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select count(*) from foo where long1 = 8 or lower(string1) in ";
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.STRING);
getPlan(sql, blackhole);
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryStringFunctionIsNotNullAndNotInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select count(*) from foo where long1 = 8 and lower(string1) is not null and lower(string1) not in ";
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.STRING);
getPlan(sql, blackhole);
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryStringFunctionIsNullOrInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select count(*) from foo where long1 = 8 and (lower(string1) is null or lower(string1) in ";
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.STRING) + ')';
getPlan(sql, blackhole);
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryJoinEqualOrInSql(Blackhole blackhole)
{
String prefix =
"explain plan for select foo.long1, fooright.string1 from foo inner join foo as fooright on foo.string1 = fooright.string1 where fooright.string1 = '7' or foo.long1 in ";
final String sql = createQuery(prefix, inClauseLiteralsCount);
final String sql = createQuery(prefix, inClauseLiteralsCount, ValueType.LONG);
getPlan(sql, blackhole);
}

private String createQuery(String prefix, int inClauseLiteralsCount)
private String createQuery(String prefix, int inClauseLiteralsCount, ValueType type)
{
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append(prefix).append('(');
IntStream.range(1, inClauseLiteralsCount - 1).forEach(i -> sqlBuilder.append(i).append(","));
sqlBuilder.append(inClauseLiteralsCount).append(")");
IntStream.range(1, inClauseLiteralsCount + 1).forEach(
i -> {
if (i > 1) {
sqlBuilder.append(',');
}

if (type == ValueType.LONG) {
sqlBuilder.append(i);
} else if (type == ValueType.STRING) {
sqlBuilder.append("'").append(i).append("'");
} else {
throw new ISE("Cannot generate IN with type[%s]", type);
}
}
);
sqlBuilder.append(")");
return sqlBuilder.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ public void setup()
schemaInfo,
DimensionsSpec.builder().setDimensions(columnSchemas).build(),
TransformSpec.NONE,
IndexSpec.DEFAULT,
IndexSpec.builder().withStringDictionaryEncoding(getStringEncodingStrategy()).build(),
Granularities.NONE,
rowsPerSegment
);
Expand Down
1 change: 1 addition & 0 deletions codestyle/druid-forbidden-apis.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ org.apache.calcite.sql.type.OperandTypes#NULLABLE_LITERAL @ Create an instance o
org.apache.commons.io.FileUtils#getTempDirectory() @ Use org.junit.rules.TemporaryFolder for tests instead
org.apache.commons.io.FileUtils#deleteDirectory(java.io.File) @ Use org.apache.druid.java.util.common.FileUtils#deleteDirectory()
org.apache.commons.io.FileUtils#forceMkdir(java.io.File) @ Use org.apache.druid.java.util.common.FileUtils.mkdirp instead
org.apache.datasketches.memory.Memory#wrap(byte[], int, int, java.nio.ByteOrder) @ The implementation isn't correct in datasketches-memory-2.2.0. Please refer to https://github.com/apache/datasketches-memory/issues/178. Use wrap(byte[]) and modify the offset by the callers instead
java.lang.Class#getCanonicalName() @ Class.getCanonicalName can return null for anonymous types, use Class.getName instead.
java.util.concurrent.Executors#newFixedThreadPool(int) @ Executor is non-daemon and can prevent JVM shutdown, use org.apache.druid.java.util.common.concurrent.Execs#multiThreaded(int, java.lang.String) instead.

Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions distribution/docker/druid.sh
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ then
setKey _common druid.zk.service.host "${ZOOKEEPER}"
fi

DRUID_SET_HOST=${DRUID_SET_HOST:-1}
if [ "${DRUID_SET_HOST}" = "1" ]
DRUID_SET_HOST_IP=${DRUID_SET_HOST_IP:-0}
if [ "${DRUID_SET_HOST_IP}" = "1" ]
then
setKey $SERVICE druid.host $(ip r get 1 | awk '{print $7;exit}')
fi
Expand Down
4 changes: 2 additions & 2 deletions distribution/docker/peon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ then
setKey _common druid.zk.service.host "${ZOOKEEPER}"
fi

DRUID_SET_HOST=${DRUID_SET_HOST:-1}
if [ "${DRUID_SET_HOST}" = "1" ]
DRUID_SET_HOST_IP=${DRUID_SET_HOST_IP:-0}
if [ "${DRUID_SET_HOST_IP}" = "1" ]
then
setKey $SERVICE druid.host $(ip r get 1 | awk '{print $7;exit}')
fi
Expand Down
Loading

0 comments on commit 4a63cc8

Please sign in to comment.