Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch Lambdas to poll from SQS #30

Merged
merged 11 commits into from
Nov 3, 2023
21 changes: 20 additions & 1 deletion analytics/deployment/resources/events.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,23 @@ Resources:
MainEventBus:
Type: AWS::Events::EventBus
Properties:
Name: ${self:custom.appPrefix}-${opt:stage}-main
Name: ${self:custom.appPrefix}-${opt:stage}-main
SendToSQSArchiveEventRule:
Type: AWS::Events::Rule
Properties:
EventBusName: !Ref MainEventBus
EventPattern:
source:
- monitor
State: ENABLED
Targets:
- Id: send-to-sqs-archive-raw
Arn:
Fn::GetAtt:
- ArchiveRawQueue
- Arn
- Id: send-to-sqs-archive-database
Arn:
Fn::GetAtt:
- ArchiveDatabaseQueue
- Arn
70 changes: 69 additions & 1 deletion analytics/deployment/resources/sqs.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,74 @@
Resources:
BlackholeDeadletterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-blackhole
MessageRetentionPeriod: 60
ArchiveDeadletterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-archive-deadletter
MessageRetentionPeriod: 604800
ArchiveRawQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-archive-raw
VisibilityTimeout: 60
RedrivePolicy:
maxReceiveCount: 5
deadLetterTargetArn:
Fn::GetAtt:
- ArchiveDeadletterQueue
- Arn
ArchiveRawQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Statement:
- Effect: Allow
Principal:
Service: events.amazonaws.com
Action: SQS:SendMessage
Resource:
Fn::GetAtt:
- ArchiveRawQueue
- Arn
Queues:
- Ref: ArchiveRawQueue
ArchiveDatabaseQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-archive-database
VisibilityTimeout: 60
RedrivePolicy:
maxReceiveCount: 5
deadLetterTargetArn:
Fn::GetAtt:
- ArchiveDeadletterQueue
- Arn
ArchiveDatabaseQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Statement:
- Effect: Allow
Principal:
Service: events.amazonaws.com
Action: SQS:SendMessage
Resource:
Fn::GetAtt:
- ArchiveDatabaseQueue
- Arn
Queues:
- Ref: ArchiveDatabaseQueue
AggregateQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:custom.appPrefix}-${opt:stage}-aggregate
VisibilityTimeout: 60
VisibilityTimeout: 60
RedrivePolicy:
maxReceiveCount: 1
deadLetterTargetArn:
Fn::GetAtt:
- BlackholeDeadletterQueue
- Arn
12 changes: 4 additions & 8 deletions analytics/functions/aggregate-daily/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@ export const aggregateDaily = async function (event, context) {
})).map(buildAggregationMessage);

for (const batch of chunk(messages, 10)) {
try {
await client.send(new SendMessageBatchCommand({
QueueUrl: process.env.AGGREGATE_QUEUE_URL,
Entries: batch
}));
} catch (err) {
console.error(`Cannot schedule aggregation batch`, batch, err);
}
await client.send(new SendMessageBatchCommand({
QueueUrl: process.env.AGGREGATE_QUEUE_URL,
Entries: batch
}));
}
}
8 changes: 3 additions & 5 deletions analytics/functions/aggregate-daily/handler_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { aggregateDaily } from "./handler.js"
import config from "../../conf/config.js";
import { flattenDeep } from "lodash-es";
import { use } from "../../common/fixtures.js";
import { throwsAsync } from '../../common/assert.js'

const expectedMessages = flattenDeep(config.regions.map((region) => {
return config.endpoints.map((endpoint) => {
Expand Down Expand Up @@ -32,13 +33,10 @@ describe('analytics - aggregateDaily', () => {
assert.deepStrictEqual(expectedMessages, actualMessages);
});

it('should handle SQS errors', async (t) => {
it('should throw on SQS errors', async (t) => {

const errorLogger = t.mock.method(console, 'error', () => { });
sqs.rejects('simulated error');

await aggregateDaily();

assert.equal(errorLogger.mock.calls.length, Math.ceil(expectedMessages.length / 10));
await throwsAsync(aggregateDaily());
});
});
12 changes: 4 additions & 8 deletions analytics/functions/aggregate-minutely/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@ export const aggregateMinutely = async function (event, context) {
})).map(buildAggregationMessage);

for (const batch of chunk(messages, 10)) {
try {
await client.send(new SendMessageBatchCommand({
QueueUrl: process.env.AGGREGATE_QUEUE_URL,
Entries: batch
}));
} catch (err) {
console.error(`Cannot schedule aggregation batch`, batch, err);
}
await client.send(new SendMessageBatchCommand({
QueueUrl: process.env.AGGREGATE_QUEUE_URL,
Entries: batch
}));
}
}
8 changes: 3 additions & 5 deletions analytics/functions/aggregate-minutely/handler_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { aggregateMinutely } from "./handler.js"
import config from "../../conf/config.js";
import { flattenDeep } from "lodash-es";
import { use } from "../../common/fixtures.js";
import { throwsAsync } from '../../common/assert.js'

const expectedMessages = flattenDeep(config.regions.map((region) => {
return [
Expand Down Expand Up @@ -38,13 +39,10 @@ describe('analytics - aggregateMinutely', () => {
assert.deepStrictEqual(expectedMessages, actualMessages);
});

it('should handle SQS errors', async (t) => {
it('should throw SQS errors', async (t) => {

const errorLogger = t.mock.method(console, 'error', () => { });
sqs.rejects('simulated error');

await aggregateMinutely();

assert.equal(errorLogger.mock.calls.length, Math.ceil(expectedMessages.length / 10));
await throwsAsync(aggregateMinutely());
});
});
2 changes: 1 addition & 1 deletion analytics/functions/archive-database/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { query } from "../../common/database.js";

export const archiveDatabase = async function (event, context) {

const result = event.detail;
const result = event.detail ? event.detail : JSON.parse(event.Records[0].body).detail;

/**
* Insert the measuement into the database. This message can
Expand Down
6 changes: 3 additions & 3 deletions analytics/functions/archive-database/handler_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('analytics - archiveDatabase', () => {

const event = { ...SAMPLE_EVENT, available: 1 };

await archiveDatabase({ detail: event });
await archiveDatabase({ Records: [{ body: JSON.stringify({ detail: event }) }] });

const measurement = await queryOne("SELECT * FROM measurements WHERE id = $1", [event.id]);
assert.deepStrictEqual(measurement, {
Expand All @@ -55,7 +55,7 @@ describe('analytics - archiveDatabase', () => {

const event = { ...SAMPLE_EVENT, available: 0 };

await archiveDatabase({ detail: event });
await archiveDatabase({ Records: [{ body: JSON.stringify({ detail: event }) }] });

const measurement = await queryOne("SELECT * FROM measurements WHERE id = $1", [event.id]);
assert.deepStrictEqual(measurement, {
Expand Down Expand Up @@ -84,7 +84,7 @@ describe('analytics - archiveDatabase', () => {
// Missing required fields
const event = { timestamp: formatISO(new Date()), available: 1 };

await throwsAsync(archiveDatabase({ detail: event }), null);
await throwsAsync(archiveDatabase({ Records: [{ body: JSON.stringify({ detail: event }) }] }), null);
});

});
3 changes: 2 additions & 1 deletion analytics/functions/archive-raw/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const client = new S3Client({});

export const archiveRaw = async function (event, context) {

const result = event.detail;
const result = event.detail ? event.detail : JSON.parse(event.Records[0].body).detail;

const body = JSON.stringify(result);
const contentType = "application/json";

Expand Down
4 changes: 2 additions & 2 deletions analytics/functions/archive-raw/handler_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('analytics - archiveRaw', () => {

const event = { id: "1", timestamp: formatISO(new Date()), available: 1 };

await archiveRaw({ detail: event });
await archiveRaw({ Records: [{ body: JSON.stringify({ detail: event }) }] });

const calls = s3.calls(PutObjectCommand);

Expand All @@ -35,7 +35,7 @@ describe('analytics - archiveRaw', () => {

const event = { timestamp: formatISO(new Date()), available: 1 };

await throwsAsync(archiveRaw({ detail: event }), null);
await throwsAsync(archiveRaw({ Records: [{ body: JSON.stringify({ detail: event }) }] }), null);
});

});
89 changes: 73 additions & 16 deletions analytics/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,31 +111,59 @@ resources:
- ${file(deployment/resources/sqs.yml)}

functions:

#
# Function: archiveRaw
# Purpose: archive a raw copy of the incoming measurement event
# on S3 for troubleshooting purposes and re-ingestion
# Failure mode: This function is invoked by the Lambda/SQS poller
# so its retry policy is managed by the SQS queue configuration. In this
# specific case, it's setup to allow for 5 deliveries and then move the
# message to a DLQ that hold the messages for a week to allow for a redrive
#
archiveRaw:
name: ${self:custom.appPrefix}-archiveRaw
handler: functions/archive-raw/handler.archiveRaw
environment:
DATA_BUCKET: !Ref DataBucket
WEBSITE_BUCKET: ${self:custom.websiteBucket}
events:
- eventBridge:
eventBus: !Ref MainEventBus
pattern:
source:
- monitor

- sqs:
batchSize: 1
arn:
Fn::GetAtt:
- ArchiveRawQueue
- Arn
#
# Function: archiveDatabase
# Purpose: archive a the measurement event in Aurora to be used by
# the aggregation tasks
# Failure mode: This function is invoked by the Lambda/SQS poller
# so its retry policy is managed by the SQS queue configuration. In this
# specific case, it's setup to allow for 5 deliveries and then move the
# message to a DLQ that hold the messages for a week to allow for a redrive
#
archiveDatabase:
name: ${self:custom.appPrefix}-archiveDatabase
handler: functions/archive-database/handler.archiveDatabase
reservedConcurrency: 20
events:
- eventBridge:
eventBus: !Ref MainEventBus
pattern:
source:
- monitor

- sqs:
batchSize: 1
arn:
Fn::GetAtt:
- ArchiveDatabaseQueue
- Arn
#
# Function: aggregateMinutely
# Purpose: kickstart aggregation jobs that are supposed to run
# on a minutely schedule
# Failure mode: This function is invoked async from EventBridge so its retry
# policy is managed with maximumRetryAttempts/onFailure. In this
# specific case being this function re-triggered every minute from
# a scheduled event, we're NOT going to perform any retry in case
# this function fail and we don't need the initial event to be
# delivered to a DLQ
#
aggregateMinutely:
name: ${self:custom.appPrefix}-aggregateMinutely
handler: functions/aggregate-minutely/handler.aggregateMinutely
Expand All @@ -147,7 +175,17 @@ functions:
- QueueUrl
events:
- schedule: rate(1 minute)

maximumRetryAttempts: 0
#
# Function: aggregateDaily
# Purpose: kickstart aggregation jobs that are supposed to run
# on a daily schedule
# Failure mode: This function is invoked async from EventBridge so its retry
# policy is managed with maximumRetryAttempts/onFailure. In this
# specific case to avoid waiting until the next day to have it triggered
# again we allow for a few retries but we don't need the initial event to be
# delivered to a DLQ
#
aggregateDaily:
name: ${self:custom.appPrefix}-aggregateDaily
handler: functions/aggregate-daily/handler.aggregateDaily
Expand All @@ -159,7 +197,16 @@ functions:
- QueueUrl
events:
- schedule: cron(5 0 * * ? *)

maximumRetryAttempts: 2
#
# Function: aggregateWorker
# Purpose: perform a single aggregation job and save the results
# on S3
# Failure mode: This function is invoked by the Lambda/SQS poller
# so its retry policy is managed by the SQS queue configuration. In this
# specific case, it's setup to allow for single delivery and then move the
# message to a blackhole DLQ that evicts them fast and we'd never redrive.
#
aggregateWorker:
name: ${self:custom.appPrefix}-aggregateWorker
handler: functions/aggregate-worker/handler.aggregateWorker
Expand All @@ -174,9 +221,19 @@ functions:
Fn::GetAtt:
- AggregateQueue
- Arn

#
# Function: cleanupDatabase
# Purpose: implement database retention period by deleting rows falling
# out of the retention window.
# Failure mode: This function is invoked async from EventBridge so its retry
# policy is managed with maximumRetryAttempts/onFailure. In this
# specific case to avoid waiting until the next day to have it triggered
# again we allow for a few retries but we don't need the initial event to be
# delivered to a DLQ
#
cleanupDatabase:
name: ${self:custom.appPrefix}-cleanupDatabase
handler: functions/cleanup-database/handler.cleanupDatabase
events:
- schedule: rate(1 day)
maximumRetryAttempts: 2
Loading