-
Notifications
You must be signed in to change notification settings - Fork 982
BH Operator Framework
Now that we've seen the main event, we are ready to turn to the after-party: the changes made to allow the scan operator to use the result set loader. We start with a revised operator framework.
Like all query engines, Drill has the concept of an "operator". Each operator has several representations: the plan-time "physical" operator, the run-time physical implementation and so on.
When first building a system, it is often not clear exactly how to partition the system. Rather than making it perfect the first time, often we take our best guess, knowing we can refactor the system later based on what we learned.
Drill's first cut at the operator implementation turned out to combine the idea of an operator and the record batch on which the operator does its work. The result was the RecordBatch
concept which is, in essence, an operator implementation, but has the name of the data that the operator manipulates. The result is that each operator ("record batch") is quite complex because each takes on many tasks.
Another consequence of the existing design is that, to test an operator, it must sit within an operator stack which must be bound to a fragment context, which is bound to a drillbit context. The result is that, to test any one operator, we need a full Drill server (or a bunch of mocks.) This arrangement throws sand in the gears when trying to create comprehensive unit tests. (It is very hard to set up a specific test case if one must do so at the level of an entire query.)
The work needed to revise the scan operator provided an opportunity to try a different approach. The operator framework is not absolutely required, but it did turn out to be a very simple way to design, implement and test the revised scan operator. The team can decide if this approach is useful for other operators.
Given the above background, we can identify the requirements that the revised operator framework attempts to satisfy:
- Enable true unit testing (using the
SuboperatorContext
and related mechanisms.) - Divide responsibilities into well-defined units, then combine them via composition.
- Point the way to a simplified Drill iterator protocol.
- Maintain compatibility with the current Drill iterator protocol.
The operator framework divides into four components: two internal components and one wrapper.
-
OperatorRecordBatch
is the wrapper class that implements the Drill iterator protocol. -
OperatorExec
defines an implementation that the "guts" of the operator implements. -
BatchAccessor
wraps the actual batch of records, tracks schema evolution, and implements the data-related methods for the Drill iterator protocol.
The operator exec interface distills the key tasks that an operator must perform. The OperatorRecordBatch
handles the translation from Drill iterator protocol events and states into the "core" operator actions. Contents of the interface:
public void bind(OperatorContext context);
BatchAccessor batchAccessor();
boolean buildSchema();
boolean next();
void cancel();
void close();
}
The code contains detailed comments which were omitted here for brevity. From the above, we can see the lifecycle:
- Construct the operator, binding it to the context.
- Ask the operator to build its schema by calling `buildSchema(). (This is based on the "fast schema" model: that the first batch should contain schema only, no data.)
- Return the (empty) batch with schema from
batchAccessor()
. - Call
next()
to produce each new row, again delivered viabatchAccessor()
. - If the operator should stop what it is doing, call
cancel()
. - At the end (regardless of cancellation or not), call
close()
.
To drive the point home, it is the job of the OperatorRecordBatch
to accept calls from the parent, to track internal state, and to translate return value here into IterOutcome
states.
This implementation makes some assumptions:
- All errors are reported by throwing a
UserException
. (The wrapper translates other exceptions into aUserException
.) - Out-of-memory errors are hard errors. If an operator wants to manage memory, it should do so.
The current iteration of the code is designed or the scan operator which reads data from an external source. Internal operators read data from another operator. To make that work, we need to wrap the child operator. The class OperatorDriver
takes a stab at this task by providing a simple interface on top of the Drill iterator protocol. However, obviously, this class has not been put to the test in an internal (non-scan) operator, so "some assembly required" to evolve this to work.
The Drill iterator protocol requires that operators return OK_NEW_SCHEMA
for the first batch, and any batch with a different schema, or OK
otherwise. Most operators have rather complex code to work this out. Some have a "schema changed" flag to track schema change.
There is a simpler solution, implemented in this framework. Wrap the outgoing batch in a class that checks the schema against the previous. Return the correct status based on the answer.
This mechanism, SchemaTracker
also maintains a version number to avoid the ambiguity of the "schema changed" (since when?) flag.
The primary reason for this work is unit testing. The bells and whistles, and simper code, is a side effect. The goal is that the OperatorExec
implementation can be tested in a true unit test. Simply use the RowSet
mechanism to build up some input batches. Tell the operator to process them. Then, get an output batch and compare it with the expected results.
With this approach, we can create a wide variety of test cases that exercise all types, data patterns and so on. We can do this without the need to create input files, to deal with the limitations of each reader, and so on. For example, JSON cannot create INT
vectors. Parquet cannot create decimal types. But, by creating unit tests, we can easily create all these types.
Here is is worth pointing out why we want to do unit test. We do them because, once we've tested an operator thoroughly, we can move on to work on other things. If we can only test at the system level, we tend to skip cases (because we can't easily set them up), and let Drill users do the testing in production. When something fails, it takes far longer to track it down and fix it than it would have taken to write the unit tests.