-
Notifications
You must be signed in to change notification settings - Fork 109
3.2 Example Code
The source tree includes an application in the demo sub-directory, called "MapReduce Made Easy". The demo is designed to demonstrate the operation and use of Google App Engine's MapReduce feature.
You can run the MapReduce Made Easy application without modification in your development web server to see MapReduce in action, or use it as a basis for developing your own MapReduce applications. The program illustrates how to use the MapreducePipeline
class to start and run the associated reducer functions on the input data.
The MapReduce Made Easy application and MapReduce in general are covered in detail in Mike Aizatsky's 2011 Google IO conference presentation.
MapReduce Made Easy takes a user-supplied .zip
file containing text files and does three MapReduce jobs on it, each controlled by a UI button:
- Word Count For each word in the specified text files, count how many times the word appears.
- Index For each word in the text files, determine which of the files contain that word.
- Phrases For each text file, determine which phrases occur in that file and not in any of the others.
There are 3 equivalent ways to run the application:
-
Run the provided build script from the python source directory:
./build.sh run_demo
-
Locate the main App Engine SDK directory, which contains the development application server (
dev_appserver.py
). Navigate to that directory or supply its path when invoking the application server. Start the sample application in the application server, providing any required path for both the application server and the sample application:dev_appserver.py demo/app.yaml
-
If you have installed the Google Cloud SDK gcloud preview app, run:
gcloud preview app run demo/app.yaml
Once the demo application is running:
-
In your browser, start the web client for the sample application at this address:
-
When prompted, check "Sign in as Administrator" and login to the application (Google Account login). If you do not sign in as administrator you will receive a "Forbidden" message when trying to view pipelines in a later step:
-
Click Choose File and select the
.zip
file you want to process. For best results, it should be filled with plain text files. Provide an optional name for the input files then click Upload. You will need to refresh the page to see an updated list of the files that have been uploaded. -
Click Word Count and wait for it to finish.
-
Click Index and wait for it to finish.
-
Click Phrases and wait for it to finish.
-
Click the wordcount, index, and phrases links to see the results of the MapReduce operations.
The MapReduce Made Easy application defines mapper and reducer functions for each of the three possible operations (Word Count, Index, and Phrases) described above. After checking an incoming request from the web client, the application uses the Pipeline API to construct and run a pipeline for the requested operation:
MapreducePipeline.run
(
job_name,
mapper_spec,
reducer_spec,
input_reader_spec,
output_writer_spec=None,
mapper_params=None,
reducer_params=None,
shards=None
)
This function constructs a pipeline to do the following:
- Call the user-supplied mapper function on the input data
- Perform a shuffle on the output of step 1
- Call the user-supplied reducer function on the output of step 2
- Clean up temporary files emitted by steps 1–3
The shuffle and cleanup functions (steps 2 and 4) are automatically provided by the MapReduce API; the following sections describe the mapper and reducer functions for each of the three operations.
The MapReduce job for the Word Count operation is invoked as follows:
yield mapreduce_pipeline.MapreducePipeline
(
"word_count",
"main.word_count_map",
"main.word_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={"blob_key": blobkey,},
reducer_params={"mime_type": "text/plain",},
shards=16
)
The parameters specify the following:
- The name of the MapReduce job is
word_count
. - The mapper function is
main.word_count_map
. - The reducer function is
main.word_count_reduce
. - The input reader is
mapreduce.input_readers.BlobstoreZipInputReader
, which will read the input from a.zip
file stored in the Blobstore. - The output writer is
mapreduce.output_writers.BlobstoreOutputWriter
, which will write the output back to the Blobstore. - The location of the input file in the Blobstore (the blob key) is given by
blobkey
. - The MIME type of the final output will be
text/plain
. - The MapReduce job will use 16 shards (workers).
Here is the mapper function for Word Count:
def word_count_map(data):
"""Word Count map function."""
(entry, text_fn) = data
text = text_fn()
logging.debug("Got %s", entry.filename)
for s in split_into_sentences(text):
for w in split_into_words(s.lower()):
yield (w, "")
The function splits each line of input into sentences and each sentence into words. For each word it finds, it emits the key-value pair (word, "")
. The specific value associated with the word doesn't matter, as the reducer function will not use it.
The Word Count reducer function consists of the following code:
def word_count_reduce(key, values):
"""Word Count reduce function."""
yield "%s: %d\n" % (key, len(values))
As noted above, the function doesn't care what specific values are associated with a word—only how many of them there are, which indicates how many times the word was encountered in the input. For each word, the function emits a pair consisting of the word itself and the number of times it occurred.
The MapReduce job for the Index operation is invoked as follows:
yield mapreduce_pipeline.MapreducePipeline
(
"index",
"main.index_map",
"main.index_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={"blob_key": blobkey,},
reducer_params={"mime_type": "text/plain",},
shards=16
)
This is essentially similar to the invocation call for the Word Count operation, but with the job name and the mapper and reducer functions changed to index
instead.
This is the mapper function for the Index operation:
def index_map(data):
"""Index demo map function."""
(entry, text_fn) = data
text = text_fn()
logging.debug("Got %s", entry.filename)
for s in split_into_sentences(text):
for w in split_into_words(s.lower()):
yield (w, entry.filename)
Just as for Word Count, the function splits the input into sentences and the sentences into words, and emits a key-value pair for each word. The difference is that this time the value associated with each word is the name of the file in which it was found, rather than just an empty placeholder.
The reducer function for Index is as follows:
def index_reduce(key, values):
"""Index demo reduce function."""
yield "%s: %s\n" % (key, list(set(values)))
This time, the key-value pairs consist of a word and a filename denoting an input file in which the word was encountered. The function converts the values into an array (list) and returns the result.
Again, the job invocation code for the Phrases operation is similar to that for Word Count and Index, but with the job name and the mapper and reducer functions changed accordingly:
yield mapreduce_pipeline.MapreducePipeline
(
"phrases",
"main.phrases_map",
"main.phrases_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={"blob_key": blobkey,},
reducer_params={"mime_type": "text/plain",},
shards=16
)
The following is the mapper function for the Phrases operation:
PHRASE_LENGTH = 4
def phrases_map(data):
"""Phrases demo map function."""
(entry, text_fn) = data
text = text_fn()
filename = entry.filename
logging.debug("Got %s", filename)
for s in split_into_sentences(text):
words = split_into_words(s.lower())
if len(words) < PHRASE_LENGTH:
yield (":".join(words), filename)
continue
for i in range(0, len(words) - PHRASE_LENGTH):
yield (":".join(words[i:i+PHRASE_LENGTH]), filename)
As before, the mapper function splits the input into sentences and each sentence into words. The constant PHRASE_LENGTH
(set to 4 in the example) defines the maximum number of words that will be considered to constitute a phrase. If a sentence contains fewer than this number of words, the function will emit the entire sentence; otherwise, it will emit each sequence of consecutive words (phrase) of the specified length. (For example, the sentence A B C D E will emit two phrases, A B C D and B C D E.) For each phrase, the function emits a pair consisting of the phrase itself and the name of the file in which it occurred.
Here is the reducer function for Phrases:
def phrases_reduce(key, values):
"""Phrases demo reduce function."""
if len(values) < 10:
return
counts = {}
for filename in values:
counts[filename] = counts.get(filename, 0) + 1
words = re.sub(r":", " ", key)
threshold = len(values) / 2
for filename, count in counts.items():
if count > threshold:
yield "%s:%s\n" % (words, filename)
The function receives a phrase along with a list of files in which it was found. If the phrase didn't occur at least ten times in the input, the reducer function ignores it and simply exits. Otherwise, the function next counts the total number of times the phrase occurred; if a majority of its occurrences were in a single file, the function considers it significant and emits a key-value pair consisting of the phrase and the filename.