coder{}
for i := 0; i < nReduce; i++ {
tempFile, err := ioutil.TempFile("./", "")
if err != nil {
log.Fatalf("cannot open temp file")
}
files = append(files, tempFile)
tmpFileNames = append(tmpFileNames, tempFile.Name())
encoders = append(encoders, json.NewEncoder(tempFile))
}
for _, kv := range kva {
n := ihash(kv.Key) % nReduce
encoders[n].Encode(kv)
}
for i := 0; i < nReduce; i++ {
files[i].Close()
os.Rename(tmpFileNames[i], "./"+intermediateFileName(taskID, i))
}
}
在当前目录创建nReduce个临时文件ioutil.TempFile("./", "")
,使用该临时文件创建json.Encoder
(在hints第四条),使用ihash
函数把每个key映射到哪个文件,写入json格式,然后对每个临时文件重命名为mr-x-y
格式
生成中间文件名函数:
func intermediateFileName(x, y int) string {
return fmt.Sprintf("mr-%v-%v", x, y)
}
3.3.3 执行reduce
func executeReduce(nFiles, taskID int, reducef func(string, []string) string) {
kvs := []KeyValue{}
for i := 0; i < nFiles; i++ {
filename := intermediateFileName(i, taskID)
// 读取每个中间文件
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
// 参考hints第四条,从文件中取出json格式的每条数据
decoder := json.NewDecoder(file)
for {
var kv KeyValue
// 已读到文件末尾
if err := decoder.Decode(&kv); err != nil {
break
}
kvs = append(kvs, kv)
}
file.Close()
}
// 参考mrsequential.go
oname := fmt.Sprintf("mr-out-%v", taskID)
tempFile, _ := ioutil.TempFile("./", "")
tempFileName := tempFile.Name()
sort.Sort(ByKey(kvs))
for i := 0; i < len(kvs); {
j := i + 1
for j < len(kvs) && kvs[j].Key == kvs[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kvs[k].Value)
}
output := reducef(kvs[i].Key, values)
fmt.Fprintf(tempFile, "%v %v\n", kvs[i].Key, output)
i = j
}
tempFile.Close()
os.Rename(tempFileName, "./"+oname)
}