Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 117 additions & 88 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,121 +1,150 @@

module.exports = SQSWorker

function SQSWorker(options, fn) {
if (!(this instanceof SQSWorker)) return new SQSWorker(options, fn)
if (!(this instanceof SQSWorker)) return new SQSWorker(options, fn)

this.client = options.client || new (require('aws-sdk').SQS)(options)

this.client = options.client || new (require('aws-sdk').SQS)(options)
// default to long polling
this.waittime = options.waittime || 20;

// Five seconds more than WaitTimeSeconds
this.client.config.httpOptions.timeout = 25000
// Five seconds more than WaitTimeSeconds
this.client.config.httpOptions.timeout = 25000

this.url = options.url
this.parallel = options.parallel || 1
this.timeout = options.timeout || void 0
this.parse = options.parse
this.log = options.log || console
this.attributes = Array.isArray(options.attributes) ? options.attributes.slice() : []
this.attempts = options.attempts || 3
this.url = options.url
this.parallel = options.parallel || 1
this.timeout = options.timeout || void 0
this.parse = options.parse
this.log = options.log || console
this.attributes = Array.isArray(options.attributes) ? options.attributes.slice() : []
this.attempts = options.attempts || 3

if (this.attempts && !~this.attributes.indexOf('ApproximateReceiveCount')) {
this.attributes.push('ApproximateReceiveCount')
}
if (this.attempts && !~this.attributes.indexOf('ApproximateReceiveCount')) {
this.attributes.push('ApproximateReceiveCount')
}

this.fn = fn
// the worker function to call when a message is received
this.fn = fn

this.handling = 0
this.receiving = 0
this.handling = 0
this.receiving = 0

this.maybeMore()
this.maybeMore()
}

SQSWorker.prototype.maybeMore = function (retries) {
if (this.receiving) return
if (this.handling >= this.parallel) return
if (this.receiving) return
if (this.handling >= this.parallel) return

this.receiving++
this.receiving++

retries = retries || 0
retries++
retries = retries || 0
retries++

var self = this
var self = this

var params =
{ 'QueueUrl': this.url
, 'MaxNumberOfMessages': Math.min(this.parallel, 10)
, 'VisibilityTimeout': this.timeout
, 'WaitTimeSeconds': 20
, 'AttributeNames': this.attributes
var params = {
'QueueUrl': this.url,
'MaxNumberOfMessages': Math.min(this.parallel, 10),
'VisibilityTimeout': this.timeout,
'WaitTimeSeconds': this.waittime,
'AttributeNames': this.attributes
}

this.client.receiveMessage(params, function (err, data) {
self.receiving--
this.client.receiveMessage(params, function (err, data) {
self.receiving--

if (err) {
self.log.error({ err: err, params: params }, 'failed to receive messages')
setTimeout(self.maybeMore.bind(self), Math.min(Math.pow(2, retries), 300) * 1000, retries)
return
}
if (err) {
self.log.error({
err: err,
params: params
}, 'failed to receive messages')
self.log.error('Will try again in ', Math.min(Math.pow(2, retries), 300), 'seconds')

self.log.info({ params: params, response: data }, 'receiveMessage response')
// Progressively increase timout up to a maximum of 300 seconds then call myself
// with the same context and an argument of the number of retries.
setTimeout(self.maybeMore.bind(self), Math.min(Math.pow(2, retries), 300) * 1000, retries)
return
}

if (Array.isArray(data.Messages)) {
data.Messages.map(self.handleMessage, self)
}
// uncomment to log every message
// self.log.info({ params: params, response: data }, 'receiveMessage response')

// data.Messages will be an array if each returned messages, call
// the handler for each message returned. (this is the .map method)
if (Array.isArray(data.Messages)) {
data.Messages.map(self.handleMessage, self)
}

self.maybeMore()
})
self.maybeMore()
})
}

SQSWorker.prototype.handleMessage = function (message) {
this.handling++

var payload = message.Body

if (this.parse) {
try {
payload = this.parse(payload)
} catch (err) {
this.log.error({ err: err, message: message }, 'error parsing message body')
return
this.handling++

var payload = message.Body

// Can choose to pass in a function to call to parse the payload, this function
// should return the properly parsed value.
if (this.parse) {
try {
payload = this.parse(payload)
} catch (err) {
this.log.error({
err: err,
message: message
}, 'error parsing message body')
return
}
}
}

var self = this

if (this.fn.length === 2) {
this.fn(payload, callback)
} else {
this.fn(payload, message, callback)
}
var self = this

function callback(err, del) {
self.handling--

if (err) {
self.log.error({ err: err, message: message, payload: payload }, 'error handling message')
del = (self.attempts && Number(message['Attributes']['ApproximateReceiveCount']) >= self.attempts)
// .length here is the number of artuments; if three arguments, pass the full
// message structure, otherwse just pass the payload
if (this.fn.length === 2) {
this.fn(payload, callback)
} else {
this.fn(payload, message, callback)
}

var params =
{ 'QueueUrl': self.url
, 'ReceiptHandle': message.ReceiptHandle
}

if (!del) {
params['VisibilityTimeout'] = 0
// the callback below is called when the worker calls done(); It handles
// removing the message from the queue or relasing it for other workers
function callback(err, del) {
self.handling--

if (err) {
self.log.error({
err: err,
message: message,
payload: payload
}, 'error handling message')
del = (self.attempts && Number(message['Attributes']['ApproximateReceiveCount']) >= self.attempts)
}

var params = {
'QueueUrl': self.url,
'ReceiptHandle': message.ReceiptHandle
}

// if marked not to delete, set VisibilityTimeout to 0 which immeidately
// releases it for another worker.
if (!del) {
params['VisibilityTimeout'] = 0
}

self.client[del ? 'deleteMessage' : 'changeMessageVisibility'](params, function (err) {
if (err) {
self.log.error({
err: err,
message: message,
payload: payload
}, 'failed to ' + (del ? 'delete message' : 'change visibility timeout'))
return
}

self.maybeMore()
})
}

self.client[del ? 'deleteMessage' : 'changeMessageVisibility'](params, function (err) {
if (err) {
self.log.error(
{ err: err, message: message, payload: payload }
, 'failed to ' + (del ? 'delete message' : 'change visibility timeout')
)
return
}

self.maybeMore()
})
}
}