Skip to content

Runtime Operator Protocol

Paul Rogers edited this page Nov 30, 2016 · 6 revisions

Runtime Operator Protocol

Drill's execution model consists of a large variety of runtime operators assembled by the planner to implement a specific SQL query. Operators use the Volcano model: each is an iterator that returns records from a call to a next() method. The devil, as they say, is in the details. We explore those details here.

Each operator implements some relational operator: scan, sort, join, broadcast, etc. The Volcano iterator approach provides a unified iterator model implemented by all operators. No matter whether the operator reads data, sorts, or performs aggregations, it implements the same basic iterator model. Here we focus on that iterator model, not on the unique behavior of each operator.

Operator Creation

Each operator has a Creator class. For example, FilterRecordBatch has a FilterBatchCreator. The creator builds the operator. The operator constructor performs one-time initial setup. The key fact to understand is that the constructor does not have visibility to the schema of the data, hence the constructor cannot do setup that requires this information. Most operators therefore have some internal state (sometimes explicit as an enum, sometimes implicit via some variable) to know that they are in the "have not yet seen a schema" state.

Operator Protocol

As [Runtime Model|described elsewhere], operators are implemented in a bit of a non-intuitive way. The term "operator" in Drill is what we might call an "operator definition": the information that describes the operator. The actual runtime operator is called a "record batch" in Drill. All such runtime operators derive from RecordBatch. The basic (highly simplified) protocol is:

public interface RecordBatch {
  public static enum IterOutcome { NONE, OK_NEW_SCHEMA, OK, STOP };

  public IterOutcome next();
  public BatchSchema getSchema();
  public VectorContainer getOutgoingContainer();
}

The actual code is somewhat more complex, but contains thorough comments that you should read for the details.

The next() Method

The heart of the protocol is the next() method. The theory in Volcano is simple: each call returns a record until all records are read. In Drill, the operation is a bit more complex because operators return batches of records (as value vectors), not individual records. Drill also allows the schema to vary as the query runs, and handles error cases. This results in the protocol explained here.

First, note that next() returns a variety of exit codes:

  • OK_NEW_SCHEMA: Returned a schema (and optionally a record batch). Returned (ideally) each time the schema from this call to next() differs from that of previous calls.
  • OK: Returned a record batch (which always includes a schema). The schema is the same as that from the previous call.
  • DONE: No data returned, end of data. Equivalent to an EOF from a file reader.
  • STOP: Error condition: stop processing.

Running the Operator Tree

The FragmentExecutor runs the fragment which consists of a tree of operators, one of which is the root. The fragment executor calls next() on the root fragment to start execution.

          while (shouldContinue() && root.next()) {
            // loop
          }

next(): Coming and Going

When discussing the behavior of next(), we have to consider two views:

  • The "consumer" the bit of code (usually an operator) that calls next() and handles the results.
  • The "producer" that implements the next() method.

We discuss both views below.

First Batch

The next() call propagates down the tree (the order is highly dependent on the particular type of operator). For any given operator, it will eventually see a first call to next().

At this point the operator does not know the data schema. Therefore, the operator must call next() on its own input in order to get the first batch. (That call may, in turn, cascade down the operator tree until it reaches a leaf: a scanner or a network receiver.) Once we have a schema, the operator can complete initialization:

  • Call next() on the input to get a first batch.
  • Initialize the present operator based on the returned schema.
  • Process the record batch.

That is, the first next() both initializes and processes records the same way that subsequent next() calls will.

The operator now must consider what to do based on the return value from it's input next(). For example:

  • OK: Indicates that the child (input) operator returned a batch of records (along with a schema.) Since this is the first batch, the present operator must usually do some form of setup which often involves generating code based on the schema.
  • OK_NEW_SCHEMA: In theory, the input should return the OK_NEW_SCHEMA status each time the schema changes, including the first time. In practice, the first batch seems to be returned (for some operators) as simply OK. Operators contain code to handle this ambiguity.
  • DONE: It could be that the query has no data at all: as scanner read an empty file, a filter removed all records, etc. In this case, the very first call to the input next() can return DONE, indicating that no data is available.
  • STOP: Indicates that an error occurred and that the operator should stop processing and exit.

Each operator processes the first batch differently. A filter will process the one batch; a sort will read all its incoming batches before returning from the first next(). In general, the return values are the above, but seen from the consumer's perspective:

  • OK_NEW_SCHEMA: Should be returned from the first next() call for successful results. Note that the actual results may be empty if all rows in the batch were filtered away.
  • DONE: No data from the query. Either no data was received from input, or this operator discarded all the data.
  • STOP: An error occurred.

Subsequent next() Calls

Consumers handle subsequent calls to next() work similarly to the first. The first call returned a schema and caused stop to occur. Since Drill has late schema binding, schemas may change. Thus any call to next() may return a new schema, requiring new initialization (assuming that the consumer can handle schema changes.) Expected return codes are thus:

  • OK: Indicates another batch with the same schema as the previous one.
  • OK_NEW_SCHEMA: Indicates a schema change (with optional data).
  • DONE: End of data.
  • STOP: Error condition.

Again, the operator must return a status as a producer using the same codes as above:

  • OK: Indicates another batch with the same schema as the previous one.
  • OK_NEW_SCHEMA: Indicates this operator encountered a schema change (with optional data).
  • DONE: End of data.
  • STOP: Error condition.

End of Data

Error Handling

Normal Close

Error Close

Clone this wiki locally