Async Queue (Redis) — Quickstart
This module provides an abstraction for job queues and drivers. A simple in-memory driver is included for testing and local development.
API (Queue):
Queue.register(name, driver)— register a new driverQueue.get(name?)— get a driver (defaults toprocess.env.QUEUE_DRIVERorinmemory)Queue.enqueue(queue, payload)— enqueue a jobQueue.dequeue(queue)— dequeue next jobQueue.ack(queue, id)— acknowledge a jobQueue.length(queue)— get pending job countQueue.drain(queue)— clear all jobs
Driver interface:
enqueue(queue, payload): Promise\<string>dequeue(queue): Promise\<QueueMessage | undefined>ack(queue, id): Promise\<void>length(queue): Promise\<number>drain(queue): Promise\<void>
Notes:
- The in-memory driver is NOT suitable for production — use Redis or another durable backend for PHASE 1.5 production work.
- Drivers should be registered with
Queue.register('redis', RedisDriver).
Sync Driver
When QUEUE_DRIVER=sync, jobs are processed synchronously and immediately. This is useful for testing and development but requires explicit worker execution after enqueuing.
Important: Manual Worker Execution Required
With the sync driver, you must manually call the worker runner to process enqueued jobs:
import { EmailQueue } from '@app/Workers/EmailWorker';
// Enqueue a job
await EmailJobService.sendWelcome('test@zintrust.com', 'Test User', 'example-mysql1');
// IMPORTANT: Process the job immediately (required for sync driver)
await EmailQueue.processOne('example-mysql1');
// Or process all jobs
await EmailQueue.processAll('example-mysql1');When to Use Sync Driver
- ✅ Development & Testing - Immediate feedback and debugging
- ✅ Simple Applications - No background processing needed
- ✅ Unit Tests - Predictable, synchronous behavior
Limitations
- ⚠️ Blocking - Jobs block the request/response cycle
- ⚠️ No Persistence - Jobs are lost if application crashes
- ⚠️ No Retry - Failed jobs are not retried automatically
- ⚠️ Manual Processing - Must explicitly call worker methods
Migration to Production
For production use, switch to Redis driver:
# Development (sync)
QUEUE_DRIVER=sync
# Production (Redis)
QUEUE_DRIVER=redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-passwordRedis Driver (BullMQ-Powered)
- The Redis queue driver now uses BullMQ for enterprise-grade job processing with auto-scaling, circuit breaker, dead letter queue, and advanced monitoring.
- Configure via standard Redis environment variables (
REDIS_HOST,REDIS_PORT,REDIS_PASSWORD,REDIS_QUEUE_DB). - You can register the driver with
Queue.register('redis', RedisDriver)and then callQueue.enqueue('my-queue', payload, 'redis').
BullMQ Environment Variables
When QUEUE_DRIVER=redis, the system uses BullMQ with these customizable settings:
| Environment Variable | Default | Description | Example |
|---|---|---|---|
BULLMQ_REMOVE_ON_COMPLETE | 100 | Number of completed jobs to keep in Redis | 200 |
BULLMQ_REMOVE_ON_FAIL | 50 | Number of failed jobs to keep in Redis | 25 |
BULLMQ_DEFAULT_ATTEMPTS | 3 | Default retry attempts for jobs | 5 |
BULLMQ_BACKOFF_DELAY | 2000 | Delay between retries (milliseconds) | 5000 |
BULLMQ_BACKOFF_TYPE | exponential | Backoff strategy: 'exponential', 'fixed', 'custom' | fixed |
Environment-Specific Examples
Development Environment:
BULLMQ_REMOVE_ON_COMPLETE=500
BULLMQ_REMOVE_ON_FAIL=100
BULLMQ_DEFAULT_ATTEMPTS=2
BULLMQ_BACKOFF_DELAY=10000Production Environment:
BULLMQ_REMOVE_ON_COMPLETE=50
BULLMQ_REMOVE_ON_FAIL=20
BULLMQ_DEFAULT_ATTEMPTS=5
BULLMQ_BACKOFF_DELAY=1000High-Volume Environment:
BULLMQ_REMOVE_ON_COMPLETE=10
BULLMQ_REMOVE_ON_FAIL=5
BULLMQ_BACKOFF_DELAY=500Install Redis driver
zin add queue:redisWhen to use queue-redis vs queue-monitor
- ✅ Use
@zintrust/queue-redisif you only need to enqueue jobs and another service will process them - ✅✅ Use
@zintrust/queue-monitorif you need full queue management (enqueue + process + monitor + retry)
Note: The monitor package can do everything queue-redis does, plus much more. So if you install @zintrust/queue-monitor, there's no need for @zintrust/queue-redis.
RabbitMQ Driver
Install:
zin add queue:rabbitmqAWS SQS Driver
Install:
zin add queue:sqsNote: This driver uses rPush/lPop semantics; ack() is a no-op for this simple implementation. For visibility timeouts and retry mechanics, implement a processing list (BRPOPLPUSH) and message requeueing.
CI integration
- A GitHub Actions workflow is provided at
.github/workflows/redis-integration.ymlto run the Redis integration test when a Redis endpoint is configured in the repository secrets asINTEGRATION_REDIS_URL. - To enable: add a repository secret
INTEGRATION_REDIS_URLwith a value likeredis://:password@host:6379and the workflow will run automatically on push/pull_request, or you can trigger it manually via "Run workflow" in the Actions tab. - For self-hosted Redis in CI you can use a managed host (Upstash/Redis Cloud) and set the connection URL as the secret.
Integration testing
- Integration tests that exercise a real Redis instance are included under
tests/integration/queue/Redis.integration.test.ts. - To run them locally or in CI, set
REDIS_URLto a reachable Redis instance (e.g.,redis://:password@host:6379). - The integration test is skipped automatically when
REDIS_URLis not set so CI jobs that don't have a Redis dependency will not fail.