golang[99]-mapReduce

mapReduce简介

MapReduce是Google提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。概念“Map(映射)”和“Reduce(归纳)”,及他们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。
当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归纳)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

上面的说法来自维基百科,mapReduce作为一个成熟、可靠、简单、高效的分布式架构,可以在并行计算上提供超强悍的性能。

抽象表示

map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)

示意图

如上图所示是一个mapReduce模型的示意图,Input files 代表原始数据, Output files 代表最后输出的数据。
worker 既可以执行map,也可以执行reduce,具体依赖master的分配。
在输入输出之间,经历了两个过程。一个是map 一个是reduce。
map的过程是将一个复杂的问题 处理后转换为许多个小文件。例如map是对原始数据按照某种规律进行分类。 a类一个文件,b类一个文件…
reduce是对一个分类的问题进行计算的过程。
master是主调度器,例如将map生成的a类文件交由reduce worker去处理。而reduce函数处理的可能就是对a类文件中所有数据进行汇总。

example 伪代码

1
2
3
4
5
6
7
8
9
10
11
12
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

golang简单实现

master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Master holds all the state that the master needs to keep track of.
type Master struct {
sync.Mutex

address string
doneChannel chan bool

// protected by the mutex
newCond *sync.Cond // signals when Register() adds to workers[]
workers []string // each worker's UNIX-domain socket name -- its RPC address

// Per-task information
jobName string // Name of currently executing job
files []string // Input files
nReduce int // Number of reduce partitions

shutdown chan struct{}
l net.Listener
stats []int
}

master的 rpc方法 注册worker

1
2
3
4
5
6
7
8
9
10
11
12
13
// Register is an RPC method that is called by workers after they have started
// up to report that they are ready to receive tasks.
func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error {
mr.Lock()
defer mr.Unlock()
debug("Register: worker %s\n", args.Worker)
mr.workers = append(mr.workers, args.Worker)

// tell forwardRegistrations() that there's a new workers[] entry.
mr.newCond.Broadcast()

return nil
}

master schedule 调度。

1、维护是否有任务 以及 空闲worker
2、调用失败会分配给其他worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// schedule() starts and waits for all tasks in the given phase (Map
// or Reduce). the mapFiles argument holds the names of the files that
// are the inputs to the map phase, one per map task. nReduce is the
// number of reduce tasks. the registerChan argument yields a stream
// of registered workers; each item is the worker's RPC address,
// suitable for passing to call(). registerChan will yield all
// existing registered workers (if any) and new ones as they register.
//
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

// All ntasks tasks have to be scheduled on workers, and only once all of
// them have been completed successfully should the function return.
// Remember that workers may fail, and that any given worker may finish
// multiple tasks.

// schedule will wait until all worker has done their jobs
var wg sync.WaitGroup

// RPC call parameter
var task DoTaskArgs
task.JobName = jobName
task.NumOtherPhase = n_other
task.Phase = phase

// task id will get from this channel
var taskChan = make(chan int)
go func() {
for i := 0; i < ntasks; i++ {
wg.Add(1)
taskChan <- i
}
// wait all workers have done their job, then close taskChan
wg.Wait()
close(taskChan)
}()

// assign all task to worker
for i := range taskChan {
// get a worker from register channel
worker := <-registerChan

task.TaskNumber = i
if phase == mapPhase {
task.File = mapFiles[i]
}

// Note: must use parameter
go func(worker string, task DoTaskArgs) {
if call(worker, "Worker.DoTask", &task, nil) {
// only successful call will call wg.Done()
wg.Done()

// put idle worker back to register channel
registerChan <- worker;
} else {
log.Printf("Schedule: assign %s task %v to %s failed", phase,
task.TaskNumber, worker)

// put failed task back to task channel
taskChan <- task.TaskNumber
}
}(worker, task)
}
fmt.Printf("Schedule: %v phase done\n", phase)
}

worker

worker是一个rpc服务,DoTask is called by the master when a new task is being scheduled on this worker.
worker可以是map worker 也可以是reduce worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Worker holds the state for a server waiting for DoTask or Shutdown RPCs
type Worker struct {
sync.Mutex

name string
Map func(string, string) []KeyValue
Reduce func(string, []string) string
nRPC int // quit after this many RPCs; protected by mutex
nTasks int // total tasks executed; protected by mutex
concurrent int // number of parallel DoTasks in this worker; mutex
l net.Listener
}

// DoTask is called by the master when a new task is being scheduled on this
// worker.
func (wk *Worker) DoTask(arg *DoTaskArgs, _ *struct{}) error {
fmt.Printf("%s: given %v task #%d on file %s (nios: %d)\n",
wk.name, arg.Phase, arg.TaskNumber, arg.File, arg.NumOtherPhase)

wk.Lock()
wk.nTasks += 1
wk.concurrent += 1
nc := wk.concurrent
wk.Unlock()

if nc > 1 {
// schedule() should never issue more than one RPC at a
// time to a given worker.
log.Fatal("Worker.DoTask: more than one DoTask sent concurrently to a single worker\n")
}

switch arg.Phase {
case mapPhase:
doMap(arg.JobName, arg.TaskNumber, arg.File, arg.NumOtherPhase, wk.Map)
case reducePhase:
doReduce(arg.JobName, arg.TaskNumber, mergeName(arg.JobName, arg.TaskNumber), arg.NumOtherPhase, wk.Reduce)
}

wk.Lock()
wk.concurrent -= 1
wk.Unlock()

fmt.Printf("%s: %v task #%d done\n", wk.name, arg.Phase, arg.TaskNumber)
return nil
}

简单的mapreduce实现

1
2
3
4
5
6
7
8
9
10
// reduceName constructs the name of the intermediate file which map task
// <mapTask> produces for reduce task <reduceTask>.
func reduceName(jobName string, mapTask int, reduceTask int) string {
return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}

// mergeName constructs the name of the output file of reduce task <reduceTask>
func mergeName(jobName string, reduceTask int) string {
return "mrtmp." + jobName + "-res-" + strconv.Itoa(reduceTask)
}

map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// doMap manages one map task: it reads one of the input files
// (inFile), calls the user-defined map function (mapF) for that file's
// contents, and partitions the output into nReduce intermediate files.
func doMap(
jobName string, // the name of the MapReduce job
mapTaskNumber int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(file string, contents string) []KeyValue,
) {
//
// You will need to write this function.
//
// The intermediate output of a map task is stored as multiple
// files, one per destination reduce task. The file name includes
// both the map task number and the reduce task number. Use the
// filename generated by reduceName(jobName, mapTaskNumber, r) as
// the intermediate file for reduce task r. Call ihash() (see below)
// on each key, mod nReduce, to pick r for a key/value pair.
//
// mapF() is the map function provided by the application. The first
// argument should be the input file name, though the map function
// typically ignores it. The second argument should be the entire
// input file contents. mapF() returns a slice containing the
// key/value pairs for reduce; see common.go for the definition of
// KeyValue.
//
// Look at Go's ioutil and os packages for functions to read
// and write files.
//
// Coming up with a scheme for how to format the key/value pairs on
// disk can be tricky, especially when taking into account that both
// keys and values could contain newlines, quotes, and any other
// character you can think of.
//
// One format often used for serializing data to a byte stream that the
// other end can correctly reconstruct is JSON. You are not required to
// use JSON, but as the output of the reduce tasks *must* be JSON,
// familiarizing yourself with it here may prove useful. You can write
// out a data structure as a JSON string to a file using the commented
// code below. The corresponding decoding functions can be found in
// common_reduce.go.
//
// enc := json.NewEncoder(file)
// for _, kv := ... {
// err := enc.Encode(&kv)
//
// Remember to close the file after you have written all the values!
//
contents, err := ioutil.ReadFile(inFile)
if err != nil {
log.Printf("read file %s failed", inFile)
return
}
kvs := mapF(inFile, string(contents))

var imm = make([]*os.File, nReduce)
var enc = make([]*json.Encoder, nReduce)
for i := 0; i < nReduce; i++ {
if f, err := os.Create(reduceName(jobName, mapTaskNumber, i)); err != nil {
log.Printf("create file %s failed", reduceName(jobName, mapTaskNumber, i))
} else {
imm[i] = f
enc[i] = json.NewEncoder(f)
}
}

for _, kv := range kvs {
r := ihash(kv.Key) % nReduce
if enc[r] != nil {
if err := enc[r].Encode(&kv); err != nil {
log.Printf("wirte %v to file %s failed", kv, reduceName(jobName, mapTaskNumber, r))
}
}
}

// close immediate files
for i := 0; i < nReduce; i++ {
if imm[i] != nil {
imm[i].Close()
}
}
}

func ihash(s string) int {
h := fnv.New32a()
h.Write([]byte(s))
return int(h.Sum32() & 0x7fffffff)
}

reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// doReduce manages one reduce task: it reads the intermediate
// key/value pairs (produced by the map phase) for this task, sorts the
// intermediate key/value pairs by key, calls the user-defined reduce function
// (reduceF) for each key, and writes the output to disk.
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTaskNumber int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//
// You will need to write this function.
//
// You'll need to read one intermediate file from each map task;
// reduceName(jobName, m, reduceTaskNumber) yields the file
// name from map task m.
//
// Your doMap() encoded the key/value pairs in the intermediate
// files, so you will need to decode them. If you used JSON, you can
// read and decode by creating a decoder and repeatedly calling
// .Decode(&kv) on it until it returns an error.
//
// You may find the first example in the golang sort package
// documentation useful.
//
// reduceF() is the application's reduce function. You should
// call it once per distinct key, with a slice of all the values
// for that key. reduceF() returns the reduced value for that key.
//
// You should write the reduce output as JSON encoded KeyValue
// objects to the file named outFile. We require you to use JSON
// because that is what the merger than combines the output
// from all the reduce tasks expects. There is nothing special about
// JSON -- it is just the marshalling format we chose to use. Your
// output code will look something like this:
//
// enc := json.NewEncoder(file)
// for key := ... {
// enc.Encode(KeyValue{key, reduceF(...)})
// }
// file.Close()
//
var keys []string // store all keys in this partition
var kvs = make(map[string][]string) // store all key-value pairs from nMap imm files

// read nMap imm files from map workers
for i := 0; i < nMap; i++ {
fn := reduceName(jobName, i, reduceTaskNumber)
fmt.Println("reduce fn",fn)
imm, err := os.Open(fn)
if err != nil {
log.Printf("open immediate file %s failed", fn)
continue
}

var kv KeyValue
dec := json.NewDecoder(imm)
err = dec.Decode(&kv)
for err == nil {
// is this key seen?
if _, ok := kvs[kv.Key]; !ok {
keys = append(keys, kv.Key)
}
kvs[kv.Key] = append(kvs[kv.Key], kv.Value)

// decode repeatedly until an error
err = dec.Decode(&kv)
}
}

// Original MapReduce Paper 4.2 Ordering Guarantees
// Keys in one partition are processed in increasing key order
sort.Strings(keys)
out, err := os.Create(outFile)
if err != nil {
log.Printf("create output file %s failed", outFile)
return
}
enc := json.NewEncoder(out)
for _, key := range keys {
if err = enc.Encode(KeyValue{key, reduceF(key, kvs[key])}); err != nil {
log.Printf("write [key: %s] to file %s failed", key, outFile)
}
}
out.Close()
}

完整例子

https://github.com/dreamerjackson/theWayToGolang

原始论文 参考资料

https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf
https://pdos.csail.mit.edu/6.824/labs/lab-1.html