Skip to content

Commit

Permalink
MSQ controller: Support in-memory shuffles; towards JVM reuse. (apach…
Browse files Browse the repository at this point in the history
…e#16168)

* MSQ controller: Support in-memory shuffles; towards JVM reuse.

This patch contains two controller changes that make progress towards a
lower-latency MSQ.

First, support for in-memory shuffles. The main feature of in-memory shuffles,
as far as the controller is concerned, is that they are not fully buffered. That
means that whenever a producer stage uses in-memory output, its consumer must run
concurrently. The controller determines which stages run concurrently, and when
they start and stop.

"Leapfrogging" allows any chain of sort-based stages to use in-memory shuffles
even if we can only run two stages at once. For example, in a linear chain of
stages 0 -> 1 -> 2 where all do sort-based shuffles, we can use in-memory shuffling
for each one while only running two at once. (When stage 1 is done reading input
and about to start writing its output, we can stop 0 and start 2.)

1) New OutputChannelMode enum attached to WorkOrders that tells workers
   whether stage output should be in memory (MEMORY), or use local or durable
   storage.

2) New logic in the ControllerQueryKernel to determine which stages can use
   in-memory shuffling (ControllerUtils#computeStageGroups) and to launch them
   at the appropriate time (ControllerQueryKernel#createNewKernels).

3) New "doneReadingInput" method on Controller (passed down to the stage kernels)
   which allows stages to transition to POST_READING even if they are not
   gathering statistics. This is important because it enables "leapfrogging"
   for HASH_LOCAL_SORT shuffles, and for GLOBAL_SORT shuffles with 1 partition.

4) Moved result-reading from ControllerContext#writeReports to new QueryListener
   interface, which ControllerImpl feeds results to row-by-row while the query
   is still running. Important so we can read query results from the final
   stage using an in-memory channel.

5) New class ControllerQueryKernelConfig holds configs that control kernel
   behavior (such as whether to pipeline, maximum number of concurrent stages,
   etc). Generated by the ControllerContext.

Second, a refactor towards running workers in persistent JVMs that are able to
cache data across queries. This is helpful because I believe we'll want to reuse
JVMs and cached data for latency reasons.

1) Move creation of WorkerManager and TableInputSpecSlicer to the
   ControllerContext, rather than ControllerImpl. This allows managing workers and
   work assignment differently when JVMs are reusable.

2) Lift the Controller Jersey resource out from ControllerChatHandler to a
   reusable resource.

3) Move memory introspection to a MemoryIntrospector interface, and introduce
   ControllerMemoryParameters that uses it. This makes it easier to run MSQ in
   process types other than Indexer and Peon.

Both of these areas will have follow-ups that make similar changes on the
worker side.

* Address static checks.

* Address static checks.

* Fixes.

* Report writer tests.

* Adjustments.

* Fix reports.

* Review updates.

* Adjust name.

* Small changes.
  • Loading branch information
gianm authored May 1, 2024
1 parent 51104e8 commit 5d1950d
Show file tree
Hide file tree
Showing 137 changed files with 6,981 additions and 2,412 deletions.
12 changes: 12 additions & 0 deletions extensions-core/multi-stage-query/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@
<artifactId>datasketches-memory</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
Expand Down Expand Up @@ -288,6 +293,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,95 +19,95 @@

package org.apache.druid.msq.exec;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;

import javax.annotation.Nullable;
import java.util.List;

/**
* Interface for the controller of a multi-stage query.
* Interface for the controller of a multi-stage query. Each Controller is specific to a particular query.
*
* @see WorkerImpl the production implementation
*/
public interface Controller
{
/**
* POJO for capturing the status of a controller task that is currently running.
*/
class RunningControllerStatus
{
private final String id;

@JsonCreator
public RunningControllerStatus(String id)
{
this.id = id;
}

@JsonProperty("id")
public String getId()
{
return id;
}
}

/**
* Unique task/query ID for the batch query run by this controller.
*
* Controller IDs must be globally unique. For tasks, this is the task ID from {@link MSQControllerTask#getId()}.
*/
String id();

/**
* The task which this controller runs.
*/
MSQControllerTask task();
String queryId();

/**
* Runs the controller logic in the current thread. Surrounding classes provide the execution thread.
*/
TaskStatus run() throws Exception;
void run(QueryListener listener) throws Exception;

/**
* Terminate the query DAG upon a cancellation request.
* Terminate the controller upon a cancellation request. Causes a concurrently-running {@link #run} method in
* a separate thread to cancel all outstanding work and exit.
*/
void stopGracefully();
void stop();

// Worker-to-controller messages

/**
* Accepts a {@link PartialKeyStatisticsInformation} and updates the controller key statistics information. If all key
* statistics have been gathered, enqueues the task with the {@link WorkerSketchFetcher} to generate partiton boundaries.
* This is intended to be called by the {@link ControllerChatHandler}.
*
* @see ControllerClient#postPartialKeyStatistics(StageId, int, PartialKeyStatisticsInformation)
*/
void updatePartialKeyStatisticsInformation(
int stageNumber,
int workerNumber,
Object partialKeyStatisticsInformationObject
);

/**
* Sent by workers when they finish reading their input, in cases where they would not otherwise be calling
* {@link #updatePartialKeyStatisticsInformation(int, int, Object)}.
*
* @see ControllerClient#postDoneReadingInput(StageId, int)
*/
void updatePartialKeyStatisticsInformation(int stageNumber, int workerNumber, Object partialKeyStatisticsInformationObject);
void doneReadingInput(int stageNumber, int workerNumber);

/**
* System error reported by a subtask. Note that the errors are organized by
* taskId, not by query/stage/worker, because system errors are associated
* with a task rather than a specific query/stage/worker execution context.
*
* @see ControllerClient#postWorkerError(String, MSQErrorReport)
*/
void workerError(MSQErrorReport errorReport);

/**
* System warning reported by a subtask. Indicates that the worker has encountered a non-lethal error. Worker should
* continue its execution in such a case. If the worker wants to report an error and stop its execution,
* please use {@link Controller#workerError}
*
* @see ControllerClient#postWorkerWarning(List)
*/
void workerWarning(List<MSQErrorReport> errorReports);

/**
* Periodic update of {@link CounterSnapshots} from subtasks.
*
* @see ControllerClient#postCounters(String, CounterSnapshotsTree)
*/
void updateCounters(String taskId, CounterSnapshotsTree snapshotsTree);

/**
* Reports that results are ready for a subtask.
*
* @see ControllerClient#postResultsComplete(StageId, int, Object)
*/
void resultsComplete(
String queryId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;

Expand All @@ -43,6 +44,21 @@ void postPartialKeyStatistics(
PartialKeyStatisticsInformation partialKeyStatisticsInformation
) throws IOException;

/**
* Client side method to tell the controller that a particular stage and worker is done reading its input.
*
* The main purpose of this call is to let the controller know when it can stop running the input stage. This helps
* execution roll smoothly from stage to stage during pipelined execution. For backwards-compatibility reasons
* (this is a newer method, only really useful when pipelining), this call should be skipped if the query is not
* pipelining stages.
*
* Only used when {@link StageDefinition#doesSortDuringShuffle()} and *not*
* {@link StageDefinition#mustGatherResultKeyStatistics()}. When the stage gathers result key statistics, workers
* call {@link #postPartialKeyStatistics(StageId, int, PartialKeyStatisticsInformation)} instead, which has the same
* effect of telling the controller that the worker is done reading its input.
*/
void postDoneReadingInput(StageId stageId, int workerNumber) throws IOException;

/**
* Client-side method to update the controller with counters for a particular stage and worker. The controller uses
* this to compile live reports, track warnings generated etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,44 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.server.DruidNode;

/**
* Context used by multi-stage query controllers.
*
* Useful because it allows test fixtures to provide their own implementations.
* Context used by multi-stage query controllers. Useful because it allows test fixtures to provide their own
* implementations.
*/
public interface ControllerContext
{
ServiceEmitter emitter();
/**
* Configuration for {@link org.apache.druid.msq.kernel.controller.ControllerQueryKernel}.
*/
ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, QueryDefinition queryDef);

/**
* Callback from the controller implementation to "register" the controller. Used in the indexing task implementation
* to set up the task chat web service.
*/
void registerController(Controller controller, Closer closer);

/**
* JSON-enabled object mapper.
*/
ObjectMapper jsonMapper();

/**
* Emit a metric using a {@link ServiceEmitter}.
*/
void emitMetric(String metric, Number value);

/**
* Provides a way for tasks to request injectable objects. Useful because tasks are not able to request injection
* at the time of server startup, because the server doesn't know what tasks it will be running.
Expand All @@ -51,32 +71,33 @@ public interface ControllerContext
DruidNode selfNode();

/**
* Provide access to the Coordinator service.
* Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} into {@link SegmentsInputSlice}.
*/
CoordinatorClient coordinatorClient();
InputSpecSlicer newTableInputSpecSlicer();

/**
* Provide access to segment actions in the Overlord.
* Provide access to segment actions in the Overlord. Only called for ingestion queries, i.e., where
* {@link MSQSpec#getDestination()} is {@link org.apache.druid.msq.indexing.destination.DataSourceMSQDestination}.
*/
TaskActionClient taskActionClient();

/**
* Provides services about workers: starting, canceling, obtaining status.
*
* @param queryId query ID
* @param querySpec query spec
* @param queryKernelConfig config from {@link #queryKernelConfig(MSQSpec, QueryDefinition)}
* @param workerFailureListener listener that receives callbacks when workers fail
*/
WorkerManagerClient workerManager();

/**
* Callback from the controller implementation to "register" the controller. Used in the indexing task implementation
* to set up the task chat web service.
*/
void registerController(Controller controller, Closer closer);
WorkerManager newWorkerManager(
String queryId,
MSQSpec querySpec,
ControllerQueryKernelConfig queryKernelConfig,
WorkerFailureListener workerFailureListener
);

/**
* Client for communicating with workers.
*/
WorkerClient taskClientFor(Controller controller);
/**
* Writes controller task report.
*/
void writeReports(String controllerTaskId, TaskReport.ReportMap reports);
WorkerClient newWorkerClient();
}
Loading

0 comments on commit 5d1950d

Please sign in to comment.