-
Notifications
You must be signed in to change notification settings - Fork 981
Storage Plugin Planning Interface
Once configuration is complete, the plugin is available to users to use in queries. The first step in each query is for the planner to resolve the table name. We first describe the process, then describe the classes that the storage plugin must provide.
Drill looks for the schema name space portion of the table name: the myPlugin
in the earlier example. (The name may also be set using a USE
statement.)
Drill uses that name to look up the storage plugin configuration in the schema name space. The schema name space is formed from the storage plugin configurations, workspace configurations, and a few pre-defined items.
Drill uses the class of the storage plugin configuration as a key to locate the constructor for the storage plugin itself using a mapping from storage plugin configuration class to the storage plugin constructor as described above.
Drill creates an instance of the storage plugin (double-check) to use for the query, passing the storage plugin instance an instance of the plugin configuration. Actually, Drill will create multiple instances as planning proceeds.
In addition to the schema defined by the storage plugin configuration, the storage plugin can define an additional schema. (Or, perhaps it must redefine the schema given by the configuration?) To do this, Drill calls the registerSchemas
method of the plugin. This method creates an instance of a schema which implements the Calcite Schema
interface and adds it to the schemas registered for this plugin. (The schemas are registered in Calcite, not in the plugin itself.)
Next, the planner must resolve the actual table name within the schema defined by the storage plugin configuration. (This seems redundant since we had to resolve the schema to get the storage plugin (configuration) in the first place...) Calcite calls the getTable()
method on the schema object to resolve the table name from the query into a Calcite Table
object, typically using the DynamicDrillTable
class which extends Table
. DynamicDrillTable
holds a list of Jackson-serializable objects which the plugin can retrieve later by deserializing the serialized form of the table data.
The planner now must move from a table definition to a scan operator (definition) for a scan of that table. Tables in Drill usually resolve to a directory of files, or a large file with distributed blocks. To handle this, the planner defines two kinds of scans:
- A group (or logical) scan that operates on the logical table referenced in the query.
- A group of sub (or physical) scans that read partitions (chunks, pages, blocks, groups) of the logical table.
The terminology is a bit confusing. A "group scan" is the logical scan of the whole table. A "sub-scan" is a scan of a partition. In some contexts, both the "group scan" and "sub scan" are both called physical scans. This is confusing because the logical scan is not a physical scan (the logical table is often just a concept, not an actual file...)
In any event, the first step is to get the group (logical) scan. The planner to resolves the table into a group scan plan (definition) by calling the getPhysicalScan
method in the plugin class, providing it with the user name (for security checks), a place to obtain the deserialized table hints created above, and a list of columns that the user selected from the table. (Though, strangely, for a SQL query, the list of columns is always just *
, even if the SELECT statement names specific columns.) The getPhysicalScan()
method returns a "group scan" which is a definition of the scan of the table as a whole. (Presumably the "group" refers to the fact that this object represents a group of scans...)
The actual list of SELECT
class columns is provided using a two-part process:
- First, the planner calls
canPushdownProjects
on the group scan, asking if the scan handles projection (can do anything with the list of columns.) - If the return value is
true
, the planner callsclone
, including the list of columns, to produce a new copy of the group scan operator that includes the column set. (The scan operator is not responsible to return just the projected columns.)
The clone operation may occur multiple times.
The planner calls getNewWithChildren
to make copy of the node (for reasons not yet clear.) For a scan, the provided child list will always be empty. Again, this may occur multiple times.
Once the group scan is negotiated, the next step is to negotiation the sub-scans. This starts with a call to getMaxParallelizationWidth
to discover the maximum number of scanners. The planner then determines the actual number and provides the scan Drillbits (decided upon by a process that needs research) by calling applyAssignments
with the Drillbit list.
Next, for each minor fragment (thread of execution), the planner calls getSpecificScan
with the minor fragment ID. The method returns the definition of the sub scan that will implement each of the actual physical scans for that minor fragment. Each scan operator may scan multiple table partitions if necessary to scan the entire table in the number of fragments available. (Again, it is bit vague at present how the number of fragments is calculated, and how that information is passed to the group scan.)
Drill uses the Calcite "Volcano" planner (so called because it is based on a paper that describe the Volcano research project.) Volcano is a rules-based engine that works on immutable trees of operators. Scans are the leaf nodes. As Volcano works, it recreates the planning tree, including the leaf scans. Hence, the group and sub scan nodes can expect to be copied many times during a single planning session.
Volcano is a cost-based planner. The group scan must report its estimated cost via the following:
public ScanStats getScanStats() {
return ScanStats.TRIVIAL_TABLE;
}
It seems that some (all?) scans use the TRIVIAL_TABLE
constant. (Need more info about how to provide a real estimate.)
The move from a group scan to a sub scan represents a move from a logical scan of a logical table to a set of physical scans on partitions of the physical table. The question arises, how many sub scans are possible and available. Factors considered are:
- How many sub scans are possible for a group scan:
getMaxParallelizationWidth()
. - Which nodes are available:
applyAssignments()
. - How many threads of execution (minor fragments) are available per node?
applyAssignments()
. - Given the above, which sub scan(s) should be done per minor fragment:
getSpecificScan()
.
Some of the work is done by the planner (determine the set of nodes, the maximum parallelization per nodes). Some is done by the group scan (reporting maximum possible parallelization, deciding how to schedule sub scans on the available nodes.)
Two key methods control this process.
applyAssignments()
provides a table of (minor fragment id, Drillbit endpoint) pairs. Since minor fragment IDs start from 0 and monotonically increase, the minor fragment id is implicit as the index into a List<DrillbitEndpoint> endpoints
. That is, you may see the list contain (Endpoint1, Endpoint1, Endpoint2, Endpoint3). This means that minor fragments 0 and 1 are both on Endpoint1, minor fragment 2 is on Endpoint2 and minor fragment 3 is on Endpoint 3.
The group scan must decide how to distribute table partitions to the available minor fragments given the node on which they run. The group scan must consider affinity (which node has which data) as well as load balancing (don't schedule all scans on the same node, even if that is where the data resides; it may be better to do network reads than overload the node.)
The group scan has some number of table partitions, n as reported by getMaxParallelizationWidth()
. In the ideal case, applyAssignments()
will provide this number of threads (minor fragments) to scan those partitions. In practice, however, the number of threads may be smaller than the number of partitions. In this case, the group scan must run multiple scans within a single thread. In terms of Drill, this means scheduling multiple physical scans per to a single sub scan.
This decision is communicated to the planner via the getSpecificScan()
method. Assuming that the group scan did the necessary work in response to applyAssignments()
, the call to getSpecificScan()
simply reads off the decisions already made. getSpecificScan()
returns a sub scan. The sub scan can contain a plugin-specific description of multiple physical scans to be done in the sub scan. (For example, for a table given by a directory of files, a sub scan may include scans of 10 of the files.) Each partition scan will give rise (if using the standard framework) to a RecordReader
instance as described below.
Planning starts by resolving the table name within the plugin. This involves three storage plugin classes:
- The storage plugin class implements
registerSchemas
to register its schema(s). - The schema class is an extension-specific subclass of
AbstractSchema
. - The schema class implements
getTable
to return an extension-specific representation of the table. - The table class decorates a Drill subclass of
Table
to hold extension-specific information about the table.
Because the planner serializes, then deserializes, the table information within the planning process, the extension cannot simply add fields to the Table
class. Instead, the extension creates a Jackson-serializable class to hold the information. A list of these objects (usually just one) is attached to the DynamicDrillTable
.
Planning then moves onto the scan.
The planner serializes the table information, then calls getPhysicalScan
on the storage plugin to get a group scan operator. The getPhysicalScan
implementation uses Jackson to deserialize the JSON table data, recovering the information provided in the getTable
method above.
The planner then uses that table information to create an extension-specific, Jackson-serializable extension of the AbstractGroupScan
class. This class may hold the same table information from above, or may hold additional (or different) scan-specific information.
As noted above, the planner calls a number of methods on the group scan to negotiate aspects of the scan. The group scan can accumulate information as work proceeds and (so it seems) that information need not be stored in Jackson-serializable form if used just within the planner. The state information must be copied, however, in the various calls to clone
or getWithChildren
.
The planner finally calls getSpecificScan
to get a sub scan. This method returns another extension-specific class, this one derives from AbstractSubScan
and is also Jackson-serializable. Since the sub scan is serialized and sent to each Drillbit, all of its state must be serializable. Think of this class as the description of the subscan sent from Foreman to execution Drillbit, leaving all other intermediate state behind.
Drill uses Jackson serialization to serialize a plan into JSON for transmission to each Drillbit. This means that each sub scan node must be serializable: it should hold no pointers to non-serialized objects. Any objects referenced in the sub scan must, themselves, be Jackson serializable. For example, the sub scan will likely identify the table, the partition within the table and possibly the list of selected columns. All these must be serializable.
Less obviously, Drill also uses Jackson serialization within the planning process, though no planning nodes are transferred between processes. For example, the group scan is entirely a planning concept, yet it is defined (for some? many? one?) plugin as Jackson serializable. Note, however, that the group node, in general, contains structures that are not easily serialized such as the computations to allocate table partitions to sub scans. Debugging sessions have not encountered a case where the group scan is, in fact, serialized, so perhaps the use of serialization is an artifact of an earlier design.
In addition, Drill uses Jackson serialization to describe tables. When the planner calls AbstractSchema.getTable
, the result is a DynamicDrillTable
which holds onto a list of plugin-specific data in the form of a Jackson-serialized list of objects. The plugin, in the group scan, must deserialize these objects in the getPhysicalScan
method. Presumably this is done because the table information will also appear in a sub scan where it must be serializable, so the same mechanism is used in the planner.