Skip to content

Commit

Permalink
Add benchmark suite for MSQ window functions (apache#17377)
Browse files Browse the repository at this point in the history
* Add benchmark suite for MSQ window functions

* Fix inspection checks

* Address review comment: Rename method
  • Loading branch information
Akshat-Jain authored Oct 30, 2024
1 parent 63c91ad commit 21e7e5c
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 9 deletions.
17 changes: 17 additions & 0 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@
<version>${jmh.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
Expand Down Expand Up @@ -217,6 +227,13 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-multi-stage-query</artifactId>
<version>${project.parent.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>

<properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.benchmark.query;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.msq.test.ExtractResultsFactory;
import org.apache.druid.msq.test.MSQTestOverlordServiceClient;
import org.apache.druid.msq.test.StandardMSQComponentSupplier;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.util.TestDataBuilder;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.lang.annotation.Annotation;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Benchmark that tests various SQL queries with window functions against MSQ engine.
*/
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(value = 1)
@Warmup(iterations = 1)
@Measurement(iterations = 5)
@SqlTestFrameworkConfig.ComponentSupplier(MSQWindowFunctionsBenchmark.MSQComponentSupplier.class)
public class MSQWindowFunctionsBenchmark extends BaseCalciteQueryTest
{
static {
NullHandling.initializeForTests();
}

private static final Logger log = new Logger(MSQWindowFunctionsBenchmark.class);
private final Closer closer = Closer.create();

@Param({"20000000"})
private int rowsPerSegment;

@Param({"2", "5"})
private int maxNumTasks;

private List<Annotation> annotations;

@Setup(Level.Trial)
public void setup()
{
annotations = Arrays.asList(MSQWindowFunctionsBenchmark.class.getAnnotations());

// Populate the QueryableIndex for the benchmark datasource.
TestDataBuilder.makeQueryableIndexForBenchmarkDatasource(closer, rowsPerSegment);
}

@TearDown(Level.Trial)
public void tearDown() throws Exception
{
closer.close();
}

@Benchmark
public void windowWithoutGroupBy(Blackhole blackhole)
{
String sql = "SELECT ROW_NUMBER() "
+ "OVER (PARTITION BY dimUniform ORDER BY dimSequential) "
+ "FROM benchmark_ds";
querySql(sql, blackhole);
}

@Benchmark
public void windowWithoutSorting(Blackhole blackhole)
{
String sql = "SELECT dimZipf, dimSequential,"
+ "ROW_NUMBER() "
+ "OVER (PARTITION BY dimZipf) "
+ "from benchmark_ds\n"
+ "group by dimZipf, dimSequential";
querySql(sql, blackhole);
}

@Benchmark
public void windowWithSorting(Blackhole blackhole)
{
String sql = "SELECT dimZipf, dimSequential,"
+ "ROW_NUMBER() "
+ "OVER (PARTITION BY dimZipf ORDER BY dimSequential) "
+ "from benchmark_ds\n"
+ "group by dimZipf, dimSequential";
querySql(sql, blackhole);
}

@Benchmark
public void windowWithHighCardinalityPartitionBy(Blackhole blackhole)
{
String sql = "select\n"
+ "__time,\n"
+ "row_number() over (partition by __time) as c1\n"
+ "from benchmark_ds\n"
+ "group by __time";
querySql(sql, blackhole);
}

@Benchmark
public void windowWithLowCardinalityPartitionBy(Blackhole blackhole)
{
String sql = "select\n"
+ "dimZipf,\n"
+ "row_number() over (partition by dimZipf) as c1\n"
+ "from benchmark_ds\n"
+ "group by dimZipf";
querySql(sql, blackhole);
}

@Benchmark
public void multipleWindows(Blackhole blackhole)
{
String sql = "select\n"
+ "dimZipf, dimSequential, minFloatZipf,\n"
+ "row_number() over (partition by dimSequential order by minFloatZipf) as c1,\n"
+ "row_number() over (partition by dimZipf order by minFloatZipf) as c2,\n"
+ "row_number() over (partition by minFloatZipf order by minFloatZipf) as c3,\n"
+ "row_number() over (partition by dimSequential, dimZipf order by minFloatZipf, dimSequential) as c4,\n"
+ "row_number() over (partition by minFloatZipf, dimZipf order by dimSequential) as c5,\n"
+ "row_number() over (partition by minFloatZipf, dimSequential order by dimZipf) as c6,\n"
+ "row_number() over (partition by dimSequential, minFloatZipf, dimZipf order by dimZipf, minFloatZipf) as c7,\n"
+ "row_number() over (partition by dimSequential, minFloatZipf, dimZipf order by minFloatZipf) as c8\n"
+ "from benchmark_ds\n"
+ "group by dimZipf, dimSequential, minFloatZipf";
querySql(sql, blackhole);
}

public void querySql(String sql, Blackhole blackhole)
{
final Map<String, Object> context = ImmutableMap.of(
MultiStageQueryContext.CTX_MAX_NUM_TASKS, maxNumTasks
);
CalciteTestConfig calciteTestConfig = createCalciteTestConfig();
QueryTestBuilder queryTestBuilder = new QueryTestBuilder(calciteTestConfig)
.addCustomRunner(
new ExtractResultsFactory(() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient())
);

queryFrameworkRule.setConfig(new SqlTestFrameworkConfig(annotations));
final List<Object[]> resultList = queryTestBuilder
.skipVectorize(true)
.queryContext(context)
.sql(sql)
.results()
.results;

if (!resultList.isEmpty()) {
log.info("Total number of rows returned by query: %d", resultList.size());
Object[] lastRow = resultList.get(resultList.size() - 1);
blackhole.consume(lastRow);
} else {
log.info("No rows returned by the query.");
}
}

protected static class MSQComponentSupplier extends StandardMSQComponentSupplier
{
public MSQComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer);
}

@Override
public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker(
QueryRunnerFactoryConglomerate conglomerate,
JoinableFactoryWrapper joinableFactory,
Injector injector
)
{
final SpecificSegmentsQuerySegmentWalker retVal = super.createQuerySegmentWalker(
conglomerate,
joinableFactory,
injector);
TestDataBuilder.attachIndexesForBenchmarkDatasource(retVal);
return retVal;
}
}
}
11 changes: 11 additions & 0 deletions extensions-core/multi-stage-query/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,17 @@
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ protected static Supplier<ResourceHolder<CompleteSegment>> getSupplierForSegment
case CalciteTests.T_ALL_TYPE_PARQUET:
index = TestDataBuilder.getQueryableIndexForDrillDatasource(segmentId.getDataSource(), tempFolderProducer.apply("tmpDir"));
break;
case CalciteTests.BENCHMARK_DATASOURCE:
index = TestDataBuilder.getQueryableIndexForBenchmarkDatasource();
break;
default:
throw new ISE("Cannot query segment %s in test runner", segmentId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.druid.msq.indexing.report.MSQResultsReport.ColumnAndType;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.QueryTestRunner;
import org.junit.Assert;
Expand Down Expand Up @@ -57,7 +56,6 @@ public QueryTestRunner.QueryRunStep make(QueryTestBuilder builder, QueryTestRunn
return new QueryTestRunner.BaseExecuteQuery(builder)
{
final List<QueryTestRunner.QueryResults> extractedResults = new ArrayList<>();
final RowSignature resultsSignature = null;

final MSQTestOverlordServiceClient overlordClient = overlordClientSupplier.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;

public final class StandardMSQComponentSupplier extends StandardComponentSupplier
public class StandardMSQComponentSupplier extends StandardComponentSupplier
{
public StandardMSQComponentSupplier(TempDirProducer tempFolderProducer)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ protected static DruidExceptionMatcher invalidSqlContains(String s)
}

@RegisterExtension
static SqlTestFrameworkConfig.Rule queryFrameworkRule = new SqlTestFrameworkConfig.Rule();
protected static SqlTestFrameworkConfig.Rule queryFrameworkRule = new SqlTestFrameworkConfig.Rule();

public SqlTestFramework queryFramework()
{
Expand Down Expand Up @@ -896,6 +896,11 @@ protected QueryTestBuilder testBuilder()
.skipVectorize(skipVectorize);
}

public CalciteTestConfig createCalciteTestConfig()
{
return new CalciteTestConfig();
}

public class CalciteTestConfig implements QueryTestBuilder.QueryTestConfig
{
private boolean isRunningMSQ = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,8 +772,11 @@ public void run()

public QueryResults resultsOnly()
{
ExecuteQuery execStep = (ExecuteQuery) runSteps.get(0);
execStep.run();
for (QueryRunStep runStep : runSteps) {
runStep.run();
}

BaseExecuteQuery execStep = (BaseExecuteQuery) runSteps.get(runSteps.size() - 1);
return execStep.results().get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,16 +298,21 @@ public void afterAll(ExtensionContext context)
@Override
public void beforeEach(ExtensionContext context)
{
setConfig(context);
makeConfigFromContext(context);
}

private void setConfig(ExtensionContext context)
public void makeConfigFromContext(ExtensionContext context)
{
testName = buildTestCaseName(context);
method = context.getTestMethod().get();
Class<?> testClass = context.getTestClass().get();
List<Annotation> annotations = collectAnnotations(testClass, method);
config = new SqlTestFrameworkConfig(annotations);
setConfig(new SqlTestFrameworkConfig(annotations));
}

public void setConfig(SqlTestFrameworkConfig config)
{
this.config = config;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public class CalciteTests
public static final String ALL_TYPES_UNIQ_PARQUET = "allTypsUniq.parquet";
public static final String FEW_ROWS_ALL_DATA_PARQUET = "fewRowsAllData.parquet";
public static final String T_ALL_TYPE_PARQUET = "t_alltype.parquet";
public static final String BENCHMARK_DATASOURCE = "benchmark_ds";

public static final String TEST_SUPERUSER_NAME = "testSuperuser";
public static final AuthorizerMapper TEST_AUTHORIZER_MAPPER = new AuthorizerMapper(null)
Expand Down
Loading

0 comments on commit 21e7e5c

Please sign in to comment.