diff --git a/pom.xml b/pom.xml index c45684ab5a18..e3b5bfcdf9e9 100644 --- a/pom.xml +++ b/pom.xml @@ -199,6 +199,7 @@ presto-singlestore presto-hana presto-openapi + presto-plan-checker-providers @@ -461,6 +462,12 @@ ${project.version} + + com.facebook.presto + presto-plan-checker-providers + ${project.version} + + io.grpc grpc-protobuf diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 8c8713e54ab6..22a96b78fb09 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -331,6 +331,7 @@ public final class SystemSessionProperties public static final String REWRITE_EXPRESSION_WITH_CONSTANT_EXPRESSION = "rewrite_expression_with_constant_expression"; public static final String PRINT_ESTIMATED_STATS_FROM_CACHE = "print_estimated_stats_from_cache"; public static final String REMOVE_CROSS_JOIN_WITH_CONSTANT_SINGLE_ROW_INPUT = "remove_cross_join_with_constant_single_row_input"; + public static final String QUERY_EXECUTION_FAIL_FAST_ENABLED = "query_execution_fail_fast_enabled"; // TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "native_simplified_expression_evaluation_enabled"; @@ -1980,6 +1981,11 @@ public SystemSessionProperties( "If one input of the cross join is a single row with constant value, remove this cross join and replace with a project node", featuresConfig.isRemoveCrossJoinWithSingleConstantRow(), false), + booleanProperty( + QUERY_EXECUTION_FAIL_FAST_ENABLED, + "Enable eager building and validation of logical plan before execution", + featuresConfig.isQueryExecutionFailFastEnabled(), + false), new PropertyMetadata<>( DEFAULT_VIEW_SECURITY_MODE, format("Set default view security mode. Options are: %s", @@ -3306,6 +3312,11 @@ public static boolean isRewriteExpressionWithConstantEnabled(Session session) return session.getSystemProperty(REWRITE_EXPRESSION_WITH_CONSTANT_EXPRESSION, Boolean.class); } + public static boolean isQueryExecutionFailFastEnabled(Session session) + { + return session.getSystemProperty(QUERY_EXECUTION_FAIL_FAST_ENABLED, Boolean.class); + } + public static CreateView.Security getDefaultViewSecurityMode(Session session) { return session.getSystemProperty(DEFAULT_VIEW_SECURITY_MODE, CreateView.Security.class); diff --git a/presto-main/src/main/java/com/facebook/presto/connector/ConnectorAwareNodeManager.java b/presto-main/src/main/java/com/facebook/presto/connector/ConnectorAwareNodeManager.java index 6764405dca87..1db15405a9f7 100644 --- a/presto-main/src/main/java/com/facebook/presto/connector/ConnectorAwareNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/connector/ConnectorAwareNodeManager.java @@ -19,6 +19,8 @@ import com.facebook.presto.spi.NodeManager; import com.google.common.collect.ImmutableSet; +import javax.inject.Inject; + import java.util.Set; import static java.util.Objects.requireNonNull; @@ -37,6 +39,12 @@ public ConnectorAwareNodeManager(InternalNodeManager nodeManager, String environ this.connectorId = requireNonNull(connectorId, "connectorId is null"); } + @Inject + public ConnectorAwareNodeManager(InternalNodeManager nodeManager) + { + this(nodeManager, "", new ConnectorId("system")); + } + @Override public Set getAllNodes() { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java index 403b3de1ce75..4d2034c05515 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java @@ -75,6 +75,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -83,6 +84,7 @@ import static com.facebook.presto.SystemSessionProperties.getExecutionPolicy; import static com.facebook.presto.SystemSessionProperties.getQueryAnalyzerTimeout; import static com.facebook.presto.SystemSessionProperties.isLogInvokedFunctionNamesEnabled; +import static com.facebook.presto.SystemSessionProperties.isQueryExecutionFailFastEnabled; import static com.facebook.presto.SystemSessionProperties.isSpoolingOutputBufferEnabled; import static com.facebook.presto.common.RuntimeMetricName.FRAGMENT_PLAN_TIME_NANOS; import static com.facebook.presto.common.RuntimeMetricName.GET_CANONICAL_INFO_TIME_NANOS; @@ -141,6 +143,7 @@ public class SqlQueryExecution private final PlanCanonicalInfoProvider planCanonicalInfoProvider; private final QueryAnalysis queryAnalysis; private final AnalyzerContext analyzerContext; + private final CompletableFuture planFuture; private SqlQueryExecution( QueryAnalyzer queryAnalyzer, @@ -243,6 +246,9 @@ private SqlQueryExecution( } } } + + // Optionally build and validate plan immediately, before execution begins + planFuture = isQueryExecutionFailFastEnabled(getSession()) ? createLogicalPlanAsync() : null; } } @@ -461,7 +467,7 @@ public void start() timeoutThreadExecutor, getQueryAnalyzerTimeout(getSession()))) { // create logical plan for the query - plan = createLogicalPlanAndOptimize(); + plan = planFuture == null ? createLogicalPlanAndOptimize() : planFuture.get(); } metadata.beginQuery(getSession(), plan.getConnectors()); @@ -528,6 +534,11 @@ public void addFinalQueryInfoListener(StateChangeListener stateChange stateMachine.addQueryInfoStateChangeListener(stateChangeListener); } + private CompletableFuture createLogicalPlanAsync() + { + return CompletableFuture.supplyAsync(this::createLogicalPlanAndOptimize, this.queryExecutor); + } + private PlanRoot createLogicalPlanAndOptimize() { try { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java index 9a6d37fdb950..198f23f1ba22 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java @@ -594,7 +594,7 @@ private StreamingPlanSection tryCostBasedOptimize(StreamingPlanSection section) .forEach(currentSubPlan -> { Optional newPlanFragment = performRuntimeOptimizations(currentSubPlan); if (newPlanFragment.isPresent()) { - planChecker.validatePlanFragment(newPlanFragment.get().getRoot(), session, metadata, warningCollector); + planChecker.validatePlanFragment(newPlanFragment.get(), session, metadata, warningCollector); oldToNewFragment.put(currentSubPlan.getFragment(), newPlanFragment.get()); } }); diff --git a/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java b/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java index 22ecb4b1c473..ec8f097f973e 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java @@ -36,6 +36,7 @@ import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory; import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory; import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory; import com.facebook.presto.spi.security.PasswordAuthenticatorFactory; @@ -48,6 +49,7 @@ import com.facebook.presto.spi.ttl.NodeTtlFetcherFactory; import com.facebook.presto.sql.analyzer.AnalyzerProviderManager; import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager; +import com.facebook.presto.sql.planner.sanity.plancheckerprovidermanagers.PlanCheckerProviderManager; import com.facebook.presto.storage.TempStorageManager; import com.facebook.presto.tracing.TracerProviderManager; import com.facebook.presto.ttl.clusterttlprovidermanagers.ClusterTtlProviderManager; @@ -131,6 +133,7 @@ public class PluginManager private final AnalyzerProviderManager analyzerProviderManager; private final QueryPreparerProviderManager queryPreparerProviderManager; private final NodeStatusNotificationManager nodeStatusNotificationManager; + private final PlanCheckerProviderManager planCheckerProviderManager; @Inject public PluginManager( @@ -152,7 +155,8 @@ public PluginManager( ClusterTtlProviderManager clusterTtlProviderManager, HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager, TracerProviderManager tracerProviderManager, - NodeStatusNotificationManager nodeStatusNotificationManager) + NodeStatusNotificationManager nodeStatusNotificationManager, + PlanCheckerProviderManager planCheckerProviderManager) { requireNonNull(nodeInfo, "nodeInfo is null"); requireNonNull(config, "config is null"); @@ -184,6 +188,7 @@ public PluginManager( this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null"); this.queryPreparerProviderManager = requireNonNull(queryPreparerProviderManager, "queryPreparerProviderManager is null"); this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null"); + this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null"); } public void loadPlugins() @@ -348,6 +353,11 @@ public void installPlugin(Plugin plugin) log.info("Registering node status notification provider %s", nodeStatusNotificationProviderFactory.getName()); nodeStatusNotificationManager.addNodeStatusNotificationProviderFactory(nodeStatusNotificationProviderFactory); } + + for (PlanCheckerProviderFactory planCheckerProviderFactory : plugin.getPlanCheckerProviderFactories()) { + log.info("Registering plan checker provider factory %s", planCheckerProviderFactory.getName()); + planCheckerProviderManager.addPlanCheckerProviderFactory(planCheckerProviderFactory); + } } public void installCoordinatorPlugin(CoordinatorPlugin plugin) diff --git a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java index 8b3aaa3009cc..8d829ac72fb3 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java @@ -47,6 +47,7 @@ import com.facebook.presto.server.security.ServerSecurityModule; import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.parser.SqlParserOptions; +import com.facebook.presto.sql.planner.sanity.plancheckerprovidermanagers.PlanCheckerProviderManager; import com.facebook.presto.storage.TempStorageManager; import com.facebook.presto.storage.TempStorageModule; import com.facebook.presto.tracing.TracerProviderManager; @@ -177,6 +178,7 @@ public void run() injector.getInstance(TracerProviderManager.class).loadTracerProvider(); injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider(); injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification(); + injector.getInstance(PlanCheckerProviderManager.class).loadPlanCheckerProvider(); startAssociatedProcesses(injector); injector.getInstance(Announcer.class).start(); diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 85658fe457f7..6c5e6f99f438 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -41,6 +41,7 @@ import com.facebook.presto.common.block.BlockEncodingSerde; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.connector.ConnectorAwareNodeManager; import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.connector.ConnectorTypeSerdeManager; import com.facebook.presto.connector.system.SystemConnectorModule; @@ -142,10 +143,13 @@ import com.facebook.presto.spi.ConnectorMetadataUpdateHandle; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorTypeSerde; +import com.facebook.presto.spi.NodeManager; import com.facebook.presto.spi.PageIndexerFactory; import com.facebook.presto.spi.PageSorter; import com.facebook.presto.spi.analyzer.ViewDefinition; import com.facebook.presto.spi.function.SqlInvokedFunction; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; import com.facebook.presto.spi.relation.DeterminismEvaluator; import com.facebook.presto.spi.relation.DomainTranslator; import com.facebook.presto.spi.relation.PredicateCompiler; @@ -200,7 +204,9 @@ import com.facebook.presto.sql.planner.NodePartitioningManager; import com.facebook.presto.sql.planner.PartitioningProviderManager; import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde; import com.facebook.presto.sql.planner.sanity.PlanChecker; +import com.facebook.presto.sql.planner.sanity.plancheckerprovidermanagers.PlanCheckerProviderManager; import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator; import com.facebook.presto.sql.relational.RowExpressionDomainTranslator; import com.facebook.presto.sql.tree.Expression; @@ -371,6 +377,8 @@ else if (serverConfig.isCoordinator()) { driftClientBinder(binder).bindDriftClient(ThriftServerInfoClient.class, ForNodeManager.class) .withAddressSelector(((addressSelectorBinder, annotation, prefix) -> addressSelectorBinder.bind(AddressSelector.class).annotatedWith(annotation).to(FixedAddressSelector.class))); + // NodeManager instance for plugins to use + binder.bind(NodeManager.class).to(ConnectorAwareNodeManager.class).in(Scopes.SINGLETON); // node scheduler // TODO: remove from NodePartitioningManager and move to CoordinatorModule @@ -625,6 +633,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon // plan jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(VariableReferenceExpressionSerializer.class); jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(VariableReferenceExpressionDeserializer.class); + jsonCodecBinder(binder).bindJsonCodec(SimplePlanFragment.class); + binder.bind(SimplePlanFragmentSerde.class).to(JsonCodecSimplePlanFragmentSerde.class).in(Scopes.SINGLETON); // history statistics configBinder(binder).bindConfig(HistoryBasedOptimizationConfig.class); @@ -785,6 +795,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon //Optional Status Detector newOptionalBinder(binder, NodeStatusService.class); binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON); + + binder.bind(PlanCheckerProviderManager.class).in(Scopes.SINGLETON); } @Provides diff --git a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java index 6157d89c19e0..c0c396b2a85a 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java @@ -32,6 +32,7 @@ import com.facebook.airlift.tracetoken.TraceTokenModule; import com.facebook.drift.server.DriftServer; import com.facebook.drift.transport.netty.server.DriftNettyServerTransport; +import com.facebook.presto.connector.ConnectorAwareNodeManager; import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.cost.StatsCalculator; import com.facebook.presto.dispatcher.DispatchManager; @@ -61,6 +62,7 @@ import com.facebook.presto.server.security.ServerSecurityModule; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.CoordinatorPlugin; +import com.facebook.presto.spi.NodeManager; import com.facebook.presto.spi.Plugin; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.eventlistener.EventListener; @@ -320,6 +322,7 @@ public TestingPrestoServer( binder.bind(RequestBlocker.class).in(Scopes.SINGLETON); newSetBinder(binder, Filter.class, TheServlet.class).addBinding() .to(RequestBlocker.class).in(Scopes.SINGLETON); + binder.bind(NodeManager.class).to(ConnectorAwareNodeManager.class); }); if (discoveryUri != null) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index c88271e97dd5..fa41d7d36808 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -296,6 +296,8 @@ public class FeaturesConfig private boolean isInlineProjectionsOnValuesEnabled; + private boolean queryExecutionFailFastEnabled; + public enum PartitioningPrecisionStrategy { // Let Presto decide when to repartition @@ -2969,4 +2971,17 @@ public FeaturesConfig setInlineProjectionsOnValues(boolean isInlineProjectionsOn this.isInlineProjectionsOnValuesEnabled = isInlineProjectionsOnValuesEnabled; return this; } + + @Config("query-execution-fail-fast-enabled") + @ConfigDescription("Enable eager building and validation of logical plan before execution") + public FeaturesConfig setQueryExecutionFailFastEnabled(boolean queryExecutionFailFastEnabled) + { + this.queryExecutionFailFastEnabled = queryExecutionFailFastEnabled; + return this; + } + + public boolean isQueryExecutionFailFastEnabled() + { + return this.queryExecutionFailFastEnabled; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index e8bb552691d9..eb3d4e7ff872 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -140,8 +140,6 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan properties.getPartitionedSources()); Set fragmentVariableTypes = extractOutputVariables(root); - planChecker.validatePlanFragment(root, session, metadata, warningCollector); - Set tableWriterNodeIds = PlanFragmenterUtils.getTableWriterNodeIds(root); boolean outputTableWriterFragment = tableWriterNodeIds.stream().anyMatch(outputTableWriterNodeIds::contains); if (outputTableWriterFragment) { @@ -164,6 +162,8 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan Optional.of(statsAndCosts.getForSubplan(root)), Optional.of(jsonFragmentPlan(root, fragmentVariableTypes, statsAndCosts.getForSubplan(root), metadata.getFunctionAndTypeManager(), session))); + planChecker.validatePlanFragment(fragment, session, metadata, warningCollector); + return new SubPlan(fragment, properties.getChildren()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index 3ef3eccaf626..7340cf7b1ca8 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -29,6 +29,7 @@ import com.facebook.presto.sql.planner.BasePlanFragmenter.FragmentProperties; import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; import com.facebook.presto.sql.planner.sanity.PlanChecker; +import com.facebook.presto.sql.planner.sanity.plancheckerprovidermanagers.PlanCheckerProviderManager; import com.google.common.collect.ImmutableList; import javax.inject.Inject; @@ -54,13 +55,13 @@ public class PlanFragmenter private final PlanChecker singleNodePlanChecker; @Inject - public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig) + public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig, PlanCheckerProviderManager providerManager) { this.metadata = requireNonNull(metadata, "metadata is null"); this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null"); this.config = requireNonNull(queryManagerConfig, "queryManagerConfig is null"); - this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false); - this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true); + this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false, providerManager); + this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true, providerManager); } public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNode, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/JsonCodecSimplePlanFragmentSerde.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/JsonCodecSimplePlanFragmentSerde.java new file mode 100644 index 000000000000..dfd68ca63a57 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/JsonCodecSimplePlanFragmentSerde.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.plan; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; +import com.google.inject.Inject; + +import java.nio.charset.StandardCharsets; + +import static java.util.Objects.requireNonNull; + +public class JsonCodecSimplePlanFragmentSerde + implements SimplePlanFragmentSerde +{ + private final JsonCodec codec; + + @Inject + public JsonCodecSimplePlanFragmentSerde(JsonCodec codec) + { + this.codec = requireNonNull(codec, "SimplePlanFragment JSON codec is null"); + } + + @Override + public String serialize(SimplePlanFragment planFragment) + { + return new String(codec.toBytes(planFragment), StandardCharsets.UTF_8); + } + + @Override + public SimplePlanFragment deserialize(String value) + { + return codec.fromBytes(value.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java index 0263b88a93fc..b74b871e1442 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java @@ -17,27 +17,41 @@ import com.facebook.presto.metadata.Metadata; import com.facebook.presto.spi.WarningCollector; import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.SimplePlanFragment; import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.sql.planner.sanity.plancheckerprovidermanagers.PlanCheckerProviderManager; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.Multimap; import javax.inject.Inject; +import java.util.List; +import java.util.stream.Collectors; + /** * Perform checks on the plan that may generate warnings or errors. */ public final class PlanChecker { - private final Multimap checkers; + private final PlanCheckerProviderManager providerMgr; + private Multimap checkers; + private boolean providedInitialized; @Inject - public PlanChecker(FeaturesConfig featuresConfig) + public PlanChecker(FeaturesConfig featuresConfig, PlanCheckerProviderManager providerManager) { - this(featuresConfig, false); + this(featuresConfig, false, providerManager); } public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode) { + this(featuresConfig, forceSingleNode, null); + } + + public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode, PlanCheckerProviderManager providerManager) + { + this.providerMgr = providerManager; ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder(); builder.putAll( Stage.INTERMEDIATE, @@ -77,26 +91,95 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode) public void validateFinalPlan(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector) { + ensureProvidedInitialized(); checkers.get(Stage.FINAL).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector)); } public void validateIntermediatePlan(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector) { + ensureProvidedInitialized(); checkers.get(Stage.INTERMEDIATE).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector)); } - public void validatePlanFragment(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector) + public void validatePlanFragment(PlanFragment planFragment, Session session, Metadata metadata, WarningCollector warningCollector) + { + ensureProvidedInitialized(); + checkers.get(Stage.FRAGMENT).forEach(checker -> checker.validateFragment(planFragment, session, metadata, warningCollector)); + } + + private static List fromSpi(List checkers) + { + return checkers.stream().map(SpiCheckerAdapter::new).collect(Collectors.toList()); + } + + private void ensureProvidedInitialized() { - checkers.get(Stage.FRAGMENT).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector)); + if (!providedInitialized) { + if (providerMgr != null) { + checkers = ImmutableListMultimap.builder().putAll(checkers) + .putAll( + Stage.INTERMEDIATE, + fromSpi(providerMgr.getPlanCheckerProvider().getPlanCheckersIntermediate())) + .putAll( + Stage.FRAGMENT, + fromSpi(providerMgr.getPlanCheckerProvider().getPlanCheckersFragment())) + .putAll( + Stage.FINAL, + fromSpi(providerMgr.getPlanCheckerProvider().getPlanCheckersFinal())) + .build(); + } + providedInitialized = true; + } } public interface Checker { void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector); + + default void validateFragment(PlanFragment planFragment, Session session, Metadata metadata, WarningCollector warningCollector) + { + validate(planFragment.getRoot(), session, metadata, warningCollector); + } } private enum Stage { INTERMEDIATE, FINAL, FRAGMENT } + + private static class SpiCheckerAdapter + implements Checker + { + private final com.facebook.presto.spi.plan.PlanChecker checker; + + public SpiCheckerAdapter(com.facebook.presto.spi.plan.PlanChecker checker) + { + this.checker = checker; + } + + @Override + public void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector) + { + checker.validate(planNode, warningCollector); + } + + @Override + public void validateFragment(PlanFragment planFragment, Session session, Metadata metadata, WarningCollector warningCollector) + { + checker.validateFragment(toSimplePlanFragment(planFragment), warningCollector); + } + + private SimplePlanFragment toSimplePlanFragment(PlanFragment planFragment) + { + return new SimplePlanFragment( + planFragment.getId(), + planFragment.getRoot(), + planFragment.getVariables(), + planFragment.getPartitioning(), + planFragment.getTableScanSchedulingOrder(), + planFragment.getPartitioningScheme(), + planFragment.getStageExecutionDescriptor(), + planFragment.isOutputTableWriterFragment()); + } + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/plancheckerprovidermanagers/PlanCheckerProviderManager.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/plancheckerprovidermanagers/PlanCheckerProviderManager.java new file mode 100644 index 000000000000..803c7410cdcf --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/plancheckerprovidermanagers/PlanCheckerProviderManager.java @@ -0,0 +1,88 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.sanity.plancheckerprovidermanagers; + +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.plan.PlanCheckerProvider; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; +import com.google.inject.Inject; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static com.facebook.presto.util.PropertiesUtil.loadProperties; +import static java.util.Objects.requireNonNull; + +public class PlanCheckerProviderManager +{ + private static final File PLAN_CHECKER_PROVIDER_CONFIG = new File("etc/plan-checker-provider.properties"); + private final NodeManager nodeManager; + private final TypeManager typeManager; + private final SimplePlanFragmentSerde simplePlanFragmentSerde; + private PlanCheckerProviderFactory providerFactory; + private PlanCheckerProvider provider; + + @Inject + public PlanCheckerProviderManager(NodeManager nodeManager, TypeManager typeManager, SimplePlanFragmentSerde simplePlanFragmentSerde) + { + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.simplePlanFragmentSerde = requireNonNull(simplePlanFragmentSerde, "planNodeSerde is null"); + this.provider = new EmptyPlanCheckerProvider(); + } + + public void addPlanCheckerProviderFactory(PlanCheckerProviderFactory providerFactory) + { + if (this.providerFactory != null) { + throw new IllegalArgumentException("PlanCheckerProviderFactory is already added, only one factory supported"); + } + this.providerFactory = requireNonNull(providerFactory, "providerFactory is null"); + } + + public PlanCheckerProvider getPlanCheckerProvider() + { + return provider; + } + + public void loadPlanCheckerProvider() + throws IOException + { + if (providerFactory != null) { + provider = providerFactory.create(getConfig(), nodeManager, simplePlanFragmentSerde); + } + } + + private Map getConfig() + throws IOException + { + Map properties; + if (PLAN_CHECKER_PROVIDER_CONFIG.exists()) { + properties = Collections.unmodifiableMap(new HashMap<>(loadProperties(PLAN_CHECKER_PROVIDER_CONFIG))); + } + else { + properties = Collections.emptyMap(); + } + return properties; + } + + public static class EmptyPlanCheckerProvider + implements PlanCheckerProvider + { + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 93a30a8664d1..5ffe130cf870 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -25,6 +25,7 @@ import com.facebook.presto.common.block.SortOrder; import com.facebook.presto.common.type.BooleanType; import com.facebook.presto.common.type.Type; +import com.facebook.presto.connector.ConnectorAwareNodeManager; import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.connector.ConnectorTypeSerdeManager; import com.facebook.presto.connector.system.AnalyzePropertiesSystemTable; @@ -140,6 +141,7 @@ import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.SimplePlanFragment; import com.facebook.presto.spi.plan.StageExecutionDescriptor; import com.facebook.presto.spi.plan.TableScanNode; import com.facebook.presto.spiller.FileSingleStreamSpillerFactory; @@ -185,8 +187,10 @@ import com.facebook.presto.sql.planner.RemoteSourceFactory; import com.facebook.presto.sql.planner.SubPlan; import com.facebook.presto.sql.planner.optimizations.PlanOptimizer; +import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde; import com.facebook.presto.sql.planner.planPrinter.PlanPrinter; import com.facebook.presto.sql.planner.sanity.PlanChecker; +import com.facebook.presto.sql.planner.sanity.plancheckerprovidermanagers.PlanCheckerProviderManager; import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator; import com.facebook.presto.sql.relational.RowExpressionDomainTranslator; import com.facebook.presto.sql.tree.AlterFunction; @@ -297,6 +301,7 @@ public class LocalQueryRunner private final SqlParser sqlParser; private final PlanFragmenter planFragmenter; private final InMemoryNodeManager nodeManager; + private final PlanCheckerProviderManager planCheckerProviderManager; private final PageSorter pageSorter; private final PageIndexerFactory pageIndexerFactory; private final MetadataManager metadata; @@ -408,6 +413,10 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, this.blockEncodingManager = new BlockEncodingManager(); featuresConfig.setIgnoreStatsCalculatorFailures(false); + this.planCheckerProviderManager = new PlanCheckerProviderManager( + new ConnectorAwareNodeManager(nodeManager), + getFunctionAndTypeManager(), + new JsonCodecSimplePlanFragmentSerde(jsonCodec(SimplePlanFragment.class))); this.metadata = new MetadataManager( new FunctionAndTypeManager(transactionManager, blockEncodingManager, featuresConfig, functionsConfig, new HandleResolver(), ImmutableSet.of()), @@ -434,7 +443,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, this.splitManager = new SplitManager(metadata, new QueryManagerConfig(), nodeSchedulerConfig); this.distributedPlanChecker = new PlanChecker(featuresConfig, false); this.singleNodePlanChecker = new PlanChecker(featuresConfig, true); - this.planFragmenter = new PlanFragmenter(this.metadata, this.nodePartitioningManager, new QueryManagerConfig(), featuresConfig); + this.planFragmenter = new PlanFragmenter(this.metadata, this.nodePartitioningManager, new QueryManagerConfig(), featuresConfig, planCheckerProviderManager); this.joinCompiler = new JoinCompiler(metadata); this.pageIndexerFactory = new GroupByHashPageIndexerFactory(joinCompiler); this.statsNormalizer = new StatsNormalizer(); @@ -515,7 +524,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, new ThrowingClusterTtlProviderManager(), historyBasedPlanStatisticsManager, new TracerProviderManager(new TracingConfig()), - new NodeStatusNotificationManager()); + new NodeStatusNotificationManager(), + planCheckerProviderManager); connectorManager.addConnectorFactory(globalSystemConnectorFactory); connectorManager.createConnection(GlobalSystemConnector.NAME, GlobalSystemConnector.NAME, ImmutableMap.of()); diff --git a/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java b/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java index d19b15f2a185..949af72a68fa 100644 --- a/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java +++ b/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java @@ -151,7 +151,7 @@ public void setUp() new SimpleTtlNodeSelectorConfig()); PartitioningProviderManager partitioningProviderManager = new PartitioningProviderManager(); nodePartitioningManager = new NodePartitioningManager(nodeScheduler, partitioningProviderManager, new NodeSelectionStats()); - planFragmenter = new PlanFragmenter(metadata, nodePartitioningManager, new QueryManagerConfig(), new FeaturesConfig()); + planFragmenter = new PlanFragmenter(metadata, nodePartitioningManager, new QueryManagerConfig(), new FeaturesConfig(), null); translator = new TestingRowExpressionTranslator(); } diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 5a1e09d7cc84..3102ee42831b 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -535,6 +535,15 @@ void PrestoServer::run() { prestoServerOperations_->runOperation(message, downstream); }); + httpServer_->registerPost( + "/v1/velox/plan", + [server = this]( + proxygen::HTTPMessage* message, + const std::vector>& body, + proxygen::ResponseHandler* downstream) { + server->convertToVeloxPlan(message, downstream, body); + }); + PRESTO_STARTUP_LOG(INFO) << "Driver CPU executor '" << driverExecutor_->getName() << "' has " << driverExecutor_->numThreads() << " threads."; @@ -1401,6 +1410,45 @@ static protocol::Duration getUptime( return protocol::Duration(seconds, protocol::TimeUnit::SECONDS); } +void PrestoServer::convertToVeloxPlan( + proxygen::HTTPMessage* message, + proxygen::ResponseHandler* downstream, + const std::vector>& body) { + std::string error; + try { + auto headers = message->getHeaders(); + + std::ostringstream oss; + for (auto& buf : body) { + oss << std::string((const char*)buf->data(), buf->length()); + } + std::string planFragmentJson = oss.str(); + protocol::PlanFragment planFragment = json::parse(planFragmentJson); + + auto queryCtx = core::QueryCtx::create(); + VeloxInteractiveQueryPlanConverter converter(queryCtx.get(), pool_.get()); + + // Create static taskId and empty TableWriteInfo needed for plan conversion + protocol::TaskId taskId = "velox-plan-conversion.0.0.0"; + auto tableWriteInfo = std::make_shared(); + + // Attempt to convert the plan fragment to a Velox plan + converter.toVeloxQueryPlan(planFragment, tableWriteInfo, taskId); + + } catch (VeloxException& e) { + error = e.message(); + } catch (std::exception& e) { + error = e.what(); + } + + // Return ok status if conversion succeeded or error if failed + if (error.empty()) { + http::sendOkResponse(downstream, json(R"({ "status": "ok" })")); + } else { + http::sendErrorResponse(downstream, json(R"({ "status": "error", "message": ")" + error + R"(")})")); + } +} + void PrestoServer::reportMemoryInfo(proxygen::ResponseHandler* downstream) { http::sendOkResponse(downstream, json(**memoryInfo_.rlock())); } diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.h b/presto-native-execution/presto_cpp/main/PrestoServer.h index bee2d8d43391..bc8ade086903 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.h +++ b/presto-native-execution/presto_cpp/main/PrestoServer.h @@ -201,6 +201,11 @@ class PrestoServer { void addServerPeriodicTasks(); + void convertToVeloxPlan( + proxygen::HTTPMessage* message, + proxygen::ResponseHandler* downstream, + const std::vector>& body); + void reportMemoryInfo(proxygen::ResponseHandler* downstream); void reportServerInfo(proxygen::ResponseHandler* downstream); diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index 9d3245244462..c8944a1d09c2 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -1833,7 +1833,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( case protocol::SystemPartitioning::FIXED: { switch (systemPartitioningHandle->function) { case protocol::SystemPartitionFunction::ROUND_ROBIN: { - auto numPartitions = partitioningScheme.bucketToPartition->size(); + auto numPartitions = partitioningScheme.bucketToPartition ? partitioningScheme.bucketToPartition->size() : 1; if (numPartitions == 1) { planFragment.planNode = core::PartitionedOutputNode::single( @@ -1853,7 +1853,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( return planFragment; } case protocol::SystemPartitionFunction::HASH: { - auto numPartitions = partitioningScheme.bucketToPartition->size(); + auto numPartitions = partitioningScheme.bucketToPartition ? partitioningScheme.bucketToPartition->size() : 1; if (numPartitions == 1) { planFragment.planNode = core::PartitionedOutputNode::single( diff --git a/presto-native-execution/presto_cpp/presto_protocol/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/presto_protocol/tests/CMakeLists.txt index 311ee0e24d5f..a2f830ddf6bc 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/presto_protocol/tests/CMakeLists.txt @@ -26,7 +26,8 @@ add_executable( TupleDomainTest.cpp TypeErrorTest.cpp VariableReferenceExpressionTest.cpp - PlanFragmentTest.cpp) + PlanFragmentTest.cpp +) add_test( NAME presto_protocol_test COMMAND presto_protocol_test diff --git a/presto-plan-checker-providers/pom.xml b/presto-plan-checker-providers/pom.xml new file mode 100644 index 000000000000..e97055e9a67b --- /dev/null +++ b/presto-plan-checker-providers/pom.xml @@ -0,0 +1,128 @@ + + + + presto-root + com.facebook.presto + 0.290-SNAPSHOT + + 4.0.0 + + presto-plan-checker-providers + Presto - Plan Checker Providers + presto-plugin + + + ${project.parent.basedir} + + + + + com.google.guava + guava + + + + com.google.inject + guice + + + + com.squareup.okhttp3 + okhttp + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.fasterxml.jackson.core + jackson-core + + + + com.facebook.airlift + bootstrap + + + + com.facebook.airlift + configuration + + + + com.facebook.airlift + log + + + + com.facebook.airlift + json + + + + + com.facebook.presto + presto-spi + provided + + + + io.airlift + units + provided + + + + com.facebook.presto + presto-common + provided + + + + org.testng + testng + test + + + + com.squareup.okhttp3 + mockwebserver + test + + + + org.mockito + mockito-core + 3.4.6 + test + + + net.bytebuddy + byte-buddy-agent + + + + + + org.objenesis + objenesis + 2.6 + test + + + net.bytebuddy + byte-buddy-agent + + + + + + \ No newline at end of file diff --git a/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/PlanCheckerProviderPlugin.java b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/PlanCheckerProviderPlugin.java new file mode 100644 index 000000000000..23fd9a3bf547 --- /dev/null +++ b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/PlanCheckerProviderPlugin.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plancheckerproviders; + +import com.facebook.presto.plancheckerproviders.nativechecker.NativePlanCheckerProviderFactory; +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; +import com.google.common.collect.ImmutableList; + +public class PlanCheckerProviderPlugin + implements Plugin +{ + @Override + public Iterable getPlanCheckerProviderFactories() + { + return ImmutableList.of(new NativePlanCheckerProviderFactory(getClassLoader())); + } + + private static ClassLoader getClassLoader() + { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = PlanCheckerProviderPlugin.class.getClassLoader(); + } + return classLoader; + } +} diff --git a/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanChecker.java b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanChecker.java new file mode 100644 index 000000000000..14cfad2ad137 --- /dev/null +++ b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanChecker.java @@ -0,0 +1,182 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plancheckerproviders.nativechecker; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.Node; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.FilterNode; +import com.facebook.presto.spi.plan.PlanChecker; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanVisitor; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.TableScanNode; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Set; + +import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; +import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED; +import static java.util.Objects.requireNonNull; + +/** + * Uses the native sidecar to check verify a plan can be run on a native worker. + */ +public final class NativePlanChecker + implements PlanChecker +{ + private static final Logger LOG = Logger.get(NativePlanChecker.class); + private static final String PRESTO_QUERY_ID = "X-Presto-Query-Id"; + private static final String PRESTO_TIME_ZONE = "X-Presto-Time-Zone"; + private static final String PRESTO_SYSTEM_PROPERTY = "X-Presto-System-Property"; + private static final String PRESTO_CATALOG_PROPERTY = "X-Presto-Catalog-Property"; + private static final MediaType JSON_CONTENT_TYPE = MediaType.parse("application/json; charset=utf-8"); + public static final String PLAN_CONVERSION_ENDPOINT = "/v1/velox/plan"; + + private final NodeManager nodeManager; + private final JsonCodec planFragmentJsonCodec; + private final NativePlanCheckerConfig config; + private final OkHttpClient httpClient; + + public NativePlanChecker(NodeManager nodeManager, JsonCodec planFragmentJsonCodec, NativePlanCheckerConfig config) + { + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.planFragmentJsonCodec = requireNonNull(planFragmentJsonCodec, "planFragmentJsonCodec is null"); + this.config = requireNonNull(config, "config is null"); + this.httpClient = new OkHttpClient.Builder().build(); + } + + private URL getPlanValidateUrl() + { + Set nodes = nodeManager.getAllNodes(); + Node sidecarNode = nodes.stream().filter(Node::isCoordinatorSidecar).findFirst() + .orElseThrow(() -> new PrestoException(NOT_FOUND, "could not find native sidecar node")); + try { + // endpoint to retrieve session properties from native worker + return new URL(sidecarNode.getHttpUri().toString() + PLAN_CONVERSION_ENDPOINT); + } + catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + @Override + public void validate(PlanNode planNode, WarningCollector warningCollector) + { + // NO-OP, only validating fragments + } + + @Override + public void validateFragment(SimplePlanFragment planFragment, WarningCollector warningCollector) + { + try { + if (!planFragment.getPartitioning().isCoordinatorOnly() && !isInternalSystemConnector(planFragment.getRoot())) { + runValidation(planFragment); + } + else if (LOG.isDebugEnabled()) { + LOG.debug("Skipping Native Plan Validation for plan fragment id: %s", planFragment.getId()); + } + } + catch (final PrestoException e) { + if (config.isQueryFailOnError()) { + throw e; + } + } + catch (final Exception e) { + LOG.error(e, "Failed to run native plan validation"); + if (config.isQueryFailOnError()) { + throw new IllegalStateException("Failed to run native plan validation", e); + } + } + } + + private boolean isInternalSystemConnector(PlanNode planNode) + { + try { + return planNode.accept(new CheckInternalVisitor(), null); + } + catch (final IllegalArgumentException e) { + // An InternalPlanNode will not allow a `PlanVisitor`, allow it to pass + return false; + } + } + + private void runValidation(SimplePlanFragment planFragment) + throws IOException + { + LOG.debug("Starting native plan validation for plan fragment id: %s", planFragment.getId()); + + String requestBodyJson = planFragmentJsonCodec.toJson(planFragment); + LOG.debug("Native validation for plan fragment: %s", requestBodyJson); + + Request request = buildRequest(requestBodyJson); + + try (Response response = httpClient.newCall(request).execute()) { + if (!response.isSuccessful()) { + String responseBody = response.body() != null ? response.body().string() : "{}"; + LOG.error("Native plan checker failed with code: %d, response: %s", response.code(), responseBody); + if (config.isQueryFailOnError()) { + throw new PrestoException(QUERY_REJECTED, "Query failed by native plan checker with code: " + response.code() + ", response: " + responseBody); + } + } + + LOG.debug("Native plan validation complete for plan fragment id: %s, successful=%s failOnError=%s", planFragment.getId(), response.isSuccessful(), config.isQueryFailOnError()); + } + } + + private Request buildRequest(String requestBodyJson) + { + Request.Builder builder = new Request.Builder() + .url(getPlanValidateUrl()) + .addHeader("CONTENT_TYPE", "APPLICATION_JSON") + .post(RequestBody.create(JSON_CONTENT_TYPE, requestBodyJson)); + + return builder.build(); + } + + private static class CheckInternalVisitor + extends PlanVisitor + { + @Override + public Boolean visitTableScan(TableScanNode tableScan, Void context) + { + TableHandle handle = tableScan.getTable(); + return ConnectorId.isInternalSystemConnector(handle.getConnectorId()); + } + + @Override + public Boolean visitFilter(FilterNode filter, Void context) + { + return filter.getSource().accept(this, context); + } + + @Override + public Boolean visitPlan(PlanNode node, Void context) + { + return false; + } + } +} diff --git a/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanCheckerConfig.java b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanCheckerConfig.java new file mode 100644 index 000000000000..5881d365beec --- /dev/null +++ b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanCheckerConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plancheckerproviders.nativechecker; + +import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; + +public class NativePlanCheckerConfig +{ + public static final String CONFIG_PREFIX = "native-plan-checker"; + private boolean enabled = true; + private boolean queryFailOnError; + + public boolean isPlanValidationEnabled() + { + return enabled; + } + + @Config("plan-validation-enabled") + @ConfigDescription("Set true to enable native plan validation") + public NativePlanCheckerConfig setPlanValidationEnabled(boolean enabled) + { + this.enabled = enabled; + return this; + } + + public boolean isQueryFailOnError() + { + return queryFailOnError; + } + + @Config("query-fail-on-error") + @ConfigDescription("Set true to fail the query if plan does not pass native validation, false will log error only") + public NativePlanCheckerConfig setQueryFailOnError(boolean queryFailOnError) + { + this.queryFailOnError = queryFailOnError; + return this; + } +} diff --git a/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanCheckerProvider.java b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanCheckerProvider.java new file mode 100644 index 000000000000..769bfdda9c82 --- /dev/null +++ b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanCheckerProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.plancheckerproviders.nativechecker; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.plan.PlanChecker; +import com.facebook.presto.spi.plan.PlanCheckerProvider; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.google.inject.Inject; + +import java.util.Collections; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class NativePlanCheckerProvider + implements PlanCheckerProvider +{ + private final NodeManager nodeManager; + private final JsonCodec planFragmentJsonCodec; + private final NativePlanCheckerConfig config; + + @Inject + public NativePlanCheckerProvider(NodeManager nodeManager, JsonCodec planFragmentJsonCodec, NativePlanCheckerConfig config) + { + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.planFragmentJsonCodec = requireNonNull(planFragmentJsonCodec, "planFragmentJsonCodec is null"); + this.config = requireNonNull(config, "config is null"); + } + + @Override + public List getPlanCheckersFragment() + { + return config.isPlanValidationEnabled() ? + Collections.singletonList(new NativePlanChecker(nodeManager, planFragmentJsonCodec, config)) : + Collections.emptyList(); + } +} diff --git a/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanCheckerProviderFactory.java b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanCheckerProviderFactory.java new file mode 100644 index 000000000000..035bb8c4e287 --- /dev/null +++ b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/NativePlanCheckerProviderFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plancheckerproviders.nativechecker; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.json.JsonModule; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.plan.PlanCheckerProvider; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; +import com.google.inject.Injector; +import com.google.inject.Scopes; + +import java.util.Map; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; +import static com.facebook.airlift.json.JsonBinder.jsonBinder; +import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static java.util.Objects.requireNonNull; + +public class NativePlanCheckerProviderFactory + implements PlanCheckerProviderFactory +{ + private final ClassLoader classLoader; + + public NativePlanCheckerProviderFactory(ClassLoader classLoader) + { + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public String getName() + { + return "native"; + } + + @Override + public PlanCheckerProvider create(Map config, NodeManager nodeManager, SimplePlanFragmentSerde simplePlanFragmentSerde) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + Bootstrap app = new Bootstrap( + binder -> { + configBinder(binder).bindConfig(NativePlanCheckerConfig.class, NativePlanCheckerConfig.CONFIG_PREFIX); + binder.install(new JsonModule()); + binder.bind(NodeManager.class).toInstance(nodeManager); + binder.bind(SimplePlanFragmentSerde.class).toInstance(simplePlanFragmentSerde); + jsonBinder(binder).addSerializerBinding(SimplePlanFragment.class).to(SimplePlanFragmentSerializer.class).in(Scopes.SINGLETON); + jsonCodecBinder(binder).bindJsonCodec(SimplePlanFragment.class); + binder.bind(PlanCheckerProvider.class).to(NativePlanCheckerProvider.class).in(Scopes.SINGLETON); + }); + + Injector injector = app + .noStrictConfig() + .doNotInitializeLogging() + .setRequiredConfigurationProperties(config) + .quiet() + .initialize(); + + return injector.getInstance(PlanCheckerProvider.class); + } + } +} diff --git a/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/SimplePlanFragmentSerializer.java b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/SimplePlanFragmentSerializer.java new file mode 100644 index 000000000000..29235968cfed --- /dev/null +++ b/presto-plan-checker-providers/src/main/java/com/facebook/presto/plancheckerproviders/nativechecker/SimplePlanFragmentSerializer.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plancheckerproviders.nativechecker; + +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.jsontype.TypeSerializer; +import com.google.inject.Inject; + +import java.io.IOException; + +import static java.util.Objects.requireNonNull; + +public class SimplePlanFragmentSerializer + extends JsonSerializer +{ + private final SimplePlanFragmentSerde simplePlanFragmentSerde; + + @Inject + public SimplePlanFragmentSerializer(SimplePlanFragmentSerde simplePlanFragmentSerde) + { + this.simplePlanFragmentSerde = requireNonNull(simplePlanFragmentSerde, "planNodeSerde is null"); + } + + @Override + public void serialize(SimplePlanFragment planNode, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException + { + jsonGenerator.writeRawValue(simplePlanFragmentSerde.serialize(planNode)); + } + + @Override + public void serializeWithType(SimplePlanFragment planNode, JsonGenerator jsonGenerator, SerializerProvider serializerProvider, TypeSerializer typeSerializer) + throws IOException + { + serialize(planNode, jsonGenerator, serializerProvider); + } +} diff --git a/presto-plan-checker-providers/src/test/java/com/facebook/presto/plancheckerproviders/TestPlanCheckerProvider.java b/presto-plan-checker-providers/src/test/java/com/facebook/presto/plancheckerproviders/TestPlanCheckerProvider.java new file mode 100644 index 000000000000..24f26449a72f --- /dev/null +++ b/presto-plan-checker-providers/src/test/java/com/facebook/presto/plancheckerproviders/TestPlanCheckerProvider.java @@ -0,0 +1,205 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plancheckerproviders; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.plancheckerproviders.nativechecker.NativePlanChecker; +import com.facebook.presto.plancheckerproviders.nativechecker.NativePlanCheckerConfig; +import com.facebook.presto.plancheckerproviders.nativechecker.NativePlanCheckerProvider; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.Node; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.NodePoolType; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.plan.PartitioningHandle; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + +public class TestPlanCheckerProvider +{ + private static final JsonCodec PLAN_FRAGMENT_JSON_CODEC = JsonCodec.jsonCodec(SimplePlanFragment.class); + + @Test + public void testGetPlanChecker() + { + NativePlanCheckerConfig config = new NativePlanCheckerConfig(); + assertTrue(config.isPlanValidationEnabled()); + NativePlanCheckerProvider provider = new NativePlanCheckerProvider(new TestNodeManager(), PLAN_FRAGMENT_JSON_CODEC, config); + assertTrue(provider.getPlanCheckersIntermediate().isEmpty()); + assertTrue(provider.getPlanCheckersFinal().isEmpty()); + assertFalse(provider.getPlanCheckersFragment().isEmpty()); + + // Disable the native plan checker entirely + config.setPlanValidationEnabled(false); + assertTrue(provider.getPlanCheckersFragment().isEmpty()); + } + + @Test + public void testNativePlanMockValidate() + throws IOException + { + PlanNode nodeMock = mock(PlanNode.class); + when(nodeMock.accept(any(), any())).thenReturn(false); + PartitioningHandle handleMock = mock(PartitioningHandle.class); + when(handleMock.isCoordinatorOnly()).thenReturn(false); + SimplePlanFragment fragmentMock = mock(SimplePlanFragment.class); + when(fragmentMock.getRoot()).thenReturn(nodeMock); + when(fragmentMock.getPartitioning()).thenReturn(handleMock); + + JsonCodec jsonCodecMock = spy(PLAN_FRAGMENT_JSON_CODEC); + when(jsonCodecMock.toJson(any(SimplePlanFragment.class))).thenReturn("{}"); + + try (MockWebServer server = new MockWebServer()) { + server.start(); + TestNodeManager nodeManager = new TestNodeManager(server.url(NativePlanChecker.PLAN_CONVERSION_ENDPOINT).uri()); + NativePlanCheckerConfig config = new NativePlanCheckerConfig(); + NativePlanChecker checker = new NativePlanChecker(nodeManager, jsonCodecMock, config); + + server.enqueue(new MockResponse().setBody("{ \"status\": \"ok\" }")); + checker.validateFragment(fragmentMock, null); + + config.setQueryFailOnError(true); + server.enqueue(new MockResponse().setResponseCode(500).setBody("{ \"error\": \"fubar\" }")); + assertThrows(PrestoException.class, + () -> checker.validateFragment(fragmentMock, null)); + } + } + + private static class TestNodeManager + implements NodeManager + { + private final URI sidecarUri; + + public TestNodeManager(URI sidecarUri) + { + this.sidecarUri = sidecarUri; + } + + public TestNodeManager() + { + this(null); + } + + @Override + public Set getAllNodes() + { + return sidecarUri != null ? Collections.singleton(new TestSidecarNode(sidecarUri)) : Collections.emptySet(); + } + + @Override + public Set getWorkerNodes() + { + return Collections.emptySet(); + } + + @Override + public Node getCurrentNode() + { + return null; + } + + @Override + public String getEnvironment() + { + return null; + } + } + + private static class TestSidecarNode + implements Node + { + private final URI sidecarUri; + + public TestSidecarNode(URI sidecarUri) + { + this.sidecarUri = sidecarUri; + } + + @Override + public String getHost() + { + return ""; + } + + @Override + public HostAddress getHostAndPort() + { + return null; + } + + @Override + public URI getHttpUri() + { + return sidecarUri; + } + + @Override + public String getNodeIdentifier() + { + return ""; + } + + @Override + public String getVersion() + { + return ""; + } + + @Override + public boolean isCoordinator() + { + return false; + } + + @Override + public boolean isResourceManager() + { + return false; + } + + @Override + public boolean isCatalogServer() + { + return false; + } + + @Override + public boolean isCoordinatorSidecar() + { + return true; + } + + @Override + public NodePoolType getPoolType() + { + return null; + } + } +} diff --git a/presto-server/pom.xml b/presto-server/pom.xml index a638792efa73..fdaad6ad5ee3 100644 --- a/presto-server/pom.xml +++ b/presto-server/pom.xml @@ -380,6 +380,13 @@ provided + + com.facebook.presto + presto-plan-checker-providers + ${project.version} + zip + provided + diff --git a/presto-server/src/main/assembly/presto.xml b/presto-server/src/main/assembly/presto.xml index c81bd9e5b10f..9d568380bad1 100644 --- a/presto-server/src/main/assembly/presto.xml +++ b/presto-server/src/main/assembly/presto.xml @@ -216,5 +216,9 @@ ${project.build.directory}/dependency/presto-clickhouse-${project.version} plugin/clickhouse + + ${project.build.directory}/dependency/presto-plan-checker-providers-${project.version} + plugin/plan-checker-providers + diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java index 8f797aa42c7e..6cc8af63c9b8 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java @@ -157,7 +157,7 @@ public void setUp() new SimpleTtlNodeSelectorConfig()); PartitioningProviderManager partitioningProviderManager = new PartitioningProviderManager(); nodePartitioningManager = new NodePartitioningManager(nodeScheduler, partitioningProviderManager, new NodeSelectionStats()); - planFragmenter = new PlanFragmenter(metadata, nodePartitioningManager, new QueryManagerConfig(), new FeaturesConfig()); + planFragmenter = new PlanFragmenter(metadata, nodePartitioningManager, new QueryManagerConfig(), new FeaturesConfig(), null); } @AfterClass(alwaysRun = true) @@ -224,7 +224,7 @@ private Void runTestIterativePlanFragmenter(PlanNode node, Plan plan, SubPlan fu plan, testingFragmentTracker::isFragmentFinished, metadata, - new PlanChecker(new FeaturesConfig()), + new PlanChecker(new FeaturesConfig(), false), new PlanNodeIdAllocator(), nodePartitioningManager, new QueryManagerConfig(), diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java b/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java index 81e8f55b0a66..aa976b80cac5 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java @@ -22,6 +22,7 @@ import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory; import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory; import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory; import com.facebook.presto.spi.security.PasswordAuthenticatorFactory; @@ -142,4 +143,9 @@ default Iterable getNodeStatusNotificatio { return emptyList(); } + + default Iterable getPlanCheckerProviderFactories() + { + return emptyList(); + } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanChecker.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanChecker.java new file mode 100644 index 000000000000..16789510c62e --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanChecker.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.spi.plan; + +import com.facebook.presto.spi.WarningCollector; + +public interface PlanChecker +{ + void validate(PlanNode planNode, WarningCollector warningCollector); + + default void validateFragment(SimplePlanFragment planFragment, WarningCollector warningCollector) + { + validate(planFragment.getRoot(), warningCollector); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProvider.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProvider.java new file mode 100644 index 000000000000..f9546677a91b --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.spi.plan; + +import java.util.Collections; +import java.util.List; + +public interface PlanCheckerProvider +{ + default List getPlanCheckersIntermediate() + { + return Collections.emptyList(); + } + + default List getPlanCheckersFinal() + { + return Collections.emptyList(); + } + + default List getPlanCheckersFragment() + { + return Collections.emptyList(); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProviderFactory.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProviderFactory.java new file mode 100644 index 000000000000..05808ac0c3ef --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProviderFactory.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.plan; + +import com.facebook.presto.spi.NodeManager; + +import java.util.Map; + +public interface PlanCheckerProviderFactory +{ + String getName(); + + PlanCheckerProvider create(Map config, NodeManager nodeManager, SimplePlanFragmentSerde simplePlanFragmentSerde); +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/SimplePlanFragmentSerde.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/SimplePlanFragmentSerde.java new file mode 100644 index 000000000000..09e9f4443de4 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/SimplePlanFragmentSerde.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.plan; + +public interface SimplePlanFragmentSerde +{ + String serialize(SimplePlanFragment planFragment); + + SimplePlanFragment deserialize(String value); +} diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java index 1757ba65e8d5..f8dd5654a3cf 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java @@ -572,7 +572,7 @@ private QueryExplainer getQueryExplainer() .getPlanningTimeOptimizers(); return new QueryExplainer( optimizers, - new PlanFragmenter(metadata, queryRunner.getNodePartitioningManager(), new QueryManagerConfig(), featuresConfig), + new PlanFragmenter(metadata, queryRunner.getNodePartitioningManager(), new QueryManagerConfig(), featuresConfig, null), metadata, queryRunner.getAccessControl(), sqlParser,