1+ import { Transform } from 'node:stream'
12import type {
23 AnyObject ,
4+ AsyncFunction ,
35 CommonLogger ,
46 NullableBuffer ,
57 NullableString ,
@@ -10,7 +12,6 @@ import type {
1012import { _stringMapEntries } from '@naturalcycles/js-lib'
1113import type { ReadableTyped } from '@naturalcycles/nodejs-lib'
1214import type { Redis , RedisOptions } from 'ioredis'
13- import type * as RedisLib from 'ioredis'
1415import type { ScanStreamOptions } from 'ioredis/built/types.js'
1516import type { ChainableCommander } from 'ioredis/built/utils/RedisCommander.js'
1617
@@ -57,11 +58,11 @@ export class RedisClient implements CommonClient {
5758
5859 private _redis ?: Redis
5960
60- redis ( ) : Redis {
61+ async redis ( ) : Promise < Redis > {
6162 if ( this . _redis ) return this . _redis
6263
6364 // lazy-load the library
64- const redisLib = require ( 'ioredis' ) as typeof RedisLib
65+ const { default : redisLib } = await import ( 'ioredis' )
6566 const redis = new redisLib . Redis ( this . cfg . redisOptions )
6667
6768 const { logger } = this . cfg
@@ -82,14 +83,16 @@ export class RedisClient implements CommonClient {
8283
8384 async connect ( ) : Promise < void > {
8485 if ( ! this . connected ) {
85- await this . redis ( ) . connect ( )
86+ const redis = await this . redis ( )
87+ await redis . connect ( )
8688 this . connected = true
8789 }
8890 }
8991
9092 async disconnect ( ) : Promise < void > {
93+ const redis = await this . redis ( )
9194 this . log ( 'redis: quit...' )
92- this . log ( `redis: quit` , await this . redis ( ) . quit ( ) )
95+ this . log ( `redis: quit` , await redis . quit ( ) )
9396 this . connected = false
9497 }
9598
@@ -98,63 +101,77 @@ export class RedisClient implements CommonClient {
98101 }
99102
100103 async ping ( ) : Promise < void > {
101- await this . redis ( ) . ping ( )
104+ const redis = await this . redis ( )
105+ await redis . ping ( )
102106 }
103107
104108 async del ( keys : string [ ] ) : Promise < number > {
105- return await this . redis ( ) . del ( keys )
109+ const redis = await this . redis ( )
110+ return await redis . del ( keys )
106111 }
107112
108113 async get ( key : string ) : Promise < NullableString > {
109- return await this . redis ( ) . get ( key )
114+ const redis = await this . redis ( )
115+ return await redis . get ( key )
110116 }
111117
112118 async getBuffer ( key : string ) : Promise < NullableBuffer > {
113- return await this . redis ( ) . getBuffer ( key )
119+ const redis = await this . redis ( )
120+ return await redis . getBuffer ( key )
114121 }
115122
116123 async mget ( keys : string [ ] ) : Promise < NullableString [ ] > {
117- return await this . redis ( ) . mget ( keys )
124+ const redis = await this . redis ( )
125+ return await redis . mget ( keys )
118126 }
119127
120128 async mgetBuffer ( keys : string [ ] ) : Promise < NullableBuffer [ ] > {
121- return await this . redis ( ) . mgetBuffer ( keys )
129+ const redis = await this . redis ( )
130+ return await redis . mgetBuffer ( keys )
122131 }
123132
124133 async set ( key : string , value : string | number | Buffer ) : Promise < void > {
125- await this . redis ( ) . set ( key , value )
134+ const redis = await this . redis ( )
135+ await redis . set ( key , value )
126136 }
127137
128138 async hgetall < T extends Record < string , string > = Record < string , string > > (
129139 key : string ,
130140 ) : Promise < T | null > {
131- const result = await this . redis ( ) . hgetall ( key )
141+ const redis = await this . redis ( )
142+ const result = await redis . hgetall ( key )
132143 if ( Object . keys ( result ) . length === 0 ) return null
133144 return result as T
134145 }
135146
136147 async hget ( key : string , field : string ) : Promise < NullableString > {
137- return await this . redis ( ) . hget ( key , field )
148+ const redis = await this . redis ( )
149+ return await redis . hget ( key , field )
138150 }
139151
140152 async hset ( key : string , value : AnyObject ) : Promise < void > {
141- await this . redis ( ) . hset ( key , value )
153+ const redis = await this . redis ( )
154+ await redis . hset ( key , value )
142155 }
143156
144157 async hdel ( key : string , fields : string [ ] ) : Promise < void > {
145- await this . redis ( ) . hdel ( key , ...fields )
158+ const redis = await this . redis ( )
159+ await redis . hdel ( key , ...fields )
146160 }
147161
148162 async hmget ( key : string , fields : string [ ] ) : Promise < NullableString [ ] > {
149- return await this . redis ( ) . hmget ( key , ...fields )
163+ const redis = await this . redis ( )
164+ return await redis . hmget ( key , ...fields )
150165 }
151166
152167 async hmgetBuffer ( key : string , fields : string [ ] ) : Promise < NullableBuffer [ ] > {
153- return await this . redis ( ) . hmgetBuffer ( key , ...fields )
168+ const redis = await this . redis ( )
169+ return await redis . hmgetBuffer ( key , ...fields )
154170 }
155171
156172 async hincr ( key : string , field : string , increment = 1 ) : Promise < number > {
157- return await this . redis ( ) . hincrby ( key , field , increment )
173+ const redis = await this . redis ( )
174+ return await redis . hincrby ( key , field , increment )
158175 }
159176
160177 async hincrBatch ( key : string , incrementTuples : [ string , number ] [ ] ) : Promise < [ string , number ] [ ] > {
@@ -181,7 +198,8 @@ export class RedisClient implements CommonClient {
181198 value : string | number | Buffer ,
182199 expireAt : UnixTimestamp ,
183200 ) : Promise < void > {
184- await this . redis ( ) . set ( key , value , 'EXAT' , expireAt )
201+ const redis = await this . redis ( )
202+ await redis . set ( key , value , 'EXAT' , expireAt )
185203 }
186204
187205 async hsetWithTTL ( _key : string , _value : AnyObject , _expireAt : UnixTimestamp ) : Promise < void > {
@@ -191,20 +209,23 @@ export class RedisClient implements CommonClient {
191209 // const keyList = valueKeys.join(' ')
192210 // const commandString = `HEXPIREAT ${key} ${expireAt} FIELDS ${numberOfKeys} ${keyList}`
193211 // const [command, ...args] = commandString.split(' ')
194- // await this. redis() .hset(key, value)
195- // await this. redis() .call(command!, args)
212+ // await redis.hset(key, value)
213+ // await redis.call(command!, args)
196214 }
197215
198216 async mset ( obj : Record < string , string | number > ) : Promise < void > {
199- await this . redis ( ) . mset ( obj )
217+ const redis = await this . redis ( )
218+ await redis . mset ( obj )
200219 }
201220
202221 async msetBuffer ( obj : Record < string , Buffer > ) : Promise < void > {
203- await this . redis ( ) . mset ( obj )
222+ const redis = await this . redis ( )
223+ await redis . mset ( obj )
204224 }
205225
206226 async incr ( key : string , by = 1 ) : Promise < number > {
207- return await this . redis ( ) . incrby ( key , by )
227+ const redis = await this . redis ( )
228+ return await redis . incrby ( key , by )
208229 }
209230
210231 async incrBatch ( incrementTuples : [ string , number ] [ ] ) : Promise < [ string , number ] [ ] > {
@@ -227,7 +248,8 @@ export class RedisClient implements CommonClient {
227248 }
228249
229250 async ttl ( key : string ) : Promise < number > {
230- return await this . redis ( ) . ttl ( key )
251+ const redis = await this . redis ( )
252+ return await redis . ttl ( key )
231253 }
232254
233255 async dropTable ( table : string ) : Promise < void > {
@@ -266,36 +288,44 @@ export class RedisClient implements CommonClient {
266288 Returns BATCHES of keys in each iteration (as-is).
267289 */
268290 scanStream ( opt ?: ScanStreamOptions ) : ReadableTyped < string [ ] > {
269- return this . redis ( ) . scanStream ( opt )
291+ return createReadableFromAsync ( async ( ) => {
292+ const redis = await this . redis ( )
293+ return redis . scanStream ( opt )
294+ } )
270295 }
271296
272297 /**
273298 * Like scanStream, but flattens the stream of keys.
274299 */
275300 scanStreamFlat ( opt : ScanStreamOptions ) : ReadableTyped < string > {
276301 // biome-ignore lint/correctness/noFlatMapIdentity: ok
277- return ( this . redis ( ) . scanStream ( opt ) as ReadableTyped < string [ ] > ) . flatMap ( keys => keys )
302+ return this . scanStream ( opt ) . flatMap ( keys => keys )
278303 }
279304
280305 async scanCount ( opt : ScanStreamOptions ) : Promise < number > {
306+ const redis = await this . redis ( )
281307 // todo: implement more efficiently, e.g via LUA?
282308 let count = 0
283309
284- await ( this . redis ( ) . scanStream ( opt ) as ReadableTyped < string [ ] > ) . forEach ( keys => {
310+ await ( redis . scanStream ( opt ) as ReadableTyped < string [ ] > ) . forEach ( keys => {
285311 count += keys . length
286312 } )
287313
288314 return count
289315 }
290316
291317 hscanStream ( key : string , opt ?: ScanStreamOptions ) : ReadableTyped < string [ ] > {
292- return this . redis ( ) . hscanStream ( key , opt )
318+ return createReadableFromAsync ( async ( ) => {
319+ const redis = await this . redis ( )
320+ return redis . hscanStream ( key , opt )
321+ } )
293322 }
294323
295324 async hscanCount ( key : string , opt ?: ScanStreamOptions ) : Promise < number > {
296325 let count = 0
297326
298- const stream = this . redis ( ) . hscanStream ( key , opt )
327+ const redis = await this . redis ( )
328+ const stream = redis . hscanStream ( key , opt )
299329
300330 await stream . forEach ( ( keyValueList : string [ ] ) => {
301331 count += keyValueList . length / 2
@@ -305,7 +335,8 @@ export class RedisClient implements CommonClient {
305335 }
306336
307337 async withPipeline ( fn : ( pipeline : ChainableCommander ) => Promisable < void > ) : Promise < void > {
308- const pipeline = this . redis ( ) . pipeline ( )
338+ const redis = await this . redis ( )
339+ const pipeline = redis . pipeline ( )
309340 await fn ( pipeline )
310341 await pipeline . exec ( )
311342 }
@@ -314,3 +345,23 @@ export class RedisClient implements CommonClient {
314345 this . cfg . logger . log ( ...args )
315346 }
316347}
348+
349+ /**
350+ * Turn async function into Readable.
351+ */
352+ function createReadableFromAsync < T > ( fn : AsyncFunction < ReadableTyped < T > > ) : ReadableTyped < T > {
353+ const transform = new Transform ( {
354+ objectMode : true ,
355+ transform : ( chunk , _encoding , cb ) => {
356+ cb ( null , chunk )
357+ } ,
358+ } )
359+
360+ void fn ( )
361+ . then ( readable => {
362+ readable . on ( 'error' , err => transform . emit ( 'error' , err ) ) . pipe ( transform )
363+ } )
364+ . catch ( err => transform . emit ( 'error' , err ) )
365+
366+ return transform
367+ }
0 commit comments