Skip to content

Async Queue (Redis) — Quickstart

This module provides an abstraction for job queues and drivers. A simple in-memory driver is included for testing and local development.

API (Queue):

  • Queue.register(name, driver) — register a new driver
  • Queue.get(name?) — get a driver (defaults to process.env.QUEUE_DRIVER or inmemory)
  • Queue.enqueue(queue, payload) — enqueue a job
  • Queue.dequeue(queue) — dequeue next job
  • Queue.ack(queue, id) — acknowledge a job
  • Queue.length(queue) — get pending job count
  • Queue.drain(queue) — clear all jobs

Driver interface:

  • enqueue(queue, payload): Promise\<string>
  • dequeue(queue): Promise\<QueueMessage | undefined>
  • ack(queue, id): Promise\<void>
  • length(queue): Promise\<number>
  • drain(queue): Promise\<void>

Notes:

  • The in-memory driver is NOT suitable for production — use Redis or another durable backend for PHASE 1.5 production work.
  • Drivers should be registered with Queue.register('redis', RedisDriver).

Sync Driver

When QUEUE_DRIVER=sync, jobs are processed synchronously and immediately. This is useful for testing and development but requires explicit worker execution after enqueuing.

Important: Manual Worker Execution Required

With the sync driver, you must manually call the worker runner to process enqueued jobs:

typescript
import { EmailQueue } from '@app/Workers/EmailWorker';

// Enqueue a job
await EmailJobService.sendWelcome('test@zintrust.com', 'Test User', 'example-mysql1');

// IMPORTANT: Process the job immediately (required for sync driver)
await EmailQueue.processOne('example-mysql1');

// Or process all jobs
await EmailQueue.processAll('example-mysql1');

When to Use Sync Driver

  • Development & Testing - Immediate feedback and debugging
  • Simple Applications - No background processing needed
  • Unit Tests - Predictable, synchronous behavior

Limitations

  • ⚠️ Blocking - Jobs block the request/response cycle
  • ⚠️ No Persistence - Jobs are lost if application crashes
  • ⚠️ No Retry - Failed jobs are not retried automatically
  • ⚠️ Manual Processing - Must explicitly call worker methods

Migration to Production

For production use, switch to Redis driver:

bash
# Development (sync)
QUEUE_DRIVER=sync

# Production (Redis)
QUEUE_DRIVER=redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password

Redis Driver (BullMQ-Powered)

  • The Redis queue driver now uses BullMQ for enterprise-grade job processing with auto-scaling, circuit breaker, dead letter queue, and advanced monitoring.
  • Configure via standard Redis environment variables (REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_QUEUE_DB).
  • You can register the driver with Queue.register('redis', RedisDriver) and then call Queue.enqueue('my-queue', payload, 'redis').
  • For Cloudflare Workers, prefer Redis RPC unless you intentionally use Workers TCP sockets. Set both USE_REDIS_PROXY=true and REDIS_RPC_URL so the Worker calls a Node.js Redis RPC backend instead of opening direct Redis/BullMQ connections.

Architecture: Producer vs Consumer (Cloudflare)

If you deploy your API to Cloudflare Workers, you cannot run Queue Consumers (Workers) in the same process because BullMQ/Redis consumers require Node.js primitives not available in the Edge runtime.

The Solution: Split your deployment into two services:

  1. Producer (Cloudflare Worker): Handles API requests, validates inputs, and enqueues jobs to Redis.
  2. Consumer (Node.js backend): A separate service (for example Docker, Railway, Fly.io, EC2, or a process manager) that connects to the same Redis instance, consumes jobs, and processes them.

See Architecture: Producer-Consumer Model for setup details.

Redis RPC (Cloudflare without Redis TCP)

When Cloudflare Workers cannot open Redis TCP sockets in your environment, ZinTrust can proxy queue commands over HTTP to @zintrust/redis-rpc. The Worker sends queue intent over HTTP; Redis, BullMQ, and Lua scripts run in the backend process.

Worker-side env:

bash
QUEUE_DRIVER=redis
USE_REDIS_PROXY=true
REDIS_RPC_URL=https://queues.example.com
REDIS_RPC_SECRET=change-me

Backend:

bash
npm i @zintrust/redis-rpc
zin redis-rpc

The older queue HTTP gateway remains available for custom deployments, but Redis RPC is the maintained queue-aware path for ZinTrust packages.

BullMQ Environment Variables

When QUEUE_DRIVER=redis, the system uses BullMQ with these customizable settings:

Environment VariableDefaultDescriptionExample
BULLMQ_REMOVE_ON_COMPLETE100Number of completed jobs to keep in Redis200
BULLMQ_REMOVE_ON_FAIL50Number of failed jobs to keep in Redis25
BULLMQ_DEFAULT_ATTEMPTS3Default retry attempts for jobs5
BULLMQ_BACKOFF_DELAY2000Delay between retries (milliseconds)5000
BULLMQ_BACKOFF_TYPEexponentialBackoff strategy: 'exponential', 'fixed', 'custom'fixed

Environment-Specific Examples

Development Environment:

bash
BULLMQ_REMOVE_ON_COMPLETE=500
BULLMQ_REMOVE_ON_FAIL=100
BULLMQ_DEFAULT_ATTEMPTS=2
BULLMQ_BACKOFF_DELAY=10000

Production Environment:

bash
BULLMQ_REMOVE_ON_COMPLETE=50
BULLMQ_REMOVE_ON_FAIL=20
BULLMQ_DEFAULT_ATTEMPTS=5
BULLMQ_BACKOFF_DELAY=1000

High-Volume Environment:

bash
BULLMQ_REMOVE_ON_COMPLETE=10
BULLMQ_REMOVE_ON_FAIL=5
BULLMQ_BACKOFF_DELAY=500

Deduplication Collision Behavior

BullMQ-backed queue payloads support a deduplication collision policy:

typescript
await Queue.enqueue('balance-updates', {
  accountId: 'acct-123',
  delta: -25,
  deduplication: {
    id: 'acct-123',
    ttl: 30000,
    collisionBehavior: 'enqueue',
  },
});
  • collisionBehavior: 'suppress' is the default and drops later matching work as a deduplicated success
  • collisionBehavior: 'enqueue' keeps later same-key jobs queued so workers can process them one-by-one behind the existing overlap lock

Use enqueue for ordered mutation workloads such as balance-affecting debit/credit operations where backlog visibility matters.

Install Redis driver

bash
zin add queue:redis

When to use queue-redis vs queue-monitor

  • ✅ Use @zintrust/queue-redis if you only need to enqueue jobs and another service will process them
  • ✅✅ Use @zintrust/queue-monitor if you need full queue management (enqueue + process + monitor + retry)

Note: The monitor package can do everything queue-redis does, plus much more. So if you install @zintrust/queue-monitor, there's no need for @zintrust/queue-redis.

RabbitMQ Driver

Install:

bash
zin add queue:rabbitmq

Cloudflare Workers (HTTP Gateway)

RabbitMQ AMQP TCP connections are not available in Workers without an HTTP gateway. Use a gateway service that exposes the following endpoints and configure the gateway URL:

  • POST /enqueue{ id: string }
  • POST /dequeue{ message?: { id: string; payload: unknown; attempts: number } | null }
  • POST /ack{ ok: true }
  • POST /length{ length: number }
  • POST /drain{ ok: true }

Set environment variables:

  • RABBITMQ_HTTP_GATEWAY_URL
  • RABBITMQ_HTTP_GATEWAY_TOKEN (optional)
  • RABBITMQ_HTTP_GATEWAY_TIMEOUT_MS (optional, default 15000)

AWS SQS Driver

Install:

bash
zin add queue:sqs

Note: This driver uses rPush/lPop semantics; ack() is a no-op for this simple implementation. For visibility timeouts and retry mechanics, implement a processing list (BRPOPLPUSH) and message requeueing.

Cloudflare Queues Driver

Install:

bash
zin add queue:cloudflare

@zintrust/queue-cloudflare supports Cloudflare Queue producers/consumers and a BullMQ-like state layer backed by D1, Durable Objects, KV, Cron Triggers, and DLQs. Run its state migration before using job inspection, repeatables, flows, or scheduler features:

bash
zin migrate:queue-cloudflare --database zintrust-queue --local

See docs/package-queue-cloudflare.md for binding configuration and API examples.

CI integration

  • If your application is tested against a real Redis instance in GitHub Actions, you can configure a repository secret (e.g. INTEGRATION_REDIS_URL) and pass it to your workflow.
  • For self-hosted Redis in CI you can use a managed host (Upstash/Redis Cloud) and set the connection URL as the secret.

Integration testing

  • Integration tests that exercise a real Redis instance are included under tests/integration/queue/Redis.integration.test.ts.
  • To run them locally or in CI, set REDIS_URL to a reachable Redis instance (e.g., redis://:password@host:6379).
  • The integration test is skipped automatically when REDIS_URL is not set so CI jobs that don't have a Redis dependency will not fail.

Released under the MIT License.