map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1"); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
// Master holds all the state that the master needs to keep track of. type Master struct { sync.Mutex
address string doneChannel chanbool
// protected by the mutex newCond *sync.Cond // signals when Register() adds to workers[] workers []string// each worker's UNIX-domain socket name -- its RPC address
// Per-task information jobName string// Name of currently executing job files []string// Input files nReduce int// Number of reduce partitions
shutdown chanstruct{} l net.Listener stats []int }
master的 rpc方法 注册worker
1 2 3 4 5 6 7 8 9 10 11 12 13
// Register is an RPC method that is called by workers after they have started // up to report that they are ready to receive tasks. func(mr *Master)Register(args *RegisterArgs, _ *struct{})error { mr.Lock() defer mr.Unlock() debug("Register: worker %s\n", args.Worker) mr.workers = append(mr.workers, args.Worker)
// tell forwardRegistrations() that there's a new workers[] entry. mr.newCond.Broadcast()
// schedule() starts and waits for all tasks in the given phase (Map // or Reduce). the mapFiles argument holds the names of the files that // are the inputs to the map phase, one per map task. nReduce is the // number of reduce tasks. the registerChan argument yields a stream // of registered workers; each item is the worker's RPC address, // suitable for passing to call(). registerChan will yield all // existing registered workers (if any) and new ones as they register. // funcschedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chanstring) { var ntasks int var n_other int// number of inputs (for reduce) or outputs (for map) switch phase { case mapPhase: ntasks = len(mapFiles) n_other = nReduce case reducePhase: ntasks = nReduce n_other = len(mapFiles) }
// All ntasks tasks have to be scheduled on workers, and only once all of // them have been completed successfully should the function return. // Remember that workers may fail, and that any given worker may finish // multiple tasks.
// schedule will wait until all worker has done their jobs var wg sync.WaitGroup
// task id will get from this channel var taskChan = make(chanint) gofunc() { for i := 0; i < ntasks; i++ { wg.Add(1) taskChan <- i } // wait all workers have done their job, then close taskChan wg.Wait() close(taskChan) }()
// assign all task to worker for i := range taskChan { // get a worker from register channel worker := <-registerChan
task.TaskNumber = i if phase == mapPhase { task.File = mapFiles[i] }
// Note: must use parameter gofunc(worker string, task DoTaskArgs) { if call(worker, "Worker.DoTask", &task, nil) { // only successful call will call wg.Done() wg.Done()
// put idle worker back to register channel registerChan <- worker; } else { log.Printf("Schedule: assign %s task %v to %s failed", phase, task.TaskNumber, worker)
// put failed task back to task channel taskChan <- task.TaskNumber } }(worker, task) } fmt.Printf("Schedule: %v phase done\n", phase) }
worker
worker是一个rpc服务,DoTask is called by the master when a new task is being scheduled on this worker.
worker可以是map worker 也可以是reduce worker
// Worker holds the state for a server waiting for DoTask or Shutdown RPCs type Worker struct { sync.Mutex
name string Map func(string, string) []KeyValue Reduce func(string, []string) string nRPC int // quit after this many RPCs; protected by mutex nTasks int // total tasks executed; protected by mutex concurrent int // number of parallel DoTasks in this worker; mutex l net.Listener }
// DoTask is called by the master when a new task is being scheduled on this // worker. func (wk *Worker) DoTask(arg *DoTaskArgs, _ *struct{}) error { fmt.Printf("%s: given %v task #%d on file %s (nios: %d)\n", wk.name, arg.Phase, arg.TaskNumber, arg.File, arg.NumOtherPhase)
if nc > 1 { // schedule() should never issue more than one RPC at a // time to a given worker. log.Fatal("Worker.DoTask: more than one DoTask sent concurrently to a single worker\n") }
switch arg.Phase { case mapPhase: doMap(arg.JobName, arg.TaskNumber, arg.File, arg.NumOtherPhase, wk.Map) case reducePhase: doReduce(arg.JobName, arg.TaskNumber, mergeName(arg.JobName, arg.TaskNumber), arg.NumOtherPhase, wk.Reduce) }
// doMap manages one map task: it reads one of the input files // (inFile), calls the user-defined map function (mapF) for that file's // contents, and partitions the output into nReduce intermediate files. funcdoMap( jobName string, // the name of the MapReduce job mapTaskNumber int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run ("R" in the paper) mapF func(file string, contents string) []KeyValue, ) { // // You will need to write this function. // // The intermediate output of a map task is stored as multiple // files, one per destination reduce task. The file name includes // both the map task number and the reduce task number. Use the // filename generated by reduceName(jobName, mapTaskNumber, r) as // the intermediate file for reduce task r. Call ihash() (see below) // on each key, mod nReduce, to pick r for a key/value pair. // // mapF() is the map function provided by the application. The first // argument should be the input file name, though the map function // typically ignores it. The second argument should be the entire // input file contents. mapF() returns a slice containing the // key/value pairs for reduce; see common.go for the definition of // KeyValue. // // Look at Go's ioutil and os packages for functions to read // and write files. // // Coming up with a scheme for how to format the key/value pairs on // disk can be tricky, especially when taking into account that both // keys and values could contain newlines, quotes, and any other // character you can think of. // // One format often used for serializing data to a byte stream that the // other end can correctly reconstruct is JSON. You are not required to // use JSON, but as the output of the reduce tasks *must* be JSON, // familiarizing yourself with it here may prove useful. You can write // out a data structure as a JSON string to a file using the commented // code below. The corresponding decoding functions can be found in // common_reduce.go. // // enc := json.NewEncoder(file) // for _, kv := ... { // err := enc.Encode(&kv) // // Remember to close the file after you have written all the values! // contents, err := ioutil.ReadFile(inFile) if err != nil { log.Printf("read file %s failed", inFile) return } kvs := mapF(inFile, string(contents))
var imm = make([]*os.File, nReduce) var enc = make([]*json.Encoder, nReduce) for i := 0; i < nReduce; i++ { if f, err := os.Create(reduceName(jobName, mapTaskNumber, i)); err != nil { log.Printf("create file %s failed", reduceName(jobName, mapTaskNumber, i)) } else { imm[i] = f enc[i] = json.NewEncoder(f) } }
for _, kv := range kvs { r := ihash(kv.Key) % nReduce if enc[r] != nil { if err := enc[r].Encode(&kv); err != nil { log.Printf("wirte %v to file %s failed", kv, reduceName(jobName, mapTaskNumber, r)) } } }
// close immediate files for i := 0; i < nReduce; i++ { if imm[i] != nil { imm[i].Close() } } }
// doReduce manages one reduce task: it reads the intermediate // key/value pairs (produced by the map phase) for this task, sorts the // intermediate key/value pairs by key, calls the user-defined reduce function // (reduceF) for each key, and writes the output to disk. funcdoReduce( jobName string, // the name of the whole MapReduce job reduceTaskNumber int, // which reduce task this is outFile string, // write the output here nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string)string, ) { // // You will need to write this function. // // You'll need to read one intermediate file from each map task; // reduceName(jobName, m, reduceTaskNumber) yields the file // name from map task m. // // Your doMap() encoded the key/value pairs in the intermediate // files, so you will need to decode them. If you used JSON, you can // read and decode by creating a decoder and repeatedly calling // .Decode(&kv) on it until it returns an error. // // You may find the first example in the golang sort package // documentation useful. // // reduceF() is the application's reduce function. You should // call it once per distinct key, with a slice of all the values // for that key. reduceF() returns the reduced value for that key. // // You should write the reduce output as JSON encoded KeyValue // objects to the file named outFile. We require you to use JSON // because that is what the merger than combines the output // from all the reduce tasks expects. There is nothing special about // JSON -- it is just the marshalling format we chose to use. Your // output code will look something like this: // // enc := json.NewEncoder(file) // for key := ... { // enc.Encode(KeyValue{key, reduceF(...)}) // } // file.Close() // var keys []string// store all keys in this partition var kvs = make(map[string][]string) // store all key-value pairs from nMap imm files
// read nMap imm files from map workers for i := 0; i < nMap; i++ { fn := reduceName(jobName, i, reduceTaskNumber) fmt.Println("reduce fn",fn) imm, err := os.Open(fn) if err != nil { log.Printf("open immediate file %s failed", fn) continue }
var kv KeyValue dec := json.NewDecoder(imm) err = dec.Decode(&kv) for err == nil { // is this key seen? if _, ok := kvs[kv.Key]; !ok { keys = append(keys, kv.Key) } kvs[kv.Key] = append(kvs[kv.Key], kv.Value)
// decode repeatedly until an error err = dec.Decode(&kv) } }
// Original MapReduce Paper 4.2 Ordering Guarantees // Keys in one partition are processed in increasing key order sort.Strings(keys) out, err := os.Create(outFile) if err != nil { log.Printf("create output file %s failed", outFile) return } enc := json.NewEncoder(out) for _, key := range keys { if err = enc.Encode(KeyValue{key, reduceF(key, kvs[key])}); err != nil { log.Printf("write [key: %s] to file %s failed", key, outFile) } } out.Close() }