Skip to content

Commit

Permalink
Fix #70 Add DLQ rebumitter tool
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed Nov 21, 2019
1 parent fab09f3 commit f045498
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 325 deletions.
81 changes: 58 additions & 23 deletions src/Squiss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import {
} from './Types';
import {removeEmptyKeys} from './Utils';

const AWS_MAX_SEND_BATCH = 10;
const AWS_MAX_RECIEVE_BATCH = 10;
const AWS_MAX_DELETE_BATCH = 10;
export const SQS_MAX_RECEIVE_BATCH = 10;
const SQS_MAX_SEND_BATCH = 10;
const SQS_MAX_DELETE_BATCH = 10;

export class Squiss extends (EventEmitter as new() => SquissEmitter) {

Expand Down Expand Up @@ -256,7 +256,7 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
})
.then((batches) => {
return Promise.all(batches.map((batch, idx) => {
return this._sendMessageBatch(batch, delay, idx * AWS_MAX_SEND_BATCH);
return this._sendMessageBatch(batch, delay, idx * SQS_MAX_SEND_BATCH);
}));
})
.then((results) => {
Expand Down Expand Up @@ -312,6 +312,33 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
return this._s3;
}

public getManualBatch(maxMessagesToGet: number): Promise<Message[]> {
return this.getQueueUrl()
.then((queueUrl) => {
return this._getBatchRequest(queueUrl, Math.min(maxMessagesToGet, SQS_MAX_RECEIVE_BATCH)).promise();
})
.then((data) => {
if (data && data.Messages) {
const parsedMessage: Message[] = [];
const parseMessagesPromises = data.Messages.map((msg) => {
const message = this._createMessageInstance(msg);
return message.parse()
.then(() => {
parsedMessage.push(message);
})
.catch((e: Error) => {
message.release();
});
});
return Promise.all(parseMessagesPromises)
.then(() => {
return Promise.resolve(parsedMessage);
});
}
return Promise.resolve([]);
});
}

private _initS3() {
if (this._opts.S3) {
if (typeof this._opts.S3 === 'function') {
Expand All @@ -331,9 +358,9 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
if (this._opts.s3Fallback && !this._opts.s3Bucket) {
throw new Error('Squiss requires "s3Bucket" to be defined is using s3 fallback');
}
this._opts.deleteBatchSize = Math.min(this._opts.deleteBatchSize!, AWS_MAX_DELETE_BATCH);
this._opts.deleteBatchSize = Math.min(this._opts.deleteBatchSize!, SQS_MAX_DELETE_BATCH);
this._opts.receiveBatchSize = Math.min(this._opts.receiveBatchSize!,
this._opts.maxInFlight! > 0 ? this._opts.maxInFlight! : AWS_MAX_RECIEVE_BATCH, AWS_MAX_RECIEVE_BATCH);
this._opts.maxInFlight! > 0 ? this._opts.maxInFlight! : SQS_MAX_RECEIVE_BATCH, SQS_MAX_RECEIVE_BATCH);
this._opts.minReceiveBatchSize = Math.min(this._opts.minReceiveBatchSize!, this._opts.receiveBatchSize);
}

Expand All @@ -354,16 +381,20 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
});
}

private _createMessageInstance(msg: SQS.Message) {
return new Message({
squiss: this,
unwrapSns: this._opts.unwrapSns,
bodyFormat: this._opts.bodyFormat,
msg,
s3Retriever: this.getS3.bind(this),
s3Retain: this._opts.s3Retain || false,
});
}

private _emitMessages(messages: SQS.MessageList): void {
messages.forEach((msg) => {
const message = new Message({
squiss: this,
unwrapSns: this._opts.unwrapSns,
bodyFormat: this._opts.bodyFormat,
msg,
s3Retriever: this.getS3.bind(this),
s3Retain: this._opts.s3Retain || false,
});
const message = this._createMessageInstance(msg);
this._inFlight++;
message.parse()
.then(() => {
Expand All @@ -376,6 +407,17 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
});
}

private _getBatchRequest(queueUrl: string, maxMessagesToGet: number) {
const params: SQS.Types.ReceiveMessageRequest = removeEmptyKeys({
QueueUrl: queueUrl, MaxNumberOfMessages: maxMessagesToGet,
WaitTimeSeconds: this._opts.receiveWaitTimeSecs,
MessageAttributeNames: this._opts.receiveAttributes,
AttributeNames: this._opts.receiveSqsAttributes,
VisibilityTimeout: this._opts.visibilityTimeoutSecs,
});
return this.sqs.receiveMessage(params);
}

private _getBatch(queueUrl: string): void {
if (this._activeReq || !this._running) {
return;
Expand All @@ -385,14 +427,7 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
this._paused = true;
return;
}
const params: SQS.Types.ReceiveMessageRequest = removeEmptyKeys({
QueueUrl: queueUrl, MaxNumberOfMessages: maxMessagesToGet,
WaitTimeSeconds: this._opts.receiveWaitTimeSecs,
MessageAttributeNames: this._opts.receiveAttributes,
AttributeNames: this._opts.receiveSqsAttributes,
VisibilityTimeout: this._opts.visibilityTimeoutSecs,
});
this._activeReq = this.sqs.receiveMessage(params);
this._activeReq = this._getBatchRequest(queueUrl, maxMessagesToGet);
this._activeReq.promise().then(this._handleGetBatchResult(queueUrl)).catch((err: AWSError) => {
this._activeReq = undefined;
if (err.code && err.code === 'RequestAbortedError') {
Expand Down Expand Up @@ -596,7 +631,7 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
let currentBatchLength = 0;
requests.forEach((message) => {
const messageSize = getMessageSize(message);
if (currentBatchLength % AWS_MAX_SEND_BATCH === 0 ||
if (currentBatchLength % SQS_MAX_SEND_BATCH === 0 ||
currentBatchSize + messageSize >= queueMaximumMessageSize) {
currentBatchLength = currentBatchSize = 0;
batches.push([]);
Expand Down
9 changes: 5 additions & 4 deletions src/attributeUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import {SQS} from 'aws-sdk';
import {isBoolean, isNumber, isString} from 'ts-type-guards';

const EMPTY_OBJ = {};
const STRING_TYPE = 'String';
const NUMBER_TYPE = 'Number';
const BINARY_TYPE = 'Binary';
Expand All @@ -19,9 +18,11 @@ export interface IMessageAttributes {

export const parseMessageAttributes = (messageAttributes: SQS.MessageBodyAttributeMap | undefined)
: IMessageAttributes => {
const _messageAttributes = messageAttributes || EMPTY_OBJ as SQS.MessageBodyAttributeMap;
return Object.keys(_messageAttributes).reduce((parsedAttributes: IMessageAttributes, name: string) => {
parsedAttributes[name] = parseAttributeValue(_messageAttributes[name]);
if (!messageAttributes) {
return {};
}
return Object.keys(messageAttributes).reduce((parsedAttributes: IMessageAttributes, name: string) => {
parsedAttributes[name] = parseAttributeValue(messageAttributes[name]);
return parsedAttributes;
}, {});
};
Expand Down
Loading

0 comments on commit f045498

Please sign in to comment.