The interface to the library and the approach to fault tolerance is similar to the one described in the original MapReduce Paper.
This Map/Reduce implementation has support for two modes of operation, sequential and distributed. In the former, the map and reduce tasks are executed one at a time: first, the first map task is executed to completion, then the second, then the third, etc. When all the map tasks have finished, the first reduce task is run, then the second, etc. This mode, while not very fast, is useful for debugging. The distributed mode runs many worker threads that first execute map tasks in parallel, and then reduce tasks. This is much faster, but also harder to implement and debug.
The Map/Reduce implementation you are given is missing some pieces. Before you can write your first Map/Reduce function pair, you will need to fix the sequential implementation. In particular, the code we give you is missing two crucial pieces: the function that divides up the output of a map task, and the function that gathers all the inputs for a reduce task. These tasks are carried out by the doMap()
function in common_map.go
, and the doReduce()
function in common_reduce.go
respectively. The comments in those files should point you in the right direction.
To help you determine if you have correctly implemented doMap()
and doReduce()
, we have provided you with a Go test suite that checks the correctness of your implementation. These tests are implemented in the file test_test.go
. To run the tests for the sequential implementation that you have now fixed, run:
$ cd <your-repo-directory>
$ export "GOPATH=$PWD" # go needs $GOPATH to be set to the project's working directory
$ cd "$GOPATH/src/mapreduce"
$ go test -run Sequential
Now you will implement word count — a simple Map/Reduce example. Look in main/wc.go; you’ll find empty mapF()
and reduceF()
functions. Your job is to insert code so that wc.go reports the number of occurrences of each word in its input. A word is any contiguous sequence of letters, as determined by unicode.IsLetter
.
There are some input files with pathnames of the form pg-*.txt in ./src/main, downloaded from Project Gutenberg. Here’s how to run wc
with the input files:
$ cd <your-repo-directory>
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/main"
$ go run wc.go master sequential pg-*.txt
Review Section 2 of the MapReduce paper. Your mapF()
and reduceF()
functions will differ a bit from those in the paper’s Section 2.1. Your mapF()
will be passed the name of a file, as well as that file’s contents; it should split the contents into words, and return a Go slice of mapreduce.KeyValue
. While you can choose what to put in the keys and values for the mapF output, for word count it only makes sense to use words as the keys. Your reduceF()
will be called once for each key, with a slice of all the values generated by mapF()
for that key. It must return a string containing the total number of occurrences of the key.
$ cd "$GOPATH/src/main"
$ time go run wc.go master sequential pg-*.txt
The code in mapreduce/master.go
does most of the work of managing a MapReduce job. We also supply you with the complete code for a worker thread, in mapreduce/worker.go
, as well as some code to deal with RPC in mapreduce/common_rpc.go
.
Your job is to implement schedule()
in mapreduce/schedule.go
. The master calls schedule()
twice during a MapReduce job, once for the Map phase, and once for the Reduce phase. schedule()
’s job is to hand out tasks to the available workers. There will usually be more tasks than worker threads, so schedule()
must give each worker a sequence of tasks, one at a time. schedule()
should wait until all tasks have completed, and then return.
schedule()
learns about the set of workers by reading its registerChan
argument. That channel yields a string for each worker, containing the worker’s RPC address. Some workers may exist before schedule()
is called, and some may start while schedule()
is running; all will appear on registerChan
. schedule()
should use all the workers, including ones that appear after it starts.
schedule()
tells a worker to execute a task by sending a Worker.DoTask
RPC to the worker. This RPC’s arguments are defined by DoTaskArgs
in mapreduce/common_rpc.go
. The File element is only used by Map tasks, and is the name of the file to read; schedule()
can find these file names in mapFiles.
Use the call()
function in mapreduce/common_rpc.go
to send an RPC to a worker. The first argument is the worker’s address, as read from registerChan
. The second argument should be Worker.DoTask
. The third argument should be the DoTaskArgs
structure, and the last argument should be nil.
Your solution to Part III should only involve modifications to schedule.go
.
Your implementation must pass the two remaining test cases in test_test.go
. The first case tests the failure of one worker, while the second test case tests handling of many failures of workers. Periodically, the test cases start new workers that the master can use to make forward progress, but these workers fail after handling a few tasks. To run these tests:
$ go test -run Failure
Your solution to Part IV should only involve modifications to schedule.go
.
We will build Map and Reduce functions for generating an _inverted index_
. Inverted indices are widely used in computer science, and are particularly useful in document searching. Broadly speaking, an inverted index is a map from interesting facts about the underlying data, to the original location of that data. For example, in the context of search, it might be a map from keywords to documents that contain those words. We have created a second binary in main/ii.go
that is very similar to the wc.go
you built earlier. You should modify mapF
and reduceF
in main/ii.go
so that they together produce an inverted index. Running ii.go
should output a list of tuples, one per line, in the following format:
word: #documents documents,sorted,and,separated,by,commas
You can run all the tests by running the script src/main/test-mr.sh
. With a correct solution, your output should resemble:
$ bash ./test-mr.sh
==> Part I
ok mapreduce 2.053s
==> Part II
Passed test
==> Part III
ok mapreduce 1.851s
==> Part IV
ok mapreduce 10.650s
==> Part V (inverted index)
Passed test