From 1b470586a63b1f7619530e57c673fb73e397bb46 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 11 Dec 2024 14:01:02 +0100 Subject: [PATCH 01/12] Allow to specify number of concurrent consumers to create --- .../core/lib/queues/AbstractQueueService.ts | 11 +++- packages/sqs/lib/sqs/AbstractSqsConsumer.ts | 57 ++++++++++++------ .../consumers/SqsPermissionConsumer.spec.ts | 60 +++++++++++++++++++ .../test/consumers/SqsPermissionConsumer.ts | 8 ++- 4 files changed, 113 insertions(+), 23 deletions(-) diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index b5d4010a..fe19e41d 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -165,14 +165,22 @@ export abstract class AbstractQueueService< } protected logProcessedMessage( - _message: MessagePayloadSchemas | null, + message: MessagePayloadSchemas | null, processingResult: MessageProcessingResult, messageId?: string, ) { + const messageTimestamp = message ? this.tryToExtractTimestamp(message) : undefined + const messageProcessingMilliseconds = messageTimestamp ? Date.now() - messageTimestamp.getTime() : undefined + + // @ts-ignore + const messageType = (message && this.messageTypeField in message) ? message[this.messageTypeField] : undefined + this.logger.debug( { processingResult, messageId, + messageProcessingTime: messageProcessingMilliseconds, + messageType, }, `Finished processing message ${messageId ?? `(unknown id)`}`, ) @@ -206,7 +214,6 @@ export abstract class AbstractQueueService< if (this.logMessages) { // @ts-ignore const resolvedMessageId: string | undefined = message?.[this.messageIdField] ?? messageId - this.logProcessedMessage(message, processingResult, resolvedMessageId) } } diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index 0d02fdfc..e6a2b14e 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -63,6 +63,7 @@ export type SQSConsumerOptions< ConsumerOptions, 'sqs' | 'queueUrl' | 'handler' | 'handleMessageBatch' | 'visibilityTimeout' > + concurrentConsumersAmount?: number } export abstract class AbstractSqsConsumer< @@ -96,7 +97,8 @@ export abstract class AbstractSqsConsumer< > implements QueueConsumer { - private consumer?: Consumer + private consumers: Consumer[] + private readonly concurrentConsumersAmount: number private readonly transactionObservabilityManager?: TransactionObservabilityManager private readonly consumerOptionsOverride: Partial private readonly handlerContainer: HandlerContainer< @@ -129,7 +131,8 @@ export abstract class AbstractSqsConsumer< this.deadLetterQueueOptions = options.deadLetterQueue this.maxRetryDuration = options.maxRetryDuration ?? DEFAULT_MAX_RETRY_DURATION this.executionContext = executionContext - + this.consumers = [] + this.concurrentConsumersAmount = options.concurrentConsumersAmount ?? 1 this._messageSchemaContainer = this.resolveConsumerMessageSchemaContainer(options) this.handlerContainer = new HandlerContainer< MessagePayloadType, @@ -174,14 +177,39 @@ export abstract class AbstractSqsConsumer< public async start() { await this.init() - if (this.consumer) this.consumer.stop() + this.stopExistingConsumers() const visibilityTimeout = await this.getQueueVisibilityTimeout() - this.consumer = Consumer.create({ + try { + this.consumers = Array.from({ length: this.concurrentConsumersAmount }) + .map((_) => this.createConsumer({ visibilityTimeout })) + } catch (err) { + console.log(err) + console.log(this.consumers) + } + + + for (const consumer of this.consumers) { + consumer.on('error', (err) => { + this.handleError(err, { + queueName: this.queueName, + }) + }) + consumer.start() + } + } + + public override async close(abort?: boolean): Promise { + await super.close() + this.stopExistingConsumers(abort ?? false) + } + + private createConsumer(options: { visibilityTimeout: number | undefined }): Consumer { + return Consumer.create({ sqs: this.sqsClient, queueUrl: this.queueUrl, - visibilityTimeout, + visibilityTimeout: options.visibilityTimeout, messageAttributeNames: [`${PAYLOAD_OFFLOADING_ATTRIBUTE_PREFIX}*`], ...this.consumerOptionsOverride, handleMessage: async (message: SQSMessage) => { @@ -250,21 +278,14 @@ export abstract class AbstractSqsConsumer< return Promise.reject(result.error) }, }) - - this.consumer.on('error', (err) => { - this.handleError(err, { - queueName: this.queueName, - }) - }) - - this.consumer.start() } - public override async close(abort?: boolean): Promise { - await super.close() - this.consumer?.stop({ - abort: abort ?? false, - }) + private stopExistingConsumers(abort?: boolean) { + for (const consumer of this.consumers) { + consumer.stop({ + abort + }) + } } private async internalProcessMessage( diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 1fca15c4..efaa5d64 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -21,6 +21,7 @@ import { SINGLETON_CONFIG, registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' import { SqsPermissionConsumer } from './SqsPermissionConsumer' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from "./userConsumerSchemas"; describe('SqsPermissionConsumer', () => { describe('init', () => { @@ -596,6 +597,65 @@ describe('SqsPermissionConsumer', () => { }) }) + describe('multiple consumers', () => { + let diContainer: AwilixContainer + let sqsClient: SQSClient + + let publisher: SqsPermissionPublisher + let consumer: SqsPermissionConsumer + + beforeEach(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asFunction(dependencies => { + return new SqsPermissionConsumer(dependencies, { + creationConfig: { + queue: { + QueueName: SqsPermissionConsumer.QUEUE_NAME, + }, + }, + concurrentConsumersAmount: 10, + logMessages: true, + }) + }) + }) + sqsClient = diContainer.cradle.sqsClient + publisher = diContainer.cradle.permissionPublisher + consumer = diContainer.cradle.permissionConsumer + + await consumer.start() + + const command = new ReceiveMessageCommand({ + QueueUrl: publisher.queueProps.url, + }) + const reply = await sqsClient.send(command) + expect(reply.Messages).toBeUndefined() + }) + + afterEach(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('process all messages properly', async () => { + const messagesAmount = 100 + const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = Array.from({ length: messagesAmount }).map((_, i) => ({ + id: `${i}`, + messageType: 'add', + timestamp: new Date().toISOString(), + })) + + await Promise.all(messages.map(async (m) => { + await publisher.publish(m) + await consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed') + })) + + // Verifies that each message is executed only once + expect(consumer.addCounter).toBe(messagesAmount) + // Verifies that no message is lost + expect(consumer.processedMessagesIds).toHaveLength(messagesAmount) + }) + }) + describe('visibility timeout', () => { const queueName = 'myTestQueue' let diContainer: AwilixContainer diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.ts index 1385e8f5..11e43f2b 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.ts @@ -38,6 +38,7 @@ type SqsPermissionConsumerOptions = Pick< preHandlingOutputs: PreHandlingOutputs, ) => Promise> removePreHandlers?: Prehandler[] + concurrentConsumersAmount?: number } type ExecutionContext = { @@ -54,6 +55,7 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< > { public addCounter = 0 public removeCounter = 0 + public processedMessagesIds: Set = new Set() public static readonly QUEUE_NAME = 'user_permissions_multi' constructor( @@ -97,6 +99,7 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< consumerOverrides: options.consumerOverrides ?? { terminateVisibilityTimeout: true, // this allows to retry failed messages immediately }, + concurrentConsumersAmount: options.concurrentConsumersAmount, maxRetryDuration: options.maxRetryDuration, payloadStoreConfig: options.payloadStoreConfig, handlers: new MessageHandlerConfigBuilder< @@ -111,9 +114,8 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< return Promise.resolve({ error: 'retryLater' }) } this.addCounter += context.incrementAmount - return Promise.resolve({ - result: 'success', - }) + this.processedMessagesIds.add(_message.id) + return Promise.resolve({result: 'success'}) }, { preHandlerBarrier: options.addPreHandlerBarrier, From 6d51f4737889b34ae51059f36a037a172b5feb9b Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 11 Dec 2024 14:11:22 +0100 Subject: [PATCH 02/12] linting --- .../core/lib/queues/AbstractQueueService.ts | 11 ++++++-- packages/sqs/lib/sqs/AbstractSqsConsumer.ts | 13 +++------ .../consumers/SqsPermissionConsumer.spec.ts | 28 +++++++++++-------- .../test/consumers/SqsPermissionConsumer.ts | 2 +- 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index fe19e41d..8f55ac9c 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -170,10 +170,15 @@ export abstract class AbstractQueueService< messageId?: string, ) { const messageTimestamp = message ? this.tryToExtractTimestamp(message) : undefined - const messageProcessingMilliseconds = messageTimestamp ? Date.now() - messageTimestamp.getTime() : undefined + const messageProcessingMilliseconds = messageTimestamp + ? Date.now() - messageTimestamp.getTime() + : undefined - // @ts-ignore - const messageType = (message && this.messageTypeField in message) ? message[this.messageTypeField] : undefined + const messageType = + message && this.messageTypeField in message + ? // @ts-ignore + message[this.messageTypeField] + : undefined this.logger.debug( { diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index e6a2b14e..256efdce 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -181,14 +181,9 @@ export abstract class AbstractSqsConsumer< const visibilityTimeout = await this.getQueueVisibilityTimeout() - try { - this.consumers = Array.from({ length: this.concurrentConsumersAmount }) - .map((_) => this.createConsumer({ visibilityTimeout })) - } catch (err) { - console.log(err) - console.log(this.consumers) - } - + this.consumers = Array.from({ length: this.concurrentConsumersAmount }).map((_) => + this.createConsumer({ visibilityTimeout }), + ) for (const consumer of this.consumers) { consumer.on('error', (err) => { @@ -283,7 +278,7 @@ export abstract class AbstractSqsConsumer< private stopExistingConsumers(abort?: boolean) { for (const consumer of this.consumers) { consumer.stop({ - abort + abort, }) } } diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index efaa5d64..1a47b17e 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -21,7 +21,7 @@ import { SINGLETON_CONFIG, registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' import { SqsPermissionConsumer } from './SqsPermissionConsumer' -import type { PERMISSIONS_ADD_MESSAGE_TYPE } from "./userConsumerSchemas"; +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas' describe('SqsPermissionConsumer', () => { describe('init', () => { @@ -606,7 +606,7 @@ describe('SqsPermissionConsumer', () => { beforeEach(async () => { diContainer = await registerDependencies({ - permissionConsumer: asFunction(dependencies => { + permissionConsumer: asFunction((dependencies) => { return new SqsPermissionConsumer(dependencies, { creationConfig: { queue: { @@ -616,7 +616,7 @@ describe('SqsPermissionConsumer', () => { concurrentConsumersAmount: 10, logMessages: true, }) - }) + }), }) sqsClient = diContainer.cradle.sqsClient publisher = diContainer.cradle.permissionPublisher @@ -638,16 +638,20 @@ describe('SqsPermissionConsumer', () => { it('process all messages properly', async () => { const messagesAmount = 100 - const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = Array.from({ length: messagesAmount }).map((_, i) => ({ - id: `${i}`, - messageType: 'add', - timestamp: new Date().toISOString(), - })) + const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = Array.from({ length: messagesAmount }).map( + (_, i) => ({ + id: `${i}`, + messageType: 'add', + timestamp: new Date().toISOString(), + }), + ) - await Promise.all(messages.map(async (m) => { - await publisher.publish(m) - await consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed') - })) + await Promise.all( + messages.map(async (m) => { + await publisher.publish(m) + await consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed') + }), + ) // Verifies that each message is executed only once expect(consumer.addCounter).toBe(messagesAmount) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.ts index 11e43f2b..6d926c9a 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.ts @@ -115,7 +115,7 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< } this.addCounter += context.incrementAmount this.processedMessagesIds.add(_message.id) - return Promise.resolve({result: 'success'}) + return Promise.resolve({ result: 'success' }) }, { preHandlerBarrier: options.addPreHandlerBarrier, From e9243d8a6bdd02dfdffbf0e29bef88446d105c07 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 11 Dec 2024 14:32:35 +0100 Subject: [PATCH 03/12] improved test flow, added concurrentConsumersAmount parameter to README --- README.md | 1 + .../sqs/test/consumers/SqsPermissionConsumer.spec.ts | 11 +++-------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index fd82b538..4d473877 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ Multi-schema consumers support multiple message types via handler configs. They * `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information); * `logMessages` - add logs for processed messages. * `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading). + * `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS consumers * `init()`, prepare consumer for use (e. g. establish all necessary connections); * `close()`, stop listening for messages and disconnect; * `start()`, which invokes `init()`. diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 1a47b17e..b84512a6 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -613,8 +613,7 @@ describe('SqsPermissionConsumer', () => { QueueName: SqsPermissionConsumer.QUEUE_NAME, }, }, - concurrentConsumersAmount: 10, - logMessages: true, + concurrentConsumersAmount: 5, }) }), }) @@ -646,12 +645,8 @@ describe('SqsPermissionConsumer', () => { }), ) - await Promise.all( - messages.map(async (m) => { - await publisher.publish(m) - await consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed') - }), - ) + messages.map(m => publisher.publish(m)) + await Promise.all(messages.map((m) => consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed'))) // Verifies that each message is executed only once expect(consumer.addCounter).toBe(messagesAmount) From 17aefd6821521d4a62f41020826a695044b15636 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 11 Dec 2024 14:33:54 +0100 Subject: [PATCH 04/12] tests fix --- packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts index 469d2eb2..4ca37367 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts @@ -91,7 +91,7 @@ describe('AmqpPermissionConsumer', () => { await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed') expect(logger.loggedMessages.length).toBe(5) - expect(logger.loggedMessages).toEqual([ + expect(logger.loggedMessages).toMatchObject([ 'Propagating new connection across 0 receivers', { id: '1', From fecc054eac6af58e6682f122c32f78433c9894c4 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 11 Dec 2024 14:45:22 +0100 Subject: [PATCH 05/12] linting --- packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index b84512a6..90462946 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -645,8 +645,10 @@ describe('SqsPermissionConsumer', () => { }), ) - messages.map(m => publisher.publish(m)) - await Promise.all(messages.map((m) => consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed'))) + messages.map((m) => publisher.publish(m)) + await Promise.all( + messages.map((m) => consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed')), + ) // Verifies that each message is executed only once expect(consumer.addCounter).toBe(messagesAmount) From dc4c756b21a2d9696926adb2584ac3c2b5ce69d9 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 11 Dec 2024 16:09:32 +0100 Subject: [PATCH 06/12] concurrent consumers test for SnsSqsPermissionConsumer --- .../SnsSqsPermissionConsumer.spec.ts | 64 ++++++++++++++++++- .../consumers/SnsSqsPermissionConsumer.ts | 4 ++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index 6fabbcf3..4771febf 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -3,8 +3,8 @@ import { ListTagsForResourceCommand, type SNSClient } from '@aws-sdk/client-sns' import { ListQueueTagsCommand, type SQSClient } from '@aws-sdk/client-sqs' import { waitAndRetry } from '@lokalise/node-core' import { assertQueue, deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs' -import { type AwilixContainer, asValue } from 'awilix' -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' +import { type AwilixContainer, asFunction, asValue } from 'awilix' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils' import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher' @@ -12,6 +12,7 @@ import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' import type { STSClient } from '@aws-sdk/client-sts' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from '@message-queue-toolkit/sqs/dist/test/consumers/userConsumerSchemas' import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' describe('SnsSqsPermissionConsumer', () => { @@ -706,6 +707,65 @@ describe('SnsSqsPermissionConsumer', () => { }) }) + describe('multiple consumers', () => { + let diContainer: AwilixContainer + + let publisher: SnsPermissionPublisher + let consumer: SnsSqsPermissionConsumer + + beforeEach(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asFunction((dependencies) => { + return new SnsSqsPermissionConsumer(dependencies, { + creationConfig: { + topic: { + Name: SnsSqsPermissionConsumer.SUBSCRIBED_TOPIC_NAME, + }, + queue: { + QueueName: SnsSqsPermissionConsumer.CONSUMED_QUEUE_NAME, + }, + updateAttributesIfExists: true, + }, + deletionConfig: { + deleteIfExists: true, + }, + concurrentConsumersAmount: 10, + }) + }), + }) + publisher = diContainer.cradle.permissionPublisher + consumer = diContainer.cradle.permissionConsumer + + await consumer.start() + }) + + afterEach(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('process all messages properly', async () => { + const messagesAmount = 50 + const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = Array.from({ length: messagesAmount }).map( + (_, i) => ({ + id: `${i}`, + messageType: 'add', + timestamp: new Date().toISOString(), + }), + ) + + messages.map((m) => publisher.publish(m)) + await Promise.all( + messages.map((m) => consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed')), + ) + + // Verifies that each message is executed only once + expect(consumer.addCounter).toBe(messagesAmount) + // Verifies that no message is lost + expect(consumer.processedMessagesIds).toHaveLength(messagesAmount) + }) + }) + describe('visibility timeout', () => { const topicName = 'myTestTopic' const queueName = 'myTestQueue' diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts index 3a24cc51..4487ffb4 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts @@ -39,6 +39,7 @@ type SnsSqsPermissionConsumerOptions = Pick< | 'consumerOverrides' | 'maxRetryDuration' | 'payloadStoreConfig' + | 'concurrentConsumersAmount' > & { addPreHandlerBarrier?: ( message: SupportedMessages, @@ -65,6 +66,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< public addBarrierCounter = 0 public removeCounter = 0 public preHandlerCounter = 0 + public processedMessagesIds: Set = new Set() constructor( dependencies: SNSSQSConsumerDependencies, @@ -101,6 +103,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< PERMISSIONS_ADD_MESSAGE_SCHEMA, (_message, context, _preHandlingOutputs) => { this.addCounter += context.incrementAmount + this.processedMessagesIds.add(_message.id) return Promise.resolve({ result: 'success' }) }, { @@ -164,6 +167,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< updateAttributesIfExists: false, }, maxRetryDuration: options.maxRetryDuration, + concurrentConsumersAmount: options.concurrentConsumersAmount, }, { incrementAmount: 1, From f577fd45ba15aee97d21a678eecc2763e24bf1e8 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 11 Dec 2024 16:29:01 +0100 Subject: [PATCH 07/12] Update README.md Co-authored-by: Igor Savin --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4d473877..c917cc60 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ Multi-schema consumers support multiple message types via handler configs. They * `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information); * `logMessages` - add logs for processed messages. * `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading). - * `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS consumers + * `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS and SNS consumers * `init()`, prepare consumer for use (e. g. establish all necessary connections); * `close()`, stop listening for messages and disconnect; * `start()`, which invokes `init()`. From d34abc91a789596d1c6d27c82bbabe57b3aeee0e Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 11 Dec 2024 16:31:35 +0100 Subject: [PATCH 08/12] Prepare to release @message-queue-toolkit/core 17.2.3 --- packages/core/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/package.json b/packages/core/package.json index 3ff17bb1..642a35a0 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/core", - "version": "17.2.1", + "version": "17.2.3", "private": false, "license": "MIT", "description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently", From bd686bcaf3a17676f52572333c4093803ff82dce Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 11 Dec 2024 16:32:17 +0100 Subject: [PATCH 09/12] Prepare to release @message-queue-toolkit/sqs 17.3.0 --- packages/sqs/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sqs/package.json b/packages/sqs/package.json index 6f9eb69e..8f8bea5d 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sqs", - "version": "17.2.0", + "version": "17.3.0", "private": false, "license": "MIT", "description": "SQS adapter for message-queue-toolkit", From 1338b01b09a50435686a43b234ce975a3c1c9e01 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 11 Dec 2024 16:38:49 +0100 Subject: [PATCH 10/12] Fixed import --- packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index 4771febf..1200dbcf 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -12,8 +12,8 @@ import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' import type { STSClient } from '@aws-sdk/client-sts' -import type { PERMISSIONS_ADD_MESSAGE_TYPE } from '@message-queue-toolkit/sqs/dist/test/consumers/userConsumerSchemas' import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas' describe('SnsSqsPermissionConsumer', () => { describe('init', () => { From 66f32643d42c0a36076264af499360c85a5a59e3 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 12 Dec 2024 14:02:42 +0100 Subject: [PATCH 11/12] Prepare to release @message-queue-toolkit/sns 18.1.1 --- packages/sns/package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/sns/package.json b/packages/sns/package.json index 5f428cb5..0e3c0db5 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sns", - "version": "18.0.1", + "version": "18.1.1", "private": false, "license": "MIT", "description": "SNS adapter for message-queue-toolkit", @@ -35,7 +35,7 @@ "@aws-sdk/client-sts": "^3.632.0", "@message-queue-toolkit/core": ">=15.0.0", "@message-queue-toolkit/schemas": ">=2.0.0", - "@message-queue-toolkit/sqs": "^17.0.0" + "@message-queue-toolkit/sqs": "^17.3.0" }, "devDependencies": { "@aws-sdk/client-s3": "^3.670.0", From 3eacb3499506e8e9a0aa95246a0d01e2f4fdc003 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 12 Dec 2024 16:00:04 +0100 Subject: [PATCH 12/12] Updated sns package version --- packages/sns/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sns/package.json b/packages/sns/package.json index 0e3c0db5..b9f14a1d 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sns", - "version": "18.1.1", + "version": "18.1.0", "private": false, "license": "MIT", "description": "SNS adapter for message-queue-toolkit",