Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1 from aspecto-io/feat/init
Browse files Browse the repository at this point in the history
feat: initial working version
  • Loading branch information
mzahor authored Mar 24, 2020
2 parents 805c1fe + 5a543b8 commit 73e0d7e
Show file tree
Hide file tree
Showing 18 changed files with 4,619 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[*.yml]
indent_size = 2
indent_style = space

[*]
indent_size = 4
indent_style = space
3 changes: 3 additions & 0 deletions .github/.prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
printWidth: 120
tabWidth: 2
singleQuote: true
22 changes: 22 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Build PR
on: [push]

jobs:
build:
name: Build
runs-on: ubuntu-latest
env:
AWS_ACCESS_KEY_ID: foo
AWS_SECRET_ACCESS_KEY: bar
steps:
- uses: actions/checkout@v2
- name: install
run: yarn
- name: build
run: yarn run build
- name: run localstack
run: docker-compose up -d
- name: wait for localstack
run: ./scripts/wait-for-url.js http://localhost:4575 && sleep 5
- name: test
run: yarn test
21 changes: 21 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# https://help.github.com/en/actions/language-and-framework-guides/publishing-nodejs-packages#publishing-packages-using-yarn
name: Publish to NPM
on:
release:
types: [created]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
# Setup .npmrc file to publish to npm
- uses: actions/setup-node@v1
with:
node-version: '12.x'
registry-url: 'https://registry.npmjs.org'
scope: '@octocat' # Defaults to the user or organization that owns the workflow file
- run: yarn
- run: yarn build
- run: yarn publish
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
node_modules
dist
.DS_Store
.idea
.vscode
yarn-error.log
package-lock.json
7 changes: 7 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
trailingComma: "es5"
semi: true
printWidth: 120
tabWidth: 4
singleQuote: true
arrowParens: "always"
jsxSingleQuote: true
169 changes: 169 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# sns-sqs-big-payload

SQS/SNS producer/consumer library. Provides an ability to pass payloads though s3.

## Instalation

```
npm install sns-sqs-big-payload
```

## Usage

The library exports 3 clients:

- SNS producer
- SQS producer
- SQS consumer

The reason they belong to the same repository and npm package
is that ther're is kind of a contract that they all share when sending the payload though S3.

### SNS Producer

```ts
import { SnsProducer } from 'sns-sqs-big-payload';

const snsProducer = SnsProducer.create({
topicArn: '<queue-url>',
region: 'us-east-1',
// to enable sending large payloads (>256KiB) though S3
largePayloadThoughS3: true,
s3Bucket: '...',
});

await snsProducer.sendJSON({
// ...
});
```

### SQS Producer

```ts
import { SqsProducer } from 'sns-sqs-big-payload';

const sqsProducer = SqsProducer.create({
queueUrl: '...',
region: 'us-east-1',
// to enable sending large payloads (>256KiB) though S3
largePayloadThoughS3: true,
s3Bucket: '...',
});

await sqsProducer.sendJSON({
// ...
});
```

### SQS Consumer

```ts
import { SqsConsumer } from 'sns-sqs-big-payload';

const sqsConsumer = SqsConsumer.create({
queueUrl: '...',
region: 'us-east-1',
// to enable loading payloads from S3 automatically
getPayloadFromS3: true,
s3Bucket: '...',
// if the queue is subscribed to SNS
// the message will arrive wrapped in sns envelope
// so we need to unwrap it first
transformMessageBody: (body) => {
const snsMessage = JSON.parse(body);
return snsMessage.Message;
},
// if you expect json payload - use `parsePayload` hook to parse it
parsePayload: (raw) => JSON.parse(raw),
// message handler, payload already parsed at this point
handleMessage: async ({ payload }) => {
// ...
},
});

sqsConsumer.start();

// to stop processing
sqsConsumer.stop();
```

- The queue is polled continuously for messages using long polling.
- Messages are deleted from the queue once the handler function has completed successfully.
- Throwing an error (or returning a rejected promise) from the handler function will cause the message to be left on the queue. An SQS redrive policy can be used to move messages that cannot be processed to a dead letter queue.
- By default messages are processed by 10 at a time – a new batch won't be received until the previous one is processed. To adjust number of messages that is being processed in parallel, use the `batchSize` option detailed below.

## Credentials

By default the consumer will look for AWS credentials in the places [specified by the AWS SDK](https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/configuring-the-jssdk.html#Setting_AWS_Credentials). The simplest option is to export your credentials as environment variables:

```sh
export AWS_SECRET_ACCESS_KEY=...
export AWS_ACCESS_KEY_ID=...
```

If you need to specify your credentials manually, you can use a pre-configured instance of the [AWS SQS](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) client:

```ts
import { SqsConsumer } from 'sns-sqs-big-payload';
import * as aws from 'aws-sdk';

aws.config.update({
region: 'us-east-1',
accessKeyId: '...',
secretAccessKey: '...',
});

const consumer = SqsConsumer.create({
queueUrl: 'https://sqs.us-east-1.amazonaws.com/account-id/queue-name',
handleMessage: async (message) => {
// ...
},
sqs: new aws.SQS(),
});

consumer.start();
```

## Events and logging

SqsConsumer has an [EventEmitter](https://nodejs.org/api/events.html) and send the following events:

| Event | Params | Description |
| ------------------- | ---------------- | ----------------------------------------------------------------------------------- |
| started | None | Fires when the polling is started |
| message-received | `message` | Fires when a message is received (one per each message, not per batch) |
| message-processed | `message` | Fires after the message is successfully processed and removed from the queue |
| stopped | None | Fires when the polling stops |
| error | `{err, message}` | Fires in case of processing error |
| s3-payload-error | `{err, message}` | Fires when an error ocurrs during downloading payload from s3 |
| processing-error | `{err, message}` | Fires when an error ocurrs during processing (only inside `handleMessage` function) |
| connection-error | `err` | Fires when a connection error ocurrs during polling (retriable) |
| payload-parse-error | `err` | Fires when a connection error ocurrs during parsing |

You can also use this enum if you're using TypeScript

```ts
enum SqsConsumerEvents {
started = 'started',
messageReceived = 'message-received',
messageProcessed = 'message-processed',
stopped = 'stopped',
error = 'error',
s3PayloadError = 's3-payload-error',
processingError = 'processing-error',
connectionError = 'connection-error',
payloadParseError = 'payload-parse-error',
}
```

You may subscribe to those events to add logging for example.

## Testing

Since this library relies heavily on AWS API there's not much sense to test it in isolation by using mocks.
So in order to run test you either need to have local stack or use a real sqs queues and sns topics.

To run localstack on mac:
```sh
TMPDIR=/private$TMPDIR docker-compose up
```
20 changes: 20 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: '2.1'

services:
localstack:
container_name: '${LOCALSTACK_DOCKER_NAME-localstack_main}'
image: localstack/localstack
ports:
- '4567-4599:4567-4599'
- '${PORT_WEB_UI-8080}:${PORT_WEB_UI-8080}'
environment:
- SERVICES=${SERVICES- }
- DEBUG=${DEBUG- }
- DATA_DIR=${DATA_DIR- }
- PORT_WEB_UI=${PORT_WEB_UI- }
- LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR- }
- KINESIS_ERROR_PROBABILITY=${KINESIS_ERROR_PROBABILITY- }
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- '${TMPDIR:-/tmp/localstack}:/tmp/localstack'
- '/var/run/docker.sock:/var/run/docker.sock'
62 changes: 62 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"name": "sns-sqs-big-payload",
"version": "0.0.1",
"license": "MIT",
"scripts": {
"test": "jest",
"build": "npm run clean && tsc",
"clean": "rm -rf ./dist/*"
},
"dependencies": {
"aws-sdk": "^2.644.0",
"uuid": "^7.0.2"
},
"devDependencies": {
"@types/jest": "^25.1.4",
"@types/node": "^13.9.2",
"jest": "^25.1.0",
"ts-jest": "^25.2.1",
"typescript": "^3.8.3",
"wait-on": "^4.0.1"
},
"repository": {
"type": "git",
"url": "https://github.com/aspecto-io/sns-sqs-big-payload"
},
"bugs": {
"url": "https://github.com/aspecto-io/sns-sqs-big-payload/issues"
},
"homepage": "https://github.com/aspecto-io/sns-sqs-big-payload",
"prepublish": "tsc",
"main": "./build/index.js",
"types": "./build/index.d.ts",
"jest": {
"preset": "ts-jest",
"testMatch": [
"**/tests/**/*.spec.+(ts|tsx|js)"
],
"globals": {
"ts-jest": {
"diagnostics": false
}
},
"moduleFileExtensions": [
"ts",
"js"
],
"transform": {
"^.+\\.(ts)$": "ts-jest"
},
"testEnvironment": "node",
"maxConcurrency": 1
},
"keywords": [
"sqs",
"sns",
"queue",
"consumer",
"large",
"big",
"payload"
]
}
34 changes: 34 additions & 0 deletions scripts/wait-for-url.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env node
const http = require('http');

let timeoutFired = false;

const urlReady = new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
timeoutFired = true;
reject(new Error('Timeout'));
}, 60000);

getUrl(timeout, resolve, reject);
});

function getUrl(timeoutId, resolve, reject) {
const url = process.argv[2];
http.get(url, (resp) => {
resp.on('data', () => {
clearTimeout(timeoutId);
resolve();
});
}).on('error', (err) => {
if (timeoutFired) {
reject();
return;
}
setTimeout(() => getUrl(timeoutId, resolve, reject), 1000);
});
}

urlReady.catch((err) => {
console.log(err);
process.exit(1);
});
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './sns-producer';
export * from './sqs-producer';
export * from './sqs-consumer';
Loading

0 comments on commit 73e0d7e

Please sign in to comment.