Skip to content

Commit 2143e09

Browse files
committed
Fix #70 Add DLQ rebumitter tool
1 parent f045498 commit 2143e09

File tree

2 files changed

+137
-167
lines changed

2 files changed

+137
-167
lines changed

src/Types.ts

+10
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,13 @@ export interface ISquissEvents {
144144
}
145145

146146
export type SquissEmitter = StrictEventEmitter<EventEmitter, ISquissEvents>;
147+
148+
export type ResubmitterMutator = (body: any) => any;
149+
150+
export interface ResubmitterConfig {
151+
readonly resubmitFromQueueConfig: ISquissOptions;
152+
readonly resubmitToQueueConfig: ISquissOptions;
153+
readonly limit: number;
154+
readonly customMutator?: ResubmitterMutator;
155+
readonly releaseTimeoutSeconds: number;
156+
}

src/resubmitter/resubmitter.ts

+127-167
Original file line numberDiff line numberDiff line change
@@ -1,167 +1,127 @@
1-
// 'use strict';
2-
//
3-
// import {SQS} from 'aws-sdk';
4-
// import {SQS_MAX_RECEIVE_BATCH, Squiss} from '../Squiss';
5-
// import {ISquissOptions} from '../Types';
6-
//
7-
// const DEFAULT_SQUISS_OPTS = {
8-
// receiveAttributes: ['All'],
9-
// receiveSqsAttributes: ['All'],
10-
// minReceiveBatchSize: 0,
11-
// };
12-
//
13-
// export type Mutator = (body: any) => any;
14-
//
15-
// export interface ResubmitConfig {
16-
// readonly resubmitFromQueueConfig: ISquissOptions;
17-
// readonly resubmitToQueueConfig: ISquissOptions;
18-
// readonly limit: number;
19-
// readonly customMutator?: Mutator;
20-
// readonly releaseTimeoutSeconds: number;
21-
// }
22-
//
23-
// export const resubmit = (config: ResubmitConfig) => {
24-
// const runContext = buildRunContext(config);
25-
// const handledMessages = new Set<string>();
26-
// return iteration({
27-
// handledMessages,
28-
// numHandledMessages: 0,
29-
// runContext,
30-
// limit: config.limit,
31-
// customMutator: config.customMutator,
32-
// });
33-
// };
34-
//
35-
// const iteration = (context: IterationContext): Promise<void> => {
36-
// if (context.numHandledMessages >= context.limit || context.limit <= 0) {
37-
// return Promise.resolve();
38-
// }
39-
// const remaining = Math.max(context.limit - context.numHandledMessages, 0);
40-
// const numberOfMessageToRead = Math.min(SQS_MAX_RECEIVE_BATCH, remaining);
41-
// if (numberOfMessageToRead <= 0) {
42-
// return Promise.resolve();
43-
// }
44-
// return readMessages(numberOfMessageToRead, context.runContext)
45-
// .then((messages) => {
46-
// if (!messages.length) {
47-
// // Make sure the iteration stops
48-
// context.numHandledMessages = context.limit;
49-
// return Promise.resolve();
50-
// }
51-
// const promises = messages.map((message) => {
52-
// const msgContext: MessageContext = {
53-
// ...context.runContext,
54-
// message,
55-
// };
56-
// return Promise.resolve().then(() => {
57-
// console.log(`${++context.numHandledMessages} messages handled`);
58-
// if (context.numHandledMessages > context.limit) {
59-
// return releaseMessage(msgContext);
60-
// }
61-
// const location = message.MessageId ?? '';
62-
// if (context.handledMessages.has(location)) {
63-
// return releaseMessage(msgContext);
64-
// }
65-
// context.handledMessages.add(location);
66-
// return handleMessage(context.customMutator, msgContext)
67-
// .catch((err) => {
68-
// releaseMessage(msgContext);
69-
// return Promise.reject(err);
70-
// });
71-
// });
72-
// });
73-
// return Promise.all(promises).then(() => {
74-
// return Promise.resolve();
75-
// });
76-
// })
77-
// .then(() => {
78-
// return iteration(context);
79-
// });
80-
// };
81-
//
82-
// interface IterationContext {
83-
// readonly limit: number;
84-
// readonly runContext: RunContext;
85-
// numHandledMessages: number;
86-
// readonly customMutator?: Mutator;
87-
// readonly handledMessages: Set<string>;
88-
// }
89-
//
90-
// interface RunContext {
91-
// readonly squissFrom: Squiss;
92-
// readonly squissTo: Squiss;
93-
// readonly releaseTimeoutSeconds: number;
94-
// }
95-
//
96-
// interface MessageContext extends RunContext {
97-
// readonly message: SQS.Message;
98-
// }
99-
//
100-
// const buildRunContext = (config: ResubmitConfig): RunContext => {
101-
// const squissFrom = new Squiss({
102-
// ...config.resubmitFromQueueConfig,
103-
// ...DEFAULT_SQUISS_OPTS,
104-
// });
105-
// const squissTo = new Squiss({
106-
// ...config.resubmitToQueueConfig,
107-
// ...DEFAULT_SQUISS_OPTS,
108-
// });
109-
// return {
110-
// squissFrom,
111-
// squissTo,
112-
// releaseTimeoutSeconds: config.releaseTimeoutSeconds,
113-
// };
114-
// };
115-
//
116-
// const readMessages = (numberOfMessageToRead: number, context: RunContext) => {
117-
// return context.squissFrom.getManualBatch(numberOfMessageToRead);
118-
// };
119-
//
120-
// const handleMessage = (customMutator: Mutator | undefined, context: MessageContext): Promise<any> => {
121-
// let getBodyPromise: Promise<string>;
122-
// if (!customMutator) {
123-
// getBodyPromise = Promise.resolve(context.message.Body ?? '');
124-
// } else {
125-
// const mutateResult = mutateMessageToSend(customMutator, context);
126-
// getBodyPromise = mutateResult.mutatePromise;
127-
// }
128-
// return getBodyPromise
129-
// .then((body) => {
130-
// return sendMessage(body, context);
131-
// })
132-
// .then(() => {
133-
// return deleteMessage(context);
134-
// });
135-
// };
136-
//
137-
// const sendMessage = (body: string, context: MessageContext) => {
138-
// return context.sqs
139-
// .sendMessage({
140-
// QueueUrl: context.fullQueueName,
141-
// MessageAttributes: context.message.MessageAttributes,
142-
// MessageBody: body,
143-
// MessageDeduplicationId: context.message.MessageAttributes?.MessageId?.StringValue,
144-
// MessageGroupId: context.message.MessageAttributes?.MessageId?.StringValue,
145-
// })
146-
// .promise();
147-
// };
148-
//
149-
// const mutateMessageToSend = (customMutator: Mutator, context: MessageContext) => {
150-
// const originalBody = context.message.Body ?? '';
151-
// const getMessageBodyFromS3Result = getMessageBodyFromS3(originalBody, context);
152-
// const s3UploadData = getMessageBodyFromS3Result.s3UploadData;
153-
// const mutatePromise = getMessageBodyFromS3Result.promise
154-
// .then((bodyToDigest) => {
155-
// return unzipMessage(bodyToDigest, context);
156-
// })
157-
// .then((bodyStr: string): Promise<string> => {
158-
// const toMutate = context.isJson ? JSON.parse(bodyStr) : bodyStr;
159-
// const mutatedObject = customMutator(toMutate);
160-
// const mutated = context.isJson ? JSON.stringify(mutatedObject) : mutatedObject;
161-
// return zipMessage(mutated, context);
162-
// })
163-
// .then((toSendBody) => {
164-
// return uploadMessageBodyToS3(toSendBody, s3UploadData, context);
165-
// });
166-
// return {mutatePromise, s3UploadData};
167-
// };
1+
'use strict';
2+
3+
import {SQS_MAX_RECEIVE_BATCH, Squiss} from '../Squiss';
4+
import {IMessageToSend, ResubmitterConfig, ResubmitterMutator} from '../Types';
5+
import {Message} from '../Message';
6+
7+
interface IterationContext {
8+
readonly limit: number;
9+
readonly runContext: RunContext;
10+
numHandledMessages: number;
11+
readonly customMutator?: ResubmitterMutator;
12+
readonly handledMessages: Set<string>;
13+
readonly releaseTimeoutSeconds: number;
14+
}
15+
16+
interface RunContext {
17+
readonly squissFrom: Squiss;
18+
readonly squissTo: Squiss;
19+
}
20+
21+
interface MessageContext extends RunContext {
22+
readonly message: Message;
23+
}
24+
25+
const DEFAULT_SQUISS_OPTS = {
26+
receiveAttributes: ['All'],
27+
receiveSqsAttributes: ['All'],
28+
minReceiveBatchSize: 0,
29+
unwrapSns: false,
30+
};
31+
32+
const iteration = (context: IterationContext): Promise<void> => {
33+
if (context.numHandledMessages >= context.limit || context.limit <= 0) {
34+
return Promise.resolve();
35+
}
36+
const remaining = Math.max(context.limit - context.numHandledMessages, 0);
37+
const numberOfMessageToRead = Math.min(SQS_MAX_RECEIVE_BATCH, remaining);
38+
if (numberOfMessageToRead <= 0) {
39+
return Promise.resolve();
40+
}
41+
return readMessages(numberOfMessageToRead, context.runContext)
42+
.then((messages) => {
43+
if (!messages.length) {
44+
// Make sure the iteration stops
45+
context.numHandledMessages = context.limit;
46+
return Promise.resolve();
47+
}
48+
const promises = messages.map((message) => {
49+
const msgContext: MessageContext = {
50+
...context.runContext,
51+
message,
52+
};
53+
return Promise.resolve().then(() => {
54+
console.log(`${++context.numHandledMessages} messages handled`);
55+
if (context.numHandledMessages > context.limit) {
56+
return message.changeVisibility(context.releaseTimeoutSeconds);
57+
}
58+
const location = message.raw.MessageId ?? '';
59+
if (context.handledMessages.has(location)) {
60+
return message.changeVisibility(context.releaseTimeoutSeconds);
61+
}
62+
context.handledMessages.add(location);
63+
return handleMessage(context.customMutator, msgContext)
64+
.then(() => {
65+
return message.del();
66+
})
67+
.catch((err) => {
68+
message.changeVisibility(context.releaseTimeoutSeconds);
69+
return Promise.reject(err);
70+
});
71+
});
72+
});
73+
return Promise.all(promises).then(() => {
74+
return Promise.resolve();
75+
});
76+
})
77+
.then(() => {
78+
return iteration(context);
79+
});
80+
};
81+
82+
const buildRunContext = (config: ResubmitterConfig): RunContext => {
83+
const squissFrom = new Squiss({
84+
...config.resubmitFromQueueConfig,
85+
...DEFAULT_SQUISS_OPTS,
86+
});
87+
const squissTo = new Squiss({
88+
...config.resubmitToQueueConfig,
89+
...DEFAULT_SQUISS_OPTS,
90+
});
91+
return {
92+
squissFrom,
93+
squissTo,
94+
};
95+
};
96+
97+
const readMessages = (numberOfMessageToRead: number, context: RunContext) => {
98+
return context.squissFrom.getManualBatch(numberOfMessageToRead);
99+
};
100+
101+
const sendMessage = (messageToSend: IMessageToSend, context: MessageContext) => {
102+
return context.squissTo.sendMessage(messageToSend, undefined, context.message.attributes);
103+
};
104+
105+
const handleMessage = (customMutator: ResubmitterMutator | undefined, context: MessageContext): Promise<any> => {
106+
return Promise.resolve()
107+
.then(() => {
108+
let body = context.message.body;
109+
if (customMutator) {
110+
body = customMutator(body);
111+
}
112+
return sendMessage(body, context);
113+
});
114+
};
115+
116+
export const resubmit = (config: ResubmitterConfig) => {
117+
const runContext = buildRunContext(config);
118+
const handledMessages = new Set<string>();
119+
return iteration({
120+
releaseTimeoutSeconds: config.releaseTimeoutSeconds,
121+
handledMessages,
122+
numHandledMessages: 0,
123+
runContext,
124+
limit: config.limit,
125+
customMutator: config.customMutator,
126+
});
127+
};

0 commit comments

Comments
 (0)