-
Notifications
You must be signed in to change notification settings - Fork 0
/
MapReduce.java
105 lines (85 loc) · 4.12 KB
/
MapReduce.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package mapreduce;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import mapreduce.mapper.MapperTask;
import mapreduce.mapper.MapperFunc;
import mapreduce.datasource.FileDataSource;
import mapreduce.datasource.DataSource;
import mapreduce.reducer.ReducerFunc;
import mapreduce.reducer.ReducerTask;
import mapreduce.shuffler.Shuffler;
import java.io.IOException;
import static mapreduce.util.Constants.THREAD_COUNT;
public class MapReduce<K, V, InterKey, InterVal, OutKey, OutVal> {
private DataSource<K, V> dataSource;
private MapperFunc<K, V, InterKey, InterVal> mapperFunc;
private ReducerFunc<InterKey, InterVal, OutKey, OutVal> reducerFunc;
private ExecutorService executorService;
private List<List<Entry<OutKey, OutVal>>> finalOutput;
public MapReduce() {
this.executorService = Executors.newCachedThreadPool();
this.finalOutput = new ArrayList<>();
}
public MapReduce(String dataSourceFilePath) throws IOException {
this();
this.dataSource = (DataSource<K, V>) new FileDataSource(dataSourceFilePath);
}
public void supplyDataSource(DataSource<K, V> dataSource) {
this.dataSource = dataSource;
}
public void supplyMapper(MapperFunc<K, V, InterKey, InterVal> mapperFunc) {
this.mapperFunc = mapperFunc;
}
public void supplyReducer(ReducerFunc<InterKey, InterVal, OutKey, OutVal> reducerFunc) {
this.reducerFunc = reducerFunc;
}
public List<Entry<OutKey, OutVal>> compute() throws Exception {
Objects.requireNonNull(dataSource);
Objects.requireNonNull(mapperFunc);
Objects.requireNonNull(reducerFunc);
//Split phase
Map<K, V> dataSets = dataSource.read();
//Mapping phase.
var mapperTaskFutures = new ArrayList<Future<List<Entry<InterKey, InterVal>>>>();
dataSets.forEach((id, dataset) -> mapperTaskFutures.add(executorService.submit(new MapperTask<>(mapperFunc, id, dataset))));
//Shuffle data as they are processed by mapping phase.
ConcurrentMap<InterKey, List<InterVal>> shuffledData = new ConcurrentHashMap<>();
CountDownLatch countDownLatch = new CountDownLatch(mapperTaskFutures.size());
for (var mapperRes : mapperTaskFutures)
executorService.submit(new Shuffler<>(shuffledData, mapperRes.get(), countDownLatch));
//Wait until all shufflers, shuffle their data.
countDownLatch.await();
//Create reducer tasks.
List<ReducerTask<InterKey, InterVal, OutKey, OutVal>> reducerTasks = createReducerTasks(shuffledData);
//Reduce phase
var allReducersResults = new ArrayList<Future<List<Entry<OutKey, OutVal>>>>();
reducerTasks.forEach(reducer -> allReducersResults.add(executorService.submit(reducer)));
for (Future<List<Entry<OutKey, OutVal>>> reducer : allReducersResults) finalOutput.add(reducer.get());
executorService.shutdown();
return this.collectOutput();
}
private List<ReducerTask<InterKey, InterVal, OutKey, OutVal>> createReducerTasks(ConcurrentMap<InterKey, List<InterVal>> groupedData) {
var reducerTasks = new ArrayList<ReducerTask<InterKey, InterVal, OutKey, OutVal>>();
IntStream.range(0, THREAD_COUNT).forEach((i) -> reducerTasks.add(new ReducerTask<>(reducerFunc)));
groupedData.keySet().forEach((interKey) -> {
int reducerTaskPos = Math.abs(interKey.hashCode() % THREAD_COUNT);
reducerTasks.get(reducerTaskPos).insertEntry(Map.entry(interKey, groupedData.get(interKey)));
});
return reducerTasks;
}
private List<Entry<OutKey, OutVal>> collectOutput() {
var joinedOutput = new ArrayList<Entry<OutKey, OutVal>>();
finalOutput.forEach(list -> joinedOutput.addAll(list));
return joinedOutput;
}
}