From e01ff822c912b5185298ad2321b83c172ac09fc8 Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Fri, 18 Oct 2024 14:41:01 -0700
Subject: [PATCH 1/4] Add a new BrokerClient in the sql package and add class
for Explain Plan.
- Add BrokerClient that leverages the ServiceClient functionality. Add a couple of
client APIs that are useful to the scheduled batch supervisor implementation.
- Add ExplainPlanInformation class that contains information about a single plan.
The BrokerClient leverages this.
---
.../apache/druid/discovery/BrokerClient.java | 4 +
.../druid/rpc/guice/ServiceClientModule.java | 2 +-
.../calcite/planner/ExplainAttributes.java | 28 ++++
.../org/apache/druid/sql/client/Broker.java | 34 ++++
.../apache/druid/sql/client/BrokerClient.java | 51 ++++++
.../druid/sql/client/BrokerClientImpl.java | 84 ++++++++++
.../druid/sql/guice/BrokerServiceModule.java | 74 +++++++++
.../sql/http/ExplainPlanInformation.java | 112 +++++++++++++
.../sql/client/BrokerClientImplTest.java | 148 ++++++++++++++++++
.../sql/http/ExplainPlanInformationTest.java | 122 +++++++++++++++
10 files changed, 658 insertions(+), 1 deletion(-)
create mode 100644 sql/src/main/java/org/apache/druid/sql/client/Broker.java
create mode 100644 sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
create mode 100644 sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java
create mode 100644 sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java
create mode 100644 sql/src/main/java/org/apache/druid/sql/http/ExplainPlanInformation.java
create mode 100644 sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
create mode 100644 sql/src/test/java/org/apache/druid/sql/http/ExplainPlanInformationTest.java
diff --git a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java b/server/src/main/java/org/apache/druid/discovery/BrokerClient.java
index bdee9b8dfe4a..a0ddbf42bed8 100644
--- a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/BrokerClient.java
@@ -29,6 +29,7 @@
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.rpc.ServiceClient;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@@ -41,7 +42,10 @@
/**
* This class facilitates interaction with Broker.
+ * Note that this should be removed and reconciled with org.apache.druid.sql.client.BrokerClient, which has the
+ * built-in functionality of {@link ServiceClient}, and proper Guice and service discovery wired in.
*/
+@Deprecated
public class BrokerClient
{
private static final int MAX_RETRIES = 5;
diff --git a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
index 51dd2b89d736..80cf59e42d94 100644
--- a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
+++ b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
@@ -48,7 +48,7 @@
public class ServiceClientModule implements DruidModule
{
private static final int CONNECT_EXEC_THREADS = 4;
- private static final int CLIENT_MAX_ATTEMPTS = 6;
+ protected static final int CLIENT_MAX_ATTEMPTS = 6;
@Override
public void configure(Binder binder)
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
index e2ae4fa7a10c..23852846fce2 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
@@ -19,12 +19,14 @@
package org.apache.druid.sql.calcite.planner;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.granularity.Granularity;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Objects;
/**
* ExplainAttributes holds the attributes of a SQL statement that is used in the EXPLAIN PLAN result.
@@ -45,6 +47,7 @@ public final class ExplainAttributes
@Nullable
private final String replaceTimeChunks;
+ @JsonCreator
public ExplainAttributes(
@JsonProperty("statementType") final String statementType,
@JsonProperty("targetDataSource") @Nullable final String targetDataSource,
@@ -117,6 +120,31 @@ public String getReplaceTimeChunks()
return replaceTimeChunks;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExplainAttributes that = (ExplainAttributes) o;
+ return Objects.equals(statementType, that.statementType) && Objects.equals(
+ targetDataSource,
+ that.targetDataSource
+ ) && Objects.equals(partitionedBy, that.partitionedBy) && Objects.equals(
+ clusteredBy,
+ that.clusteredBy
+ ) && Objects.equals(replaceTimeChunks, that.replaceTimeChunks);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(statementType, targetDataSource, partitionedBy, clusteredBy, replaceTimeChunks);
+ }
+
@Override
public String toString()
{
diff --git a/sql/src/main/java/org/apache/druid/sql/client/Broker.java b/sql/src/main/java/org/apache/druid/sql/client/Broker.java
new file mode 100644
index 000000000000..fb20c5166c8f
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/client/Broker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@BindingAnnotation
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Broker
+{
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java b/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
new file mode 100644
index 000000000000..7df744929fee
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.sql.http.ExplainPlanInformation;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlTaskStatus;
+
+import java.util.List;
+
+/**
+ * High-level Broker client.
+ *
+ * All methods return futures, enabling asynchronous logic. If you want a synchronous response, use
+ * {@code FutureUtils.get} or {@code FutureUtils.getUnchecked}.
+ * Futures resolve to exceptions in the manner described by {@link org.apache.druid.rpc.ServiceClient#asyncRequest}.
+ *
+ * Typically acquired via Guice, where it is registered using {@link org.apache.druid.rpc.guice.ServiceClientModule}.
+ */
+public interface BrokerClient
+{
+ /**
+ * Submit the given {@code sqlQuery} to the Broker's SQL task endpoint.
+ */
+ ListenableFuture submitSqlTask(SqlQuery sqlQuery);
+
+ /**
+ * Fetches the explain plan information for the given {@code sqlQuery}.
+ *
+ * @param sqlQuery the SQL query for which the {@code EXPLAIN PLAN FOR} information is to be fetched
+ */
+ ListenableFuture> fetchExplainPlanInformation(SqlQuery sqlQuery);
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java b/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java
new file mode 100644
index 000000000000..74d1ee56eb74
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/client/BrokerClientImpl.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.sql.http.ExplainPlanInformation;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlTaskStatus;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.util.List;
+
+public class BrokerClientImpl implements BrokerClient
+{
+ private final ServiceClient client;
+ private final ObjectMapper jsonMapper;
+
+ public BrokerClientImpl(final ServiceClient client, final ObjectMapper jsonMapper)
+ {
+ this.client = client;
+ this.jsonMapper = jsonMapper;
+ }
+
+ @Override
+ public ListenableFuture submitSqlTask(final SqlQuery sqlQuery)
+ {
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+ .jsonContent(jsonMapper, sqlQuery),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), SqlTaskStatus.class)
+ );
+ }
+
+ @Override
+ public ListenableFuture> fetchExplainPlanInformation(final SqlQuery sqlQuery)
+ {
+ final SqlQuery explainSqlQuery = new SqlQuery(
+ StringUtils.format("EXPLAIN PLAN FOR %s", sqlQuery.getQuery()),
+ null,
+ false,
+ false,
+ false,
+ null,
+ null
+ );
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+ .jsonContent(jsonMapper, explainSqlQuery),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference>() {})
+ );
+ }
+}
+
diff --git a/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java b/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java
new file mode 100644
index 000000000000..1e280f123cf1
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/guice/BrokerServiceModule.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.guice;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Provides;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.rpc.DiscoveryServiceLocator;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.rpc.guice.ServiceClientModule;
+import org.apache.druid.sql.client.Broker;
+import org.apache.druid.sql.client.BrokerClient;
+import org.apache.druid.sql.client.BrokerClientImpl;
+
+/**
+ * Module that processes can bind to if they require a {@link BrokerClient}.
+ *
+ * This extend {@link ServiceClientModule} because the {@link BrokerClient} requires
+ * classes present in the sql module.
+ *
+ */
+public class BrokerServiceModule extends ServiceClientModule
+{
+ @Provides
+ @ManageLifecycle
+ @Broker
+ public ServiceLocator makeBrokerServiceLocator(final DruidNodeDiscoveryProvider discoveryProvider)
+ {
+ return new DiscoveryServiceLocator(discoveryProvider, NodeRole.BROKER);
+ }
+
+ @Provides
+ @LazySingleton
+ public BrokerClient makeBrokerClient(
+ @Json final ObjectMapper jsonMapper,
+ @EscalatedGlobal final ServiceClientFactory clientFactory,
+ @Broker final ServiceLocator serviceLocator
+ )
+ {
+ return new BrokerClientImpl(
+ clientFactory.makeClient(
+ NodeRole.BROKER.getJsonName(),
+ serviceLocator,
+ StandardRetryPolicy.builder().maxAttempts(CLIENT_MAX_ATTEMPTS).build()
+ ),
+ jsonMapper
+ );
+ }
+}
+
diff --git a/sql/src/main/java/org/apache/druid/sql/http/ExplainPlanInformation.java b/sql/src/main/java/org/apache/druid/sql/http/ExplainPlanInformation.java
new file mode 100644
index 000000000000..dc48e72d50de
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/http/ExplainPlanInformation.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.druid.sql.calcite.planner.ExplainAttributes;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Class that encapsulates the information of a single plan for an {@code EXPLAIN PLAN FOR} query.
+ *
+ * Similar to {@link #getAttributes()}, it's possible to provide more structure to {@link #getPlan()},
+ * at least for the native query explain, but there's currently no use case for it.
+ *
+ */
+public class ExplainPlanInformation
+{
+ @JsonProperty("PLAN")
+ private final String plan;
+
+ @JsonProperty("RESOURCES")
+ private final String resources;
+
+ @JsonProperty("ATTRIBUTES")
+ @JsonDeserialize(using = ExplainAttributesDeserializer.class)
+ private final ExplainAttributes attributes;
+
+ @JsonCreator
+ public ExplainPlanInformation(
+ @JsonProperty("PLAN") final String plan,
+ @JsonProperty("RESOURCES") final String resources,
+ @JsonProperty("ATTRIBUTES") final ExplainAttributes attributes
+ )
+ {
+ this.plan = plan;
+ this.resources = resources;
+ this.attributes = attributes;
+ }
+
+ public String getPlan()
+ {
+ return plan;
+ }
+
+ public String getResources()
+ {
+ return resources;
+ }
+
+ public ExplainAttributes getAttributes()
+ {
+ return attributes;
+ }
+
+ private static class ExplainAttributesDeserializer extends JsonDeserializer
+ {
+ @Override
+ public ExplainAttributes deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException
+ {
+ final ObjectMapper objectMapper = (ObjectMapper) jsonParser.getCodec();
+ return objectMapper.readValue(jsonParser.getText(), ExplainAttributes.class);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExplainPlanInformation that = (ExplainPlanInformation) o;
+ return Objects.equals(plan, that.plan)
+ && Objects.equals(resources, that.resources)
+ && Objects.equals(attributes, that.attributes);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(plan, resources, attributes);
+ }
+}
+
+
diff --git a/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
new file mode 100644
index 000000000000..0a9f05f10e09
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.avatica.SqlType;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.rpc.MockServiceClient;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.sql.calcite.planner.ExplainAttributes;
+import org.apache.druid.sql.http.ExplainPlanInformation;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.sql.http.SqlParameter;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlTaskStatus;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class BrokerClientImplTest
+{
+ private ObjectMapper jsonMapper;
+ private MockServiceClient serviceClient;
+ private BrokerClient brokerClient;
+
+ @Before
+ public void setup()
+ {
+ jsonMapper = new DefaultObjectMapper();
+ serviceClient = new MockServiceClient();
+ brokerClient = new BrokerClientImpl(serviceClient, jsonMapper);
+ }
+
+ @After
+ public void tearDown()
+ {
+ serviceClient.verify();
+ }
+
+ @Test
+ public void testSubmitSqlTask() throws Exception
+ {
+ final SqlQuery query = new SqlQuery(
+ "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL",
+ ResultFormat.ARRAY,
+ true,
+ true,
+ true,
+ ImmutableMap.of("useCache", false),
+ ImmutableList.of(new SqlParameter(SqlType.INTEGER, 1))
+ );
+ final SqlTaskStatus taskStatus = new SqlTaskStatus("taskId1", TaskState.RUNNING, null);
+
+ serviceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task/")
+ .jsonContent(jsonMapper, query),
+ HttpResponseStatus.OK,
+ ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+ jsonMapper.writeValueAsBytes(taskStatus)
+ );
+
+ assertEquals(taskStatus, brokerClient.submitSqlTask(query).get());
+ }
+
+ @Test
+ public void testFetchExplainPlanInformation() throws Exception
+ {
+ final SqlQuery query = new SqlQuery(
+ "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL",
+ ResultFormat.ARRAY,
+ true,
+ true,
+ true,
+ ImmutableMap.of("useCache", false),
+ ImmutableList.of(new SqlParameter(SqlType.INTEGER, 1))
+ );
+ final SqlQuery explainQuery = new SqlQuery(
+ StringUtils.format("EXPLAIN PLAN FOR %s", query.getQuery()),
+ null,
+ false,
+ false,
+ false,
+ null,
+ null
+ );
+
+ final String plan = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"inline\",\"data\":\"a,b,1\\nc,d,2\\n\"},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"sqlReplaceTimeChunks\":\"all\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]";
+ final String resources = "[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
+ final ExplainAttributes attributes = new ExplainAttributes("REPLACE", "foo", Granularities.ALL, null, "all");
+
+ final List
*/
-public class ExplainPlanInformation
+public class ExplainPlan
{
@JsonProperty("PLAN")
private final String plan;
@@ -51,7 +51,7 @@ public class ExplainPlanInformation
private final ExplainAttributes attributes;
@JsonCreator
- public ExplainPlanInformation(
+ public ExplainPlan(
@JsonProperty("PLAN") final String plan,
@JsonProperty("RESOURCES") final String resources,
@JsonProperty("ATTRIBUTES") final ExplainAttributes attributes
@@ -96,7 +96,7 @@ public boolean equals(Object o)
if (o == null || getClass() != o.getClass()) {
return false;
}
- ExplainPlanInformation that = (ExplainPlanInformation) o;
+ ExplainPlan that = (ExplainPlan) o;
return Objects.equals(plan, that.plan)
&& Objects.equals(resources, that.resources)
&& Objects.equals(attributes, that.attributes);
diff --git a/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
index 0a9f05f10e09..31f16dd42eaf 100644
--- a/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
@@ -30,7 +30,7 @@
import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.sql.calcite.planner.ExplainAttributes;
-import org.apache.druid.sql.http.ExplainPlanInformation;
+import org.apache.druid.sql.http.ExplainPlan;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlParameter;
import org.apache.druid.sql.http.SqlQuery;
@@ -140,8 +140,8 @@ public void testFetchExplainPlanInformation() throws Exception
);
assertEquals(
- ImmutableList.of(new ExplainPlanInformation(plan, resources, attributes)),
- brokerClient.fetchExplainPlanInformation(query).get()
+ ImmutableList.of(new ExplainPlan(plan, resources, attributes)),
+ brokerClient.fetchExplainPlan(query).get()
);
}
diff --git a/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanInformationTest.java b/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java
similarity index 90%
rename from sql/src/test/java/org/apache/druid/sql/http/ExplainPlanInformationTest.java
rename to sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java
index d4059a6be461..e3385fc5f518 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanInformationTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/ExplainPlanTest.java
@@ -35,7 +35,7 @@
import static org.junit.Assert.assertEquals;
-public class ExplainPlanInformationTest
+public class ExplainPlanTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@@ -57,7 +57,7 @@ public void testExplainPlanSerdeForSelectQuery() throws JsonProcessingException
)
);
- testSerde(givenPlans, ImmutableList.of(new ExplainPlanInformation(plan, resources, attributes)));
+ testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, attributes)));
}
@Test
@@ -78,7 +78,7 @@ public void testExplainPlanSerdeForReplaceQuery() throws JsonProcessingException
)
);
- testSerde(givenPlans, ImmutableList.of(new ExplainPlanInformation(plan, resources, attributes)));
+ testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, attributes)));
}
@Test
@@ -99,24 +99,24 @@ public void testExplainPlanSerdeForInsertQuery() throws JsonProcessingException
)
);
- testSerde(givenPlans, ImmutableList.of(new ExplainPlanInformation(plan, resources, attributes)));
+ testSerde(givenPlans, ImmutableList.of(new ExplainPlan(plan, resources, attributes)));
}
private void testSerde(
final List> givenPlans,
- final List expectedExplainPlanInfos
+ final List expectedExplainPlans
)
{
- final List observedExplainPlanInfos;
+ final List observedExplainPlans;
try {
- observedExplainPlanInfos = MAPPER.readValue(
+ observedExplainPlans = MAPPER.readValue(
MAPPER.writeValueAsString(givenPlans),
- new TypeReference>() {}
+ new TypeReference>() {}
);
}
catch (Exception e) {
- throw DruidException.defensive(e, "Error deserializing given plans[%s] into explain plan infos.", givenPlans);
+ throw DruidException.defensive(e, "Error deserializing given plans[%s] into explain plans.", givenPlans);
}
- assertEquals(expectedExplainPlanInfos, observedExplainPlanInfos);
+ assertEquals(expectedExplainPlans, observedExplainPlans);
}
}
From 2613bc38d1fa387771abd054da5828edb7c138bd Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Sat, 19 Oct 2024 18:45:18 -0700
Subject: [PATCH 3/4] Add comment for the deserializer and cleanup
ExplainAttributesTest.
---
.../calcite/planner/ExplainAttributes.java | 2 +
.../apache/druid/sql/client/BrokerClient.java | 2 +-
.../apache/druid/sql/http/ExplainPlan.java | 25 +++---
.../planner/ExplainAttributesTest.java | 87 +++++++++++++------
.../sql/client/BrokerClientImplTest.java | 2 +-
5 files changed, 78 insertions(+), 40 deletions(-)
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
index 533de7d58f2f..d0b9ebc5f94d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
@@ -22,9 +22,11 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularity;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.List;
import java.util.Objects;
diff --git a/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java b/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
index 9bf5f45f1db5..14cbbb7bff6a 100644
--- a/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
+++ b/sql/src/main/java/org/apache/druid/sql/client/BrokerClient.java
@@ -43,7 +43,7 @@ public interface BrokerClient
ListenableFuture submitSqlTask(SqlQuery sqlQuery);
/**
- * Fetches the explain plan for the given {@code sqlQuery}.
+ * Fetches the explain plan for the given {@code sqlQuery} from the Broker's SQL task endpoint.
*
* @param sqlQuery the SQL query for which the {@code EXPLAIN PLAN FOR} information is to be fetched
*/
diff --git a/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java b/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java
index ce0c68473b40..68defc4b2a4d 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/ExplainPlan.java
@@ -77,16 +77,6 @@ public ExplainAttributes getAttributes()
return attributes;
}
- private static class ExplainAttributesDeserializer extends JsonDeserializer
- {
- @Override
- public ExplainAttributes deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException
- {
- final ObjectMapper objectMapper = (ObjectMapper) jsonParser.getCodec();
- return objectMapper.readValue(jsonParser.getText(), ExplainAttributes.class);
- }
- }
-
@Override
public boolean equals(Object o)
{
@@ -107,6 +97,21 @@ public int hashCode()
{
return Objects.hash(plan, resources, attributes);
}
+
+ /**
+ * Custom deserializer for {@link ExplainAttributes} because the value for {@link #attributes} in the plan
+ * is encoded as a JSON string. This deserializer tells Jackson on how to parse the JSON string
+ * and map it to the fields in the {@link ExplainAttributes} class.
+ */
+ private static class ExplainAttributesDeserializer extends JsonDeserializer
+ {
+ @Override
+ public ExplainAttributes deserialize(JsonParser jsonParser, DeserializationContext context) throws IOException
+ {
+ final ObjectMapper objectMapper = (ObjectMapper) jsonParser.getCodec();
+ return objectMapper.readValue(jsonParser.getText(), ExplainAttributes.class);
+ }
+ }
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
index 53e2abf2749d..252c2e588849 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/ExplainAttributesTest.java
@@ -19,8 +19,8 @@
package org.apache.druid.sql.calcite.planner;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.junit.Assert;
@@ -28,14 +28,22 @@
import java.util.Arrays;
+import static org.junit.Assert.assertEquals;
+
public class ExplainAttributesTest
{
- private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new DefaultObjectMapper();
+ private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Test
- public void testSimpleGetters()
+ public void testGetters()
{
- ExplainAttributes selectAttributes = new ExplainAttributes("SELECT", null, null, null, null);
+ final ExplainAttributes selectAttributes = new ExplainAttributes(
+ "SELECT",
+ null,
+ null,
+ null,
+ null
+ );
Assert.assertEquals("SELECT", selectAttributes.getStatementType());
Assert.assertNull(selectAttributes.getTargetDataSource());
Assert.assertNull(selectAttributes.getPartitionedBy());
@@ -44,9 +52,9 @@ public void testSimpleGetters()
}
@Test
- public void testSerializeSelectAttributes() throws JsonProcessingException
+ public void testSerdeOfSelectAttributes()
{
- ExplainAttributes selectAttributes = new ExplainAttributes(
+ final ExplainAttributes selectAttributes = new ExplainAttributes(
"SELECT",
null,
null,
@@ -56,13 +64,14 @@ public void testSerializeSelectAttributes() throws JsonProcessingException
final String expectedAttributes = "{"
+ "\"statementType\":\"SELECT\""
+ "}";
- Assert.assertEquals(expectedAttributes, DEFAULT_OBJECT_MAPPER.writeValueAsString(selectAttributes));
+
+ testSerde(selectAttributes, expectedAttributes);
}
@Test
- public void testSerializeInsertAttributes() throws JsonProcessingException
+ public void testSerdeOfInsertAttributes()
{
- ExplainAttributes insertAttributes = new ExplainAttributes(
+ final ExplainAttributes insertAttributes = new ExplainAttributes(
"INSERT",
"foo",
Granularities.DAY,
@@ -74,13 +83,13 @@ public void testSerializeInsertAttributes() throws JsonProcessingException
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"DAY\""
+ "}";
- Assert.assertEquals(expectedAttributes, DEFAULT_OBJECT_MAPPER.writeValueAsString(insertAttributes));
+ testSerde(insertAttributes, expectedAttributes);
}
@Test
- public void testSerializeInsertAllAttributes() throws JsonProcessingException
+ public void testSerdeOfInsertAllAttributes()
{
- ExplainAttributes insertAttributes = new ExplainAttributes(
+ final ExplainAttributes insertAttributes = new ExplainAttributes(
"INSERT",
"foo",
Granularities.ALL,
@@ -92,78 +101,100 @@ public void testSerializeInsertAllAttributes() throws JsonProcessingException
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":{\"type\":\"all\"}"
+ "}";
- Assert.assertEquals(expectedAttributes, DEFAULT_OBJECT_MAPPER.writeValueAsString(insertAttributes));
+ testSerde(insertAttributes, expectedAttributes);
}
@Test
- public void testSerializeReplaceAttributes() throws JsonProcessingException
+ public void testSerdeOfReplaceAttributes()
{
- ExplainAttributes replaceAttributes1 = new ExplainAttributes(
+ final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
null,
"ALL"
);
- final String expectedAttributes1 = "{"
+ final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+ "\"replaceTimeChunks\":\"ALL\""
+ "}";
- Assert.assertEquals(expectedAttributes1, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes1));
+ testSerde(replaceAttributes, expectedAttributes);
+ }
- ExplainAttributes replaceAttributes2 = new ExplainAttributes(
+ @Test
+ public void testSerdeOfReplaceAttributesWithTimeChunks()
+ {
+ final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
null,
"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z"
);
- final String expectedAttributes2 = "{"
+ final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+ "\"replaceTimeChunks\":\"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z\""
+ "}";
- Assert.assertEquals(expectedAttributes2, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes2));
+ testSerde(replaceAttributes, expectedAttributes);
}
@Test
- public void testSerializeReplaceWithClusteredByAttributes() throws JsonProcessingException
+ public void testReplaceAttributesWithClusteredBy()
{
- ExplainAttributes replaceAttributes1 = new ExplainAttributes(
+ final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
Arrays.asList("foo", "CEIL(`f2`)"),
"ALL"
);
- final String expectedAttributes1 = "{"
+ final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+ "\"clusteredBy\":[\"foo\",\"CEIL(`f2`)\"],"
+ "\"replaceTimeChunks\":\"ALL\""
+ "}";
- Assert.assertEquals(expectedAttributes1, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes1));
-
+ testSerde(replaceAttributes, expectedAttributes);
+ }
- ExplainAttributes replaceAttributes2 = new ExplainAttributes(
+ @Test
+ public void testReplaceAttributesWithClusteredByAndTimeChunks()
+ {
+ final ExplainAttributes replaceAttributes = new ExplainAttributes(
"REPLACE",
"foo",
Granularities.HOUR,
Arrays.asList("foo", "boo"),
"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z"
);
- final String expectedAttributes2 = "{"
+ final String expectedAttributes = "{"
+ "\"statementType\":\"REPLACE\","
+ "\"targetDataSource\":\"foo\","
+ "\"partitionedBy\":\"HOUR\","
+ "\"clusteredBy\":[\"foo\",\"boo\"],"
+ "\"replaceTimeChunks\":\"2019-08-25T02:00:00.000Z/2019-08-25T03:00:00.000Z\""
+ "}";
- Assert.assertEquals(expectedAttributes2, DEFAULT_OBJECT_MAPPER.writeValueAsString(replaceAttributes2));
+ testSerde(replaceAttributes, expectedAttributes);
}
+
+ private void testSerde(final ExplainAttributes explainAttributes, final String expectedSerializedAttributes)
+ {
+ final ExplainAttributes observedAttributes;
+ try {
+ final String observedSerializedAttributes = MAPPER.writeValueAsString(explainAttributes);
+ assertEquals(expectedSerializedAttributes, observedSerializedAttributes);
+ observedAttributes = MAPPER.readValue(observedSerializedAttributes, ExplainAttributes.class);
+ }
+ catch (Exception e) {
+ throw DruidException.defensive(e, "Error serializing/deserializing explain plan[%s].", explainAttributes);
+ }
+ assertEquals(explainAttributes, observedAttributes);
+ }
+
}
diff --git a/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
index 31f16dd42eaf..51d66f03816d 100644
--- a/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/client/BrokerClientImplTest.java
@@ -95,7 +95,7 @@ public void testSubmitSqlTask() throws Exception
}
@Test
- public void testFetchExplainPlanInformation() throws Exception
+ public void testFetchExplainPlan() throws Exception
{
final SqlQuery query = new SqlQuery(
"REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY ALL",
From 14b4dae2cb75db4d0f772243f27e6e101bb1bf86 Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Sat, 19 Oct 2024 18:46:40 -0700
Subject: [PATCH 4/4] Unused import.
---
.../org/apache/druid/sql/calcite/planner/ExplainAttributes.java | 2 --
1 file changed, 2 deletions(-)
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
index d0b9ebc5f94d..533de7d58f2f 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/ExplainAttributes.java
@@ -22,11 +22,9 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularity;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.List;
import java.util.Objects;