diff --git a/packages/queue/package.json b/packages/queue/package.json index 03b1f129..c7e99be7 100644 --- a/packages/queue/package.json +++ b/packages/queue/package.json @@ -59,6 +59,10 @@ "@h3ravel/core": "workspace:^1.22.0-alpha.10", "@h3ravel/contracts": "workspace:^0.29.0-alpha.10" }, + "dependencies": { + "bullmq": "^5.0.0", + "ioredis": "^5.3.2" + }, "devDependencies": { "typescript": "^5.4.0" } diff --git a/packages/queue/src/Drivers/BullMQDriver.ts b/packages/queue/src/Drivers/BullMQDriver.ts new file mode 100644 index 00000000..61755893 --- /dev/null +++ b/packages/queue/src/Drivers/BullMQDriver.ts @@ -0,0 +1,258 @@ +import { Queue, Worker, ConnectionOptions, JobsOptions } from 'bullmq' +import { IQueueDriver, IJob, JobPayload } from '@h3ravel/contracts' +import { Container } from '@h3ravel/core' +import { BullMQJob } from '../Jobs/BullMQJob' + +/** + * Redis connection configuration for BullMQ. + */ +export interface BullMQRedisConfig { + host?: string + port?: number + password?: string + db?: number | string + username?: string + url?: string +} + +/** + * BullMQ queue driver implementation. + */ +export class BullMQDriver extends IQueueDriver { + protected queues: Map = new Map() + protected connections: Map = new Map() + protected workers: Map = new Map() + protected defaultConnection: string + protected container: Container + + /** + * @param redisConfig Single config object or Record of connection configs + * @param defaultConnection Default connection name + * @param container Service container + */ + constructor( + redisConfig: BullMQRedisConfig | Record, + defaultConnection: string = 'default', + container: Container, + ) { + super() + this.defaultConnection = defaultConnection + this.container = container + + if ('host' in redisConfig || 'url' in redisConfig) { + this.connections.set(defaultConnection, this.buildConnectionOptions(redisConfig as BullMQRedisConfig)) + } else { + for (const [name, config] of Object.entries(redisConfig)) { + this.connections.set(name, this.buildConnectionOptions(config)) + } + } + } + + /** + * Build BullMQ connection options from Redis config. + * Parses Redis URL format: redis://[username]:[password]@host:port/db + */ + protected buildConnectionOptions(config: BullMQRedisConfig): ConnectionOptions { + if (config.url) { + try { + const url = new URL(config.url) + return { + host: url.hostname, + port: url.port ? parseInt(url.port, 10) : 6379, + username: url.username || undefined, + password: url.password || undefined, + db: url.pathname ? parseInt(url.pathname.slice(1), 10) : undefined, + } as ConnectionOptions + } catch { + return { host: config.url } as ConnectionOptions + } + } + + const options: ConnectionOptions = { + host: config.host || '127.0.0.1', + port: config.port || 6379, + } + + if (config.password) options.password = config.password + if (config.username) options.username = config.username + if (config.db !== undefined) { + options.db = typeof config.db === 'string' ? parseInt(config.db, 10) : config.db + } + + return options + } + + protected getQueue(queue: string, connection?: string): Queue { + const connectionName = connection || this.defaultConnection + const queueKey = `${connectionName}:${queue}` + const connOptions = this.connections.get(connectionName) + + if (!connOptions) { + throw new Error(`Redis connection "${connectionName}" not found`) + } + + if (!this.queues.has(queueKey)) { + const bullQueue = new Queue(queue, { + connection: connOptions, + }) + this.queues.set(queueKey, bullQueue) + } + + return this.queues.get(queueKey)! + } + + /** + * Get or create a paused Worker instance for job state transitions. + * Worker is paused to prevent automatic processing; used only for state management. + */ + protected getWorker(queue: string, connection?: string): Worker { + const connectionName = connection || this.defaultConnection + const queueKey = `${connectionName}:${queue}` + const connOptions = this.connections.get(connectionName) + + if (!connOptions) { + throw new Error(`Redis connection "${connectionName}" not found`) + } + + if (!this.workers.has(queueKey)) { + const worker = new Worker( + queue, + async () => {}, + { + connection: connOptions, + concurrency: 1, + }, + ) + worker.pause() + this.workers.set(queueKey, worker) + } + + return this.workers.get(queueKey)! + } + + protected getWorkerToken(queue: string, connection?: string): string { + const connectionName = connection || this.defaultConnection + return `bullmq-worker-${connectionName}-${queue}` + } + + /** + * Convert JobPayload options to BullMQ JobsOptions. + * Converts time values from seconds to milliseconds. + */ + protected buildJobOptions(payload: JobPayload): JobsOptions { + const options: JobsOptions = {} + + if (payload.maxTries !== undefined) { + options.attempts = payload.maxTries + } + + if (payload.backoff !== undefined) { + if (typeof payload.backoff === 'number') { + options.backoff = { + type: 'exponential', + delay: payload.backoff * 1000, + } + } else if (Array.isArray(payload.backoff)) { + options.backoff = { + type: 'exponential', + delay: payload.backoff.map((delay) => delay * 1000), + } + } + } + + if (payload.timeout !== undefined) { + options.timeout = payload.timeout * 1000 + } + + if (payload.delay !== undefined) { + options.delay = payload.delay * 1000 + } + + if (payload.priority !== undefined) { + options.priority = payload.priority + } + + if (payload.tags !== undefined && payload.tags.length > 0) { + options.tags = payload.tags + } + + if (payload.retryUntil !== undefined) { + options.jobId = payload.uuid + } + + if (payload.uuid) { + options.jobId = payload.uuid + } + + return options + } + + async push(queue: string, payload: JobPayload, connection?: string): Promise { + const bullQueue = this.getQueue(queue, connection) + const options = this.buildJobOptions(payload) + const job = await bullQueue.add('job', payload, options) + return job.id! + } + + async later(queue: string, payload: JobPayload, delay: number, connection?: string): Promise { + const bullQueue = this.getQueue(queue, connection) + const options = this.buildJobOptions(payload) + options.delay = delay * 1000 + const job = await bullQueue.add('job', payload, options) + return job.id! + } + + async bulk(queue: string, payloads: JobPayload[], connection?: string): Promise<(string | number | void)[]> { + const bullQueue = this.getQueue(queue, connection) + const jobs = payloads.map((payload) => ({ + name: 'job', + data: payload, + opts: this.buildJobOptions(payload), + })) + const addedJobs = await bullQueue.addBulk(jobs) + return addedJobs.map((job) => job.id!) + } + + /** + * Pop a job from the queue using Queue.getWaiting() for manual processing. + */ + async pop(queue: string, connection?: string): Promise { + const connectionName = connection || this.defaultConnection + const bullQueue = this.getQueue(queue, connection) + + try { + const waitingJobs = await bullQueue.getWaiting(0, 1) + if (waitingJobs.length === 0 || !waitingJobs[0]) { + return null + } + + const workerToken = this.getWorkerToken(queue, connection) + return new BullMQJob(waitingJobs[0], connectionName, queue, this.container, workerToken) + } catch { + return null + } + } + + async size(queue: string, connection?: string): Promise { + const bullQueue = this.getQueue(queue, connection) + const counts = await bullQueue.getJobCounts('waiting', 'active', 'delayed') + return (counts.waiting || 0) + (counts.active || 0) + (counts.delayed || 0) + } + + async clear(queue: string, connection?: string): Promise { + const bullQueue = this.getQueue(queue, connection) + await bullQueue.obliterate({ force: true }) + } + + async close(): Promise { + for (const queue of this.queues.values()) { + await queue.close() + } + this.queues.clear() + + for (const worker of this.workers.values()) { + await worker.close() + } + this.workers.clear() + } +} diff --git a/packages/queue/src/Jobs/BullMQJob.ts b/packages/queue/src/Jobs/BullMQJob.ts new file mode 100644 index 00000000..c05573d4 --- /dev/null +++ b/packages/queue/src/Jobs/BullMQJob.ts @@ -0,0 +1,68 @@ +import { Job as BullMQJobType } from 'bullmq' +import { Container } from '@h3ravel/core' +import { JobPayload } from '@h3ravel/contracts' +import { Job } from './Job' + +/** + * BullMQ job wrapper that implements the IJob contract. + */ +export class BullMQJob extends Job { + protected bullMQJob: BullMQJobType + protected rawPayload: string + protected workerToken: string + + /** + * @param workerToken Required for moveToFailed() state transitions + */ + constructor( + bullMQJob: BullMQJobType, + connectionName: string, + queue: string, + container: Container, + workerToken: string, + ) { + super() + this.bullMQJob = bullMQJob + this.connectionName = connectionName + this.queue = queue + this.container = container + this.workerToken = workerToken + + const jobData = bullMQJob.data as JobPayload + this.rawPayload = JSON.stringify(jobData) + } + + public getJobId(): string | number | undefined { + return this.bullMQJob.id + } + + public getRawBody(): string { + return this.rawPayload + } + + public delete(): void { + this.deleted = true + this.bullMQJob.remove().catch(() => {}) + } + + /** + * @param delay Delay in seconds before releasing the job + */ + public release(delay = 0): void { + this.released = true + + if (delay > 0) { + this.bullMQJob.moveToDelayed(Date.now() + delay * 1000).catch(() => {}) + } else { + this.bullMQJob.moveToWaiting().catch(() => {}) + } + } + + /** + * moveToFailed requires worker token as second parameter. + */ + public fail(e: Error): void { + super.fail(e) + this.bullMQJob.moveToFailed(e, this.workerToken).catch(() => {}) + } +} diff --git a/packages/queue/src/Providers/QueueServiceProvider.ts b/packages/queue/src/Providers/QueueServiceProvider.ts index 57e6a5c7..29965e54 100644 --- a/packages/queue/src/Providers/QueueServiceProvider.ts +++ b/packages/queue/src/Providers/QueueServiceProvider.ts @@ -1,10 +1,13 @@ import { ServiceProvider } from '@h3ravel/core' +import { IQueueManager } from '@h3ravel/contracts' +import { QueueManager } from '../QueueManager' +import { BullMQDriver, BullMQRedisConfig } from '../Drivers/BullMQDriver' /** * Queues and workers. * * Register QueueManager. - * Load drivers (Redis, in-memory). + * Load drivers (Redis, in-memory, BullMQ). * Register job dispatcher and workers. * * Auto-Registered if @h3ravel/queue is installed @@ -13,6 +16,54 @@ export class QueueServiceProvider extends ServiceProvider { public static priority = 991 register () { - // Core bindings + // Register QueueManager as singleton + this.app.singleton('queue.manager', () => { + return new QueueManager() + }) + + // Register BullMQ driver if Redis configuration is available + const config = this.app.make('config') + const redisConfig = config.get('database.redis') + + if (redisConfig) { + // Extract Redis connection configurations + const redisConnections: Record = {} + + // Process each Redis connection (default, cache, etc.) + for (const [name, connectionConfig] of Object.entries(redisConfig)) { + if (name !== 'client' && name !== 'options' && typeof connectionConfig === 'object') { + const conn = connectionConfig as any + redisConnections[name] = { + url: conn.url, + host: conn.host, + port: typeof conn.port === 'string' ? parseInt(conn.port, 10) : conn.port, + password: conn.password, + username: conn.username, + db: conn.database || conn.db, + } + } + } + + // Get default connection name from config or use 'default' + const defaultConnection = config.get('queue.connection') || config.get('queue.default') || 'default' + const redisConnectionName = config.get('queue.redis_connection') || 'default' + + // Create BullMQ driver instance + const bullMQDriver = new BullMQDriver( + redisConnections, + redisConnectionName, + this.app, + ) + + // Register BullMQ driver + const queueManager = this.app.make('queue.manager') + queueManager.extend('bullmq', bullMQDriver) + queueManager.extend('redis', bullMQDriver) // Also register as 'redis' alias + + // Set default connection if configured + if (defaultConnection) { + queueManager.setDefaultConnection(defaultConnection) + } + } } } diff --git a/packages/queue/src/QueueManager.ts b/packages/queue/src/QueueManager.ts index 0b1f950e..a34bd84a 100644 --- a/packages/queue/src/QueueManager.ts +++ b/packages/queue/src/QueueManager.ts @@ -1 +1,76 @@ -export default class { } +import { IQueueManager, IQueueDriver } from '@h3ravel/contracts' + +/** + * Queue manager for managing drivers and connections. + */ +export class QueueManager extends IQueueManager { + /** + * Map of driver names to driver instances. + */ + protected drivers: Map = new Map() + + /** + * Map of connection names to driver names. + */ + protected connections: Map = new Map() + + /** + * The default connection name. + */ + protected defaultConnection: string = 'default' + + /** + * Get a queue driver for the given connection. + */ + connection(name?: string): IQueueDriver { + const connectionName = name || this.defaultConnection + const driverName = this.connections.get(connectionName) || connectionName + + const driver = this.drivers.get(driverName) + if (!driver) { + throw new Error(`Queue driver "${driverName}" is not registered`) + } + + return driver + } + + /** + * Get a queue driver by name. + */ + driver(name: string): IQueueDriver { + const driver = this.drivers.get(name) + if (!driver) { + throw new Error(`Queue driver "${name}" is not registered`) + } + + return driver + } + + /** + * Register a new driver. + */ + extend(name: string, driver: IQueueDriver): void { + this.drivers.set(name, driver) + } + + /** + * Get the default connection name. + */ + getDefaultConnection(): string { + return this.defaultConnection + } + + /** + * Set the default connection name. + */ + setDefaultConnection(name: string): void { + this.defaultConnection = name + } + + /** + * Register a connection mapping. + */ + addConnection(name: string, driver: string): void { + this.connections.set(name, driver) + } +} diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index d087ad52..2fc54ffe 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -1,10 +1,12 @@ export * from './Contracts/JobContract' +export * from './Drivers/BullMQDriver' export * from './Drivers/MemoryDriver' export * from './Drivers/RedisDriver' export * from './Events/JobFailed' export * from './Exceptions/ManuallyFailedException' export * from './Exceptions/MaxAttemptsExceededException' export * from './Exceptions/TimeoutExceededException' +export * from './Jobs/BullMQJob' export * from './Jobs/Job' export * from './Jobs/JobName' export * from './Providers/QueueServiceProvider'