Skip to content

Redis Queue Adapter

The @zintrust/queue-redis package provides a Redis driver for ZinTrust's queue system, enabling high-performance message queuing using Redis's data structures.

Installation

bash
zin add  @zintrust/queue-redis

Configuration

Add the Redis queue configuration to your environment:

typescript
// config/queue.ts
import { QueueConfig } from '@zintrust/core';

export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    host: process.env.REDIS_HOST || 'localhost',
    port: parseInt(process.env.REDIS_PORT || '6379'),
    password: process.env.REDIS_PASSWORD,
    db: parseInt(process.env.REDIS_DB || '1'),
    keyPrefix: 'zintrust:queue:',
    maxRetriesPerRequest: 3,
    retryDelayOnFailover: 100,
    lazyConnect: true,
    keepAlive: 30000,
    family: 4,
  },
};

Environment Variables

bash
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your_password
REDIS_DB=1

Usage

typescript
import { Queue } from '@zintrust/core';

// Define a job
const SendEmailJob = Queue.define({
  queue: 'email-queue',
  handler: async (job) => {
    const { to, subject, content } = job.data;
    await sendEmail(to, subject, content);
  },
  options: {
    attempts: 3,
    backoff: 'exponential',
    delay: 1000,
  },
});

// Dispatch a job
await SendEmailJob.dispatch({
  to: 'user@example.com',
  subject: 'Welcome!',
  content: 'Welcome to our service!',
});

// Process jobs
await Queue.process('email-queue', async (job) => {
  await SendEmailJob.handler(job);
});

Features

  • Redis Lists: Uses Redis LIST data structure for queues
  • Blocking Operations: Efficient blocking pop operations
  • Priority Queues: Support for priority-based job processing
  • Delayed Jobs: Built-in support for delayed job execution
  • Job Retries: Configurable retry strategies
  • Dead Letter Queues: Automatic handling of failed jobs
  • Monitoring: Built-in queue monitoring and metrics
  • Cluster Support: Redis Cluster support
  • Pub/Sub: Redis pub/sub for real-time notifications

Advanced Configuration

Redis Cluster

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    cluster: [
      { host: 'redis-1', port: 6379 },
      { host: 'redis-2', port: 6379 },
      { host: 'redis-3', port: 6379 },
    ],
    options: {
      redisOptions: {
        password: 'your-password',
      },
      maxRedirections: 16,
      retryDelayOnFailover: 100,
    },
  },
};

Sentinel Configuration

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    sentinels: [
      { host: 'sentinel-1', port: 26379 },
      { host: 'sentinel-2', port: 26379 },
      { host: 'sentinel-3', port: 26379 },
    ],
    name: 'mymaster',
    password: 'your-password',
  },
};

Connection Pooling

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    // ... other config
    pool: {
      max: 10,
      min: 2,
      acquireTimeoutMillis: 30000,
      idleTimeoutMillis: 30000,
    },
  },
};

Queue Types

Standard Queue

typescript
const StandardJob = Queue.define({
  queue: 'standard-queue',
  handler: async (job) => {
    console.log('Processing standard job:', job.data);
  },
});

Priority Queue

typescript
const PriorityJob = Queue.define({
  queue: 'priority-queue',
  priority: true, // Enable priority queue
  handler: async (job) => {
    console.log('Processing priority job:', job.data);
  },
});

// Dispatch with priority
await PriorityJob.dispatch(
  {
    message: 'High priority task',
  },
  {
    priority: 10, // 0-10 priority range
  }
);

Delayed Queue

typescript
const DelayedJob = Queue.define({
  queue: 'delayed-queue',
  delayed: true, // Enable delayed processing
  handler: async (job) => {
    console.log('Processing delayed job:', job.data);
  },
});

// Dispatch with delay
await DelayedJob.dispatch(
  {
    message: 'Process in 5 minutes',
  },
  {
    delay: 5 * 60 * 1000, // 5 minutes delay
  }
);

Job Options

Retry Configuration

typescript
const RetryJob = Queue.define({
  queue: 'retry-queue',
  options: {
    attempts: 5,
    backoff: {
      type: 'exponential',
      delay: 1000,
      maxDelay: 60000,
    },
  },
  handler: async (job) => {
    await processJob(job.data);
  },
});

Job Timeout

typescript
const TimeoutJob = Queue.define({
  queue: 'timeout-queue',
  options: {
    timeout: 30000, // 30 seconds timeout
  },
  handler: async (job) => {
    await longRunningOperation(job.data);
  },
});

Job Dependencies

typescript
const ParentJob = Queue.define({
  queue: 'parent-queue',
  handler: async (job) => {
    await processParentTask(job.data);
  },
});

const ChildJob = Queue.define({
  queue: 'child-queue',
  dependencies: ['parent-queue'], // Wait for parent jobs
  handler: async (job) => {
    await processChildTask(job.data);
  },
});

Advanced Features

Batch Processing

typescript
const BatchJob = Queue.define({
  queue: 'batch-queue',
  batchSize: 10,
  batchTimeout: 5000,
  handler: async (jobs) => {
    // Process multiple jobs at once
    const results = await processBatch(jobs.map((job) => job.data));
    return results;
  },
});

Rate Limiting

typescript
const RateLimitedJob = Queue.define({
  queue: 'rate-limited-queue',
  rateLimit: {
    max: 100, // Max 100 jobs
    period: 60000, // Per minute
  },
  handler: async (job) => {
    await processJob(job.data);
  },
});

Job Chaining

typescript
const Step1Job = Queue.define({
  queue: 'step1-queue',
  handler: async (job) => {
    const result = await processStep1(job.data);

    // Chain to next step
    await Step2Job.dispatch({
      ...job.data,
      step1Result: result,
    });
  },
});

const Step2Job = Queue.define({
  queue: 'step2-queue',
  handler: async (job) => {
    await processStep2(job.data);
  },
});

Dead Letter Queues

Configure DLQ

typescript
const JobWithDLQ = Queue.define({
  queue: 'processing-queue',
  deadLetterQueue: {
    queue: 'dlq-processing',
    ttl: 7 * 24 * 60 * 60 * 1000, // 7 days TTL
  },
  options: {
    attempts: 3,
  },
  handler: async (job) => {
    await processJob(job.data);
  },
});

Process Dead Letters

typescript
const DLQJob = Queue.define({
  queue: 'dlq-processing',
  handler: async (job) => {
    console.log('Dead letter job:', job.data);
    console.log('Failed attempts:', job.attempts);
    console.log('Error:', job.error);

    // Decide whether to retry or archive
    if (shouldRetry(job)) {
      await JobWithDLQ.dispatch(job.data);
    } else {
      await archiveFailedJob(job);
    }
  },
});

Monitoring and Metrics

Queue Statistics

typescript
const stats = await Queue.getStats('email-queue');
// Returns:
{
  queue: 'email-queue',
  waiting: 25,
  active: 3,
  completed: 1000,
  failed: 5,
  delayed: 10,
  paused: false,
  processing: 3,
}

const allStats = await Queue.getAllStats();
// Returns stats for all queues

Job Information

typescript
const jobInfo = await Queue.getJob('job-id');
// Returns:
{
  id: 'job-id',
  queue: 'email-queue',
  data: { /* job data */ },
  opts: { /* job options */ },
  progress: 50,
  attempts: 2,
  maxAttempts: 3,
  processedOn: 1640995200000,
  finishedOn: null,
  failedOn: null,
}

Queue Health

typescript
const health = await Queue.getHealth();
// Returns:
{
  connected: true,
  redis: {
    connected: true,
    host: 'localhost',
    port: 6379,
    db: 1,
  },
  queues: {
    total: 5,
    active: 3,
    paused: 0,
  },
}

Real-time Notifications

Redis Pub/Sub

typescript
// Subscribe to queue events
Queue.on('job:completed', (jobId, result) => {
  console.log(`Job ${jobId} completed:`, result);
});

Queue.on('job:failed', (jobId, error) => {
  console.log(`Job ${jobId} failed:`, error);
});

Queue.on('queue:drained', (queueName) => {
  console.log(`Queue ${queueName} is empty`);
});

Queue.on('queue:stalled', (queueName) => {
  console.log(`Queue ${queueName} has stalled jobs`);
});

Custom Events

typescript
// Emit custom events
await Queue.emit('custom:event', {
  queue: 'email-queue',
  data: {
    /* custom data */
  },
});

// Listen for custom events
Queue.on('custom:event', (data) => {
  console.log('Custom event:', data);
});

Performance Optimization

Lua Scripts

typescript
// Use Lua scripts for atomic operations
const script = `
  local queue = KEYS[1]
  local job = ARGV[1]
  local priority = tonumber(ARGV[2])

  if priority > 0 then
    redis.call('ZADD', queue .. ':priority', priority, job)
  else
    redis.call('RPUSH', queue, job)
  end

  return 1
`;

await Queue.eval(script, 1, 'email-queue', JSON.stringify(jobData), 5);

Pipeline Operations

typescript
// Use Redis pipelines for bulk operations
const pipeline = Queue.pipeline();

for (let i = 0; i < 100; i++) {
  pipeline.rpush('bulk-queue', JSON.stringify(jobData[i]));
}

await pipeline.exec();

Memory Optimization

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    // ... other config
    memory: {
      maxMemoryPolicy: 'allkeys-lru',
      maxMemory: '2gb',
    },
  },
};

Error Handling

Global Error Handler

typescript
Queue.setErrorHandler(async (job, error) => {
  console.log('Job failed:', job.id, error.message);

  // Log to external monitoring
  await logError(job, error);

  // Send alert for critical errors
  if (error.severity === 'critical') {
    await sendAlert(error);
  }
});

Queue-Specific Error Handler

typescript
const ErrorHandlingJob = Queue.define({
  queue: 'error-handling-queue',
  errorHandler: async (job, error) => {
    console.log('Specific error handler for job:', job.id);

    // Custom error handling logic
    if (error.code === 'RETRYABLE') {
      return true; // Retry the job
    } else {
      return false; // Don't retry
    }
  },
  handler: async (job) => {
    await processJob(job.data);
  },
});

Testing

Mock Redis

typescript
import { RedisMock } from '@zintrust/queue-redis';

// Use mock for testing
const mockQueue = new RedisMock();

// Mock Redis operations
mockQueue.on('rpush', (queue, data) => {
  console.log('Mock push:', queue, data);
});

// Test job processing
await mockQueue.process('test-queue', async (job) => {
  expect(job.data).toEqual({ test: 'data' });
});

Integration Testing

typescript
import { TestRedis } from '@zintrust/queue-redis';

// Use test Redis instance
const testQueue = new TestRedis({
  host: 'localhost',
  port: 6380, // Different port for testing
});

// Setup test data
await testQueue.clearQueue('test-queue');
await testQueue.addJob('test-queue', { test: 'data' });

// Run test
const result = await processTestJob();
expect(result).toBeTruthy();

Security

Authentication

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    host: 'localhost',
    port: 6379,
    password: 'your-secure-password',
    tls: {
      host: 'redis.example.com',
      port: 6380,
    },
  },
};

Access Control

typescript
export const queue: QueueConfig = {
  driver: 'redis',
  redis: {
    // ... other config
    acl: {
      username: 'queue-user',
      password: 'queue-password',
      commands: ['GET', 'SET', 'LPUSH', 'RPOP', 'ZADD'],
      keys: ['zintrust:queue:*'],
    },
  },
};

Limitations

  • Memory Usage: Redis is in-memory, large queues consume significant memory
  • Persistence: Depends on Redis persistence configuration
  • Network Latency: Network issues can affect performance
  • Single Point of Failure: Single Redis instance can be a SPOF (mitigated with clustering)
  • Message Size: Redis has limitations on individual value sizes

Released under the MIT License.