diff --git a/mapreduce/master.go b/mapreduce/master.go index e02f953..5807f23 100644 --- a/mapreduce/master.go +++ b/mapreduce/master.go @@ -28,11 +28,6 @@ type Master struct { idleWorkerChan chan *RemoteWorker failedWorkerChan chan *RemoteWorker - - /////////////////////////////// - // ADD EXTRA PROPERTIES HERE // - /////////////////////////////// - // Fault Tolerance } type Operation struct { @@ -77,9 +72,15 @@ func (master *Master) acceptMultipleConnections() { // handleFailingWorkers will handle workers that fails during an operation. func (master *Master) handleFailingWorkers() { - ///////////////////////// - // YOUR CODE GOES HERE // - ///////////////////////// + var ( + failedWorker *RemoteWorker + ) + for failedWorker = range master.failedWorkerChan { + master.workersMutex.Lock() + delete(master.workers, failedWorker.id) + master.workersMutex.Unlock() + log.Println("Removing worker with id ", failedWorker.id) + } } // Handle a single connection until it's done, then closes it. diff --git a/mapreduce/master_scheduler.go b/mapreduce/master_scheduler.go index e081083..f557b39 100644 --- a/mapreduce/master_scheduler.go +++ b/mapreduce/master_scheduler.go @@ -8,10 +8,6 @@ import ( // Schedules map operations on remote workers. This will run until InputFilePathChan // is closed. If there is no worker available, it'll block. func (master *Master) schedule(task *Task, proc string, filePathChan chan string) int { - ////////////////////////////////// - // YOU WANT TO MODIFY THIS CODE // - ////////////////////////////////// - var ( wg sync.WaitGroup filePath string @@ -31,22 +27,18 @@ func (master *Master) schedule(task *Task, proc string, filePathChan chan string wg.Add(1) go master.runOperation(worker, operation, &wg) } - wg.Wait() - log.Printf("%vx %v operations completed\n", counter, proc) return counter } // runOperation start a single operation on a RemoteWorker and wait for it to return or fail. func (master *Master) runOperation(remoteWorker *RemoteWorker, operation *Operation, wg *sync.WaitGroup) { - ////////////////////////////////// - // YOU WANT TO MODIFY THIS CODE // - ////////////////////////////////// var ( err error args *RunArgs + ) log.Printf("Running %v (ID: '%v' File: '%v' Worker: '%v')\n", operation.proc, operation.id, operation.filePath, remoteWorker.id) @@ -56,8 +48,12 @@ func (master *Master) runOperation(remoteWorker *RemoteWorker, operation *Operat if err != nil { log.Printf("Operation %v '%v' Failed. Error: %v\n", operation.proc, operation.id, err) - wg.Done() + //Em caso de falha, mandamos o worker para o failedWorkerChan e procuramos um novo worker para realizar a op. nao feita master.failedWorkerChan <- remoteWorker + remoteWorker = <-master.idleWorkerChan + wg.Add(1) + go master.runOperation(remoteWorker, operation, wg) + wg.Done() } else { wg.Done() master.idleWorkerChan <- remoteWorker