Redis Queue Adapter
The @zintrust/queue-redis package provides a Redis driver for ZinTrust's queue system, enabling high-performance message queuing using Redis's data structures.
Installation
bash
zin add @zintrust/queue-redisConfiguration
Add the Redis queue configuration to your environment:
typescript
// config/queue.ts
import { QueueConfig } from '@zintrust/core';
export const queue: QueueConfig = {
driver: 'redis',
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '1'),
keyPrefix: 'zintrust:queue:',
maxRetriesPerRequest: 3,
retryDelayOnFailover: 100,
lazyConnect: true,
keepAlive: 30000,
family: 4,
},
};Environment Variables
bash
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your_password
REDIS_DB=1Usage
typescript
import { Queue } from '@zintrust/core';
// Define a job
const SendEmailJob = Queue.define({
queue: 'email-queue',
handler: async (job) => {
const { to, subject, content } = job.data;
await sendEmail(to, subject, content);
},
options: {
attempts: 3,
backoff: 'exponential',
delay: 1000,
},
});
// Dispatch a job
await SendEmailJob.dispatch({
to: 'user@example.com',
subject: 'Welcome!',
content: 'Welcome to our service!',
});
// Process jobs
await Queue.process('email-queue', async (job) => {
await SendEmailJob.handler(job);
});Features
- Redis Lists: Uses Redis LIST data structure for queues
- Blocking Operations: Efficient blocking pop operations
- Priority Queues: Support for priority-based job processing
- Delayed Jobs: Built-in support for delayed job execution
- Job Retries: Configurable retry strategies
- Dead Letter Queues: Automatic handling of failed jobs
- Monitoring: Built-in queue monitoring and metrics
- Cluster Support: Redis Cluster support
- Pub/Sub: Redis pub/sub for real-time notifications
Advanced Configuration
Redis Cluster
typescript
export const queue: QueueConfig = {
driver: 'redis',
redis: {
cluster: [
{ host: 'redis-1', port: 6379 },
{ host: 'redis-2', port: 6379 },
{ host: 'redis-3', port: 6379 },
],
options: {
redisOptions: {
password: 'your-password',
},
maxRedirections: 16,
retryDelayOnFailover: 100,
},
},
};Sentinel Configuration
typescript
export const queue: QueueConfig = {
driver: 'redis',
redis: {
sentinels: [
{ host: 'sentinel-1', port: 26379 },
{ host: 'sentinel-2', port: 26379 },
{ host: 'sentinel-3', port: 26379 },
],
name: 'mymaster',
password: 'your-password',
},
};Connection Pooling
typescript
export const queue: QueueConfig = {
driver: 'redis',
redis: {
// ... other config
pool: {
max: 10,
min: 2,
acquireTimeoutMillis: 30000,
idleTimeoutMillis: 30000,
},
},
};Queue Types
Standard Queue
typescript
const StandardJob = Queue.define({
queue: 'standard-queue',
handler: async (job) => {
console.log('Processing standard job:', job.data);
},
});Priority Queue
typescript
const PriorityJob = Queue.define({
queue: 'priority-queue',
priority: true, // Enable priority queue
handler: async (job) => {
console.log('Processing priority job:', job.data);
},
});
// Dispatch with priority
await PriorityJob.dispatch(
{
message: 'High priority task',
},
{
priority: 10, // 0-10 priority range
}
);Delayed Queue
typescript
const DelayedJob = Queue.define({
queue: 'delayed-queue',
delayed: true, // Enable delayed processing
handler: async (job) => {
console.log('Processing delayed job:', job.data);
},
});
// Dispatch with delay
await DelayedJob.dispatch(
{
message: 'Process in 5 minutes',
},
{
delay: 5 * 60 * 1000, // 5 minutes delay
}
);Job Options
Retry Configuration
typescript
const RetryJob = Queue.define({
queue: 'retry-queue',
options: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000,
maxDelay: 60000,
},
},
handler: async (job) => {
await processJob(job.data);
},
});Job Timeout
typescript
const TimeoutJob = Queue.define({
queue: 'timeout-queue',
options: {
timeout: 30000, // 30 seconds timeout
},
handler: async (job) => {
await longRunningOperation(job.data);
},
});Job Dependencies
typescript
const ParentJob = Queue.define({
queue: 'parent-queue',
handler: async (job) => {
await processParentTask(job.data);
},
});
const ChildJob = Queue.define({
queue: 'child-queue',
dependencies: ['parent-queue'], // Wait for parent jobs
handler: async (job) => {
await processChildTask(job.data);
},
});Advanced Features
Batch Processing
typescript
const BatchJob = Queue.define({
queue: 'batch-queue',
batchSize: 10,
batchTimeout: 5000,
handler: async (jobs) => {
// Process multiple jobs at once
const results = await processBatch(jobs.map((job) => job.data));
return results;
},
});Rate Limiting
typescript
const RateLimitedJob = Queue.define({
queue: 'rate-limited-queue',
rateLimit: {
max: 100, // Max 100 jobs
period: 60000, // Per minute
},
handler: async (job) => {
await processJob(job.data);
},
});Job Chaining
typescript
const Step1Job = Queue.define({
queue: 'step1-queue',
handler: async (job) => {
const result = await processStep1(job.data);
// Chain to next step
await Step2Job.dispatch({
...job.data,
step1Result: result,
});
},
});
const Step2Job = Queue.define({
queue: 'step2-queue',
handler: async (job) => {
await processStep2(job.data);
},
});Dead Letter Queues
Configure DLQ
typescript
const JobWithDLQ = Queue.define({
queue: 'processing-queue',
deadLetterQueue: {
queue: 'dlq-processing',
ttl: 7 * 24 * 60 * 60 * 1000, // 7 days TTL
},
options: {
attempts: 3,
},
handler: async (job) => {
await processJob(job.data);
},
});Process Dead Letters
typescript
const DLQJob = Queue.define({
queue: 'dlq-processing',
handler: async (job) => {
console.log('Dead letter job:', job.data);
console.log('Failed attempts:', job.attempts);
console.log('Error:', job.error);
// Decide whether to retry or archive
if (shouldRetry(job)) {
await JobWithDLQ.dispatch(job.data);
} else {
await archiveFailedJob(job);
}
},
});Monitoring and Metrics
Queue Statistics
typescript
const stats = await Queue.getStats('email-queue');
// Returns:
{
queue: 'email-queue',
waiting: 25,
active: 3,
completed: 1000,
failed: 5,
delayed: 10,
paused: false,
processing: 3,
}
const allStats = await Queue.getAllStats();
// Returns stats for all queuesJob Information
typescript
const jobInfo = await Queue.getJob('job-id');
// Returns:
{
id: 'job-id',
queue: 'email-queue',
data: { /* job data */ },
opts: { /* job options */ },
progress: 50,
attempts: 2,
maxAttempts: 3,
processedOn: 1640995200000,
finishedOn: null,
failedOn: null,
}Queue Health
typescript
const health = await Queue.getHealth();
// Returns:
{
connected: true,
redis: {
connected: true,
host: 'localhost',
port: 6379,
db: 1,
},
queues: {
total: 5,
active: 3,
paused: 0,
},
}Real-time Notifications
Redis Pub/Sub
typescript
// Subscribe to queue events
Queue.on('job:completed', (jobId, result) => {
console.log(`Job ${jobId} completed:`, result);
});
Queue.on('job:failed', (jobId, error) => {
console.log(`Job ${jobId} failed:`, error);
});
Queue.on('queue:drained', (queueName) => {
console.log(`Queue ${queueName} is empty`);
});
Queue.on('queue:stalled', (queueName) => {
console.log(`Queue ${queueName} has stalled jobs`);
});Custom Events
typescript
// Emit custom events
await Queue.emit('custom:event', {
queue: 'email-queue',
data: {
/* custom data */
},
});
// Listen for custom events
Queue.on('custom:event', (data) => {
console.log('Custom event:', data);
});Performance Optimization
Lua Scripts
typescript
// Use Lua scripts for atomic operations
const script = `
local queue = KEYS[1]
local job = ARGV[1]
local priority = tonumber(ARGV[2])
if priority > 0 then
redis.call('ZADD', queue .. ':priority', priority, job)
else
redis.call('RPUSH', queue, job)
end
return 1
`;
await Queue.eval(script, 1, 'email-queue', JSON.stringify(jobData), 5);Pipeline Operations
typescript
// Use Redis pipelines for bulk operations
const pipeline = Queue.pipeline();
for (let i = 0; i < 100; i++) {
pipeline.rpush('bulk-queue', JSON.stringify(jobData[i]));
}
await pipeline.exec();Memory Optimization
typescript
export const queue: QueueConfig = {
driver: 'redis',
redis: {
// ... other config
memory: {
maxMemoryPolicy: 'allkeys-lru',
maxMemory: '2gb',
},
},
};Error Handling
Global Error Handler
typescript
Queue.setErrorHandler(async (job, error) => {
console.log('Job failed:', job.id, error.message);
// Log to external monitoring
await logError(job, error);
// Send alert for critical errors
if (error.severity === 'critical') {
await sendAlert(error);
}
});Queue-Specific Error Handler
typescript
const ErrorHandlingJob = Queue.define({
queue: 'error-handling-queue',
errorHandler: async (job, error) => {
console.log('Specific error handler for job:', job.id);
// Custom error handling logic
if (error.code === 'RETRYABLE') {
return true; // Retry the job
} else {
return false; // Don't retry
}
},
handler: async (job) => {
await processJob(job.data);
},
});Testing
Mock Redis
typescript
import { RedisMock } from '@zintrust/queue-redis';
// Use mock for testing
const mockQueue = new RedisMock();
// Mock Redis operations
mockQueue.on('rpush', (queue, data) => {
console.log('Mock push:', queue, data);
});
// Test job processing
await mockQueue.process('test-queue', async (job) => {
expect(job.data).toEqual({ test: 'data' });
});Integration Testing
typescript
import { TestRedis } from '@zintrust/queue-redis';
// Use test Redis instance
const testQueue = new TestRedis({
host: 'localhost',
port: 6380, // Different port for testing
});
// Setup test data
await testQueue.clearQueue('test-queue');
await testQueue.addJob('test-queue', { test: 'data' });
// Run test
const result = await processTestJob();
expect(result).toBeTruthy();Security
Authentication
typescript
export const queue: QueueConfig = {
driver: 'redis',
redis: {
host: 'localhost',
port: 6379,
password: 'your-secure-password',
tls: {
host: 'redis.example.com',
port: 6380,
},
},
};Access Control
typescript
export const queue: QueueConfig = {
driver: 'redis',
redis: {
// ... other config
acl: {
username: 'queue-user',
password: 'queue-password',
commands: ['GET', 'SET', 'LPUSH', 'RPOP', 'ZADD'],
keys: ['zintrust:queue:*'],
},
},
};Limitations
- Memory Usage: Redis is in-memory, large queues consume significant memory
- Persistence: Depends on Redis persistence configuration
- Network Latency: Network issues can affect performance
- Single Point of Failure: Single Redis instance can be a SPOF (mitigated with clustering)
- Message Size: Redis has limitations on individual value sizes