Skip to content

Commit

Permalink
'framework'
Browse files Browse the repository at this point in the history
  • Loading branch information
Yan0613 committed Sep 17, 2023
1 parent ccecaff commit 664e493
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 65 deletions.
109 changes: 92 additions & 17 deletions src/mr/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,35 @@ mu := sync.Mutex

type Coordinator struct {
// Your definitions here.
mapFiles []string // 输入文件列表
nReduce int // Reduce 任务数量
mapTasks []Task // Map 任务的状态和进度
reduceTasks []Task // Reduce 任务的状态和进度
mapFinished int // 已完成的 Map 任务数量
reduceFinished int // 已完成的 Reduce 任务数量
ReducerNum int // 传入的参数决定需要多少个reducer
TaskId int // 用于生成task的特殊id
DistPhase Phase // 目前整个框架应该处于什么任务阶段
TaskChannelReduce chan *Task // 使用chan保证并发安全
TaskChannelMap chan *Task // 使用chan保证并发安全
taskMetaHolder TaskMetaHolder // 存着task
files []string // 传入的文件数组
}

// TaskMetaHolder 保存全部任务的元数据
type TaskMetaHolder struct {
MetaMap map[int]*TaskMetaInfo // 通过下标hash快速定位
}

// TaskMetaInfo 保存任务的元数据
type TaskMetaInfo struct {
state State // 任务的状态
TaskAdr *Task // 传入任务的指针,为的是这个任务从通道中取出来后,还能通过地址标记这个任务已经完成
}

const (
MapPhase Phase = iota // 此阶段在分发MapTask
ReducePhase // 此阶段在分发ReduceTask
AllDone // 此阶段已完成
)




// Your code here -- RPC handlers for the worker to call.

//
Expand All @@ -26,11 +47,44 @@ type Coordinator struct {
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) AssignTask(args *TaskArgs, reply *TaskReply) error {
// 分发任务应该上锁,防止多个worker竞争,并用defer回退解锁
mu.Lock()
defer mu.Unlock()
switch c.Phase{
case MapPhase:
reply := c.TaskChannelMap
case ReducePhase:
reply := c.TaskChannelReduce
Exit()
}
}


// 对map任务进行处理,初始化map任务
func (c *Coordinator) makeMapTasks(files []string) {
for _, v := range files {
id := c.generateTaskId()
task := Task{
TaskType: MapTask,
TaskId: id,
ReducerNum: c.ReducerNum,
Filename: v,
}

// 保存任务的初始状态
taskMetaInfo := TaskMetaInfo{
state: Waiting, // 任务等待被执行
TaskAdr: &task, // 保存任务的地址
}
c.taskMetaHolder.acceptMeta(&taskMetaInfo)

fmt.Println("make a map task :", &task)
c.TaskChannelMap <- &task
}
}

func MakeReduceTasks(c *Coordinator){

}
//
// start a thread that listens for RPCs from worker.go
//
Expand All @@ -51,26 +105,47 @@ func (c *Coordinator) server() {
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
//Done 主函数mr调用,如果所有task完成mr会通过此方法退出
func (c *Coordinator) Done() bool {
ret := false

// Your code here.


return ret
mu.Lock()
defer mu.Unlock()
if c.DistPhase == AllDone {
fmt.Printf("All tasks are finished,the coordinator will be exit! !")
return true
} else {
return false
}
}


//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}

// Your code here.

c := Coordinator{
files: files,
ReducerNum: nReduce,
DistPhase: MapPhase,
TaskChannelMap: make(chan *Task, len(files)),
TaskChannelReduce: make(chan *Task, nReduce),
taskMetaHolder: TaskMetaHolder{
MetaMap: make(map[int]*TaskMetaInfo, len(files)+nReduce), // 任务的总数应该是files + Reducer的数量
},
}
c.makeMapTasks(files)

c.server()
return &c

// c := Coordinator{}

// // Your code here.


// c.server()
// return &c
}


43 changes: 3 additions & 40 deletions src/mr/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Task struct {
// TaskArgs rpc应该传入的参数,可实际上应该什么都不用传,因为只是worker获取一个任务
type TaskArgs struct{}

type TaskReply struct{}

// TaskType 对于下方枚举任务的父类型
type TaskType int

Expand Down Expand Up @@ -87,43 +89,4 @@ func coordinatorSock() string {
//coordinatorSock() 函数:这个函数用于生成一个唯一的 UNIX 域套接字名称,用于协调器(coordinator)。
// UNIX 域套接字是一种在同一台机器上的不同进程之间进行本地通信的机制。
// 这个函数通过在文件路径中添加用户ID(通过 os.Getuid() 获取)来生成唯一的套接字名称。
// 这个函数的返回值是一个字符串,表示套接字的路径



package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type TaskArgs struct {

}

type TaskReply struct {

}

// Add your RPC definitions here.


// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
s := "/var/tmp/5840-mr-"
s += strconv.Itoa(os.Getuid())
return s
}
// 这个函数的返回值是一个字符串,表示套接字的路径
28 changes: 20 additions & 8 deletions src/mr/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func ihash(key string) int {
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for {
// Request a task from the coordinator.
task := CallTask()
task := GetTask()

switch task.TaskType {
case MapTask:
Expand Down Expand Up @@ -76,13 +76,25 @@ func DoMapTask(mapf func(string, string) []KeyValue, task *Task) {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
//... 是一个扩展操作符(variadic operator)。 扩展操作符的作用是将切片 kva 中的元素逐个展开,
//作为参数传递给 append 函数。
//这样可以将 kva 中的元素一个一个地追加到 intermediate 中,而不是将整个 kva 切片作为一个元素追加。

// After processing, call callDone() to notify the coordinator of task completion.
intermediate = mapf(filename, string(content))

//initialize and loop over []KeyValue
rn := response.ReducerNum
// 创建一个长度为nReduce的二维切片
HashedKV := make([][]KeyValue, rn)

for _, kv := range intermediate {
HashedKV[ihash(kv.Key)%rn] = append(HashedKV[ihash(kv.Key)%rn], kv)
}
for i := 0; i < rn; i++ {
oname := "mr-tmp-" + strconv.Itoa(response.TaskId) + "-" + strconv.Itoa(i)
ofile, _ := os.Create(oname)
enc := json.NewEncoder(ofile)
for _, kv := range HashedKV[i] {
enc.Encode(kv)
}
ofile.Close()
}
callDone()
}

Expand Down

0 comments on commit 664e493

Please sign in to comment.