diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index d9f38b8235fc..a8ad3c6f78b7 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -24,7 +24,7 @@ services: DFLY_snapshot_cron: '* * * * *' DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4 diff --git a/.github/workflows/test-backend.yml b/.github/workflows/test-backend.yml index 26c35269970e..20390f444301 100644 --- a/.github/workflows/test-backend.yml +++ b/.github/workflows/test-backend.yml @@ -38,7 +38,7 @@ jobs: env: DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4 @@ -99,7 +99,7 @@ jobs: env: DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4 diff --git a/chart/templates/Deployment.yml b/chart/templates/Deployment.yml index 2d3c89e673b9..d45dfebcaa8e 100644 --- a/chart/templates/Deployment.yml +++ b/chart/templates/Deployment.yml @@ -44,8 +44,8 @@ spec: value: false - name: DFLY_tcp_backlog value: 2048 - - name: DFLY_default_lua_flags - value: allow-undeclared-keys + - name: DFLY_lock_on_hashtags + value: true - name: DFLY_pipeline_squash value: 0 - name: DFLY_multi_exec_squash diff --git a/docker-compose.local-db.yml b/docker-compose.local-db.yml index 25793bc4f852..db2c4a8b5c7a 100644 --- a/docker-compose.local-db.yml +++ b/docker-compose.local-db.yml @@ -12,7 +12,7 @@ services: DFLY_snapshot_cron: '* * * * *' DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4 diff --git a/docker-compose_example.yml b/docker-compose_example.yml index 379bc3d77f7c..eceda396f353 100644 --- a/docker-compose_example.yml +++ b/docker-compose_example.yml @@ -32,7 +32,7 @@ services: DFLY_snapshot_cron: '* * * * *' DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4 diff --git a/packages/backend/src/config.ts b/packages/backend/src/config.ts index 046d9fe1b3ea..65e58a5606d8 100644 --- a/packages/backend/src/config.ts +++ b/packages/backend/src/config.ts @@ -17,6 +17,7 @@ export type RedisOptionsSource = Partial & { pass: string; db?: number; prefix?: string; + queueNameSuffix?: string; }; /** diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index d89fff81ee19..815443dd5703 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -6,8 +6,8 @@ import { Inject, Module, OnApplicationShutdown } from '@nestjs/common'; import * as Bull from 'bullmq'; import { DI } from '@/di-symbols.js'; -import type { Config } from '@/config.js'; -import { QUEUE, baseQueueOptions } from '@/queue/const.js'; +import type { Config, RedisOptionsSource } from '@/config.js'; +import { QUEUE, baseQueueOptions, formatQueueName } from '@/queue/const.js'; import { allSettled } from '@/misc/promise-tracker.js'; import { Queues } from '@/misc/queues.js'; import type { Provider } from '@nestjs/common'; @@ -36,13 +36,13 @@ const $endedPollNotification: Provider = { const $deliver: Provider = { provide: 'queue:deliver', - useFactory: (config: Config) => new Queues(config.redisForDeliverQueues.map(queueConfig => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.DELIVER)))), + useFactory: (config: Config) => createQueues(QUEUE.DELIVER, config.redisForDeliverQueues, config.bullmqQueueOptions), inject: [DI.config], }; const $inbox: Provider = { provide: 'queue:inbox', - useFactory: (config: Config) => new Queues(config.redisForInboxQueues.map(queueConfig => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.INBOX)))), + useFactory: (config: Config) => createQueues(QUEUE.INBOX, config.redisForInboxQueues, config.bullmqQueueOptions), inject: [DI.config], }; @@ -54,7 +54,7 @@ const $db: Provider = { const $relationship: Provider = { provide: 'queue:relationship', - useFactory: (config: Config) => new Queues(config.redisForRelationshipQueues.map(queueConfig => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.RELATIONSHIP)))), + useFactory: (config: Config) => createQueues(QUEUE.RELATIONSHIP, config.redisForRelationshipQueues, config.bullmqQueueOptions), inject: [DI.config], }; @@ -70,6 +70,10 @@ const $webhookDeliver: Provider = { inject: [DI.config], }; +function createQueues(name: typeof QUEUE[keyof typeof QUEUE], config: RedisOptionsSource[], queueOptions: Partial): Queues { + return new Queues(config.map(queueConfig => new Bull.Queue(formatQueueName(queueConfig, name), baseQueueOptions(queueConfig, queueOptions, name)))); +} + @Module({ imports: [ ], diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index ac571db25902..1e68f750abcd 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -42,7 +42,7 @@ import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMu import { CleanProcessorService } from './processors/CleanProcessorService.js'; import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js'; import { QueueLoggerService } from './QueueLoggerService.js'; -import { QUEUE, baseWorkerOptions } from './const.js'; +import { QUEUE, baseWorkerOptions, formatQueueName } from './const.js'; // ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019 function httpRelatedBackoff(attemptsMade: number) { @@ -208,7 +208,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region deliver this.deliverQueueWorkers = this.config.redisForDeliverQueues .filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10)) - .map(config => new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), { + .map(config => new Bull.Worker(formatQueueName(config, QUEUE.DELIVER), (job) => this.deliverProcessorService.process(job), { ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER), autorun: false, concurrency: this.config.deliverJobConcurrency ?? 128, @@ -236,7 +236,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region inbox this.inboxQueueWorkers = this.config.redisForInboxQueues .filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10)) - .map(config => new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), { + .map(config => new Bull.Worker(formatQueueName(config, QUEUE.INBOX), (job) => this.inboxProcessorService.process(job), { ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.INBOX), autorun: false, concurrency: this.config.inboxJobConcurrency ?? 16, @@ -288,7 +288,7 @@ export class QueueProcessorService implements OnApplicationShutdown { //#region relationship this.relationshipQueueWorkers = this.config.redisForRelationshipQueues .filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10)) - .map(config => new Bull.Worker(QUEUE.RELATIONSHIP, (job) => { + .map(config => new Bull.Worker(formatQueueName(config, QUEUE.RELATIONSHIP), (job) => { switch (job.name) { case 'follow': return this.relationshipProcessorService.processFollow(job); case 'unfollow': return this.relationshipProcessorService.processUnfollow(job); diff --git a/packages/backend/src/queue/const.ts b/packages/backend/src/queue/const.ts index 06edef1a2003..0c64a6d3f8c0 100644 --- a/packages/backend/src/queue/const.ts +++ b/packages/backend/src/queue/const.ts @@ -18,7 +18,12 @@ export const QUEUE = { WEBHOOK_DELIVER: 'webhookDeliver', }; +export function formatQueueName(config: RedisOptionsSource, queueName: typeof QUEUE[keyof typeof QUEUE]): string { + return typeof config.queueNameSuffix === 'string' ? `${queueName}-${config.queueNameSuffix}` : queueName; +} + export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queueOptions: Partial, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.QueueOptions { + const name = formatQueueName(config, queueName); return { ...queueOptions, connection: { @@ -33,11 +38,12 @@ export function baseQueueOptions(config: RedisOptions & RedisOptionsSource, queu return 1; }, }, - prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`, + prefix: config.prefix ? `{${config.prefix}:queue:${name}}` : `{queue:${name}}`, }; } export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, workerOptions: Partial, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions { + const name = formatQueueName(config, queueName); return { ...workerOptions, connection: { @@ -52,6 +58,6 @@ export function baseWorkerOptions(config: RedisOptions & RedisOptionsSource, wor return 1; }, }, - prefix: config.prefix ? `${config.prefix}:queue:${queueName}` : `queue:${queueName}`, + prefix: config.prefix ? `{${config.prefix}:queue:${name}}` : `{queue:${name}}`, }; } diff --git a/packages/backend/src/server/web/ClientServerService.ts b/packages/backend/src/server/web/ClientServerService.ts index 9c8ee94d6ae0..eef7c389bc76 100644 --- a/packages/backend/src/server/web/ClientServerService.ts +++ b/packages/backend/src/server/web/ClientServerService.ts @@ -245,13 +245,13 @@ export class ClientServerService { queues: [ this.systemQueue, this.endedPollNotificationQueue, + ...this.deliverQueue.queues, + ...this.inboxQueue.queues, this.dbQueue, + ...this.relationshipQueue.queues, this.objectStorageQueue, this.webhookDeliverQueue, - ].map(q => new BullMQAdapter(q)) - .concat(this.deliverQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))) - .concat(this.inboxQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))) - .concat(this.relationshipQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))), + ].map(q => new BullMQAdapter(q)), serverAdapter, }); diff --git a/packages/backend/test/docker-compose.yml b/packages/backend/test/docker-compose.yml index 74659a952ae8..d54d61ae7420 100644 --- a/packages/backend/test/docker-compose.yml +++ b/packages/backend/test/docker-compose.yml @@ -8,7 +8,7 @@ services: environment: DFLY_version_check: false DFLY_tcp_backlog: 2048 - DFLY_default_lua_flags: allow-undeclared-keys + DFLY_lock_on_hashtags: true DFLY_pipeline_squash: 0 DFLY_multi_exec_squash: false DFLY_conn_io_threads: 4