但此时worker-1节点才刚运行完map-1任务并报告给coordinator,coordinator检测到当前是reduce阶段,但收到报告完成的rpc是map类型,不会对其进行任何操作。 type CallTaskDoneArgs struct {
TaskID int
tp taskType
}
type CallTaskDoneReply struct {
}
3.2 Coordinator
3.2.1 结构体设计
type taskState int
const (
spare taskState = iota
executing
finish
)
type task struct {
fileName string
id int
state taskState
start time.Time
}
首先设计一个task struct,该结构体代表一个任务
filename
:在map阶段,用于coordinator告知worker要读取的初始文件
id
: 该任务的id,传给worker,作用在RPC设计中提及
state
:任务有三个状态:空闲、执行中、已完成。若空闲则可以分配给worker;若执行中,则监视该任务是否超时
start
:任务刚开始执行的时间
type Coordinator struct {
// Your definitions here.
mu sync.Mutex
state taskType
tasks []*task
mapChan chan *task
reduceChan chan *task
nReduce int
nFiles int
finished int
}
接着设计主要Coordinator结构体
state
:当前系统的状态,map阶段(分配map任务)、reduce阶段(分配reduce任务)、全部完成done(可以结束系统运行)
tasks
: *task的切片,维护了一组任务
mapChan
、reduceChan
:用于分发map、reduce任务的channel。map阶段,若有空闲map任务,则放至channel中,当有worker请求任务时,则可取出来。reduce阶段同理
finished
:当前已完成任务的数量。map阶段,若finished == nFiles
,则表示所有map任务完成,可以进入reduce阶段。reduce阶段同理,进入done
3.2.2 初始化
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here.
c.mapPhase(files, nReduce)
go c.watch()
c.server()
return &c
}
func (c *Coordinator) mapPhase(files []string, nReduce int) {
c.state = mapType //设置系统状态为map阶段
c.nReduce = nReduce
c.nFiles = len(files)
c.tasks = make([]*task, c.nFiles)
c.mapChan = make(chan *task, c.nFiles) // c.nFiles长度的map channel
for i := 0; i < c.nFiles; i++ {
c.tasks[i] = &task{fileName: files[i], id: i}
c.mapChan <- c.tasks[i] // 刚开始所有任务都是空闲状态,放入channel中
}
}
系统刚开始时即map阶段,mapPhase
初始化Coordinator结构体。然后启动c.watch()
协程,用于监视任务是否超时,放后面讲
3.2.3 分配任务
func (c *Coordinator) CallTask(args *CallTaskArgs, reply *CallTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.state == done {
reply.Tp = done
} else if c.state == mapType {
switch len(c.mapChan) > 0 {
case true:
task := <-c.mapChan
c.setReply(task, reply)
case false:
reply.Tp = waitting
}
} else {
switch len(c.reduceChan) > 0 {
case true:
task := <-c.reduceChan
c.setReply(task, reply)
case false:
reply.Tp = waitting
}
}
return nil
}
func (c *Coordinator) setReply(t *task, reply *CallTaskReply) {
if t.state == finish {
reply.Tp = waitting
return
}
reply.Tp = c.state
reply.TaskID = t.id
reply.NReduce = c.nReduce
reply.NFiles = c.nFiles
reply.FileName = t.fileName
t.state = executing
t.start = time.Now()
}
分配任务的主要函数,worker处会调用call("Coordinator.CallTask", &args, &reply)
。
- 若当前系统状态为done,则返回done,告知worker可以退出了
- 若当前系统状态为map阶段:如果当前有任务可以分配
len(c.mapChan) > 0
,则取出一个task,调用c.setReply(task, reply)
,将任务的相关信息填入reply中,并把task的当前状态设为执行中,开始时间设为time.Now()
。如果没有可分配的任务,则设reply.Tp = waitting
,让worker等待一会再请求任务
- 若当前系统状态为reduce阶段:同上
3.2.4 任务完成
func (c *Coordinator) CallTaskDone(args *CallTaskDoneArgs, reply *CallTaskDoneReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.state != args.Tp || c.state == done {
return nil
}
if c.tasks[args.TaskID].state != finish {
c.tasks[args.TaskID].state = finish
c.finished++
//fmt.Printf("task %v done\n", args.TaskID)
if c.state == mapType && c.f