Skip to content

Commit

Permalink
-
Browse files Browse the repository at this point in the history
  • Loading branch information
Yan0613 committed Oct 23, 2023
1 parent 4d2d1e0 commit 4c658e3
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 97 deletions.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/MIT6.824.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 0 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,5 @@ Reduce则合并相同键的值,以word count为例,Reduce是将相同单词
- 获得reduce任务的Worker,通过远程调用请求数据,数据加载完毕后,对数据进行排序,之后遍历数据,将相同key的数据进行合并,最终输出结果;
- 当所有的map和reduce任务完成了,整个MapReduce程序就处理完毕了,Worker得到处理后的数据,通常会保存在不同的小文件中,合并这些文件之后就是最重要的结果。

——转自知乎划拳小匠,网站:<https://zhuanlan.zhihu.com/p/187507617>

Tips: Go语言的通道的用法

1. **通道的创建:**
你可以使用内置的 `make` 函数来创建一个通道。通道有两种类型:带缓冲和不带缓冲。带缓冲的通道可以容纳一定数量的数据项,而不带缓冲的通道在发送和接收数据时需要 Goroutines 同时准备好。

```go
// 创建一个不带缓冲的通道
ch := make(chan int)

// 创建一个带缓冲的通道,容量为 10
ch := make(chan int, 10)
```

2. **通道的发送和接收:**
使用通道的 `<-` 操作符进行数据的发送和接收。发送操作将数据发送到通道,接收操作从通道中接收数据。

```go
// 发送数据到通道
ch <- 42

// 从通道中接收数据
data := <-ch
```

3. **通道的关闭:**
通道可以被关闭,以表示没有更多的数据将被发送到通道。通常在发送者知道不再需要发送数据时关闭通道,接收者可以通过检查通道是否关闭来判断是否还有数据。

```go
close(ch)
```

成功输出中间文件:
![Alt text](image-1.png)
# UIUC2014-Mini-Program
Empty file added src/main/mr-out-8
Empty file.
Empty file added src/main/mr-out-9
Empty file.
96 changes: 44 additions & 52 deletions src/mr/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Coordinator struct {
State int //map stage or reduce stage? 0 start ,1 map , 2 reduce
MapTask chan Task//很显然,这个地方是需要worker来取任务,因此此处必须保证线程的安全,go里面没有自己的队列实现,所以用channel
MapTaskNum int
ReduceTaskNum int
AllReduceTaask int
ReduceTask chan Task
Files []string
NumReduceTask int
Expand All @@ -29,22 +29,26 @@ type Coordinator struct {
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) AssignTask(args *TaskArgs, reply *TaskReply) error {
if len(c.MapTaskFin)!= c.MapTaskNum {
// mu.Lock()
// defer mu.Unlock()
if c.State == 0{
maptask, ok := <-c.MapTask
if ok{
reply.TaskAddr = &maptask
reply.MapTaskNum = c.MapTaskNum
reply.ReduceTaskNum = c.ReduceTaskNum
reply.ReduceTaskNum = c.NumReduceTask
reply.State = c.State
}
}else {
}else if c.State == 1{
reducetask, ok := <-c.ReduceTask
if ok{
reply.TaskAddr = &reducetask
reply.MapTaskNum = c.MapTaskNum
reply.ReduceTaskNum = c.ReduceTaskNum
reply.ReduceTaskNum = c.NumReduceTask
reply.State = c.State
}
}else if c.State == 2{

}

return nil
Expand Down Expand Up @@ -74,7 +78,7 @@ func (c *Coordinator) server() {
func (c *Coordinator) Done() bool {
ret := false

if len(c.ReduceTaskFin )== c.NumReduceTask{
if len(c.ReduceTaskFin)== c.AllReduceTaask{
ret = true
}

Expand All @@ -91,45 +95,41 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
State:0,
MapTask: make(chan Task,len(files)),
ReduceTask: make(chan Task,len(files)*nReduce),
ReduceTask: make(chan Task,nReduce),
AllReduceTaask: nReduce,
Files: files,
MapTaskNum:0,
ReduceTaskNum:0,
MapTaskNum:len(files),
NumReduceTask:nReduce,
MapTaskFin: make(chan bool,len(files)),
ReduceTaskFin: make(chan bool, nReduce),
}
if c.State == 0{

//make map task
for id, file := range (files) {
// fmt.Println("%v", file)
maptask:=Task{
Filename: file,
TaskType: 0,
TaskId:id,
ReduceNum: nReduce, //reduce的数量
}

c.MapTask <- maptask
c.MapTaskNum++
fmt.Println("sucessefully make a map task!")
for id, file := range (files) {
// fmt.Println("%v", file)
maptask:=Task{
Filename: file,
TaskType: 0,
TaskId:id,
ReduceNum: nReduce, //reduce的数量
}
// }else if c.State == 1{

// //make reduce tasks
// for id, file := range (files){
// reducetask:=Task{
// Filename: file,
// TaskType: 1,
// TaskId: id,
// ReduceNum: nReduce, //reduce的数量
// }

// c.ReduceTask <- reducetask
// fmt.Println("sucessefully make a reduce task!")
// }

c.MapTask <- maptask
}
fmt.Println("sucessefully make all map tasks!")

//make reduce tasks

for i:=0;i<nReduce;i++{
reducetask:=Task{
TaskType: 1,
TaskId: i,
ReduceNum: nReduce, //reduce的数量
}
c.ReduceTask <- reducetask
}

fmt.Println("sucessefully make all reduce tasks!")
c.server()
return &c
}
Expand All @@ -140,25 +140,17 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator {
func (c *Coordinator)MarkDoneTask(args *TaskArgs, reply *TaskReply) error{
if len(c.MapTaskFin)!= c.MapTaskNum {
c.MapTaskFin <- true
}else if len(c.MapTaskFin)== c.MapTaskNum && len(c.ReduceTaskFin) != c.NumReduceTask{
c.State = 1
nReduce := c.NumReduceTask
files := c.Files
//make reduce tasks
for id, file := range (files){
reducetask:=Task{
Filename: file,
TaskType: 1,
TaskId: id,
ReduceNum: nReduce, //reduce的数量
}

c.ReduceTask <- reducetask
fmt.Println("sucessefully make a reduce task!")
if len(c.MapTaskFin) == c.MapTaskNum{
c.State = 1
fmt.Println("all map tasks are done, Start reduce stage")
}
}else if len(c.ReduceTaskFin) == c.NumReduceTask{
}else if len(c.MapTaskFin) == c.MapTaskNum&&len(c.ReduceTaskFin) != c.AllReduceTaask{
c.ReduceTaskFin <- true
if len(c.ReduceTaskFin) == c.AllReduceTaask{
c.State = 2
fmt.Println("all reduce tasks are done!")
}
}

return nil
}
24 changes: 13 additions & 11 deletions src/mr/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"strconv"
"sort"
// "time"
)


Expand Down Expand Up @@ -52,6 +53,7 @@ func Worker(mapf func(string, string) []KeyValue,
// Your worker implementation here.
reply:= CallTask()
task:= reply.TaskAddr
fmt.Println("task:",task)
switch task.TaskType {
case 0: {
// uncomment to send the Task RPC to the coordinator.
Expand All @@ -61,11 +63,12 @@ func Worker(mapf func(string, string) []KeyValue,
case 1: {
DoReduceTask(reducef,reply)
TaskDone(reply)
if reply.State == 2{
break
}
}
}
if reply.State == 2{
break
}
// time.Sleep(time.Second)

}

Expand Down Expand Up @@ -93,7 +96,7 @@ func CallTask() TaskReply{
ok := call("Coordinator.AssignTask", &args, &reply)
if ok {
// reply.Y should be 100.
fmt.Printf("HERE IS CALL TASK,THE FILE NAME IS : %v\n", reply.TaskAddr.Filename)
fmt.Printf("Call sucessfully : %v\n", reply.TaskAddr.TaskType)
} else {
fmt.Printf("call failed!\n")
}
Expand Down Expand Up @@ -124,7 +127,7 @@ func call(rpcname string, args interface{}, reply interface{}) bool {


func DoMapTask(mapf func(string, string) []KeyValue, reply TaskReply){
// intermediate := []KeyValue{}
var intermediate = []KeyValue{}
task:=reply.TaskAddr
filename:= task.Filename
file, err := os.Open(filename)
Expand All @@ -136,7 +139,7 @@ func DoMapTask(mapf func(string, string) []KeyValue, reply TaskReply){
log.Fatalf("cannot read %v", filename)
}
file.Close()
intermediate := mapf(filename, string(content))
intermediate = mapf(filename, string(content))
// NOW we got pairs of kv, we need to store and write them in temp files
reduceNum := task.ReduceNum
HashKv := make([][]KeyValue, reduceNum)
Expand Down Expand Up @@ -172,19 +175,19 @@ func TaskDone(donereply TaskReply){

ok := call("Coordinator.MarkDoneTask", &args, &reply)
if ok {
fmt.Printf("HERE IS TASK DONE!")
fmt.Printf("task done!")
} else {
fmt.Printf("call failed!\n")
}
}

func DoReduceTask(reducef func(string, []string) string, reply TaskReply){
task := reply.TaskAddr
num_reduce := task.ReduceNum
num_map := reply.MapTaskNum
intermediate := []KeyValue{}
id := task.TaskId
for i:=0; i<num_reduce; i++{
map_filename := "mr-" + strconv.Itoa(id)+ "-" + strconv.Itoa(i)
for i:=0; i<num_map; i++{
map_filename := "mr-" + strconv.Itoa(i)+ "-" + strconv.Itoa(id)
inputfile,err := os.OpenFile(map_filename, os.O_RDONLY, 0777)
if err != nil{
log.Fatalf("OPEN MAP TEMP FILE '%v FAILED!", map_filename)
Expand Down Expand Up @@ -220,7 +223,6 @@ func DoReduceTask(reducef func(string, []string) string, reply TaskReply){

// this is the correct format for each line of Reduce output.
fmt.Fprintf(tmp_file, "%v %v\n", intermediate[i].Key, output)

i = j
}

Expand Down

0 comments on commit 4c658e3

Please sign in to comment.