Skip to content

Commit

Permalink
Support notification for host shutting down
Browse files Browse the repository at this point in the history
We want to support custom notification mechanism for quicker drain
signal recognition to minimize compute wastage. This mechanism is
currently intended to be used for both the exchange client and
supporting a faster graceful shutdown of worker.
  • Loading branch information
abhiseksaikia committed Aug 22, 2023
1 parent 7edd2c2 commit 1fda136
Show file tree
Hide file tree
Showing 13 changed files with 218 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.execution.TaskManager;
import io.airlift.units.Duration;

import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;

Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.TimeoutException;

import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.lang.Thread.currentThread;
Expand All @@ -55,6 +57,8 @@ public class GracefulShutdownHandler
private final boolean isResourceManager;
private final ShutdownAction shutdownAction;
private final Duration gracePeriod;
private final NodeStatusNotificationManager nodeStatusNotificationManager;
private boolean isLoadNodeStatusNotification;

@GuardedBy("this")
private boolean shutdownRequested;
Expand All @@ -65,7 +69,8 @@ public GracefulShutdownHandler(
ServerConfig serverConfig,
ShutdownAction shutdownAction,
LifeCycleManager lifeCycleManager,
QueryManager queryManager)
QueryManager queryManager,
NodeStatusNotificationManager nodeStatusNotificationManager)
{
this.sqlTaskManager = requireNonNull(sqlTaskManager, "sqlTaskManager is null");
this.shutdownAction = requireNonNull(shutdownAction, "shutdownAction is null");
Expand All @@ -74,6 +79,21 @@ public GracefulShutdownHandler(
this.isResourceManager = serverConfig.isResourceManager();
this.gracePeriod = serverConfig.getGracePeriod();
this.queryManager = requireNonNull(queryManager, "queryManager is null");
this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null");
}

public void loadNodeStatusNotification()
{
log.debug("Loading node status notification");
checkState(!isLoadNodeStatusNotification, "Node status notification can be registered only once");
this.nodeStatusNotificationManager.getNotificationProvider().registerGracefulShutdownEventListener(this::initiateShutdown);
isLoadNodeStatusNotification = true;
}

private void initiateShutdown()
{
log.info("Trigger shutdown from status notification");
requestShutdown();
}

public synchronized void requestShutdown()
Expand All @@ -85,6 +105,7 @@ public synchronized void requestShutdown()
}

if (isShutdownRequested()) {
log.info("Shutdown already requested");
return;
}

Expand Down Expand Up @@ -202,4 +223,10 @@ public synchronized boolean isShutdownRequested()
{
return shutdownRequested;
}

@PreDestroy
public synchronized void destroy()
{
this.nodeStatusNotificationManager.getNotificationProvider().removeGracefulShutdownEventListener(this::initiateShutdown);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server;

import com.facebook.presto.spi.nodestatus.NoOpNodeStatusNotificationProvider;
import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProvider;
import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory;
import com.google.common.collect.ImmutableMap;

import java.io.File;
import java.io.IOException;
import java.util.Map;

import static com.facebook.presto.util.PropertiesUtil.loadProperties;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class NodeStatusNotificationManager
{
private static final File NODE_STATUS_NOTIFICATION_CONFIG = new File("etc/node-status-notification.properties");
private NodeStatusNotificationProviderFactory notificationProviderFactory;
private NodeStatusNotificationProvider notificationProvider = new NoOpNodeStatusNotificationProvider();
private boolean isNotificationProviderAdded;

public void addNodeStatusNotificationProviderFactory(NodeStatusNotificationProviderFactory notificationProviderFactory)
{
this.notificationProviderFactory = requireNonNull(notificationProviderFactory, "notificationProviderFactory is null");
}

public void loadNodeStatusNotificationProvider()
throws IOException
{
if (this.notificationProviderFactory == null) {
return;
}
checkState(!isNotificationProviderAdded, "NotificationProvider can only be set once");
this.notificationProvider = this.notificationProviderFactory.create(getConfig());
this.isNotificationProviderAdded = true;
}

private Map<String, String> getConfig()
throws IOException
{
if (NODE_STATUS_NOTIFICATION_CONFIG.exists()) {
return loadProperties(NODE_STATUS_NOTIFICATION_CONFIG);
}
return ImmutableMap.of();
}

public NodeStatusNotificationProvider getNotificationProvider()
{
return this.notificationProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory;
import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory;
import com.facebook.presto.spi.security.PasswordAuthenticatorFactory;
Expand Down Expand Up @@ -123,6 +124,7 @@ public class PluginManager
private final HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager;
private final TracerProviderManager tracerProviderManager;
private final AnalyzerProviderManager analyzerProviderManager;
private final NodeStatusNotificationManager nodeStatusNotificationManager;

@Inject
public PluginManager(
Expand All @@ -142,7 +144,8 @@ public PluginManager(
NodeTtlFetcherManager nodeTtlFetcherManager,
ClusterTtlProviderManager clusterTtlProviderManager,
HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager,
TracerProviderManager tracerProviderManager)
TracerProviderManager tracerProviderManager,
NodeStatusNotificationManager nodeStatusNotificationManager)
{
requireNonNull(nodeInfo, "nodeInfo is null");
requireNonNull(config, "config is null");
Expand Down Expand Up @@ -172,6 +175,7 @@ public PluginManager(
this.historyBasedPlanStatisticsManager = requireNonNull(historyBasedPlanStatisticsManager, "historyBasedPlanStatisticsManager is null");
this.tracerProviderManager = requireNonNull(tracerProviderManager, "tracerProviderManager is null");
this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null");
this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null");
}

public void loadPlugins()
Expand Down Expand Up @@ -317,6 +321,11 @@ public void installPlugin(Plugin plugin)
log.info("Registering analyzer provider %s", analyzerProvider.getType());
analyzerProviderManager.addAnalyzerProvider(analyzerProvider);
}

for (NodeStatusNotificationProviderFactory nodeStatusNotificationProviderFactory : plugin.getNodeStatusNotificationProviderFactory()) {
log.info("Registering node status notification provider %s", nodeStatusNotificationProviderFactory.getName());
nodeStatusNotificationManager.addNodeStatusNotificationProviderFactory(nodeStatusNotificationProviderFactory);
}
}

private URLClassLoader buildClassLoader(String plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public void run()
injector.getInstance(NodeTtlFetcherManager.class).loadNodeTtlFetcher();
injector.getInstance(ClusterTtlProviderManager.class).loadClusterTtlProvider();
injector.getInstance(TracerProviderManager.class).loadTracerProvider();

injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider();
injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification();
startAssociatedProcesses(injector);

injector.getInstance(Announcer.class).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon

//Optional Status Detector
newOptionalBinder(binder, NodeStatusService.class);
binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.index.IndexJoinLookupStats;
import com.facebook.presto.server.ConnectorMetadataUpdateHandleJsonSerde;
import com.facebook.presto.server.NodeStatusNotificationManager;
import com.facebook.presto.server.PluginManager;
import com.facebook.presto.server.PluginManagerConfig;
import com.facebook.presto.server.SessionPropertyDefaults;
Expand Down Expand Up @@ -504,7 +505,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
new ThrowingNodeTtlFetcherManager(),
new ThrowingClusterTtlProviderManager(),
historyBasedPlanStatisticsManager,
new TracerProviderManager(new TracingConfig()));
new TracerProviderManager(new TracingConfig()),
new NodeStatusNotificationManager());

connectorManager.addConnectorFactory(globalSystemConnectorFactory);
connectorManager.createConnection(GlobalSystemConnector.NAME, GlobalSystemConnector.NAME, ImmutableMap.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void configure(Binder binder)
configBinder(binder).bindConfig(ServerConfig.class);
//Bind noop QueryManager similar to the binding done for TaskManager here
binder.bind(QueryManager.class).to(NoOpQueryManager.class).in(Scopes.SINGLETON);
binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON);
binder.bind(GracefulShutdownHandler.class).in(Scopes.SINGLETON);
binder.bind(ShutdownAction.class).to(TestingPrestoServer.TestShutdownAction.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import com.facebook.presto.resourcemanager.ResourceGroupService;
import com.facebook.presto.server.ConnectorMetadataUpdateHandleJsonSerde;
import com.facebook.presto.server.ForJsonMetadataUpdateHandle;
import com.facebook.presto.server.NodeStatusNotificationManager;
import com.facebook.presto.server.PluginManager;
import com.facebook.presto.server.PluginManagerConfig;
import com.facebook.presto.server.QuerySessionSupplier;
Expand Down Expand Up @@ -501,6 +502,7 @@ protected void setup(Binder binder)
binder.bind(ResourceGroupService.class).to(NoopResourceGroupService.class).in(Scopes.SINGLETON);
binder.bind(NodeTtlFetcherManager.class).to(ThrowingNodeTtlFetcherManager.class).in(Scopes.SINGLETON);
binder.bind(ClusterTtlProviderManager.class).to(ThrowingClusterTtlProviderManager.class).in(Scopes.SINGLETON);
binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON);

// TODO: Decouple and remove: required by SessionPropertyDefaults, PluginManager, InternalResourceGroupManager, ConnectorManager
configBinder(binder).bindConfig(NodeConfig.class);
Expand Down
6 changes: 6 additions & 0 deletions presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory;
import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory;
import com.facebook.presto.spi.security.PasswordAuthenticatorFactory;
Expand Down Expand Up @@ -130,4 +131,9 @@ default Iterable<AnalyzerProvider> getAnalyzerProviders()
{
return emptyList();
}

default Iterable<NodeStatusNotificationProviderFactory> getNodeStatusNotificationProviderFactory()
{
return emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spi.nodestatus;

@FunctionalInterface
public interface GracefulShutdownEventListener
{
void onNodeShuttingDown();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spi.nodestatus;

public class NoOpNodeStatusNotificationProvider
implements NodeStatusNotificationProvider
{
@Override
public void registerGracefulShutdownEventListener(GracefulShutdownEventListener listener)
{
}

@Override
public void removeGracefulShutdownEventListener(GracefulShutdownEventListener listener)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spi.nodestatus;

/**
* The {@code NodeStatusNotificationProvider} interface provides a registry for node status listeners.
* Implementations of this interface can listen to node status events and notify all registered listeners,
* especially when a node goes down.
*
* <p>It is essential for implementations to ensure proper synchronization if the registry is accessed
* by multiple threads.</p>
*/
public interface NodeStatusNotificationProvider
{
void registerGracefulShutdownEventListener(GracefulShutdownEventListener listener);

void removeGracefulShutdownEventListener(GracefulShutdownEventListener listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spi.nodestatus;

import java.util.Map;

public interface NodeStatusNotificationProviderFactory
{
String getName();

NodeStatusNotificationProvider create(Map<String, String> config);
}

0 comments on commit 1fda136

Please sign in to comment.