Skip to content

Redis Queue Adapter

The @zintrust/queue-redis package provides a Redis driver for ZinTrust's queue system, enabling high-performance message queuing using Redis's data structures.

Installation

bash
zin add  @zintrust/queue-redis

Configuration

Add the Redis queue configuration to your environment:

typescript
// config/queue.ts
import { QueueConfig } from '@zintrust/core';

export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    host: process.env.REDIS_HOST || 'localhost',
    port: parseInt(process.env.REDIS_PORT || '6379'),
    password: process.env.REDIS_PASSWORD,
    db: parseInt(process.env.REDIS_DB || '1'),
    keyPrefix: 'zintrust:queue:',
    maxRetriesPerRequest: 3,
    retryDelayOnFailover: 100,
    lazyConnect: true,
    keepAlive: 30000,
    family: 4,
  },
};

Environment Variables

bash
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your_password
REDIS_DB=1

Usage

typescript
import { Queue } from '@zintrust/core';

// Define a job
const SendEmailJob = Queue.define({
  queue: 'email-queue',
  handler: async (job) => {
    const { to, subject, content } = job.data;
    await sendEmail(to, subject, content);
  },
  options: {
    attempts: 3,
    backoff: 'exponential',
    delay: 1000,
  },
});

// Dispatch a job
await SendEmailJob.dispatch({
  to: 'user@example.com',
  subject: 'Welcome!',
  content: 'Welcome to our service!',
});

// Process jobs
await Queue.process('email-queue', async (job) => {
  await SendEmailJob.handler(job);
});

Features

  • Redis Lists: Uses Redis LIST data structure for queues
  • Blocking Operations: Efficient blocking pop operations
  • Priority Queues: Support for priority-based job processing
  • Delayed Jobs: Built-in support for delayed job execution
  • Job Retries: Configurable retry strategies
  • Dead Letter Queues: Automatic handling of failed jobs
  • Monitoring: Built-in queue monitoring and metrics
  • Cluster Support: Redis Cluster support
  • Pub/Sub: Redis pub/sub for real-time notifications

Deduplication Collision Behavior

The BullMQ-backed Redis adapter supports both duplicate suppression and serialized overlap queueing through payload.deduplication.collisionBehavior.

typescript
await Queue.enqueue('ledger', {
  entryId: 'entry-123',
  deduplication: {
    id: 'account-42',
    ttl: 30000,
    collisionBehavior: 'enqueue',
  },
});
  • suppress remains the default and returns the deduplication id when a live lock already exists
  • enqueue keeps later same-key jobs in Redis so they wait behind the active lock instead of disappearing as hidden suppression

Use enqueue when jobs must execute sequentially for the same logical resource key but every job still needs a real queue record.

Advanced Configuration

Redis Cluster

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    cluster: [
      { host: 'redis-1', port: 6379 },
      { host: 'redis-2', port: 6379 },
      { host: 'redis-3', port: 6379 },
    ],
    options: {
      redisOptions: {
        password: 'your-password',
      },
      maxRedirections: 16,
      retryDelayOnFailover: 100,
    },
  },
};

Sentinel Configuration

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    sentinels: [
      { host: 'sentinel-1', port: 26379 },
      { host: 'sentinel-2', port: 26379 },
      { host: 'sentinel-3', port: 26379 },
    ],
    name: 'mymaster',
    password: 'your-password',
  },
};

Connection Pooling

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    // ... other config
    pool: {
      max: 10,
      min: 2,
      acquireTimeoutMillis: 30000,
      idleTimeoutMillis: 30000,
    },
  },
};

Queue Types

Standard Queue

typescript
const StandardJob = Queue.define({
  queue: 'standard-queue',
  handler: async (job) => {
    console.log('Processing standard job:', job.data);
  },
});

Priority Queue

typescript
const PriorityJob = Queue.define({
  queue: 'priority-queue',
  priority: true, // Enable priority queue
  handler: async (job) => {
    console.log('Processing priority job:', job.data);
  },
});

// Dispatch with priority
await PriorityJob.dispatch(
  {
    message: 'High priority task',
  },
  {
    priority: 10, // 0-10 priority range
  }
);

Delayed Queue

typescript
const DelayedJob = Queue.define({
  queue: 'delayed-queue',
  delayed: true, // Enable delayed processing
  handler: async (job) => {
    console.log('Processing delayed job:', job.data);
  },
});

// Dispatch with delay
await DelayedJob.dispatch(
  {
    message: 'Process in 5 minutes',
  },
  {
    delay: 5 * 60 * 1000, // 5 minutes delay
  }
);

Job Options

Retry Configuration

typescript
const RetryJob = Queue.define({
  queue: 'retry-queue',
  options: {
    attempts: 5,
    backoff: {
      type: 'exponential',
      delay: 1000,
      maxDelay: 60000,
    },
  },
  handler: async (job) => {
    await processJob(job.data);
  },
});

Job Timeout

typescript
const TimeoutJob = Queue.define({
  queue: 'timeout-queue',
  options: {
    timeout: 30000, // 30 seconds timeout
  },
  handler: async (job) => {
    await longRunningOperation(job.data);
  },
});

Job Dependencies

typescript
const ParentJob = Queue.define({
  queue: 'parent-queue',
  handler: async (job) => {
    await processParentTask(job.data);
  },
});

const ChildJob = Queue.define({
  queue: 'child-queue',
  dependencies: ['parent-queue'], // Wait for parent jobs
  handler: async (job) => {
    await processChildTask(job.data);
  },
});

Advanced Features

Batch Processing

typescript
const BatchJob = Queue.define({
  queue: 'batch-queue',
  batchSize: 10,
  batchTimeout: 5000,
  handler: async (jobs) => {
    // Process multiple jobs at once
    const results = await processBatch(jobs.map((job) => job.data));
    return results;
  },
});

Rate Limiting

typescript
const RateLimitedJob = Queue.define({
  queue: 'rate-limited-queue',
  rateLimit: {
    max: 100, // Max 100 jobs
    period: 60000, // Per minute
  },
  handler: async (job) => {
    await processJob(job.data);
  },
});

Job Chaining

typescript
const Step1Job = Queue.define({
  queue: 'step1-queue',
  handler: async (job) => {
    const result = await processStep1(job.data);

    // Chain to next step
    await Step2Job.dispatch({
      ...job.data,
      step1Result: result,
    });
  },
});

const Step2Job = Queue.define({
  queue: 'step2-queue',
  handler: async (job) => {
    await processStep2(job.data);
  },
});

Dead Letter Queues

Configure DLQ

typescript
const JobWithDLQ = Queue.define({
  queue: 'processing-queue',
  deadLetterQueue: {
    queue: 'dlq-processing',
    ttl: 7 * 24 * 60 * 60 * 1000, // 7 days TTL
  },
  options: {
    attempts: 3,
  },
  handler: async (job) => {
    await processJob(job.data);
  },
});

Process Dead Letters

typescript
const DLQJob = Queue.define({
  queue: 'dlq-processing',
  handler: async (job) => {
    console.log('Dead letter job:', job.data);
    console.log('Failed attempts:', job.attempts);
    console.log('Error:', job.error);

    // Decide whether to retry or archive
    if (shouldRetry(job)) {
      await JobWithDLQ.dispatch(job.data);
    } else {
      await archiveFailedJob(job);
    }
  },
});

Monitoring and Metrics

Queue Statistics

typescript
const stats = await Queue.getStats('email-queue');
// Returns:
{
  queue: 'email-queue',
  waiting: 25,
  active: 3,
  completed: 1000,
  failed: 5,
  delayed: 10,
  paused: false,
  processing: 3,
}

const allStats = await Queue.getAllStats();
// Returns stats for all queues

Job Information

typescript
const jobInfo = await Queue.getJob('job-id');
// Returns:
{
  id: 'job-id',
  queue: 'email-queue',
  data: { /* job data */ },
  opts: { /* job options */ },
  progress: 50,
  attempts: 2,
  maxAttempts: 3,
  processedOn: 1640995200000,
  finishedOn: null,
  failedOn: null,
}

Queue Health

typescript
const health = await Queue.getHealth();
// Returns:
{
  connected: true,
  redis: {
    connected: true,
    host: 'localhost',
    port: 6379,
    db: 1,
  },
  queues: {
    total: 5,
    active: 3,
    paused: 0,
  },
}

Real-time Notifications

Redis Pub/Sub

typescript
// Subscribe to queue events
Queue.on('job:completed', (jobId, result) => {
  console.log(`Job ${jobId} completed:`, result);
});

Queue.on('job:failed', (jobId, error) => {
  console.log(`Job ${jobId} failed:`, error);
});

Queue.on('queue:drained', (queueName) => {
  console.log(`Queue ${queueName} is empty`);
});

Queue.on('queue:stalled', (queueName) => {
  console.log(`Queue ${queueName} has stalled jobs`);
});

Custom Events

typescript
// Emit custom events
await Queue.emit('custom:event', {
  queue: 'email-queue',
  data: {
    /* custom data */
  },
});

// Listen for custom events
Queue.on('custom:event', (data) => {
  console.log('Custom event:', data);
});

Performance Optimization

Lua Scripts

typescript
// Use Lua scripts for atomic operations
const script = `
  local queue = KEYS[1]
  local job = ARGV[1]
  local priority = tonumber(ARGV[2])

  if priority > 0 then
    redis.call('ZADD', queue .. ':priority', priority, job)
  else
    redis.call('RPUSH', queue, job)
  end

  return 1
`;

await Queue.eval(script, 1, 'email-queue', JSON.stringify(jobData), 5);

Pipeline Operations

typescript
// Use Redis pipelines for bulk operations
const pipeline = Queue.pipeline();

for (let i = 0; i < 100; i++) {
  pipeline.rpush('bulk-queue', JSON.stringify(jobData[i]));
}

await pipeline.exec();

Memory Optimization

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    // ... other config
    memory: {
      maxMemoryPolicy: 'allkeys-lru',
      maxMemory: '2gb',
    },
  },
};

Error Handling

Global Error Handler

typescript
Queue.setErrorHandler(async (job, error) => {
  console.log('Job failed:', job.id, error.message);

  // Log to external monitoring
  await logError(job, error);

  // Send alert for critical errors
  if (error.severity === 'critical') {
    await sendAlert(error);
  }
});

Queue-Specific Error Handler

typescript
const ErrorHandlingJob = Queue.define({
  queue: 'error-handling-queue',
  errorHandler: async (job, error) => {
    console.log('Specific error handler for job:', job.id);

    // Custom error handling logic
    if (error.code === 'RETRYABLE') {
      return true; // Retry the job
    } else {
      return false; // Don't retry
    }
  },
  handler: async (job) => {
    await processJob(job.data);
  },
});

Testing

Mock Redis

typescript
import { RedisMock } from '@zintrust/queue-redis';

// Use mock for testing
const mockQueue = new RedisMock();

// Mock Redis operations
mockQueue.on('rpush', (queue, data) => {
  console.log('Mock push:', queue, data);
});

// Test job processing
await mockQueue.process('test-queue', async (job) => {
  expect(job.data).toEqual({ test: 'data' });
});

Integration Testing

typescript
import { TestRedis } from '@zintrust/queue-redis';

// Use test Redis instance
const testQueue = new TestRedis({
  host: 'localhost',
  port: 6380, // Different port for testing
});

// Setup test data
await testQueue.clearQueue('test-queue');
await testQueue.addJob('test-queue', { test: 'data' });

// Run test
const result = await processTestJob();
expect(result).toBeTruthy();

Security

Authentication

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    host: 'localhost',
    port: 6379,
    password: 'your-secure-password',
    tls: {
      host: 'redis.example.com',
      port: 6380,
    },
  },
};

Access Control

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    // ... other config
    acl: {
      username: 'queue-user',
      password: 'queue-password',
      commands: ['GET', 'SET', 'LPUSH', 'RPOP', 'ZADD'],
      keys: ['zintrust:queue:*'],
    },
  },
};

Limitations

  • Memory Usage: Redis is in-memory, large queues consume significant memory
  • Persistence: Depends on Redis persistence configuration
  • Network Latency: Network issues can affect performance
  • Single Point of Failure: Single Redis instance can be a SPOF (mitigated with clustering)
  • Message Size: Redis has limitations on individual value sizes

Released under the MIT License.