diff --git a/packages/queue/package.json b/packages/queue/package.json index c7e99be7..03b1f129 100644 --- a/packages/queue/package.json +++ b/packages/queue/package.json @@ -59,10 +59,6 @@ "@h3ravel/core": "workspace:^1.22.0-alpha.10", "@h3ravel/contracts": "workspace:^0.29.0-alpha.10" }, - "dependencies": { - "bullmq": "^5.0.0", - "ioredis": "^5.3.2" - }, "devDependencies": { "typescript": "^5.4.0" } diff --git a/packages/queue/src/Drivers/BullMQDriver.ts b/packages/queue/src/Drivers/BullMQDriver.ts deleted file mode 100644 index 61755893..00000000 --- a/packages/queue/src/Drivers/BullMQDriver.ts +++ /dev/null @@ -1,258 +0,0 @@ -import { Queue, Worker, ConnectionOptions, JobsOptions } from 'bullmq' -import { IQueueDriver, IJob, JobPayload } from '@h3ravel/contracts' -import { Container } from '@h3ravel/core' -import { BullMQJob } from '../Jobs/BullMQJob' - -/** - * Redis connection configuration for BullMQ. - */ -export interface BullMQRedisConfig { - host?: string - port?: number - password?: string - db?: number | string - username?: string - url?: string -} - -/** - * BullMQ queue driver implementation. - */ -export class BullMQDriver extends IQueueDriver { - protected queues: Map = new Map() - protected connections: Map = new Map() - protected workers: Map = new Map() - protected defaultConnection: string - protected container: Container - - /** - * @param redisConfig Single config object or Record of connection configs - * @param defaultConnection Default connection name - * @param container Service container - */ - constructor( - redisConfig: BullMQRedisConfig | Record, - defaultConnection: string = 'default', - container: Container, - ) { - super() - this.defaultConnection = defaultConnection - this.container = container - - if ('host' in redisConfig || 'url' in redisConfig) { - this.connections.set(defaultConnection, this.buildConnectionOptions(redisConfig as BullMQRedisConfig)) - } else { - for (const [name, config] of Object.entries(redisConfig)) { - this.connections.set(name, this.buildConnectionOptions(config)) - } - } - } - - /** - * Build BullMQ connection options from Redis config. - * Parses Redis URL format: redis://[username]:[password]@host:port/db - */ - protected buildConnectionOptions(config: BullMQRedisConfig): ConnectionOptions { - if (config.url) { - try { - const url = new URL(config.url) - return { - host: url.hostname, - port: url.port ? parseInt(url.port, 10) : 6379, - username: url.username || undefined, - password: url.password || undefined, - db: url.pathname ? parseInt(url.pathname.slice(1), 10) : undefined, - } as ConnectionOptions - } catch { - return { host: config.url } as ConnectionOptions - } - } - - const options: ConnectionOptions = { - host: config.host || '127.0.0.1', - port: config.port || 6379, - } - - if (config.password) options.password = config.password - if (config.username) options.username = config.username - if (config.db !== undefined) { - options.db = typeof config.db === 'string' ? parseInt(config.db, 10) : config.db - } - - return options - } - - protected getQueue(queue: string, connection?: string): Queue { - const connectionName = connection || this.defaultConnection - const queueKey = `${connectionName}:${queue}` - const connOptions = this.connections.get(connectionName) - - if (!connOptions) { - throw new Error(`Redis connection "${connectionName}" not found`) - } - - if (!this.queues.has(queueKey)) { - const bullQueue = new Queue(queue, { - connection: connOptions, - }) - this.queues.set(queueKey, bullQueue) - } - - return this.queues.get(queueKey)! - } - - /** - * Get or create a paused Worker instance for job state transitions. - * Worker is paused to prevent automatic processing; used only for state management. - */ - protected getWorker(queue: string, connection?: string): Worker { - const connectionName = connection || this.defaultConnection - const queueKey = `${connectionName}:${queue}` - const connOptions = this.connections.get(connectionName) - - if (!connOptions) { - throw new Error(`Redis connection "${connectionName}" not found`) - } - - if (!this.workers.has(queueKey)) { - const worker = new Worker( - queue, - async () => {}, - { - connection: connOptions, - concurrency: 1, - }, - ) - worker.pause() - this.workers.set(queueKey, worker) - } - - return this.workers.get(queueKey)! - } - - protected getWorkerToken(queue: string, connection?: string): string { - const connectionName = connection || this.defaultConnection - return `bullmq-worker-${connectionName}-${queue}` - } - - /** - * Convert JobPayload options to BullMQ JobsOptions. - * Converts time values from seconds to milliseconds. - */ - protected buildJobOptions(payload: JobPayload): JobsOptions { - const options: JobsOptions = {} - - if (payload.maxTries !== undefined) { - options.attempts = payload.maxTries - } - - if (payload.backoff !== undefined) { - if (typeof payload.backoff === 'number') { - options.backoff = { - type: 'exponential', - delay: payload.backoff * 1000, - } - } else if (Array.isArray(payload.backoff)) { - options.backoff = { - type: 'exponential', - delay: payload.backoff.map((delay) => delay * 1000), - } - } - } - - if (payload.timeout !== undefined) { - options.timeout = payload.timeout * 1000 - } - - if (payload.delay !== undefined) { - options.delay = payload.delay * 1000 - } - - if (payload.priority !== undefined) { - options.priority = payload.priority - } - - if (payload.tags !== undefined && payload.tags.length > 0) { - options.tags = payload.tags - } - - if (payload.retryUntil !== undefined) { - options.jobId = payload.uuid - } - - if (payload.uuid) { - options.jobId = payload.uuid - } - - return options - } - - async push(queue: string, payload: JobPayload, connection?: string): Promise { - const bullQueue = this.getQueue(queue, connection) - const options = this.buildJobOptions(payload) - const job = await bullQueue.add('job', payload, options) - return job.id! - } - - async later(queue: string, payload: JobPayload, delay: number, connection?: string): Promise { - const bullQueue = this.getQueue(queue, connection) - const options = this.buildJobOptions(payload) - options.delay = delay * 1000 - const job = await bullQueue.add('job', payload, options) - return job.id! - } - - async bulk(queue: string, payloads: JobPayload[], connection?: string): Promise<(string | number | void)[]> { - const bullQueue = this.getQueue(queue, connection) - const jobs = payloads.map((payload) => ({ - name: 'job', - data: payload, - opts: this.buildJobOptions(payload), - })) - const addedJobs = await bullQueue.addBulk(jobs) - return addedJobs.map((job) => job.id!) - } - - /** - * Pop a job from the queue using Queue.getWaiting() for manual processing. - */ - async pop(queue: string, connection?: string): Promise { - const connectionName = connection || this.defaultConnection - const bullQueue = this.getQueue(queue, connection) - - try { - const waitingJobs = await bullQueue.getWaiting(0, 1) - if (waitingJobs.length === 0 || !waitingJobs[0]) { - return null - } - - const workerToken = this.getWorkerToken(queue, connection) - return new BullMQJob(waitingJobs[0], connectionName, queue, this.container, workerToken) - } catch { - return null - } - } - - async size(queue: string, connection?: string): Promise { - const bullQueue = this.getQueue(queue, connection) - const counts = await bullQueue.getJobCounts('waiting', 'active', 'delayed') - return (counts.waiting || 0) + (counts.active || 0) + (counts.delayed || 0) - } - - async clear(queue: string, connection?: string): Promise { - const bullQueue = this.getQueue(queue, connection) - await bullQueue.obliterate({ force: true }) - } - - async close(): Promise { - for (const queue of this.queues.values()) { - await queue.close() - } - this.queues.clear() - - for (const worker of this.workers.values()) { - await worker.close() - } - this.workers.clear() - } -} diff --git a/packages/queue/src/Jobs/BullMQJob.ts b/packages/queue/src/Jobs/BullMQJob.ts deleted file mode 100644 index c05573d4..00000000 --- a/packages/queue/src/Jobs/BullMQJob.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { Job as BullMQJobType } from 'bullmq' -import { Container } from '@h3ravel/core' -import { JobPayload } from '@h3ravel/contracts' -import { Job } from './Job' - -/** - * BullMQ job wrapper that implements the IJob contract. - */ -export class BullMQJob extends Job { - protected bullMQJob: BullMQJobType - protected rawPayload: string - protected workerToken: string - - /** - * @param workerToken Required for moveToFailed() state transitions - */ - constructor( - bullMQJob: BullMQJobType, - connectionName: string, - queue: string, - container: Container, - workerToken: string, - ) { - super() - this.bullMQJob = bullMQJob - this.connectionName = connectionName - this.queue = queue - this.container = container - this.workerToken = workerToken - - const jobData = bullMQJob.data as JobPayload - this.rawPayload = JSON.stringify(jobData) - } - - public getJobId(): string | number | undefined { - return this.bullMQJob.id - } - - public getRawBody(): string { - return this.rawPayload - } - - public delete(): void { - this.deleted = true - this.bullMQJob.remove().catch(() => {}) - } - - /** - * @param delay Delay in seconds before releasing the job - */ - public release(delay = 0): void { - this.released = true - - if (delay > 0) { - this.bullMQJob.moveToDelayed(Date.now() + delay * 1000).catch(() => {}) - } else { - this.bullMQJob.moveToWaiting().catch(() => {}) - } - } - - /** - * moveToFailed requires worker token as second parameter. - */ - public fail(e: Error): void { - super.fail(e) - this.bullMQJob.moveToFailed(e, this.workerToken).catch(() => {}) - } -} diff --git a/packages/queue/src/Providers/QueueServiceProvider.ts b/packages/queue/src/Providers/QueueServiceProvider.ts index 29965e54..57e6a5c7 100644 --- a/packages/queue/src/Providers/QueueServiceProvider.ts +++ b/packages/queue/src/Providers/QueueServiceProvider.ts @@ -1,13 +1,10 @@ import { ServiceProvider } from '@h3ravel/core' -import { IQueueManager } from '@h3ravel/contracts' -import { QueueManager } from '../QueueManager' -import { BullMQDriver, BullMQRedisConfig } from '../Drivers/BullMQDriver' /** * Queues and workers. * * Register QueueManager. - * Load drivers (Redis, in-memory, BullMQ). + * Load drivers (Redis, in-memory). * Register job dispatcher and workers. * * Auto-Registered if @h3ravel/queue is installed @@ -16,54 +13,6 @@ export class QueueServiceProvider extends ServiceProvider { public static priority = 991 register () { - // Register QueueManager as singleton - this.app.singleton('queue.manager', () => { - return new QueueManager() - }) - - // Register BullMQ driver if Redis configuration is available - const config = this.app.make('config') - const redisConfig = config.get('database.redis') - - if (redisConfig) { - // Extract Redis connection configurations - const redisConnections: Record = {} - - // Process each Redis connection (default, cache, etc.) - for (const [name, connectionConfig] of Object.entries(redisConfig)) { - if (name !== 'client' && name !== 'options' && typeof connectionConfig === 'object') { - const conn = connectionConfig as any - redisConnections[name] = { - url: conn.url, - host: conn.host, - port: typeof conn.port === 'string' ? parseInt(conn.port, 10) : conn.port, - password: conn.password, - username: conn.username, - db: conn.database || conn.db, - } - } - } - - // Get default connection name from config or use 'default' - const defaultConnection = config.get('queue.connection') || config.get('queue.default') || 'default' - const redisConnectionName = config.get('queue.redis_connection') || 'default' - - // Create BullMQ driver instance - const bullMQDriver = new BullMQDriver( - redisConnections, - redisConnectionName, - this.app, - ) - - // Register BullMQ driver - const queueManager = this.app.make('queue.manager') - queueManager.extend('bullmq', bullMQDriver) - queueManager.extend('redis', bullMQDriver) // Also register as 'redis' alias - - // Set default connection if configured - if (defaultConnection) { - queueManager.setDefaultConnection(defaultConnection) - } - } + // Core bindings } } diff --git a/packages/queue/src/QueueManager.ts b/packages/queue/src/QueueManager.ts index a34bd84a..0b1f950e 100644 --- a/packages/queue/src/QueueManager.ts +++ b/packages/queue/src/QueueManager.ts @@ -1,76 +1 @@ -import { IQueueManager, IQueueDriver } from '@h3ravel/contracts' - -/** - * Queue manager for managing drivers and connections. - */ -export class QueueManager extends IQueueManager { - /** - * Map of driver names to driver instances. - */ - protected drivers: Map = new Map() - - /** - * Map of connection names to driver names. - */ - protected connections: Map = new Map() - - /** - * The default connection name. - */ - protected defaultConnection: string = 'default' - - /** - * Get a queue driver for the given connection. - */ - connection(name?: string): IQueueDriver { - const connectionName = name || this.defaultConnection - const driverName = this.connections.get(connectionName) || connectionName - - const driver = this.drivers.get(driverName) - if (!driver) { - throw new Error(`Queue driver "${driverName}" is not registered`) - } - - return driver - } - - /** - * Get a queue driver by name. - */ - driver(name: string): IQueueDriver { - const driver = this.drivers.get(name) - if (!driver) { - throw new Error(`Queue driver "${name}" is not registered`) - } - - return driver - } - - /** - * Register a new driver. - */ - extend(name: string, driver: IQueueDriver): void { - this.drivers.set(name, driver) - } - - /** - * Get the default connection name. - */ - getDefaultConnection(): string { - return this.defaultConnection - } - - /** - * Set the default connection name. - */ - setDefaultConnection(name: string): void { - this.defaultConnection = name - } - - /** - * Register a connection mapping. - */ - addConnection(name: string, driver: string): void { - this.connections.set(name, driver) - } -} +export default class { } diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index 2fc54ffe..d087ad52 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -1,12 +1,10 @@ export * from './Contracts/JobContract' -export * from './Drivers/BullMQDriver' export * from './Drivers/MemoryDriver' export * from './Drivers/RedisDriver' export * from './Events/JobFailed' export * from './Exceptions/ManuallyFailedException' export * from './Exceptions/MaxAttemptsExceededException' export * from './Exceptions/TimeoutExceededException' -export * from './Jobs/BullMQJob' export * from './Jobs/Job' export * from './Jobs/JobName' export * from './Providers/QueueServiceProvider'