Skip to content

3.4 Readers and Writers

Neil Patel edited this page Feb 5, 2017 · 8 revisions

Using Readers and Writers

You don't instantiate, invoke, or write to input readers or output writers; all of the interaction with the readers and writers is done for you by the [MapreducePipeline] (https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/pipeline_base.py) object. You simply tell your MapreducePipeline object what reader to use and what output writer to use, and you also provide the MapreducePipeline with the reader and writer parameters.

The illustration below is a representation of a MapreducePipeline object with its constructor specifying a word count job, corresponding mapper and reducer functions, and the input reader and output writer to be used. Notice the "mapper_params" and "reducer_params". Those parameters are actually for the reader and writer, respectively. Notice also how the reader and writer are specified, using the Mapreduce library.

mapreduce_pipeline.MapreducePipeline(
  "word_count",
  "main.work_count_map",
  "main.word_count_reduce",
  "mapreduce.input_readers.BlobstoreZipInputReader",
  "mapreduce.output_writers.BlobstoreOutputWriter",
  mapper_params={
    "blob_key":"blobkey",
  },
  reducer_params={
    "mime_type":"text/plain",
  },
  shards=16)
```

## Standard Input Readers

The standard data input readers are designed to read in data from specific storage, such as blobstore or datastore and then supply the data to the mapper function. The summary below describes each reader and its mappreduce.yaml parameter settings.

###### [*BlobstoreLineInputReader*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/input_readers.py#L1358-L1509)
Reads a newline (\n) delimited text file one line at a time from Blobstore. It calls the mapper function once with each line, passing to the mapper a tuple comprised of the byte offset in the file of the first character in the line and the line as a string, not including the trailing newline. For example: (byte_offset, line_value).

*Parameters*
* <code>blob_keys</code> Either a string containing the blob key, or a list containing multiple blob keys, specifying the data to be read by the reader.

###### [*BlobstoreZipInputReader*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/input_readers.py#L1512-L1673)
Iterates over all of the compressed files within the specified zipfile in Blobstore. It calls the mapper function once for each file, passing it the tuple comprised of the zipfile.ZipInfo entry for the file, and a parameterless function that your mapper calls to return the complete body of the file as a string. For example, (zipinfo, file_callable). The following snippet shows how your mapper might extract each file's data in each iteration:

    def word_count_map(data):
      """Word count map function."""
      (entry, text_fn) = data
      text = text_fn()
      
*Parameters*
* <code>blob_key</code> A string containing the blob key specifying the zip file data to be read by the reader.

###### [*BlobstoreZipLineInputReader*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/input_readers.py#L1676-L1905)
Iterates over all of the compressed files, each of which must contain newline (\n) delimited data, within the specified zipfile in Blobstore. It calls the mapper function once for each line in each file, passing a tuple consisting of the byte offset in the file of the first character in the line and the line as a string, not including the trailing newline. For example, (byte_offset, line_value).
      
*Parameters*
* <code>blob_keys</code> Either a string containing the blob key, or a list containing multiple blob keys, specifying the zip file data to be read by the reader.


###### [*DatastoreInputReader*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/input_readers.py#L607-L785)
Iterates and returns all instances of the specified entity (entity_kind) from the datastore, automatically advancing to the next unread entities. Each iteration returns the number of entities specified by the batch_size parameter. This reader does no filtering: you would need to do any required filtering in your mapper.
      
*Parameters*
* <code>entity_kind</code> The datastore kind to map over.
* <code>namespace</code> The namespace that will be searched for entity_kinds.
* <code>batch_size</code> The number of entities to read from the datastore with each batch get. Default is 50.

###### [*DatastoreKeyInputReader*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/input_readers.py#L860-L864)
Iterates and returns all keys of the entities in the datastore of the specified entity_kind, automatically advancing to the next unread keys. Each iteration returns the number of keys specified by the batch_size parameter. This reader does no filtering: you would need to do any required filtering in your mapper.
      
*Parameters*
* <code>entity_kind</code> The datastore kind whose keys are to be returned.
* <code>namespace</code> The namespace that will be searched for entity_kinds.
* <code>batch_size</code> The number of keys to read from the datastore with each batch get. Default is 50.


###### [*NamespaceInputReader*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/input_readers.py#L2001-L2091)
Iterates over and returns the available namespaces.
      
*Parameters*
* <code>namespace_range</code> The range of namespaces that will be iterated over.
* <code>batch_size</code> The number of namespaces to return in each iteration. Default is 10.



###### [*RecordsReader*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/input_readers.py#L2094-L2228)
Reads a list of files obtained via the Files API in records format, yielding each record as a string in each iteration.
      
*Parameters*
* <code>files</code> Either a string containing the file to be read or a list containing multiple strings of files to be read.</td>


## Standard Output Writers

The standard output writers write data from the reducer function to a Google Cloud Storage location. The summary below describes each writer. Each writer uses the following mappreduce.yaml parameter settings:

*Required Parameters for all output writers*
* <code>bucket_name</code> Name of the bucket to use (with no extra delimiters or suffixes such as directories. Directories/prefixes can be specified as part of the <code>naming_format</code>

*Optional Parameters for all output writers*
* <code>acl</code> acl to apply to new files, else bucket default used.
* <code>naming_format</code> prefix format string for the new files (there is no required starting slash, expected formats would look like <code>directory/basename...</code>, any starting slash will be treated as part of the file name) that should use the following substitutions:
  * $name - the name of the job
  * $id - the id assigned to the job
  * $num - the shard number. If there is more than one shard $num must be used. An arbitrary suffix may be applied by the writer.
* <code>content_type</code> mime type to apply on the files. If not provided, Google Cloud Storage will apply its default.
* <code>tmp_bucket_name</code> name of the bucket used for writing tmp files by consistent GCS output writers. Defaults to <code>bucket_name</code> if not set.

###### [*GoogleCloudStorageOutputWriter*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/output_writers.py#L669)
This version is known to create inconsistent outputs if the input changes during slice retries. Consider using GoogleCloudStorageConsistentOutputWriter instead.

*Parameters*
* <code>no_duplicate</code>  if True, slice recovery logic will be used to ensure output files has no duplicates. Every shard should have only one final output in user specified location. But it may produce many smaller
      files (named "seg") due to slice recovery. These segs live in a tmp directory and should be combined and renamed to the final location. In current impl, they are not combined.

###### [*GoogleCloudStorageConsistentOutputWriter*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/output_writers.py#L863)
This version ensures that the output written to GCS is consistent.

###### [*GoogleCloudStorageKeyValueOutputWriter*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/output_writers.py#L1140)
Write key/values to Google Cloud Storage files in LevelDB format.
      
###### [*GoogleCloudStorageRecordOutputWriter*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/output_writers.py#L1126)
Lightweight record format. This format implements the log file format from levelDB: https://raw.githubusercontent.com/google/leveldb/master/doc/log_format.txt. The main advantages of this format are:

1. to detect corruption. Every record has a crc32c checksum.
2. to quickly skip corrupted record to the next valid record.

###### [*GoogleCloudStorageConsistentRecordOutputWriter*](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/output_writers.py#L1134)
Same as GoogleCloudStorageRecordOutputWriter but ensures output written to Google Cloud Storage is consistent


## Customized Readers and Writers

The standard input readers and output writers should suffice for most use cases. If you need a reader that handles a different input source and format or a writer that writes to a different location and output format than the standard ones, contact Google to determine whether Google can add these to the standard readers and writers.

Alternatively, for those who want to write their own reader or writer, you can take a look at the open source code for [readers](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/input_readers.py) and [writers](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/python/src/mapreduce/output_writers.py) to see how to do this.