diff --git a/README.md b/README.md index 56dda2e..985dc71 100644 --- a/README.md +++ b/README.md @@ -149,6 +149,8 @@ If a message is discarded entirely because it does not pass your `shouldRetry` l If the queue is reclaiming events from an abandonded queue, and sees duplicate entries, we will keep the first, and discard the rest, emitting a `duplication` event for each. +If a message is discarded because the queue exceeds `maxItems`, the queue will emit an `overflow` event. + ### `processed` ```javascript @@ -171,6 +173,13 @@ queue.on('discard', function(item, attempts) { ```javascript queue.on('duplication', function(item, attempts) { console.error('discarding message %O due to duplicate entries', item, attempts); +``` + +### `overflow` + +```javascript +queue.on('overflow', function(item, attempts) { + console.error('discarding message %O after %d attempts due to queue overflow', item, attempts); }) ``` diff --git a/lib/index.js b/lib/index.js index 34220e8..373edf4 100644 --- a/lib/index.js +++ b/lib/index.js @@ -174,8 +174,18 @@ Queue.prototype.requeue = function(item, attemptNumber, error, id) { Queue.prototype._enqueue = function(entry) { var queue = this._store.get(this.keys.QUEUE) || []; - queue = queue.slice(-(this.maxItems - 1)); - queue.push(entry); + + // We should remove events from the end of the array as these are generally the oldest events that have been retried. + + // If greater than the length of the array, start will be set to the length of the array. + var sliceStart = this.maxItems - 1; + // If deleteCount is omitted, or if its value is equal to or larger than array.length - start, + // then all the elements from sliceStart to the end of the array will be deleted. + var sliceDeleteCount = queue.length - sliceStart + // More docs https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/splice + var removedEntrys = queue.splice(sliceStart, sliceDeleteCount, entry); + // We can add the new event at the end of the array since we sort the array on the next line + // We keep the deleted events so we can emit events for them below. queue = queue.sort(function(a,b) { return a.time - b.time; }); @@ -185,6 +195,11 @@ Queue.prototype._enqueue = function(entry) { if (this._running) { this._processHead(); } + if (removedEntrys.length > 0) { + for (var i = 0; i < removedEntrys.length; i++) { + this.emit('overflow', removedEntrys[i].item, removedEntrys[i].attemptNumber); + } + } }; Queue.prototype._processHead = function() { diff --git a/test/index.test.js b/test/index.test.js index d19ca88..6c259d7 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -552,7 +552,11 @@ describe('Queue', function() { describe('events', function() { var queue; + var clock; + beforeEach(function() { + clock = lolex.createClock(0); + Schedule.setClock(clock); queue = new Queue('events', function(_, cb) { cb(); }); @@ -560,6 +564,7 @@ describe('events', function() { afterEach(function() { queue.stop(); + Schedule.resetClock(); }); it('should emit processed with response, and item', function(done) { @@ -605,6 +610,30 @@ describe('events', function() { }); queue.start(); queue.addItem({ a: 'b' }); + clock.runAll(); + }); + + it('should emit overflow if the adding a message exceeds queue maxItems', function(done) { + var firstEvent = { a: 'b' }; + var otherEvents = { c: 'd' }; + + queue.fn = function(item, cb) { + cb(new Error('no')); + }; + + queue.maxItems = 5; + queue.on('overflow', function(item, attempts) { + assert.equal(item.a, firstEvent.a); + assert.equal(attempts, 0); + done(); + }); + queue.addItem(firstEvent); + clock.tick(10); + queue.addItem(otherEvents); + queue.addItem(otherEvents); + queue.addItem(otherEvents); + queue.addItem(otherEvents); + queue.addItem(otherEvents); }); });