Skip to content
Paul Rogers edited this page May 17, 2019 · 16 revisions

This tutorial shows how to use the Extended Vector Framework to create a simple format plugin. The EVF framework has also been called the "row set framework" and the "new scan framework". Here we focus on using the framework. Other pages in this section provide background information for when you need features beyond those shown here.

The Log Plugin

The Drill log plugin is the focus of this tutorial. A simplified version of this plugin is explained in the Learning Apache Drill book. The version used here is the one which ships with Drill.

Plugin Design

Most format plugins are based on the "easy" framework. EVF still uses the "easy" framework, but the implementation differs.

"Legacy" plugins are based on the idea of a "record reader" (a concept borrowed from Hive.) Unlike the hive record readers, Drill's never read a single record: they all read a batch of records. In EVF, the reader changes to be a "row batch reader" which implements a new interface.

In Drill 1.16 and earlier, the LogRecordReader uses a typical method to write to value vectors using the associated Mutator class.

Other readers are more clever: the "V2" text reader (Drill 1.16 and earlier) worked with direct memory itself, handling its own buffer allocation, offset vector calculations and so on.

With the EVF, we'll replace the Mutator with a ColumnWriter. We'll first do the simplest possible conversion, then look at how to use advanced features, such as type conversions, schema and table properties.

Let's work though the needed changes one-by-one.

Simplify the Plugin Definition

Without the EVF, plugins must pass a bunch of options to the EasyFormatPlugin base class, and must define a number of method to further define behavior. Here is the Drill 1.16 LogFormatPlugin version:

public LogFormatPlugin(String name, DrillbitContext context,
                         Configuration fsConf, StoragePluginConfig storageConfig,
                         LogFormatConfig formatConfig) {
    super(name, context, fsConf, storageConfig, formatConfig,
        true,  // readable
        false, // writable
        true, // blockSplittable
        true,  // compressible
        Lists.newArrayList(formatConfig.getExtension()),
        DEFAULT_NAME);
    this.formatConfig = formatConfig;
  }

  @Override
  public boolean supportsPushDown() {
    return true;
  }

  @Override
  public int getReaderOperatorType() {
    return UserBitShared.CoreOperatorType.REGEX_SUB_SCAN_VALUE;
  }

  @Override
  public int getWriterOperatorType() {
    throw new UnsupportedOperationException("unimplemented");
  }

  @Override
  public boolean supportsStatistics() {
    return false;
  }

So, first step is to convert this to the EVF version which uses a "config" object (different than the JSON-serialized format config) to specify the properties of the Easy format plugin:

  public LogFormatPlugin(String name, DrillbitContext context,
                         Configuration fsConf, StoragePluginConfig storageConfig,
                         LogFormatConfig formatConfig) {
    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
  }

  private static EasyFormatConfig easyConfig(Configuration fsConf, LogFormatConfig pluginConfig) {
    EasyFormatConfig config = new EasyFormatConfig();
    config.readable = true;
    config.writable = false;
    config.blockSplittable = true;
    config.compressible = true;
    config.supportsProjectPushdown = true;
    config.extensions = Lists.newArrayList(pluginConfig.getExtension());
    config.fsConf = fsConf;
    config.defaultName = DEFAULT_NAME;
    config.readerOperatorType = CoreOperatorType.REGEX_SUB_SCAN_VALUE;
    return config;
  }

The other methods shown earlier (and a few more "boilerplate methods") can be removed as they're now handled by the base class based on the "config" created above. The only other method that should still exist is getRecordReader().

This change is orthogonal to the other changes: we should now be able to compile and run the unit tests for the plugin.

Create the Row Batch Reader

Prior to the EVF, Easy format plugins were based on the original ScanBatch. At the start of execution, the plugin creates a set of record readers which are passed to the scan batch. With EVF, we use the new scan framework. The new framework focuses on batches, and uses a new type of reader called a "batch reader."

The next step is to create the new batch reader. We'll do so as an "orphan" class so that the plugin continues to use the old record reader. We'll then swap in the new one when we're read. Let's first review how the existing reader works. The tutorial assumes you create a new file and copy over methods and fields as needed. Of course, you could also simply copy the old reader to create the new one, though that can be a bit more confusing.

Create a new class:

public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
}

Now might be a good time to check out the ManagedReader interface. There are only three methods to implement and doing so will be easier if we are familiar with the interface.

The FileSchemaNegotiator is a new concept: we will use it to define the schema of our file as we learn the schema on read.

Create a Constructor

Copy the existing constructor:

  public LogRecordReader(FragmentContext context, DrillFileSystem dfs,
                         FileWork fileWork, List<SchemaPath> columns, String userName,
                         LogFormatConfig formatConfig) {

Modify it to remove the arguments not needed with the EVF:

  public LogBatchReader(FileSplit split,
                         LogFormatConfig formatConfig) {
    this.split = split;
    this.formatConfig = formatConfig;
    this.maxErrors = formatConfig.getMaxErrors();

    if (maxErrors < 0) {
      throw UserException
          .validationError()
          .message("Max Errors must be a positive integer greater than zero.")
          .build(logger);
    }
  }

The context, columns, dfs and userName are handled by EVF for us. EVF gives us a FileSplit instead of a FileWork object.

The Open Method

Next, let's implement the open() method. Create a stub:

  @Override
  public boolean open(FileSchemaNegotiator negotiator) {
    return false;
  }

The caller gives us a "schema negotiator" which we'll discuss later. The method returns true if there is data to read, false if we can tell immediately that there is no data available. Errors are indicated by unchecked exceptions, preferably in the form of a UserException that Drill can forward to the user with a clear description of the issue.

To figure out what we need to do, let's look at the existing LogRecordReader implementation:

  @Override
  public void setup(final OperatorContext context, final OutputMutator output) {
    this.outputMutator = output;

    setupPattern();
    openFile();
    setupProjection();
    defineVectors();
  }

Of the above, we only need the first two methods, so let's copy them over, along with the variables that they require.

  @Override
  public boolean open(FileSchemaNegotiator negotiator) {
    setupPattern();
    openFile(negotiator);
    return true;
  }

The openFile() method is adjusted to get context information (the Drill File System, the user name) from the schema negotiator:

  private void openFile(FileSchemaNegotiator negotiator) {
    InputStream in;
    try {
      in = negotiator.fileSystem().open(fileWork.getPath());
    } catch (Exception e) {
      throw UserException
          .dataReadError(e)
          .message("Failed to open open input file: %s", fileWork.getPath())
          .addContext("User name", negotiator.userName())
          .build(logger);
    }
    reader = new BufferedReader(new InputStreamReader(in, Charsets.UTF_8));
  }

Define the Reader Schema

We omitted the setupProjection() and defineVectors() methods from the old version. This gets us to the key difference between "old school" and EVF: EVF handles projection and vector creation for us; we just have to tell it what we want. Here'w how the old code handled the wildcard (*, project all) case:

  private void projectAll() {
    List<String> fields = formatConfig.getFieldNames();
    for (int i = fields.size(); i < capturingGroups; i++) {
      fields.add("field_" + i);
    }
    columns = new ColumnDefn[capturingGroups];

    for (int i = 0; i < capturingGroups; i++) {
      columns[i] = makeColumn(fields.get(i), i);
    }
  }

Under EVF, the framework itself handles projection, but we need to tell it the columns we can provide. We can reuse the above logic to help us. We must:

  • Create a TupleMetadata schema of the available columns. We can use the SchemaBuilder to help. (There are two classes of that name, be sure to use the correct one.)
  • Adjust the ColumnDefn to use the EVF ColumnWriter objects.

The log reader uses a ColumnDefn class to define a column. We'll reuse these classes with minor changes. Let's start by changing how we define columns:

  private abstract static class ColumnDefn {
    ...
    public abstract void define(OutputMutator outputMutator) throws SchemaChangeException;
}

Replace the above method with:

    public abstract void define(SchemaBuilder builder);

Here is how we define the "reader schema" (the schema that the reader can provide):

  @Override
  public boolean open(FileSchemaNegotiator negotiator) {
    ...
    negotiator.setTableSchema(defineSchema(), true);
    openFile(negotiator);
    return true;
  }

  private TupleMetadata defineSchema() {
    ...
    columns = new ColumnDefn[capturingGroups];
    SchemaBuilder builder = new SchemaBuilder();
    for (int i = 0; i < capturingGroups; i++) {
      columns[i] = makeColumn(fields.get(i), i);
      columns[i].define(builder);
     }
    return builder.buildSchema();
  }

The true argument to setTableSchema() says that the schema is "complete": we won't be discovering any new columns as the read proceeds. (Other readers might know of no columns at open time and discover all at read time, others may do a combination.)

We can reuse the existing makeColumn() method as-is, so let's just copy it across. The IDE will complain that none of the column definition classes exist; we'll fix that later.

Bind the Column Definitions to Column Writers

We've told the schema negotiator our schema. We can now as the EVF to build the result set loader that will handle all the grunt tasks of creating batches:

  private ResultSetLoader loader;

  @Override
  public boolean open(FileSchemaNegotiator negotiator) {
    ...
    negotiator.setTableSchema(defineSchema(), true);
    loader = negotiator.build();

Now we can modify the column definition to hold onto the column writer instead of a mutator:

  private abstract static class ColumnDefn {
    ...
    private ScalarWriter colWriter;
    ...
    
    public void bind(TupleWriter rowWriter) {
      colWriter = rowWriter.scalar(index);
    }

The EVF uses a set of JSON-like writers. TupleWriter manages the entire row. (The same writer handles maps, which is why it is called a "tuple" writer.)

From a tuple we can reference column writers by either name or position. Since the ColumnDefn knows its column index, we just use that value.

The log writer only works with "scalars": strings, numbers, dates. We store the scalar column writer on the ColumnDefn base class for convenience.

We now need something to call our new bind() method:

  @Override
  public boolean open(FileSchemaNegotiator negotiator) {
    ...
    loader = negotiator.build();
    bindColumns(loader.writer());
    ...

  private void bindColumns(RowSetLoader writer) {
    for (int i = 0; i < capturingGroups; i++) {
      columns[i].bind(writer);
    }
  }

Revised Column Definition Implementation

We are now ready to tackle the changes to the concrete column definition classes. Let's start with the VarCharDefn class. Here is the existing implementation:

  private static class VarCharDefn extends ColumnDefn {

    private NullableVarCharVector.Mutator mutator;

    public VarCharDefn(String name, int index) {
      super(name, index);
    }

    @Override
    public void define(OutputMutator outputMutator) throws SchemaChangeException {
      MaterializedField field = MaterializedField.create(getName(),
          Types.optional(MinorType.VARCHAR));
      mutator = outputMutator.addField(field, NullableVarCharVector.class).getMutator();
    }

    @Override
    public void load(int rowIndex, String value) {
      byte[] bytes = value.getBytes();
      mutator.setSafe(rowIndex, bytes, 0, bytes.length);
    }
  }

Changes:

  • Remove the define() method.
  • Add the new getSchema() method.
  • Change load() to use the column writer.
  • Remove the unused mutator.

Here is the new version:

  private static class VarCharDefn extends ColumnDefn {

    public VarCharDefn(String name, int index) {
      super(name, index);
    }

    @Override
    public void define(SchemaBuilder builder) {
      builder.addNullable(getName(), MinorType.VARCHAR);
    }

    @Override
    public void load(int rowIndex, String value) {
      colWriter.setString(value);
    }
  }

We use the setString() method of the writer to set the string. Note that we no longer need to specify the row position; the EVF tracks that for us. Later, we can go ahead and remove the rowIndex argument.

The class is getting to be pretty light-weight. We could even remove it completely. But, to keep things simple, let's just keep it for now.

Next, we make the same changes for the other column defns; not shown here for brevity. You can see the new form in [this branch](need link) NEED LINK. Some things to note:

  • The TINYINT, SMALLINT and INT types all use the setInt() method to set values.
  • The FLOAT4 and FLOAT8 types use the setDouble() method to set values.
  • DATE, TIME and TIMESTAMP can use Joda (not Java 8 date/time) objects or setLong() to set values.

Regardless of the Java type used to set the value, the underlying vector type is the one you request.

With this, we've handled the tasks needed to open our batch reader (and have jumped ahead on a few others.)

Define the next() Method

Our next step is to convert the code that handles each batch. The next() method on the batch reader asks us to read the next batch.

Here is the old version:

  @Override
  public int next() {
    rowIndex = 0;
    while (nextLine()) {
    }
    return rowIndex;
  }

The new version must add a number of items:

  • Use the row writer to start and end each row (so that the column writers know where to write their values within the underlying vectors.)
  • Ask the result set loader if we can add more rows. (The result set loader will detect when the batch is full, so we don't have to maintain our own count-based batch size.)
  • Return true if there is more to read, false if no more rows are available. The return value is forward looking: it tells the EVF whether to call the next() method again. On the last batch, we'll typically load rows into the batch, then return false to indicate that, after this batch, we have nothing more to offer.

Here is the revised version:

  @Override
  public boolean next() {
    RowSetLoader rowWriter = loader.writer();
    while (! rowWriter.isFull()) {
      rowWriter.start();
      if (! nextLine()) {
        return false;
      }
      rowWriter.save();
    }
    return true;
  }

The existing nextLine() method is reused, with the key change that the method returns true if it read a row, false if it hit EOF. We remove the existing logic to count rows against a maximum. That is, change logic like this:

        rowIndex++;
        return rowIndex < BATCH_SIZE;

to

        return true;

There is no need to maintain a row index; EVF does that for us. The log reader benefits from a lineNumber counter, however, for use in error messages since the presence of non-matching lines means that the row count, maintained by EVF, is not the same as the line number.

The existing loadVectors() method is also reused, but is simplified. Previously, the record writer had to handle projection: create vectors only for the projected columns, and save values only to those columns. With EVF, all this is hidden: the EVF creates column writers for all of the columns we declare in the schema (and only for those columns.) If a column is unprojected, the EVF gives us a "dummy" writer that we use just as if it were a real witer. The EVF handles projection of any columns not defined in our schema.

Here is the revised implementation:

  private void loadVectors(Matcher m) {
    for (int i = 0; i < columns.length; i++) {
      String value = m.group(columns[i].index + 1);
      if (value != null) {
        columns[i].load(0, value);
      }
    }
  }

The log reader has a number of special columns; we'll handle those later.

Define the close() Method

Finally, we round out the core of our batch reader by copying the close method from the record reader. As in prior versions, ensure that all resources are released in close. After this call, the EVF will forget about your batch reader, allowing the JVM to garbage collect it.

Handling Special Columns

The log reader handles two special columns:

  • _unmatched_rows: Value of any unmatched row.
  • _raw: The entire unparsed row.

Prior logic had to fit these into the array of columns, which was a bit awkward. We can revise this logic to exploit the EVF. We simply always define our "special" columns, storing their writers directly:

  private static final String RAW_LINE_COL_NAME = "_raw";
  private static final String UNMATCHED_LINE_COL_NAME = "_unmatched_rows";
  ...
  private ScalarWriter rawColWriter;
  private ScalarWriter unmatchedColWriter;
  ...
  private TupleMetadata defineSchema() {
    ...
    SchemaBuilder builder = new SchemaBuilder();
    ...
    builder.addNullable(RAW_LINE_COL_NAME, MinorType.VARCHAR);
    builder.addNullable(UNMATCHED_LINE_COL_NAME, MinorType.VARCHAR);
    return builder.buildSchema();
  }

  private void bindColumns(RowSetLoader writer) {
    ...
    rawColWriter = writer.scalar(RAW_LINE_COL_NAME);
    unmatchedColWriter = writer.scalar(UNMATCHED_LINE_COL_NAME);
  }

Some things to note:

  • We add the special columns to the schema builder after the regex columns. This ensures that the column indexes for those columns are the same for both the columns array and the schema.
  • We define the special columns all the time; relying on EVF projection to materialize them only when needed.
  • We obtain the column writers for the special columns by name. We could do this on every row, but caching the writers is slightly faster.

Now, we need only modify the per-row logic to use the new writers. Recall that EVF will give us dummy writers if the columns are not projected, so we don't need any special logic:

  private boolean nextLine() {
    String line;
    try {
      line = reader.readLine();
    }...
    rawColWriter.setString(line);
    lineNumber++;
    Matcher lineMatcher = pattern.matcher(line);
    if (lineMatcher.matches()) {
      loadVectors(lineMatcher);
      return true;
    }
    ...    
    unmatchedColWriter.setString(line);
    return true;
  }

Again, we could have written the following instead:

    writer.scalar(UNMATCHED_LINE_COL_NAME).setString(line);

The above form is handy if you must work with columns by name rather than position, for example if working with JSON.

Define the Record Batch Creator

The above should give us a working, converted log batch reader. There are many improvements we can make. But, first let's actually try out our creation. Recall that the batch reader has thus far been an "orphan": nothing calls it. Let's fix that.

Prior to the EVF, Easy format plugins were based on the original ScanBatch. At the start of execution, the plugin creates a set of record readers which are passed to the scan batch. With EVF, we use the new scan framework. The new framework focuses on batches, and uses a new type of reader called a "batch reader." We provide a factory method to create batch readers on the fly. The batch reader itself does what the old record reader used to do, but using the EVF.

EVF is a general-purpose framework: it handles many kinds of scans. We must customize it for each specific reader. Rather than doing so by creating subclasses, we instead assemble the pieces we needed through composition by providing a framework builder class. This builder class is what allows the Easy framework to operator with both "legacy" and EVF-based readers.

In fact, if you are especially cautious, you can leverage the framework builder mechanism to offer both the new and old versions of your reader, as was done with the "v2" and "v3" versions of the text (CSV) reader in Drill 1.16.

  private static class LogScanBatchCreator extends ScanFrameworkCreator {

    private final LogFormatPlugin logPlugin;

    public LogScanBatchCreator(LogFormatPlugin plugin) {
      super(plugin);
      logPlugin = plugin;
    }

    @Override
    protected FileScanBuilder frameworkBuilder(
        EasySubScan scan) throws ExecutionSetupException {
      FileScanBuilder builder = new FileScanBuilder();
      builder.setReaderFactory(new ColumnsReaderFactory(logPlugin));

      // The default type of regex columns is nullable VarChar,
      // so let's use that as the missing column type.

      builder.setNullType(Types.optional(MinorType.VARCHAR));

      // Pass along the output schema, if any

      builder.setOutputSchema(scan.getSchema());
      return builder;
    }
  }

Here's what's happening:

  • Our class extends ScanFrameworkCreator which integrates with the Easy plugin framework.
  • We hold onto the log format plugin for later use.
  • The main show is the frameworkBuilder() method which allows us to configure our prefered framework options.
  • The log reader reads from a file, so we use the FileScanBuilder class. We could support the columns column to read into an array, like CSV, if we wanted.
  • We specify the builder for our batch readers by calling setReaderFactory(). We'll define the actual class shortly.
  • Next we call setNullType() to define a type to use for missing columns rather than the traditional nullable INT. We observe that the native type of a regex column is nullable Varchar. So, if the user asked for a column that we don't have, we should use that same type so that types remain unchanged when the user later decides to define that column.
  • Finally, jumping ahead a bit, we pass along a "provided schema" if the user defined one. Drill 1.16 added provided schema support to the EasySubScan, we get it for free. We'll use it in the advanced tutorial.

Define the Row Batch Reader Creator

Next in the chain is a class to create your batch readers. Recall that EVF creates readers on the fly rather than up-front as in the legacy implementation. So, we need to provide the class that does the batch reader creation:

  private static class LogReaderFactory extends FileReaderFactory {

    private final LogFormatPlugin plugin;

    public LogReaderFactory(LogFormatPlugin plugin) {
      this.plugin = plugin;
    }

    @Override
    public ManagedReader<? extends FileSchemaNegotiator> newReader(
        FileSplit split) {
       return new LogBatchReader(split, plugin.getConfig());
    }
  }
  

This is simple enough: EVF calls the newReader() method when it is ready to read the next file split. (The split names a file and, if we said the plugin is block splittable, it also names a block offset and length.)

We are free to create the log batch reader any way we like: the constructor is up to us. We already trimmed it down earlier, so we simply use that constructor here.

Return our Scan Batch Creator

All the work we've done thus far is still an "orphan": nothing calls it. We're finally ready to change that. By default, the Easy plugin creates the scan operator the legacy way; that's why the old plugin worked. To trigger an EVF-based scan, we return our scan batch creator:

  @Override
  protected ScanBatchCreator scanBatchCreator(OptionManager options) {
    return new LogScanBatchCreator(this);
  }

We mentioned before that we could be cautious and let the user choose between the old and new versions. Here's how that would look. This is for the text reader:

  @Override
  protected ScanBatchCreator scanBatchCreator(OptionManager options) {
    // Create the "legacy", "V2" reader or the new "V3" version based on
    // the result set loader. This code should be temporary: the two
    // readers provide identical functionality for the user; only the
    // internals differ.
    if (options.getBoolean(ExecConstants.ENABLE_V3_TEXT_READER_KEY)) {
      return new TextScanBatchCreator(this);
    } else {
      return new ClassicScanBatchCreator(this);
    }
  }

Here TextScanBatchCreator is the text version of the one we just created. ClassicScanBatchCreator is a generic one that does things the old-fashioned way.

If we decided to be conservative, we'd retain our old record reader in addition to the new row batch reader.

Test

With this method in place, our new version is "live". You should use your unit tests to step through the new code to make sure it works -- and to ensure you understand the EVF, or at least the parts you need.

Next Steps

We've now completed a "bare bones" conversion to the new framework. We'd be fine if we stopped here.

The new framework offers additional features that can further simplify the log format plugin. We'll look at those topics in the next section.

Clone this wiki locally