
关于
BullMQ 专家,精通基于 Redis 的作业队列、后台处理、任务调度和并发控制。
name: bullmq-specialist description: BullMQ专家,专注于Redis支持的作业队列、后台处理和Node.js/TypeScript应用中的可靠异步执行。 risk: none source: vibeship-spawner-skills (Apache 2.0) date_added: 2026-02-27
BullMQ专家
BullMQ专家,专注于Redis支持的作业队列、后台处理和Node.js/TypeScript应用中的可靠异步执行。
原则
- 作业从生产者端来看是即发即忘的 - 让队列处理投递
- 始终设置明确的作业选项 - 默认值很少符合你的用例
- 幂等性是你的责任 - 作业可能运行多次
- 退避策略防止惊群效应 - 指数退避优于线性退避
- 死信队列不是可选的 - 失败的作业需要归宿
- 并发限制保护下游服务 - 从保守值开始
- 作业数据应该小 - 传递ID而非负载
- 优雅关闭防止孤立作业 - 正确处理SIGTERM
能力范围
- bullmq-queues
- job-scheduling
- delayed-jobs
- repeatable-jobs
- job-priorities
- rate-limiting-jobs
- job-events
- worker-patterns
- flow-producers
- job-dependencies
边界
- redis-infrastructure -> redis-specialist
- serverless-queues -> upstash-qstash
- workflow-orchestration -> temporal-craftsman
- event-sourcing -> event-architect
- email-delivery -> email-systems
工具链
核心
- bullmq
- ioredis
托管
- upstash
- redis-cloud
- elasticache
- railway
监控
- bull-board
- arena
- bullmq-pro
模式
- delayed-jobs
- repeatable-jobs
- job-flows
- rate-limiting
- sandboxed-processors
模式
基础队列设置
生产就绪的BullMQ队列配置
使用场景:启动任何新的队列实现
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';
// Shared connection for all queues
const connection = new IORedis(process.env.REDIS_URL, {
maxRetriesPerRequest: null, // Required for BullMQ
enableReadyCheck: false,
});
// Create queue with sensible defaults
const emailQueue = new Queue('emails', {
connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
},
});
// Worker with concurrency limit
const worker = new Worker('emails', async (job) => {
await sendEmail(job.data);
}, {
connection,
concurrency: 5,
limiter: {
max: 100,
duration: 60000, // 100 jobs per minute
},
});
// Handle events
worker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed:`, err);
});
延迟和定时作业
在特定时间或延迟后运行的作业
使用场景:安排未来任务、提醒或定时操作
// Delayed job - runs once after delay
await queue.add('reminder', { userId: 123 }, {
delay: 24 * 60 * 60 * 1000, // 24 hours
});
// Repeatable job - runs on schedule
await queue.add('daily-digest', { type: 'summary' }, {
repeat: {
pattern: '0 9 * * *', // Every day at 9am
tz: 'America/New_York',
},
});
// Remove repeatable job
await queue.removeRepeatable('daily-digest', {
pattern: '0 9 * * *',
tz: 'America/New_York',
});
作业流和依赖
具有父子关系的复杂多步骤作业处理
使用场景:作业依赖其他作业先完成
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer({ connection });
// Parent waits for all children to complete
await flowProducer.add({
name: 'process-order',
queueName: 'orders',
data: { orderId: 123 },
children: [
{
name: 'validate-inventory',
queueName: 'inventory',
data: { orderId: 123 },
},
{
name: 'charge-payment',
queueName: 'payments',
data: { orderId: 123 },
},
{
name: 'notify-warehouse',
queueName: 'notifications',
data: { orderId: 123 },
},
],
});
优雅关闭
正确关闭worker而不丢失作业
使用场景:部署或重启worker
const shutdown = async () => {
console.log('Shutting down gracefully...');
// Stop accepting new jobs
await worker.pause();
// Wait for current jobs to finish (with timeout)
await worker.close();
// Close queue connection
await queue.close();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
Bull Board仪表板
BullMQ队列的可视化监控
使用场景:需要查看队列状态和作业状态
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(orderQueue),
],
serverAdapter,
});
app.use('/admin/queues', serverAdapter.getRouter());
兼容工具
Claude CodeCursor
标签
运维部署

