← Back to list

queue-job-processor
by patricio0312rev
Comprehensive library of +100 production-ready development skills covering every aspect of modern software engineering. From project setup to production deployment, from security hardening to performance optimization.
⭐ 6🍴 0📅 Jan 19, 2026
SKILL.md
name: queue-job-processor description: Implements background job processing with BullMQ/Redis including job queues, workers, scheduling, retries, and monitoring. Use when users request "background jobs", "queue processing", "async tasks", "BullMQ", or "job scheduler".
Queue Job Processor
Build robust background job processing with BullMQ and Redis.
Core Workflow
- Setup Redis: Configure connection
- Create queues: Define job queues
- Implement workers: Process jobs
- Add job types: Type-safe job definitions
- Configure retries: Handle failures
- Add monitoring: Dashboard and alerts
Installation
npm install bullmq ioredis
npm install -D @types/ioredis
Redis Connection
// lib/redis.ts
import IORedis from 'ioredis';
export const redis = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null, // Required for BullMQ
enableReadyCheck: false,
});
export const redisSubscriber = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
Queue Setup
Define Job Types
// jobs/types.ts
export interface EmailJobData {
to: string;
subject: string;
template: string;
variables: Record<string, string>;
}
export interface ImageProcessingJobData {
imageId: string;
userId: string;
operations: Array<{
type: 'resize' | 'crop' | 'watermark';
params: Record<string, any>;
}>;
}
export interface ReportJobData {
reportId: string;
userId: string;
type: 'daily' | 'weekly' | 'monthly';
dateRange: {
start: string;
end: string;
};
}
export interface WebhookJobData {
url: string;
payload: Record<string, any>;
headers?: Record<string, string>;
retryCount?: number;
}
export type JobData =
| { type: 'email'; data: EmailJobData }
| { type: 'image-processing'; data: ImageProcessingJobData }
| { type: 'report'; data: ReportJobData }
| { type: 'webhook'; data: WebhookJobData };
Create Queues
// queues/index.ts
import { Queue, QueueOptions } from 'bullmq';
import { redis } from '../lib/redis';
import {
EmailJobData,
ImageProcessingJobData,
ReportJobData,
WebhookJobData,
} from './types';
const defaultOptions: QueueOptions = {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: {
count: 1000, // Keep last 1000 completed jobs
age: 24 * 3600, // Keep for 24 hours
},
removeOnFail: {
count: 5000, // Keep last 5000 failed jobs
},
},
};
export const emailQueue = new Queue<EmailJobData>('email', defaultOptions);
export const imageQueue = new Queue<ImageProcessingJobData>('image-processing', {
...defaultOptions,
defaultJobOptions: {
...defaultOptions.defaultJobOptions,
attempts: 5,
timeout: 5 * 60 * 1000, // 5 minutes
},
});
export const reportQueue = new Queue<ReportJobData>('reports', {
...defaultOptions,
defaultJobOptions: {
...defaultOptions.defaultJobOptions,
timeout: 30 * 60 * 1000, // 30 minutes
},
});
export const webhookQueue = new Queue<WebhookJobData>('webhooks', {
...defaultOptions,
defaultJobOptions: {
...defaultOptions.defaultJobOptions,
attempts: 5,
backoff: {
type: 'exponential',
delay: 5000,
},
},
});
Workers
Email Worker
// workers/email.worker.ts
import { Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';
import { EmailJobData } from '../jobs/types';
import { sendEmail } from '../lib/email';
const emailWorker = new Worker<EmailJobData>(
'email',
async (job: Job<EmailJobData>) => {
const { to, subject, template, variables } = job.data;
console.log(`Processing email job ${job.id} to ${to}`);
// Update progress
await job.updateProgress(10);
// Render template
const html = await renderTemplate(template, variables);
await job.updateProgress(50);
// Send email
const result = await sendEmail({
to,
subject,
html,
});
await job.updateProgress(100);
return { messageId: result.messageId, sentAt: new Date() };
},
{
connection: redis,
concurrency: 10, // Process 10 emails at a time
limiter: {
max: 100, // Max 100 jobs
duration: 60000, // Per minute
},
}
);
// Event handlers
emailWorker.on('completed', (job, result) => {
console.log(`Email job ${job.id} completed:`, result);
});
emailWorker.on('failed', (job, error) => {
console.error(`Email job ${job?.id} failed:`, error);
});
emailWorker.on('progress', (job, progress) => {
console.log(`Email job ${job.id} progress: ${progress}%`);
});
export { emailWorker };
Image Processing Worker
// workers/image.worker.ts
import { Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';
import { ImageProcessingJobData } from '../jobs/types';
import sharp from 'sharp';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
const s3 = new S3Client({ region: process.env.AWS_REGION });
const imageWorker = new Worker<ImageProcessingJobData>(
'image-processing',
async (job: Job<ImageProcessingJobData>) => {
const { imageId, userId, operations } = job.data;
console.log(`Processing image ${imageId} for user ${userId}`);
// Download original image
const originalBuffer = await downloadImage(imageId);
let image = sharp(originalBuffer);
// Apply operations
for (let i = 0; i < operations.length; i++) {
const op = operations[i];
switch (op.type) {
case 'resize':
image = image.resize(op.params.width, op.params.height, {
fit: op.params.fit || 'cover',
});
break;
case 'crop':
image = image.extract({
left: op.params.left,
top: op.params.top,
width: op.params.width,
height: op.params.height,
});
break;
case 'watermark':
image = image.composite([
{ input: op.params.watermarkPath, gravity: 'southeast' },
]);
break;
}
await job.updateProgress(((i + 1) / operations.length) * 80);
}
// Convert and upload
const processedBuffer = await image.webp({ quality: 85 }).toBuffer();
const key = `processed/${userId}/${imageId}.webp`;
await s3.send(
new PutObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: key,
Body: processedBuffer,
ContentType: 'image/webp',
})
);
await job.updateProgress(100);
return {
url: `https://${process.env.S3_BUCKET}.s3.amazonaws.com/${key}`,
size: processedBuffer.length,
};
},
{
connection: redis,
concurrency: 5,
}
);
imageWorker.on('failed', async (job, error) => {
// Notify user of failure
if (job) {
await notifyUser(job.data.userId, {
type: 'image-processing-failed',
imageId: job.data.imageId,
error: error.message,
});
}
});
export { imageWorker };
Webhook Worker with Retries
// workers/webhook.worker.ts
import { Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';
import { WebhookJobData } from '../jobs/types';
const webhookWorker = new Worker<WebhookJobData>(
'webhooks',
async (job: Job<WebhookJobData>) => {
const { url, payload, headers = {} } = job.data;
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Signature': generateSignature(payload),
...headers,
},
body: JSON.stringify(payload),
signal: AbortSignal.timeout(30000), // 30s timeout
});
if (!response.ok) {
// Retry for server errors
if (response.status >= 500) {
throw new Error(`Webhook failed: ${response.status}`);
}
// Don't retry for client errors
return {
success: false,
status: response.status,
message: 'Client error, not retrying',
};
}
return {
success: true,
status: response.status,
};
},
{
connection: redis,
concurrency: 20,
}
);
export { webhookWorker };
Adding Jobs
Service Layer
// services/jobs.service.ts
import { emailQueue, imageQueue, reportQueue, webhookQueue } from '../queues';
import { JobsOptions } from 'bullmq';
export class JobService {
// Send email
static async sendEmail(data: EmailJobData, options?: JobsOptions) {
return emailQueue.add('send-email', data, {
...options,
priority: data.template === 'password-reset' ? 1 : 10,
});
}
// Bulk emails
static async sendBulkEmails(emails: EmailJobData[]) {
const jobs = emails.map((data, index) => ({
name: 'send-email',
data,
opts: {
delay: index * 100, // Stagger by 100ms
},
}));
return emailQueue.addBulk(jobs);
}
// Process image
static async processImage(data: ImageProcessingJobData) {
return imageQueue.add('process', data, {
jobId: `image-${data.imageId}`, // Prevent duplicates
});
}
// Schedule report
static async scheduleReport(data: ReportJobData, runAt: Date) {
return reportQueue.add('generate', data, {
delay: runAt.getTime() - Date.now(),
});
}
// Send webhook
static async sendWebhook(data: WebhookJobData) {
return webhookQueue.add('deliver', data);
}
}
API Usage
// app/api/users/route.ts
import { JobService } from '@/services/jobs.service';
export async function POST(req: Request) {
const data = await req.json();
// Create user
const user = await db.user.create({ data });
// Queue welcome email
await JobService.sendEmail({
to: user.email,
subject: 'Welcome!',
template: 'welcome',
variables: { name: user.name },
});
return Response.json(user);
}
Scheduled Jobs (Cron)
// schedulers/index.ts
import { Queue, QueueScheduler } from 'bullmq';
import { redis } from '../lib/redis';
// Daily report scheduler
export async function setupSchedulers() {
// Clean up old jobs daily
await reportQueue.add(
'cleanup',
{},
{
repeat: {
pattern: '0 0 * * *', // Every day at midnight
},
}
);
// Hourly metrics aggregation
await metricsQueue.add(
'aggregate',
{},
{
repeat: {
pattern: '0 * * * *', // Every hour
},
}
);
// Weekly digest
await emailQueue.add(
'weekly-digest',
{ template: 'weekly-digest' },
{
repeat: {
pattern: '0 9 * * 1', // Every Monday at 9 AM
},
}
);
}
Job Events & Monitoring
Event Listeners
// monitoring/events.ts
import { QueueEvents } from 'bullmq';
import { redis } from '../lib/redis';
const emailQueueEvents = new QueueEvents('email', { connection: redis });
emailQueueEvents.on('completed', ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} completed with:`, returnvalue);
metrics.increment('email.completed');
});
emailQueueEvents.on('failed', ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed:`, failedReason);
metrics.increment('email.failed');
// Alert on repeated failures
alertOnFailure(jobId, failedReason);
});
emailQueueEvents.on('delayed', ({ jobId, delay }) => {
console.log(`Job ${jobId} delayed by ${delay}ms`);
});
emailQueueEvents.on('progress', ({ jobId, data }) => {
console.log(`Job ${jobId} progress:`, data);
});
emailQueueEvents.on('stalled', ({ jobId }) => {
console.warn(`Job ${jobId} stalled`);
metrics.increment('email.stalled');
});
Bull Board Dashboard
// app/api/admin/queues/route.ts
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { emailQueue, imageQueue, reportQueue, webhookQueue } from '@/queues';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/api/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(imageQueue),
new BullMQAdapter(reportQueue),
new BullMQAdapter(webhookQueue),
],
serverAdapter,
});
export const GET = serverAdapter.getRouter();
export const POST = serverAdapter.getRouter();
Error Handling
// workers/base.worker.ts
import { Worker, Job, UnrecoverableError } from 'bullmq';
// Custom error for non-retryable failures
export class NonRetryableError extends UnrecoverableError {
constructor(message: string) {
super(message);
this.name = 'NonRetryableError';
}
}
// Worker with error handling
const worker = new Worker(
'queue-name',
async (job: Job) => {
try {
// Validate input
if (!job.data.requiredField) {
throw new NonRetryableError('Missing required field');
}
// Process job
return await processJob(job.data);
} catch (error) {
if (error instanceof NonRetryableError) {
throw error; // Won't retry
}
// Log and rethrow for retry
console.error(`Job ${job.id} error:`, error);
throw error;
}
},
{
connection: redis,
}
);
// Handle worker errors
worker.on('error', (error) => {
console.error('Worker error:', error);
});
Graceful Shutdown
// server.ts
import { emailWorker, imageWorker, reportWorker } from './workers';
const workers = [emailWorker, imageWorker, reportWorker];
async function gracefulShutdown() {
console.log('Shutting down workers...');
// Close workers gracefully
await Promise.all(
workers.map((worker) =>
worker.close().catch((err) => {
console.error('Error closing worker:', err);
})
)
);
// Close Redis connections
await redis.quit();
await redisSubscriber.quit();
console.log('Workers shut down');
process.exit(0);
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
Best Practices
- Idempotent jobs: Jobs should be safe to retry
- Unique job IDs: Prevent duplicate processing
- Set timeouts: Prevent stuck jobs
- Use progress updates: For long-running jobs
- Handle failures gracefully: Alert and log
- Clean up old jobs: Remove completed/failed jobs
- Graceful shutdown: Wait for jobs to complete
- Monitor queues: Use Bull Board or similar
Output Checklist
Every queue implementation should include:
- Redis connection with proper config
- Typed job data interfaces
- Queue with default options
- Worker with concurrency limits
- Retry and backoff configuration
- Event handlers for monitoring
- Error handling (retryable vs non-retryable)
- Graceful shutdown handling
- Bull Board or monitoring dashboard
- Scheduled/recurring jobs (if needed)
Score
Total Score
70/100
Based on repository quality metrics
✓SKILL.md
SKILL.mdファイルが含まれている
+20
✓LICENSE
ライセンスが設定されている
+10
✓説明文
100文字以上の説明がある
+10
○人気
GitHub Stars 100以上
0/15
✓最近の活動
1ヶ月以内に更新
+10
○フォーク
10回以上フォークされている
0/5
✓Issue管理
オープンIssueが50未満
+5
○言語
プログラミング言語が設定されている
0/5
✓タグ
1つ以上のタグが設定されている
+5
Reviews
💬
Reviews coming soon


