Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch Lambdas to poll from SQS #30

Merged
merged 11 commits into from
Nov 3, 2023
2 changes: 1 addition & 1 deletion analytics/functions/archive-database/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { query } from "../../common/database.js";

export const archiveDatabase = async function (event, context) {

const result = event.detail;
const result = event.detail ? event.detail : JSON.parse(event.Records[0].body).detail;

/**
* Insert the measuement into the database. This message can
Expand Down
6 changes: 3 additions & 3 deletions analytics/functions/archive-database/handler_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('analytics - archiveDatabase', () => {

const event = { ...SAMPLE_EVENT, available: 1 };

await archiveDatabase({ detail: event });
await archiveDatabase({ Records: [{ body: JSON.stringify({ detail: event }) }] });

const measurement = await queryOne("SELECT * FROM measurements WHERE id = $1", [event.id]);
assert.deepStrictEqual(measurement, {
Expand All @@ -55,7 +55,7 @@ describe('analytics - archiveDatabase', () => {

const event = { ...SAMPLE_EVENT, available: 0 };

await archiveDatabase({ detail: event });
await archiveDatabase({ Records: [{ body: JSON.stringify({ detail: event }) }] });

const measurement = await queryOne("SELECT * FROM measurements WHERE id = $1", [event.id]);
assert.deepStrictEqual(measurement, {
Expand Down Expand Up @@ -84,7 +84,7 @@ describe('analytics - archiveDatabase', () => {
// Missing required fields
const event = { timestamp: formatISO(new Date()), available: 1 };

await throwsAsync(archiveDatabase({ detail: event }), null);
await throwsAsync(archiveDatabase({ Records: [{ body: JSON.stringify({ detail: event }) }] }), null);
});

});
3 changes: 2 additions & 1 deletion analytics/functions/archive-raw/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const client = new S3Client({});

export const archiveRaw = async function (event, context) {

const result = event.detail;
const result = event.detail ? event.detail : JSON.parse(event.Records[0].body).detail;

const body = JSON.stringify(result);
const contentType = "application/json";

Expand Down
4 changes: 2 additions & 2 deletions analytics/functions/archive-raw/handler_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('analytics - archiveRaw', () => {

const event = { id: "1", timestamp: formatISO(new Date()), available: 1 };

await archiveRaw({ detail: event });
await archiveRaw({ Records: [{ body: JSON.stringify({ detail: event }) }] });

const calls = s3.calls(PutObjectCommand);

Expand All @@ -35,7 +35,7 @@ describe('analytics - archiveRaw', () => {

const event = { timestamp: formatISO(new Date()), available: 1 };

await throwsAsync(archiveRaw({ detail: event }), null);
await throwsAsync(archiveRaw({ Records: [{ body: JSON.stringify({ detail: event }) }] }), null);
});

});
24 changes: 12 additions & 12 deletions analytics/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,19 @@ functions:
# specific case, it's setup to allow for 5 deliveries and then move the
# message to a DLQ that hold the messages for a week to allow for a redrive
#
# TODO switch to SQS in a separate PR to avoid data loss
archiveRaw:
name: ${self:custom.appPrefix}-archiveRaw
handler: functions/archive-raw/handler.archiveRaw
environment:
DATA_BUCKET: !Ref DataBucket
WEBSITE_BUCKET: ${self:custom.websiteBucket}
events:
- eventBridge:
eventBus: !Ref MainEventBus
pattern:
source:
- monitor
- sqs:
batchSize: 1
arn:
Fn::GetAtt:
- ArchiveRawQueue
- Arn
#
# Function: archiveDatabase
# Purpose: archive a the measurement event in Aurora to be used by
Expand All @@ -142,17 +142,17 @@ functions:
# specific case, it's setup to allow for 5 deliveries and then move the
# message to a DLQ that hold the messages for a week to allow for a redrive
#
# TODO switch to SQS in a separate PR to avoid data loss
archiveDatabase:
name: ${self:custom.appPrefix}-archiveDatabase
handler: functions/archive-database/handler.archiveDatabase
reservedConcurrency: 20
events:
- eventBridge:
eventBus: !Ref MainEventBus
pattern:
source:
- monitor
- sqs:
batchSize: 1
arn:
Fn::GetAtt:
- ArchiveDatabaseQueue
- Arn
#
# Function: aggregateMinutely
# Purpose: kickstart aggregation jobs that are supposed to run
Expand Down