Skip to content

Elephant Bird Lucene: Creating Indexes

isnotinvain edited this page Jan 3, 2013 · 3 revisions

For the general overview of Elephant-Bird-Lucene, see Elephant Bird Lucene

First create a class that extends LuceneIndexOutputFormat. This is where you set up how exactly your index is built, including which features of lucene to use. The primary functions of this class is to build a lucene Document from a given key + value and to provide an analyzer.

Here's an example:

package com.example;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.util.Version;

import com.twitter.elephantbird.mapreduce.output.LuceneIndexOutputFormat;

/**
 * This OutputFormat assumes that the key is a user id, and the value is the text of a tweet.
 * It builds an index that is searchable by tokens in the tweet text, and by user id.
 */
public class TweetIndexOutputFormat extends LuceneIndexOutputFormat<LongWritable, Text> {
  // create some lucene Fields. These can be anything you'd like, such as DocFields
  public static final String TWEET_TEXT_FIELD = "tweet_text";
  public static final String USER_ID_FIELD = "user_id";
  private final Field tweetTextField = new TextField(TWEET_TEXT_FIELD, "", Field.Store.YES);
  private final Field userIdField = new LongField(USER_ID_FIELD, 0L, Field.Store.YES);
  private final Document doc = new Document();

  public TweetIndexOutputFormat() {
    doc.add(tweetTextField);
    doc.add(userIdField);
  }

  // This is where you convert an MR key value pair into a lucene Document
  // This part is up to you, depending on how you want your data indexed / stored / tokenized / etc.
  @Override
  protected Document buildDocument(LongWritable userId, Text tweetText) throws IOException {
    tweetTextField.setStringValue(tweetText.toString());
    userIdField.setLongValue(userId.get());
    return doc;
  }

  // Provide an analyzer to use. If you don't want to use an analyzer
  // (if your data is pre-tokenized perhaps) you can simply not override this method.
  @Override
  protected Analyzer newAnalyzer(Configuration conf) {
    return new SimpleAnalyzer(Version.LUCENE_40);
  }
}

Now that you have an OutputFormat, you can use it in a map reduce job. Make sure to call LuceneIndexOutputFormat.setOutputPath(Job job, Path path) to set where the index will be written to.

Using pig to create indexes

To use pig to create an index, you'll have to create a PigLuceneIndexOutputFormat which is a LuceneIndexOutputFormat<NullWritable, Tuple>. Here's our TweetIndexOutputFormat rewritten for use with pig:

package com.example;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.util.Version;
import org.apache.pig.data.Tuple;

import com.twitter.elephantbird.pig.store.LuceneIndexStorage;

/**
 * This OutputFormat assumes that the values are Tuples whose first element is
 * the text of a tweet and whose second element is a user id.
 *
 * It builds an index that is searchable by tokens in the tweet text, and by user id.
 */
public class TweetIndexOutputFormat extends LuceneIndexStorage.PigLuceneIndexOutputFormat {
  // create some lucene Fields. These can be anything you'd like, such as DocFields
  public static final String TWEET_TEXT_FIELD = "tweet_text";
  public static final String USER_ID_FIELD = "user_id";
  private final Field tweetTextField = new TextField(TWEET_TEXT_FIELD, "", Field.Store.YES);
  private final Field userIdField = new LongField(USER_ID_FIELD, 0L, Field.Store.YES);
  private final Document doc = new Document();

  public TweetIndexOutputFormat() {
    doc.add(tweetTextField);
    doc.add(userIdField);
  }

  // This is where you convert an MR key value pair into a lucene Document
  // This part is up to you, depending on how you want your data indexed / stored / tokenized / etc.
  // Note that this time the value is a Tuple because we're extending PigLuceneIndexOutputFormat
  @Override
  protected Document buildDocument(NullWritable ignoredKey, Tuple tuple) throws IOException {
    tweetTextField.setStringValue((String) tuple.get(0));
    userIdField.setLongValue((Long) tuple.get(1));
    return doc;
  }

  // Provide an analyzer to use. If you don't want to use an analyzer
  // (if your data is pre-tokenized perhaps) you can simply not override this method.
  @Override
  protected Analyzer newAnalyzer(Configuration conf) {
    return new SimpleAnalyzer(Version.LUCENE_40);
  }
}

Once you've created a PigLuceneIndexOutputFormat, you can use LuceneIndexStorage to wrap it for use in pig. This can be done two ways. The first way is to use LuceneIndexStorage directly in a pig script:

data_to_index = load '/my/input' using SomeLoadFunc();
store data_to_index into 'my/indexes/' using LuceneIndexStorage('com.example.TweetIndexOutputFormat');

One thing to note is that if data_to_index has the wrong schema this script will fail but only after loading all the input data.

The second way to is to extend LuceneIndexStorage which has the benefit of adding some type safety and error checking to our StoreFunc:

package com.example;

import java.io.IOException;

import org.apache.pig.ResourceSchema;
import org.apache.pig.data.DataType;

import com.twitter.elephantbird.pig.store.LuceneIndexStorage;

public class TweetIndexStorage extends LuceneIndexStorage {

  // Tell LuceneIndexStorage that we're wrapping a TweetIndexOutputFormat
  public TweetIndexStorage() {
    super(TweetIndexOutputFormat.class);
  }

  // Add some type safety so that trying to store a bad schema is caught client side
  @Override
  public void checkSchema(ResourceSchema s) throws IOException {
    ResourceSchema.ResourceFieldSchema[] fields = s.getFields();
    if (fields.length != 2) {
      throw new IOException("TweetIndexStorage expected tuples of size 2");
    }

    if (fields[0].getType() != DataType.CHARARRAY) {
      throw new IOException("TweetIndexStorage expects tuples whose first element is a chararray");
    }
    
    if (fields[0].getType() != DataType.LONG) {
      throw new IOException("TweetIndexStorage expects tuples whose second element is a long");
    }
  }
}

Now you can use TweetIndexStorage from a pig script like this:

data_to_index = load '/my/input' using SomeLoadFunc();
store data_to_index into 'my/indexes/' using TweetIndexStorage();

If data_to_index has the wrong schema this time, the pig client will report an error before even trying to launch the job.