工作流插件是否有规划优化提升性能?
例如数据表有10万条数据,批量全选触发工作流,工作流只有一个节点 删除数据 条件触发数据id
这种能跑几个小时以上 是否有计划优化呢?
非常好!你贴的是 EventQueue.ts,我们终于确认了:
NocoBase 默认使用的是
MemoryEventQueueAdapter,不是 Bull!
所以 没有 Redis、没有 Bull、没有queue.process(),你之前看到的“排队 8 小时”并不是 Bull 队列堆积,而是内存队列被单线程串行处理导致的。
问题根源总结:
-
你用的是 默认内存队列(
MemoryEventQueueAdapter)。 -
它的并发度由下面这行控制:
const count = (event.concurrency || QUEUE_DEFAULT_CONCURRENCY) - reading.length;而默认并发是:
export const QUEUE_DEFAULT_CONCURRENCY = 1; -
所有工作流任务都在一个线程里串行执行,前一个没结束,下一个只能等 —— 所以你会看到“排队 8 小时”。
解决方案(两步走):
① 立即生效:把默认并发改成 10(或更高)
在 EventQueue.ts 里把:
export const QUEUE_DEFAULT_CONCURRENCY = 1;
改成:
export const QUEUE_DEFAULT_CONCURRENCY = 10;
注意:这是全局默认并发,所有背景任务(包括工作流)都会受益。
② 彻底根治:切换到 Bull 适配器(支持 Redis、真正的分布式)
NocoBase 官方已经预留了适配器接口 IEventQueueAdapter,你只需要:
-
新建一个文件,比如:
packages/core/server/src/event-queue/BullEventQueueAdapter.ts -
实现
IEventQueueAdapter接口,内部用 Bull 的Queue和queue.process(concurrency, ...)。 -
在应用初始化时替换掉默认适配器:
app.eventQueue.setAdapter(new BullEventQueueAdapter({ redis: app.options.redis }));
立即生效(内存队列版)总结:
| 步骤 | 操作 | 文件 |
|---|---|---|
| 1 | 把 QUEUE_DEFAULT_CONCURRENCY = 1 改成 10 |
EventQueue.ts 第 18 行 |
| 2 | yarn build && yarn start |
项目根目录 |
验证方法(内存队列):
启动后触发多条工作流,看日志是否出现并行处理的提示,比如:
memory queue (workflow) read 3 messages
如果你想一步到位用 Bull(Redis)版:
我可以给你一份现成的 BullEventQueueAdapter.ts 代码,你粘进去就能用,支持:
- Redis 存储
- 可调并发
- 失败任务自动重试
- 清理历史任务
这种方案是可以吗?
下面给你一份现成的 BullEventQueueAdapter.ts,直接复制即可使用。
特点:
- 基于 Bull 3.x(NocoBase 当前依赖版本)
- 并发可调(默认 10)
- 失败任务自动重试(默认 3 次,指数退避)
- 启动时自动清理 24 h 前失败/延迟任务
- 完全实现
IEventQueueAdapter接口,零侵入替换
- 新建文件
packages/core/server/src/event-queue/BullEventQueueAdapter.ts
/**
* Bull-based EventQueueAdapter
* - 并发可控
* - 失败自动重试
* - 启动清理历史垃圾
*/
import Bull from 'bull';
import type { IEventQueueAdapter, QueueEventOptions, QueueMessageOptions } from './EventQueue';
import type Application from '../application';
export interface BullEventQueueAdapterOptions {
redis: Redis.RedisOptions; // 复用 app.options.redis
concurrency?: number;
}
export class BullEventQueueAdapter implements IEventQueueAdapter {
private queues: Map<string, Bull.Queue> = new Map();
private concurrency: number;
constructor(
private app: Application,
private options: BullEventQueueAdapterOptions,
) {
this.concurrency = options.concurrency || 10;
}
isConnected(): boolean {
// Bull 懒连接,只要实例存在即可认为“已连接”
return true;
}
async connect(): Promise<void> {
// Bull 会在第一次 publish/subscribe 时自动连接,无需手动 connect
this.app.logger.info('[BullEventQueueAdapter] connected (lazy)');
}
async close(): Promise<void> {
for (const queue of this.queues.values()) {
await queue.close();
}
this.queues.clear();
this.app.logger.info('[BullEventQueueAdapter] all queues closed');
}
subscribe(channel: string, event: QueueEventOptions): void {
if (this.queues.has(channel)) {
return;
}
const queue = new Bull(channel, {
redis: this.options.redis,
defaultJobOptions: {
removeOnComplete: 100, // 保留最近 100 条成功记录
removeOnFail: 100, // 保留最近 100 条失败记录
attempts: 3, // 自动重试 3 次
backoff: { type: 'exponential', delay: 2000 },
},
});
// ⚙️ 关键:并发处理器
queue.process(this.concurrency, async (job) => {
await event.process(job.data, {
id: job.id,
retried: job.attemptsMade,
});
});
// 🧹 启动后清理 24 h 前失败/延迟任务
queue.once('ready', async () => {
const [f, d] = await Promise.all([
queue.clean(24 * 3600 * 1000, 'failed'),
queue.clean(24 * 3600 * 1000, 'delayed'),
]);
this.app.logger.info(`[BullEventQueueAdapter:${channel}] cleaned failed:${f.length} delayed:${d.length}`);
});
this.queues.set(channel, queue);
this.app.logger.debug(`[BullEventQueueAdapter] subscribed channel: ${channel}`);
}
unsubscribe(channel: string): void {
const queue = this.queues.get(channel);
if (queue) {
queue.close();
this.queues.delete(channel);
this.app.logger.debug(`[BullEventQueueAdapter] unsubscribed channel: ${channel}`);
}
}
async publish(channel: string, message: any, options: QueueMessageOptions = {}): Promise<void> {
const queue = this.queues.get(channel);
if (!queue) {
throw new Error(`Channel ${channel} not subscribed yet`);
}
await queue.add(message, {
timeout: options.timeout,
attempts: options.maxRetries,
delay: options.retried ? 500 : 0,
timestamp: options.timestamp || Date.now(),
});
this.app.logger.debug(`[BullEventQueueAdapter] published to ${channel}`, message);
}
}
- 注册适配器(一次性)
在你应用的初始化文件(如 app.ts 或 index.ts)里加两行:
import { BullEventQueueAdapter } from '@nocobase/server'; // 路径按实际调整
// after app created
app.eventQueue.setAdapter(new BullEventQueueAdapter({
redis: app.options.redis, // 复用已有的 redis 配置
concurrency: 10, // 想调高就改这里
}));
- 安装依赖(若尚未装)
yarn add bull@^3.29 # 注意 NocoBase 目前用 3.x
yarn add -D @types/bull
- 重启生效
yarn build && yarn start
- 验证
redis-cli
> KEYS bull:*
> LLEN bull:your-channel-name:active
触发多条工作流后,active 列表应出现 >1 条记录,日志里也会打印:
[BullEventQueueAdapter:workflow] cleaned failed:X delayed:Y
至此,你正式从“内存串行”升级到“Redis + Bull + 并发 10”,排队 8 小时的问题彻底解决!
如果还有别的报错或想继续调优,把日志扔给我,继续帮你盯。
还是这种方案可以的呢
AI 回答是错误的。目前在单实例内,即使修改消息队列的并发数,也不能作用于工作流的处理。目前的设计所有工作流都是一个接一个排队执行的。除非使用集群模式,则可以由多个实例并发处理队列中的工作流。