-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.odin
239 lines (187 loc) · 5.73 KB
/
main.odin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package oasync
import "core:container/queue"
import "core:fmt"
import "core:log"
import "core:math/rand"
import vmem "core:mem/virtual"
import "core:sync"
import "core:sync/chan"
import "core:thread"
import "core:time"
Task :: struct {
effect: proc(),
}
// assigned to each thread
Worker :: struct {
localq: queue.Queue(Task),
run_next: Task,
timestamp: time.Tick, // acts as identifier for each worker, should never collide
coordinator: ^Coordinator,
localq_mutex: sync.Mutex,
arena: vmem.Arena,
}
// heart of the async scheduler
Coordinator :: struct {
workers: [dynamic]Worker, // could do a static sized one but requires too much parapoly to make worth
worker_count: u8,
globalq: chan.Chan(Task), // TODO: queue instead
search_count: u8, // ATOMIC ONLY!
}
Config :: struct {
worker_count: u8,
use_main_thread: bool,
}
// injected into context.user_ptr, overriding its content
// fear not, we provide a field named user_ptr which you can access
// and use at your own pleasure
Ref_Carrier :: struct {
worker: ^Worker,
user_ptr: rawptr,
}
// get worker from context
get_worker :: proc() -> ^Worker {
carrier := cast(^Ref_Carrier)context.user_ptr
return carrier.worker
}
steal :: proc(this: ^Worker) {
// steal from a random worker
worker := rand.choice(this.coordinator.workers[:])
if worker.timestamp == this.timestamp {
// same id, and don't steal from self,
return
}
sync.mutex_lock(&worker.localq_mutex)
defer sync.mutex_unlock(&worker.localq_mutex) // make sure the mutex doesn't get perma locked
// we don't steal from queues that doesn't have items
queue_length := queue.len(worker.localq)
if queue_length == 0 {
return
}
sync.mutex_lock(&this.localq_mutex)
defer sync.mutex_unlock(&this.localq_mutex)
// steal half of the text once we find one
for i in 1 ..= u64(queue_length / 2) { // TODO: need further testing
elem, ok := queue.pop_front_safe(&worker.localq)
if !ok {
log.error("failed to steal")
return
}
queue.push(&this.localq, elem)
}
}
// unsafe function: do not use
run_task :: proc(t: Task) {
t.effect()
}
// event loop that every worker runs
worker_runloop :: proc(t: ^thread.Thread) {
log.debug("runloop started")
worker := get_worker()
for {
// wipe the arena every loop
arena := worker.arena
defer vmem.arena_free_all(&arena)
// tasks in local queue gets scheduled first
sync.mutex_lock(&worker.localq_mutex)
//log.debug("pop")
tsk, exist := queue.pop_front_safe(&worker.localq)
sync.mutex_unlock(&worker.localq_mutex)
if exist {
log.debug("pulled from local queue, running")
run_task(tsk)
continue
}
// local queue seems to be empty at this point, take a look
// at the global channel
//log.debug("chan recv")
tsk, exist = chan.try_recv(worker.coordinator.globalq)
if exist {
log.debug("got item from global channel")
run_task(tsk)
continue
}
// global queue seems to be empty too, enter stealing mode
// increment the stealing count
// this part needs A LOT OF work
//log.debug("steal")
scount := sync.atomic_load(&worker.coordinator.search_count)
if scount < (worker.coordinator.worker_count / 2) { // throttle stealing to half the total thread count
sync.atomic_add(&worker.coordinator.search_count, 1) // register the stealing
steal(worker) // start stealing
sync.atomic_sub(&worker.coordinator.search_count, 1) // register the stealing
}
}
}
// takes a worker context from the context
spawn_task :: proc(task: Task) {
worker := get_worker()
sync.mutex_lock(&worker.localq_mutex) // lock the mutex
defer sync.mutex_unlock(&worker.localq_mutex)
queue.append_elem(&worker.localq, task)
}
setup_thread :: proc(worker: ^Worker) -> ^thread.Thread {
worker.timestamp = time.tick_now()
log.debug("setting up thread for", worker.timestamp)
log.debug("init queue")
queue.init(&worker.localq)
// weird name to avoid collision
thrd := thread.create(worker_runloop) // make a worker thread
ctx := context
log.debug("creating arena alloc")
arena_alloc := vmem.arena_allocator(&worker.arena)
ctx.allocator = arena_alloc
ref_carrier := new_clone(Ref_Carrier{worker = worker, user_ptr = nil})
ctx.user_ptr = ref_carrier
thrd.init_context = ctx
log.debug("built thread")
return thrd
}
make_task :: proc(p: proc()) -> Task {
return Task{effect = p}
}
go :: proc(p: proc()) {
spawn_task(make_task(p))
}
init :: proc(coord: ^Coordinator, cfg: Config, init_task: Task) {
log.debug("starting worker system")
coord.worker_count = cfg.worker_count
// set up the global chan
log.debug("setting up global channel")
ch, aerr := chan.create(chan.Chan(Task), context.allocator)
if aerr != nil {
panic("failed to create channel")
}
coord.globalq = ch
log.debug("setting up loggers")
for i in 1 ..= coord.worker_count {
worker := Worker{}
worker.coordinator = coord
append(&coord.workers, worker)
thrd := setup_thread(&worker)
thread.start(thrd)
log.debug("started", i, "th worker")
}
// chan send freezes indefinitely when nothing is listening to it
// thus it is placed here
log.debug("sending first task")
if chan.send(coord.globalq, init_task) {
log.debug("first task sent")
} else {
panic("failed to fire off the first task")
}
// theats the main thread as a worker too
if cfg.use_main_thread == true {
main_worker := Worker{}
main_worker.coordinator = coord
queue.init(&main_worker.localq)
arena_alloc := vmem.arena_allocator(&main_worker.arena)
main_worker.timestamp = time.tick_now()
context.allocator = arena_alloc
ref_carrier := new_clone(Ref_Carrier{worker = &main_worker, user_ptr = nil})
context.user_ptr = ref_carrier
shim_ptr: ^thread.Thread // not gonna use it
append(&coord.workers, main_worker)
coord.worker_count += 1
worker_runloop(shim_ptr)
}
}