Skip to content

Commit

Permalink
update dependecies
Browse files Browse the repository at this point in the history
update package google-cloud to @google-cloud/pubsub.
update socket.io-adapter version.

Closes #6
  • Loading branch information
idoshamun authored May 13, 2017
1 parent 6a85680 commit 68e05f4
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 86 deletions.
Empty file modified .eslintrc
100644 → 100755
Empty file.
Empty file modified .gitignore
100644 → 100755
Empty file.
Empty file modified .travis.yml
100644 → 100755
Empty file.
Empty file modified LICENSE
100644 → 100755
Empty file.
6 changes: 3 additions & 3 deletions README.md
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
## How to use

```js
const gcloud = require('google-cloud')();
const pubsub = gcloud.pubsub();
const pubsub = require('@google-cloud/pubsub');
const pubsubClient = pubsub();
const io = require('socket.io')(3000);
const pubsubAdapter = require('socket.io-pubsub');
io.adapter(pubsubAdapter(pubsub));
io.adapter(pubsubAdapter(pubsubClient));
```

By running socket.io with the `socket.io-pubsub` adapter you can run
Expand Down
Empty file modified key.json.enc
100644 → 100755
Empty file.
127 changes: 63 additions & 64 deletions lib/index.js
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

const shortid = require('shortid');
const Adapter = require('socket.io-adapter');
const Emitter = require('events').EventEmitter;
const debug = require('debug')('socket.io-pubsub');
const async = require('async');

Expand Down Expand Up @@ -87,7 +86,8 @@ function adapter(pubsub, opts) {
this.topics = {};
this.subscriptions = {};

this.subscribe(this.prefix);
this.subscribe(this.prefix, () => {
});
}

/**
Expand All @@ -102,7 +102,7 @@ function adapter(pubsub, opts) {
*/
PubsubAdapter.prototype.waitForInitialization = function (callback) {
if (this.initializing) {
setTimeout(() => this.waitForInitialization(callback), 10);
setTimeout(this.waitForInitialization.bind(this, callback));
}
else {
callback();
Expand Down Expand Up @@ -186,15 +186,15 @@ function adapter(pubsub, opts) {

const data = msg.data;

if (this.uid == data.senderId) return debug('ignore same uid');
if (this.uid === data.senderId) return debug('ignore same uid');

const packet = data.packet;

if (packet && packet.nsp === undefined) {
packet.nsp = '/';
}

if (!packet || packet.nsp != this.nsp.name) {
if (!packet || packet.nsp !== this.nsp.name) {
return debug('ignore different namespace');
}

Expand Down Expand Up @@ -225,7 +225,7 @@ function adapter(pubsub, opts) {
opts
};

if (opts.rooms) {
if (opts.rooms && opts.rooms.length) {
opts.rooms.forEach(room => {
const chnRoom = chn + room + '#';
this.publish(this.prefix, chnRoom, msg);
Expand All @@ -239,88 +239,87 @@ function adapter(pubsub, opts) {
};

/**
* Subscribe client to room messages.
* Adds a socket to a list of rooms.
*
* @param {String} id Socket id
* @param {String} room Room id
* @param {String} id socket id
* @param {String} rooms list of room names
* @param {Function} callback
* @api public
*/

PubsubAdapter.prototype.add = function (id, room) {
debug(`adding ${id} to ${room}`);
Adapter.prototype.add.call(this, id, room);
if (!this.subscriptions[this.prefix] && !this.initializing) {
this.subscribe(this.prefix);
PubsubAdapter.prototype.addAll = function (id, rooms, callback) {
async.waterfall([
this.waitForInitialization.bind(this),
(callback) => Adapter.prototype.addAll.call(this, id, rooms, callback),
(callback) => {
if (!this.subscriptions[this.prefix]) {
this.subscribe(this.prefix, callback);
}
else {
callback();
}
}
], callback);
};

/**
* Deletes the pubsub subscription if no roo
* @param callback
*/
PubsubAdapter.prototype.lazyDeleteSubscription = function (callback) {
if (!Object.keys(this.rooms || {}).length && !this.deleting) {
debug('deleting subscriptions %s of topic %s', this.uid, this.prefix);
this.deleting = true;
this.subscriptions[this.prefix].delete(err => {
if (err && err.code !== 404) {
this.emit('error', err);
}
else {
err = null;
delete this.subscriptions[this.prefix];
}
callback(err);
});
}
else {
callback();
}
};

/**
* Unsubscribe client from room messages.
* Removes a socket from a room.
*
* @param {String} id Socket id
* @param {String} room Room id
* @param {Function} [callback] Callback function
* @param {String} id socket id
* @param {String} room name of the room
* @param {Function} callback
* @api public
*/

PubsubAdapter.prototype.del = function (id, room, callback) {
debug('removing %s from %s', id, room);

Adapter.prototype.del.call(this, id, room);

this.waitForInitialization(() => {
if (!Object.keys(this.rooms || {}).length && !this.deleting) {
debug('deleting subscriptions %s of topic %s', this.uid, this.prefix);
this.deleting = true;
this.subscriptions[this.prefix].delete(err => {
if (err && err.code !== 404) {
this.emit('error', err);
}
else {
err = null;
delete this.subscriptions[this.prefix];
}

if (callback) {
callback(err);
}
});
} else if (callback) {
callback();
}
});
async.waterfall([
this.waitForInitialization.bind(this),
(callback) => Adapter.prototype.del.call(this, id, room, callback),
this.lazyDeleteSubscription.bind(this),
], callback);
};

/**
* Unsubscribe client completely.
* Removes a socket from all rooms it's joined.
*
* @param {String} id Socket id
* @param {Function} [callback] Callback function
* @param {String} id socket id
* @param {Function} callback
* @api public
*/

PubsubAdapter.prototype.delAll = function (id, callback) {
debug('removing %s from all rooms', id);

const rooms = this.sids[id];

if (!rooms) {
return callback();
}

async.forEach(Object.keys(rooms), this.del.bind(this, id), err => {
if (err) {
self.emit('error', err);

}
else {
delete this.sids[id];
}

if (callback) {
callback(err);
}
});
async.waterfall([
this.waitForInitialization.bind(this),
(callback) => Adapter.prototype.delAll.call(this, id, callback),
this.lazyDeleteSubscription.bind(this),
], callback);
};

return PubsubAdapter;
Expand Down
13 changes: 5 additions & 8 deletions package.json
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,17 @@
},
"homepage": "https://github.com/elegantmonkeys/socket.io-pubsub#readme",
"dependencies": {
"async": "^2.0.0-rc.5",
"async": "^2.4.0",
"debug": "^2.2.0",
"shortid": "^2.2.6",
"socket.io-adapter": "^0.4.0"
"socket.io-adapter": "^1.1.0"
},
"devDependencies": {
"@google-cloud/pubsub": "^0.11.0",
"code": "^3.0.1",
"google-cloud": "^0.48.0",
"lab": "^10.7.1",
"socket.io": "^1.4.6",
"socket.io-client": "^1.4.6",
"socket.io": "^2.0.1",
"socket.io-client": "^2.0.1",
"tape": "^4.5.1"
},
"peerDependencies": {
"google-cloud": "^0.48.0"
}
}
22 changes: 11 additions & 11 deletions test/index.js
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ const Lab = require('lab');
const http = require('http').Server;
const io = require('socket.io');
const ioc = require('socket.io-client');
const gcloud = require('google-cloud')({
const gcloudPubsub = require('@google-cloud/pubsub');
const pubsub = gcloudPubsub({
projectId: 'socketio-pubsub-testing',
keyFilename: 'key.json'
});
const pubsub = gcloud.pubsub();
const adapter = require('..');

const lab = exports.lab = Lab.script();
Expand All @@ -28,7 +28,7 @@ describe('socket.io-pubsub', () => {
expect(b).to.equal({ a: 'b' });
client1.disconnect();
client2.disconnect();
setTimeout(done, 2000);
done();
});
server2.on('connection', c2 => {
c2.broadcast.emit('woot', [], { a: 'b' });
Expand Down Expand Up @@ -61,7 +61,7 @@ describe('socket.io-pubsub', () => {
client1.disconnect();
client2.disconnect();
client3.disconnect();
setTimeout(done, 2000);
done();
});

client2.on('broadcast', () => {
Expand Down Expand Up @@ -93,8 +93,8 @@ describe('socket.io-pubsub', () => {
client1.disconnect();
client2.disconnect();
client3.disconnect();
setTimeout(done, 2000);
}, 1000);
done();
}, 2000);
});
});

Expand All @@ -115,12 +115,12 @@ describe('socket.io-pubsub', () => {
server.on('connection', c => {
c.join('woot');
c.on('disconnect', () => {
expect(c.adapter.sids[c.id]).to.be.empty();
expect(c.adapter.rooms).to.be.empty();
expect(c.adapter.sids[c.id] || {}).to.be.empty();
expect(c.adapter.rooms || []).to.be.empty();
client.disconnect();
setTimeout(done, 2000);
done();
});
setTimeout(c.disconnect.bind(c), 2000);
c.disconnect();
});
});
});
Expand All @@ -146,4 +146,4 @@ describe('socket.io-pubsub', () => {
});
}

});
});

0 comments on commit 68e05f4

Please sign in to comment.