-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkerpool.go
107 lines (88 loc) · 1.88 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package workerpool
// WorkerPool is a worker pool
type WorkerPool struct {
size int
workers []*Worker
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool(size int) *WorkerPool {
pool := &WorkerPool{
size: size,
workers: make([]*Worker, size),
}
for i := 0; i < size; i++ {
pool.workers[i] = NewWorker()
}
return pool
}
// Size returns the size of the worker pool
func (pool *WorkerPool) Size() int {
return pool.size
}
// AddTask adds a task to the worker pool
func (pool *WorkerPool) AddTask(task Task) {
worker := pool.getWorker()
worker.AddTask(task)
}
// AddTasks adds tasks to the worker pool
func (pool *WorkerPool) AddTasks(tasks []Task) {
for _, task := range tasks {
pool.AddTask(task)
}
}
// Wait waits for all tasks to be completed
func (pool *WorkerPool) Wait() {
for _, worker := range pool.workers {
worker.Wait()
}
}
// getWorker returns a worker with the least number of tasks
func (pool *WorkerPool) getWorker() *Worker {
worker := pool.workers[0]
for _, w := range pool.workers {
if w.NumTasks() < worker.NumTasks() {
worker = w
}
}
return worker
}
// Task is a task
type Task interface {
Do()
}
// Worker is a worker
type Worker struct {
tasks chan Task
done chan bool
numTasks int
}
// NewWorker creates a new worker
func NewWorker() *Worker {
worker := &Worker{
tasks: make(chan Task),
done: make(chan bool),
}
go worker.run()
return worker
}
// AddTask adds a task to the worker
func (worker *Worker) AddTask(task Task) {
worker.tasks <- task
worker.numTasks++
}
// NumTasks returns the number of tasks in the worker
func (worker *Worker) NumTasks() int {
return worker.numTasks
}
// Wait waits for all tasks to be completed
func (worker *Worker) Wait() {
<-worker.done
}
// run runs the worker
func (worker *Worker) run() {
for task := range worker.tasks {
task.Do()
worker.numTasks--
}
worker.done <- true
}