-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstore.mjs
117 lines (92 loc) · 2.65 KB
/
store.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import EventEmitter from 'events'
/*
* Using object properties shorthand for performance.
*
* q - queue
* crc - config requests count
* ctf - config time frame
* arc - actual requests count
* atf - actual time frame
*/
class Store extends EventEmitter {
#available
#data
#redis
constructor (redis) {
super()
this.#available = []
this.#data = []
this.#redis = redis
}
ack (requestId, executionId) {
console.info('queueRemove', executionId)
const dataIdx = this.#queueGetIdx(requestId)
if (this.#data[dataIdx].q[0] === executionId) {
this.#data[dataIdx].q.shift()
this.#available[dataIdx] = true
}
this.#update()
}
async poll (requestId, executionId) {
console.info('queuePush', executionId)
const dataIdx = this.#queueGetIdx(requestId)
this.#data[dataIdx].q.push(executionId)
if (this.#data[dataIdx].q.length > 1) {
console.info('once start', executionId, this.#data[dataIdx].q.length)
await new Promise((resolve) => {
this.once(requestId + '-' + executionId, resolve)
this.#update()
})
console.info('once end', executionId)
}
this.#available[dataIdx] = false
this.#update()
console.info('queueGet', executionId, this.#data[dataIdx].q.length)
if (this.#data[dataIdx].atf + this.#data[dataIdx].ctf > Date.now()) {
this.#data[dataIdx].arc++
} else {
this.#data[dataIdx].arc = 1
this.#data[dataIdx].atf = Date.now()
}
this.#update()
if (this.#data[dataIdx].arc > this.#data[dataIdx].crc) {
const drc = this.#data[dataIdx].atf + this.#data[dataIdx].ctf - Date.now()
console.info('time window start', executionId, drc)
await new Promise((resolve) => {
setTimeout(resolve, drc)
})
this.#data[dataIdx].arc = 1
this.#data[dataIdx].atf = Date.now()
this.#update()
}
}
#queueGetIdx (requestId) {
let dataIdx = this.#data.findIndex(it => it.id === requestId)
if (dataIdx === -1) {
this.#data.push({
id: requestId,
q: [],
crc: 1,
ctf: 1e3,
arc: 0,
atf: 0
})
this.#available.push(true)
dataIdx = this.#data.length - 1
this.#redis.get(requestId).then((info) => {
const [requestsCount, timeFrame] = info.split('/')
this.#data[dataIdx].crc = parseInt(requestsCount)
this.#data[dataIdx].ctf = parseInt(timeFrame)
})
}
return dataIdx
}
#update () {
for (const [dataIdx, it] of this.#data.entries()) {
if (this.#available[dataIdx] && it.q.length > 0) {
this.emit(it.id + '-' + it.q[0])
}
}
}
}
export default Store