-
Notifications
You must be signed in to change notification settings - Fork 982
EVF Tutorial
This tutorial shows how to use the Extended Vector Framework to create a simple format plugin. The EVF framework has also been called the "row set framework" and the "new scan framework". Here we focus on using the framework. Other pages in this section provide background information for when you need features beyond those shown here.
The Drill log plugin is the focus of this tutorial. A simplified version of this plugin is explained in the Learning Apache Drill book. The version used here is the one which ships with Drill.
Most format plugins are based on the "easy" framework. EVF still uses the "easy" framework, but the implementation differs.
"Legacy" plugins are based on the idea of a "record reader" (a concept borrowed from Hive.) Unlike the hive record readers, Drill's never read a single record: they all read a batch of records. In EVF, the reader changes to be a "row batch reader" which implements a new interface.
In Drill 1.16 and earlier, the LogRecordReader
uses a typical method to write to value vectors using the associated Mutator
class.
Other readers are more clever: the "V2" text reader (Drill 1.16 and earlier) worked with direct memory itself, handling its own buffer allocation, offset vector calculations and so on.
With the EVF, we'll replace the Mutator
with a ColumnWriter
. We'll first do the simplest possible conversion, then look at how to use advanced features, such as type conversions, schema and table properties.
Let's work though the needed changes one-by-one.
Prior to the EVF, Easy format plugins were based on the original ScanBatch
. At the start of execution, the plugin creates a set of record readers which are passed to the scan batch. With EVF, we use the new scan framework. The new framework focuses on batches, and uses a new type of reader called a "batch reader." We provide a factory method to create batch readers on the fly. The batch reader itself does what the old record reader used to do, but using the EVF.
Without the EVF, plugins must pass a bunch of options to the EasyFormatPlugin
base class, and must define a number of method to further define behavior. Here is the Drill 1.16 LogFormatPlugin
version:
public LogFormatPlugin(String name, DrillbitContext context,
Configuration fsConf, StoragePluginConfig storageConfig,
LogFormatConfig formatConfig) {
super(name, context, fsConf, storageConfig, formatConfig,
true, // readable
false, // writable
true, // blockSplittable
true, // compressible
Lists.newArrayList(formatConfig.getExtension()),
DEFAULT_NAME);
this.formatConfig = formatConfig;
}
@Override
public boolean supportsPushDown() {
return true;
}
@Override
public int getReaderOperatorType() {
return UserBitShared.CoreOperatorType.REGEX_SUB_SCAN_VALUE;
}
@Override
public int getWriterOperatorType() {
throw new UnsupportedOperationException("unimplemented");
}
@Override
public boolean supportsStatistics() {
return false;
}
So, first step is to convert this to the EVF version:
public LogFormatPlugin(String name, DrillbitContext context,
Configuration fsConf, StoragePluginConfig storageConfig,
LogFormatConfig formatConfig) {
super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
}
private static EasyFormatConfig easyConfig(Configuration fsConf, LogFormatConfig pluginConfig) {
EasyFormatConfig config = new EasyFormatConfig();
config.readable = true;
config.writable = false;
config.blockSplittable = true;
config.compressible = true;
config.supportsProjectPushdown = true;
config.extensions = Lists.newArrayList(pluginConfig.getExtension());
config.fsConf = fsConf;
config.defaultName = DEFAULT_NAME;
config.readerOperatorType = CoreOperatorType.REGEX_SUB_SCAN_VALUE;
return config;
}
The other methods shown earlier (and a few more "boilerplate methods") can be removed as they're now handled by the base class based on the "config" created above. The only other method that should still exist is getRecordReader()
.
This change is orthogonal to the other changes: we should now be able to compile and run the unit tests for the plugin.
For this plugin, we can also remove the formatConfig
member, since the base class already tracks that for us.
This is also a good time to tidy up the plugin name. In LogFormatPlugin
change
public static final String DEFAULT_NAME = "logRegex";
to
public static final String PLUGIN_NAME = "logRegex";
Then, in LogFormatConfig
, change
@JsonTypeName("logRegex")
to
@JsonTypeName(LogFormatPlugin.PLUGIN_NAME)
The next step is to create the actual new reader. We'll do so as an "orphan" class so that the plugin continues to use the old record reader. We'll then swap in the new one when we're read. Let's first review how the existing reader works.
Create a new class:
public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
Now might be a good time to check out the ManagedReader
interface. There are only three methods to implement and doing so will be easier if we are familiar with the interface.
Copy the existing constructor:
public LogRecordReader(FragmentContext context, DrillFileSystem dfs,
FileWork fileWork, List<SchemaPath> columns, String userName,
LogFormatConfig formatConfig) {
Modify it to remove the arguments not needed with the EVF:
public LogBatchReader(DrillFileSystem dfs,
FileWork fileWork, String userName,
LogFormatConfig formatConfig) {
this.dfs = dfs;
this.fileWork = fileWork;
this.userName = userName;
this.formatConfig = formatConfig;
this.unmatchedColumnIndex = -1;
this.maxErrors = formatConfig.getMaxErrors();
if (maxErrors < 0) {
throw UserException
.validationError()
.message("Max Errors must be a positive integer greater than zero.")
.build(logger);
}
}
The context
and columns
are handled by EVF for us.
Next, let's implement the open()
method. Create a stub:
@Override
public boolean open(FileSchemaNegotiator negotiator) {
return false;
}
The caller gives us a "schema negotiator" which we'll discuss later. The method returns true
if there is data to read, false
if we can tell immediately that there is no data available. Errors are indicated by unchecked exceptions, preferably in the form of a UserException
that Drill can forward to the user with a clear description of the issue.
To figure out what we need to do, let's look at the existing LogRecordReader
implementation:
@Override
public void setup(final OperatorContext context, final OutputMutator output) {
this.outputMutator = output;
setupPattern();
openFile();
setupProjection();
defineVectors();
}
Of the above, we only need the first two methods, so let's copy them over, along with the variables that they require.
@Override
public boolean open(FileSchemaNegotiator negotiator) {
setupPattern();
openFile();
return true;
}
We omitted the setupProjection()
and defineVectors()
methods from the old version. This gets us to the key difference between "old school" and EVF: EVF handles projection and vector creation for us; we just have to tell it what we want. Here'w how the old code handled the wildcard (*, project all) case:
private void projectAll() {
List<String> fields = formatConfig.getFieldNames();
for (int i = fields.size(); i < capturingGroups; i++) {
fields.add("field_" + i);
}
columns = new ColumnDefn[capturingGroups];
for (int i = 0; i < capturingGroups; i++) {
columns[i] = makeColumn(fields.get(i), i);
}
}
Under EVF, the framework itself handles projection, but we need to tell it the columns we can provide. We can reuse the above logic to help us. We must:
- Create a
TupleMetadata
schema of the available columns. We can use theSchemaBuilder
to help. (There are two classes of that name, be sure to use the correct one.) - Adjust the
ColumnDefn
to use the EVFColumnWriter
objects.
The log reader uses a ColumnDefn
class to define a column. We'll reuse these classes with minor changes. Let's start by changing how we define columns:
private abstract static class ColumnDefn {
...
public abstract void define(OutputMutator outputMutator) throws SchemaChangeException;
}
Replace the above method with:
public abstract void define(SchemaBuilder builder);
Here is how we define the "reader schema" (the schema that the reader can provide):
@Override
public boolean open(FileSchemaNegotiator negotiator) {
...
negotiator.setTableSchema(defineSchema(), true);
return true;
}
private TupleMetadata defineSchema() {
...
columns = new ColumnDefn[capturingGroups];
SchemaBuilder builder = new SchemaBuilder();
for (int i = 0; i < capturingGroups; i++) {
columns[i] = makeColumn(fields.get(i), i);
columns[i].define(builder);
}
return builder.buildSchema();
}
The true
argument to setTableSchema()
says that the schema is "complete": we won't be discovering any new columns as the read proceeds. (Other readers might know of no columns at open time and discover all at read time, others may do a combination.)
We can reuse the existing makeColumn()
method as-is, so let's just copy it across. The IDE will complain that none of the column definition classes exist; we'll fix that later.
We've told the schema negotiator our schema. We can now as the EVF to build the result set loader that will handle all the grunt tasks of creating batches:
private ResultSetLoader loader;
@Override
public boolean open(FileSchemaNegotiator negotiator) {
...
negotiator.setTableSchema(defineSchema(), true);
loader = negotiator.build();
Now we can modify the column definition to hold onto the column writer instead of a mutator:
private abstract static class ColumnDefn {
...
private ScalarWriter colWriter;
...
public void bind(TupleWriter rowWriter) {
colWriter = rowWriter.scalar(index);
}
The EVF uses a set of JSON-like writers. TupleWriter
manages the entire row. (The same writer handles maps, which is why it is called a "tuple" writer.)
From a tuple we can reference column writers by either name or position. Since the ColumnDefn
knows its column index, we just use that value.
The log writer only works with "scalars": strings, numbers, dates. We store the scalar column writer on the ColumnDefn
base class for convenience.
We now need something to call our new bind()
method:
@Override
public boolean open(FileSchemaNegotiator negotiator) {
...
loader = negotiator.build();
bindColumns(loader.writer());
...
private void bindColumns(RowSetLoader writer) {
for (int i = 0; i < capturingGroups; i++) {
columns[i].bind(writer);
}
}
We are now ready to tackle the changes to the concrete column definition classes. Let's start with the VarCharDefn
class. Here is the existing implementation:
private static class VarCharDefn extends ColumnDefn {
private NullableVarCharVector.Mutator mutator;
public VarCharDefn(String name, int index) {
super(name, index);
}
@Override
public void define(OutputMutator outputMutator) throws SchemaChangeException {
MaterializedField field = MaterializedField.create(getName(),
Types.optional(MinorType.VARCHAR));
mutator = outputMutator.addField(field, NullableVarCharVector.class).getMutator();
}
@Override
public void load(int rowIndex, String value) {
byte[] bytes = value.getBytes();
mutator.setSafe(rowIndex, bytes, 0, bytes.length);
}
}
Changes:
- Remove the
define()
method. - Add the new
getSchema()
method. - Change
load()
to use the column writer. - Remove the unused mutator.
Here is the new version:
private static class VarCharDefn extends ColumnDefn {
public VarCharDefn(String name, int index) {
super(name, index);
}
@Override
public void define(SchemaBuilder builder) {
builder.addNullable(getName(), MinorType.VARCHAR);
}
@Override
public void load(int rowIndex, String value) {
colWriter.setString(value);
}
}
We use the setString()
method of the writer to set the string. Note that we no longer need to specify the row position; the EVF tracks that for us. Later, we can go ahead and remove the rowIndex
argument.
The class is getting to be pretty light-weight. We could even remove it completely. But, to keep things simple, let's just keep it for now.
Next, we make the same changes for the other column defns; not shown here for brevity. You can see the new form in [this branch](need link) NEED LINK. Some things to note:
- The
TINYINT
,SMALLINT
andINT
types all use thesetInt()
method to set values. - The
FLOAT4
andFLOAT8
types use thesetDouble()
method to set values. -
DATE
,TIME
andTIMESTAMP
can use Joda (not Java 8 date/time) objects orsetLong()
to set values.
Regardless of the Java type used to set the value, the underlying vector type is the one you request.
We'll now start making some breaking changes; the code won't build during the next several steps.
The Drill 1.16 plugin creates record readers as follows:
@Override
public RecordReader getRecordReader(FragmentContext context,
DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns,
String userName) throws ExecutionSetupException {
return new LogRecordReader(context, dfs, fileWork,
columns, userName, formatConfig);
}
The EVF uses batch readers, created using a batch reader creator:
((Insert code))
The LogBatchReader
doesn't exist yet; so our next step is to create it from our record reader.
Rename the existing record reader from LogRecordReader
to LogBatchReader
. Then, change the base class:
public class LogRecordReader extends AbstractRecordReader {
To
public class LogBatchReader extends ManagedReader<FileSchemaNegotiator> {
The FileSchemaNegotiator
is a new concept: we will use it to define the schema of our file as we learn the schema on read.