@@ -16,32 +16,49 @@ export interface AsyncQueueOptions {
16
16
}
17
17
18
18
export class AsyncQueue implements Disposable {
19
- /** @internal */
19
+ /**
20
+ * Maximum number of concurrent tasks
21
+ * @internal
22
+ */
20
23
private _concurrency : number ;
21
- /** @internal */
24
+ /**
25
+ * Currently running tasks count
26
+ * @internal
27
+ */
22
28
private _running = 0 ;
23
- /** @internal */
29
+ /**
30
+ * Pending task queue
31
+ * @internal
32
+ */
24
33
private _queue : Task [ ] = [ ] ;
25
- /** @internal */
26
- private __paused = false ;
27
- /** @internal */
34
+ /**
35
+ * Whether the queue is paused
36
+ * @internal
37
+ */
38
+ private _paused = false ;
39
+ /**
40
+ * Completed task count
41
+ * @internal
42
+ */
28
43
private _completed = 0 ;
29
- /** @internal */
30
- private _pendingResolves : Function [ ] = [ ] ;
44
+ /**
45
+ * Resolvers waiting for all tasks to complete
46
+ * @internal
47
+ */
48
+ private _pendingResolves : ( ( ) => void ) [ ] = [ ] ;
31
49
32
50
/**
33
51
* Creates an AsyncQueue instance.
34
- * @param options - The options to create the queue with .
52
+ * @param options - Configuration options for the queue.
35
53
*/
36
54
constructor ( options : AsyncQueueOptions = { } ) {
37
- const { concurrency = 1 } = options ;
38
- this . _concurrency = concurrency ;
55
+ this . setConcurrency ( options . concurrency ?? 1 ) ;
39
56
}
40
57
41
58
/**
42
59
* Adds a task to the queue.
43
- * @param task - The task to add. Should return a Promise.
44
- * @param args - The arguments to call the task with .
60
+ * @param task - The task function (must return a Promise) .
61
+ * @param args - Arguments for the task function .
45
62
*/
46
63
enqueue < T extends ( ...args : any [ ] ) => any > ( task : T , ...args : Parameters < T > ) : Promisify < ReturnType < T > > {
47
64
return new Promise ( ( resolve , reject ) => {
@@ -51,38 +68,26 @@ export class AsyncQueue implements Disposable {
51
68
}
52
69
53
70
/**
54
- * Runs the next task in the queue (if concurrency limit allows) .
71
+ * Runs the next task in the queue, respecting concurrency limits .
55
72
* @internal
56
73
*/
57
74
private _runNext ( ) {
58
- // If the queue is paused, do not run any tasks
59
- if ( this . __paused ) {
60
- return ;
61
- }
62
-
63
- // If the maximum concurrency is reached, do not run any tasks
64
- if ( this . _running >= this . _concurrency ) {
65
- return ;
66
- }
67
-
68
- // If the queue is empty and no tasks are running, resolve all pending promises
69
- if ( this . _running === 0 && this . _queue . length === 0 ) {
70
- while ( this . _pendingResolves . length > 0 ) {
71
- const resolve = this . _pendingResolves . shift ( ) ;
72
- resolve ?.( ) ;
73
- }
74
- return ;
75
- }
75
+ if ( this . _paused || this . _running >= this . _concurrency ) return ;
76
76
77
- // If the queue is empty, return
78
77
if ( this . _queue . length === 0 ) {
78
+ // Queue is empty, resolve pending promises
79
+ if ( this . _running === 0 ) {
80
+ const pending = this . _pendingResolves . splice ( 0 ) ;
81
+ pending . forEach ( ( resolve ) => resolve ( ) ) ;
82
+ }
79
83
return ;
80
84
}
81
85
82
86
const { task, args, resolve, reject } = this . _queue . shift ( ) as Task ;
83
87
this . _running ++ ;
84
88
85
- task ( ...args )
89
+ Promise . resolve ( )
90
+ . then ( ( ) => task ( ...args ) )
86
91
. then ( resolve )
87
92
. catch ( reject )
88
93
. finally ( ( ) => {
@@ -92,99 +97,79 @@ export class AsyncQueue implements Disposable {
92
97
} ) ;
93
98
}
94
99
95
- /**
96
- * Gets the current number of running tasks in the queue.
97
- */
98
- get running ( ) {
100
+ /** Gets the current number of running tasks. */
101
+ get running ( ) : number {
99
102
return this . _running ;
100
103
}
101
104
102
- /**
103
- * Gets the number of completed tasks in the queue.
104
- */
105
- get completed ( ) {
105
+ /** Gets the number of completed tasks. */
106
+ get completed ( ) : number {
106
107
return this . _completed ;
107
108
}
108
109
109
- /**
110
- * Gets the number of pending tasks in the queue.
111
- */
112
- get pending ( ) {
110
+ /** Gets the number of pending tasks in the queue. */
111
+ get pending ( ) : number {
113
112
return this . _queue . length ;
114
113
}
115
114
116
- /**
117
- * Gets the current number of concurrent tasks.
118
- */
115
+ /** Gets the current concurrency limit. */
119
116
get concurrency ( ) : number {
120
117
return this . _concurrency ;
121
118
}
122
119
123
120
/**
124
- * Dynamically updates the number of concurrent tasks.
125
- * @param concurrency - The new number of concurrent tasks .
121
+ * Updates the concurrency limit and attempts to run more tasks.
122
+ * @param concurrency - New concurrency level .
126
123
*/
127
- setConcurrency ( concurrency : number ) {
128
- this . _concurrency = concurrency ;
124
+ setConcurrency ( concurrency : number ) : void {
125
+ if ( concurrency < 1 || Number . isNaN ( concurrency ) || ! Number . isFinite ( concurrency ) || ! Number . isInteger ( concurrency ) ) {
126
+ throw new RangeError ( "Concurrency must be a positive integer." ) ;
127
+ }
128
+ this . _concurrency = Math . max ( 1 , concurrency ) ;
129
129
this . _runNext ( ) ;
130
130
}
131
131
132
- /**
133
- * Clears the remaining queue without rejecting tasks.
134
- */
132
+ /** Clears the remaining tasks in the queue. */
135
133
clear ( ) : void {
136
- this . _queue = [ ] ;
134
+ this . _queue . length = 0 ;
135
+ this . _runNext ( ) ;
137
136
}
138
137
139
- /**
140
- * Pauses the queue processing.
141
- */
138
+ /** Pauses the execution of queued tasks. */
142
139
pause ( ) : void {
143
- this . __paused = true ;
140
+ this . _paused = true ;
144
141
}
145
142
146
- /**
147
- * Resumes the queue processing.
148
- */
143
+ /** Resumes execution of tasks in the queue. */
149
144
resume ( ) : void {
150
- if ( this . __paused ) {
151
- this . __paused = false ;
145
+ if ( this . _paused ) {
146
+ this . _paused = false ;
152
147
this . _runNext ( ) ;
153
148
}
154
149
}
155
150
156
- /**
157
- * Waits for all tasks to complete.
158
- */
151
+ /** Waits for all queued and running tasks to complete. */
159
152
waitForAll ( ) : Promise < void > {
160
- if ( this . _running === 0 && this . _queue . length === 0 ) {
161
- return Promise . resolve ( ) ; // Queue is empty, return immediately
162
- }
163
- return new Promise ( ( resolve ) => {
164
- this . _pendingResolves . push ( resolve ) ;
165
- } ) ;
153
+ return this . _running === 0 && this . _queue . length === 0
154
+ ? Promise . resolve ( )
155
+ : new Promise ( ( resolve ) => this . _pendingResolves . push ( resolve ) ) ;
166
156
}
167
157
168
- /**
169
- * Releases resources, clears the queue, and ensures unexecuted tasks are rejected.
170
- */
158
+ /** Disposes of the queue, rejecting any pending tasks. */
171
159
dispose ( ) : void {
172
- while ( this . _queue . length > 0 ) {
160
+ while ( this . _queue . length ) {
173
161
const { reject } = this . _queue . shift ( ) as Task ;
174
162
reject ( new Error ( "Queue disposed before execution." ) ) ;
175
163
}
164
+ this . _pendingResolves . length = 0 ;
176
165
}
177
166
178
- /**
179
- * Releases resources, clears the queue, and ensures unexecuted tasks are rejected.
180
- */
167
+ /** Implements Disposable interface cleanup. */
181
168
[ Symbol . dispose ] ( ) {
182
169
this . dispose ( ) ;
183
170
}
184
171
185
- /**
186
- * Returns the string representation of the object.
187
- */
172
+ /** Returns a string tag for debugging. */
188
173
get [ Symbol . toStringTag ] ( ) {
189
174
return "AsyncQueue" ;
190
175
}
0 commit comments