diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c49c009 --- /dev/null +++ b/LICENSE @@ -0,0 +1,18 @@ +Copyright 2010 Bob Potter. All rights reserved. +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to +deal in the Software without restriction, including without limitation the +rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +sell copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..5573695 --- /dev/null +++ b/README.md @@ -0,0 +1,69 @@ +node-gossip implements a gossip protocol w/failure detection, allowing you to create a fault-tolerant, self-managing cluster of node.js processes. Each server in the cluster has it's own set of key-value pairs which are propogated to the others peers in the cluster. The API allows you to make changes to the local state, listen for changes in state, listen for new peers and be notified when a peer appears to be dead or appears to have come back to life. + +The library is currently in 'hey it seems to work for me' state, there are probably some bugs lurking around. The API will probably change and suggestions on how to improve it are very welcome. + +Check out the the scripts in the simulations/ directory for some examples. + +### Usage + + var Gossiper = require('gossiper').Gossiper; + // Create a seed peer. + var seed = new Gossiper(9000, []); + seed.start(); + + // Create 20 new peers and point them at the seed (usually this would happen in 20 separate processes) + // To prevent having a single point of failure you would probably have multiple seeds + for(var i = 9001; i <= 9020;i++) { + var g = new Gossiper(i, ['127.0.0.1:9000']); + g.start(); + + g.on('update', function(peer, k, v) { + console.log("peer " + peer + " set " + k + " to " + v); // peer 127.0.0.1:9999 set somekey to somevalue + }); + } + + // Add another peer which updates it's state after 15 seconds + var updater = new Gossiper(9999, ['127.0.0.1:9000']); + updater.start(); + setTimeout(function() { + updater.setLocalState('somekey', 'somevalue'); + }, 15000); + + +### API + +Gossiper methods: + + allPeeers() + livePeers() + deadPeers() + peerValue(peer, key) + peerKeys(peer) + getLocalState(key) + setLocalSate(key, value) + +Gossiper events: + + on('update', function(peer_name, key, value) {}) + on('new_peer', function(peer_name) {}) + on('peer_alive', function(peer_name) {}) + on('peer_failed', function(peer_name) {}) + +### Tests + + expresso -I lib test/* + +### TODO + +* test edge cases +* Cluster name -- dont allow peers to accidentally join the wrong cluster +The scuttlebutt paper mentions a couple things we don't current do: + * congestion throttling + * make digests only be random subsets + +### Acknowledgements + +Both the gossip protocol and the failure detection algorithms are based off of academic papers and Cassandra's (http://www.cassandra.org/) implementation of those papers. This library is highly indebted to both. + +* ["Efficient reconciliation and flow control for anti-entropy protocols"](http://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf) +* ["The Phi accrual failure detector"](http://vsedach.googlepages.com/HDY04.pdf) diff --git a/lib/.gossiper.js.swp b/lib/.gossiper.js.swp new file mode 100644 index 0000000..6007cb6 Binary files /dev/null and b/lib/.gossiper.js.swp differ diff --git a/lib/.peer_state.js.swp b/lib/.peer_state.js.swp new file mode 100644 index 0000000..dd0b8fc Binary files /dev/null and b/lib/.peer_state.js.swp differ diff --git a/lib/accrual_failure_detector.js b/lib/accrual_failure_detector.js new file mode 100644 index 0000000..ce13f03 --- /dev/null +++ b/lib/accrual_failure_detector.js @@ -0,0 +1,34 @@ +var AccrualFailureDetector = exports.AccrualFailureDetector = function() { + this.last_time = undefined; + this.intervals = []; +} + +AccrualFailureDetector.prototype.add = function(arrival_time) { + if(this.last_time == undefined) { + var i = 750; + } else { + var i = arrival_time - this.last_time; + } + + this.last_time = arrival_time; + this.intervals.push(i); + if(this.intervals.length > 1000) { + this.intervals.shift(); + } +}; + +AccrualFailureDetector.prototype.phi = function(current_time) { + var current_interval = current_time - this.last_time; + var exp = -1 * current_interval / this.interval_mean(); + + var p = Math.pow(Math.E, exp); + return -1 * (Math.log(p) / Math.log(10)); +}; + +AccrualFailureDetector.prototype.interval_mean = function(current_time) { + sum = 0; + for(i in this.intervals) { + sum += this.intervals[i]; + } + return sum / this.intervals.length; +}; diff --git a/lib/gossiper.js b/lib/gossiper.js new file mode 100644 index 0000000..bca7b39 --- /dev/null +++ b/lib/gossiper.js @@ -0,0 +1,228 @@ +var PeerState = require('peer_state').PeerState, + Scuttle = require('scuttle').Scuttle, + EventEmitter = require('events').EventEmitter, + PeerState = require('peer_state').PeerState, + net = require('net'), + sys = require('sys'), + child_process = require('child_process'), + dns = require('dns'), + msgpack = require('msgpack'); + +var Gossiper = exports.Gossiper = function(port, seeds, ip_to_bind) { + EventEmitter.call(this); + this.peers = {}; + this.ip_to_bind = ip_to_bind; + this.port = port; + this.my_state = new PeerState(); + this.scuttle = new Scuttle(this.peers, this.my_state); + + for(var i in seeds) { + var s = seeds[i]; + this.peers[s] = new PeerState(s); + } +} + +sys.inherits(Gossiper, EventEmitter); + +Gossiper.prototype.start = function(msg) { + var self = this; + + // Create Server + this.server = net.createServer(function (net_stream) { + var mp_stream = new msgpack.Stream(net_stream); + mp_stream.on('msg', function(msg) { self.handleMessage(net_stream, mp_stream, msg) }); + }); + + // Bind to ip/port + if(this.ip_to_bind) { + this.peer_name = [this.address, this.port.toString()].join(':'); + this.peers[this.peer_name] = this.my_state; + this.server.listen(this.port, this.ip_to_bind); + } else { + child_process.exec('hostname', function(error, stdout, stderr) { + var l = stdout.length; + var hostname = stdout.slice(0, l - 1); + dns.lookup(hostname, 4, function(err,address, family) { + self.peer_name = [address, self.port.toString()].join(':'); + self.peers[self.peer_name] = self.my_state; + self.server.listen(self.port, address); + }); + }); + } + + this.heartBeatTimer = setInterval(function() { self.my_state.beatHeart() }, 1000 ); + this.gossipTimer = setInterval(function() { self.gossip() }, 1000); +} + +Gossiper.prototype.stop = function() { + this.server.close(); + clearInterval(this.heartBeatTimer); + clearInterval(this.gossipTimer); +} + + +// The method of choosing whice peer(s) to gossip to is borrowed from Cassandra. +// They seemed to have worked out all of the edge cases +// http://wiki.apache.org/cassandra/ArchitectureGossip +Gossiper.prototype.gossip = function() { + // Find a live peer to gossip to + if(this.livePeers() > 0) { + var live_peer = this.chooseRandom(this.livePeers()); + this.gossipToPeer(live_peer); + } + + // Possilby gossip to a dead peer + var prob = this.deadPeers().length / (this.livePeers().length + 1) + if(Math.random() < prob) { + var dead_peer = this.chooseRandom(this.deadPeers()); + this.gossipToPeer(dead_peer); + } + + // Gossip to seed under certain conditions + if(live_peer && !this.seeds[live_peer] && this.livePeers().length < this.seeds.length) { + if(Math.random() < (this.seeds / this.allPeers.size())) { + this.gossipToPeer(chooseRandom(this.peers)); + } + } + + // Check health of peers + for(var i in this.peers) { + var peer = this.peers[i]; + if(peer != this.my_state) { + peer.isSuspect(); + } + } +} + +Gossiper.prototype.chooseRandom = function(peers) { + // Choose random peer to gossip to + var i = Math.floor(Math.random()*1000000) % peers.length; + return peers[i]; +} + +Gossiper.prototype.gossipToPeer = function(peer) { + var a = peer.split(":"); + var gosipee = new net.createConnection(a[1], a[0]); + var self = this; + gosipee.on('connect', function(net_stream) { + var mp_stream = new msgpack.Stream(gosipee); + mp_stream.on('msg', function(msg) { self.handleMessage(gosipee, mp_stream, msg) }); + mp_stream.send(self.requestMessage()); + }); + gosipee.on('error', function(exception) { +// console.log(self.peer_name + " received " + sys.inspect(exception)); + }); +} + +Gossiper.REQUEST = 0; +Gossiper.FIRST_RESPONSE = 1; +Gossiper.SECOND_RESPONSE = 2; + +Gossiper.prototype.handleMessage = function(net_stream, mp_stream, msg) { + switch(msg.type) { + case Gossiper.REQUEST: + mp_stream.send(this.firstResponseMessage(msg.digest)); + break; + case Gossiper.FIRST_RESPONSE: + this.scuttle.updateKnownState(msg.updates); + mp_stream.send(this.secondResponseMessage(msg.request_digest)); + net_stream.end(); + break; + case Gossiper.SECOND_RESPONSE: + this.scuttle.updateKnownState(msg.updates); + net_stream.end(); + break; + default: + // shit went bad + break; + } +} + +// MESSSAGES + + +Gossiper.prototype.handleNewPeers = function(new_peers) { + var self = this; + for(var i in new_peers) { + var peer_name = new_peers[i]; + this.peers[peer_name] = new PeerState(peer_name); + this.emit('new_peer', peer_name); + + var peer = this.peers[peer_name]; + this.listenToPeer(peer); + } +} + +Gossiper.prototype.listenToPeer = function(peer) { + var self = this; + peer.on('update', function(k,v) { + self.emit('update', peer.name, k, v); + }); + peer.on('peer_alive', function() { + self.emit('peer_alive', peer.name); + }); + peer.on('peer_failed', function() { + self.emit('peer_failed', peer.name); + }); +} + +Gossiper.prototype.requestMessage = function() { + var m = { + 'type' : Gossiper.REQUEST, + 'digest' : this.scuttle.digest(), + }; + return m; +}; + +Gossiper.prototype.firstResponseMessage = function(peer_digest) { + var sc = this.scuttle.scuttle(peer_digest) + this.handleNewPeers(sc.new_peers); + var m = { + 'type' : Gossiper.FIRST_RESPONSE, + 'request_digest' : sc.requests, + 'updates' : sc.deltas + }; + return m; +}; + +Gossiper.prototype.secondResponseMessage = function(requests) { + var m = { + 'type' : Gossiper.SECOND_RESPONSE, + 'updates' : this.scuttle.fetchDeltas(requests) + }; + return m; +}; + +Gossiper.prototype.setLocalState = function(k, v) { + this.my_state.updateLocal(k,v); +} + +Gossiper.prototype.getLocalState = function(k) { + return this.my_state.getValue(k); +} + +Gossiper.prototype.peerKeys = function(peer) { + return this.peers[peer].getKeys(); +} + +Gossiper.prototype.peerValue = function(peer, k) { + return this.peers[peer].getValue(k); +} + +Gossiper.prototype.allPeers = function() { + var keys = []; + for(var k in this.peers) { keys.push(k) }; + return keys; +} + +Gossiper.prototype.livePeers = function() { + var keys = []; + for(var k in this.peers) { if(k.alive) { keys.push(k)} }; + return keys; +} + +Gossiper.prototype.deadPeers = function() { + var keys = []; + for(var k in this.peers) { if(!k.alive) { keys.push(k) } }; + return keys; +} diff --git a/lib/peer_state.js b/lib/peer_state.js new file mode 100644 index 0000000..19aa16f --- /dev/null +++ b/lib/peer_state.js @@ -0,0 +1,98 @@ +var AccrualFailureDetector = require('./accrual_failure_detector').AccrualFailureDetector, + EventEmitter = require('events').EventEmitter, + sys = require('sys'); + +var PeerState = exports.PeerState = function(name) { + EventEmitter.call(this); + this.max_version_seen = 0; + this.attrs = {}; + this.detector = new AccrualFailureDetector(); + this.alive = true; + this.heart_beat_version = 0; + this.PHI = 8; + this.name = name; +}; + +sys.inherits(PeerState, EventEmitter); + +PeerState.prototype.updateWithDelta = function(k,v,n) { + // It's possibly to get the same updates more than once if we're gossiping with multiple peers at once + // ignore them + if(n > this.max_version_seen) { + this.max_version_seen = n; + this.setKey(k,v,n); + if(k == '__heartbeat__') { + var d = new Date(); + this.detector.add(d.getTime()); + } + } +} + +/* This is used when the peerState is owned by this peer */ + +PeerState.prototype.updateLocal = function(k,v) { + this.max_version_seen += 1; + this.setKey(k,v,this.max_version_seen); +} + +PeerState.prototype.getValue = function(k) { + if(this.attrs[k] == undefined) { + return undefined; + } else { + return this.attrs[k][0]; + } +} + +PeerState.prototype.getKeys = function() { + var keys = []; + for(k in this.attrs) { keys.push(k) }; + return keys; +} + +PeerState.prototype.setKey = function(k,v,n) { + this.attrs[k] = [v,n]; + this.emit('update', k, v); +} + +PeerState.prototype.beatHeart = function() { + this.heart_beat_version += 1; + this.updateLocal('__heartbeat__', this.heart_beat_version); +} + +PeerState.prototype.deltasAfterVersion = function(lowest_version) { + deltas = [] + for(k in this.attrs) { + var value = this.attrs[k][0]; + var version = this.attrs[k][1]; + if(version > lowest_version) { + deltas.push([k,value,version]); + } + } + return deltas; +} + +PeerState.prototype.isSuspect = function() { + var d = new Date(); + var phi = this.detector.phi(d.getTime()); + if(phi > this.PHI) { + this.markDead(); + return true; + } else { + this.markAlive(); + return false; + } +} + +PeerState.prototype.markAlive = function() { + if(!this.alive) { + this.alive = true; + this.emit('peer_alive'); + } +} + +PeerState.prototype.markDead = function() { + if(this.alive) { + this.alive = false; + this.emit('peer_failed'); + } +} diff --git a/lib/scuttle.js b/lib/scuttle.js new file mode 100644 index 0000000..a8d17ad --- /dev/null +++ b/lib/scuttle.js @@ -0,0 +1,96 @@ +var PeerState = require('./peer_state').PeerState; +var Scuttle = exports.Scuttle = function(peers, local_peer) { + this.peers = peers; + this.local_peer = local_peer; +}; + +Scuttle.prototype.digest = function() { + var digest = {}; + for(i in this.peers) { + var p = this.peers[i]; + digest[i] = p.max_version_seen; + } + return digest; +} + +// HEART OF THE BEAST + +Scuttle.prototype.scuttle = function(digest) { + var deltas_with_peer = []; + var requests = {} + var new_peers = []; + for(var peer in digest) { + var local_version = this.maxVersionSeenForPeer(peer); + var local_peer = this.peers[peer]; + var digest_version = digest[peer]; + + if(!this.peers[peer]) { + // We don't know about this peer. Request all information. + requests[peer] = 0; + new_peers.push(peer); + } else if(local_version > digest[peer]) { + // We have more recent information for this peer. Build up deltas. + deltas_with_peer.push( { peer : peer, deltas : local_peer.deltasAfterVersion(digest[peer]) }); + } else if(local_version < digest[peer]) { + // They have more recent information, request it. + requests[peer] = local_version; + } else { + // Everything is the same. + } + } + + // Sort by peers with most deltas + deltas_with_peer.sort( function(a,b) { return b.deltas.length - a.deltas.length } ); + + var deltas = []; + for(i in deltas_with_peer) { + var peer = deltas_with_peer[i]; + var peer_deltas = peer.deltas; + + // Sort deltas by version number + peer_deltas.sort(function(a,b) { return a[2] - b[2]; }) + if(peer_deltas.length > 1) { + // console.log(peer_deltas); + } + for(j in peer_deltas) { + var delta = peer_deltas[j]; + delta.unshift(peer.peer); + deltas.push(delta); + } + } + + return { 'deltas' : deltas, + 'requests' : requests, + 'new_peers' : new_peers }; +} + +Scuttle.prototype.maxVersionSeenForPeer = function(peer) { + if(this.peers[peer]) { + return this.peers[peer].max_version_seen; + } else { + return 0; + } +} + +Scuttle.prototype.updateKnownState = function(deltas) { + for(i in deltas) { + var d = deltas[i]; + + var peer_name = d.shift(); + var peer_state = this.peers[peer_name]; + peer_state.updateWithDelta(d[0],d[1],d[2]); + } +}; + +Scuttle.prototype.fetchDeltas = function(requests) { + var deltas = [] + for(i in requests) { + var peer_deltas = this.peers[i].deltasAfterVersion(requests[i]); + peer_deltas.sort(function(a,b) { return a[2] - b[2]; }); + for(j in peer_deltas) { + peer_deltas[j].unshift(i); + deltas.push(peer_deltas[j]); + } + } + return deltas; +} diff --git a/simulation/example.js b/simulation/example.js new file mode 100644 index 0000000..37ddfee --- /dev/null +++ b/simulation/example.js @@ -0,0 +1,24 @@ +var Gossiper = require('gossiper').Gossiper; +// Create a seed peer. +var seed = new Gossiper(9000, []); +seed.start(); + +// Create 20 new peers and point them at the seed (usually this would happen in 20 separate processes) +// To prevent having a single point of failure you would probably have multiple seeds +for(var i = 9001; i <= 9020;i++) { + var g = new Gossiper(i, ['127.0.0.1:9000']); + g.start(); + + g.on('update', function(peer, k, v) { + if(k == 'somekey') { + console.log("peer " + peer + " set " + k + " to " + v); // peer 127.0.0.1:9999 set somekey to somevalue + } + }); +} + +// Add another peer which updates it's state after 15 seconds +var updater = new Gossiper(9999, ['127.0.0.1:9000']); +updater.start(); +setTimeout(function() { + updater.setLocalState('somekey', 'somevalue'); +}, 15000); diff --git a/simulation/s1.js b/simulation/s1.js new file mode 100644 index 0000000..2e1fc06 --- /dev/null +++ b/simulation/s1.js @@ -0,0 +1,35 @@ +var Gossiper = require('gossiper').Gossiper; + +var seed = new Gossiper(9000, []); +seed.start(); + +var n = 0; +var gs = []; +var start_time = undefined; +var count = 100; +for(var i = 9001; i < 9001+count;i++) { + var g = gs[i] = new Gossiper(i, ['127.0.0.1:9000']); + g.start(); + g.on('update', function(peer,k,v) { + if(k == "hi") { + console.log("hi received by " + this.peer_name + " at " + (new Date().getTime())); + n++; + if(n == count) { + console.log("fully propogated"); + console.log("took " + (new Date().getTime() - start_time)); + process.exit(); + } + } + }); +} + +var g = new Gossiper(9999, ['127.0.0.1:9000']); +g.start(); + +setTimeout(function() { + console.log(seed.allPeers()); + // Set value for 'hi' + g.setLocalState('hi', 'hello'); + start_time = new Date().getTime(); + console.log('hi sent ' + (new Date().getTime())); +}, 10000); diff --git a/simulation/s2.js b/simulation/s2.js new file mode 100644 index 0000000..bdf856e --- /dev/null +++ b/simulation/s2.js @@ -0,0 +1,27 @@ +var Gossiper = require('gossiper').Gossiper; + +var seed = new Gossiper(9000, []); +seed.start(); + +var n = 0; +var gs = []; +var start_time = undefined; +var count = 100; +var setup_peer = function(this_peer) { + this_peer.start(); + this_peer.on('peer_failed', function(peer) { + console.log(this_peer.peer_name + " thinks " + peer + " is dead"); + }); + this_peer.on('peer_alive', function(peer) { + console.log(this_peer.peer_name + " thinks " + peer + " is alive"); + }); +} +for(var i = 9001; i < 9001+count;i++) { + var g = gs[i] = new Gossiper(i, ['127.0.0.1:9000']); + setup_peer(g); +} +// kill one of the nodes +setTimeout(function() { + gs[9020].stop(); + setTimeout(function() { gs[9020].start() }, 30000); +}, 5000); diff --git a/simulation/s3.js b/simulation/s3.js new file mode 100644 index 0000000..9a7e2a8 --- /dev/null +++ b/simulation/s3.js @@ -0,0 +1,33 @@ +var Gossiper = require('gossiper').Gossiper; +var sys = require('sys'); + +var seed1 = new Gossiper(9000, []); +seed1.start(); + +var seed2 = new Gossiper(9001, []); +seed2.start(); + +var n = 0; +var gs = []; +var count = 100; +var peers_done = 0; +var setup_peer = function(this_peer) { + var n = 0; + this_peer.on('new_peer', function() { + n++; + if(n == 100) { + console.log('peer done'); + peers_done++; + if(peers_done == 100) { + console.log("all peers know about each other"); + process.exit(); + } + } + }); +} + +for(var i = 9101; i <= 9101+count;i++) { + var g = gs[i] = new Gossiper(i, ['127.0.0.1:9000', '127.0.0.1:9001']); + setup_peer(g); + g.start(); +} diff --git a/test/accrual_failure_detector.test.js b/test/accrual_failure_detector.test.js new file mode 100644 index 0000000..75609c9 --- /dev/null +++ b/test/accrual_failure_detector.test.js @@ -0,0 +1,33 @@ +var AccrualFailureDetector = require('accrual_failure_detector').AccrualFailureDetector; + +module.exports = { + 'should have a low phi value after only a second' : function(assert) { + var afd = new AccrualFailureDetector(); + var time = 0; + for(var i = 0;i < 100;i++) { + time += 1000; + afd.add(time); + } + assert.ok(afd.phi(time + 1000) < 0.5); + }, + + 'should have a high phi value after ten seconds' : function(assert) { + var afd = new AccrualFailureDetector(); + var time = 0; + for(var i = 0;i < 100;i++) { + time += 1000; + afd.add(time); + } + assert.ok(afd.phi(time + 10000) > 4); + }, + + 'should only keep last 1000 values' : function(assert) { + var afd = new AccrualFailureDetector(); + var time = 0; + for(var i = 0;i < 2000;i++) { + time += 1000; + afd.add(time); + } + assert.equal(1000, afd.intervals.length); + } +} diff --git a/test/gossiper.test.js b/test/gossiper.test.js new file mode 100644 index 0000000..53d320a --- /dev/null +++ b/test/gossiper.test.js @@ -0,0 +1,58 @@ +var Gossiper = require('gossiper').Gossiper, + PeerState = require('peer_state').PeerState; + +module.exports = { + 'should be able to set and retrieve local state' : function(assert) { + var g = new Gossiper(); + g.setLocalState('hi', 'hello'); + assert.equal('hello', g.getLocalState('hi')); + }, + 'should be able to get a list of keys for a peer' : function(assert) { + var g = new Gossiper(); + g.peers['p1'] = new PeerState(); + g.peers['p1'].attrs['keyz'] = []; + g.peers['p1'].attrs['keyzy'] = []; + assert.deepEqual(['keyz','keyzy'], g.peerKeys('p1')); + }, + 'should be able to get the value of a key for a peer' : function(assert) { + var g = new Gossiper(); + g.peers['p1'] = new PeerState(); + g.peers['p1'].attrs['keyz'] = ['hi', 1]; + assert.equal('hi', g.peerValue('p1','keyz')); + }, + 'should be able to get a list of peers' : function(assert) { + var g = new Gossiper(); + g.peers['p1'] = new PeerState(); + g.peers['p2'] = new PeerState(); + assert.deepEqual(['p1','p2'], g.allPeers()); + }, + 'should emit new_peer event when we learn about a new peer' : function(assert, beforeExit) { + var g = new Gossiper(); + // mock scuttle + g.scuttle = { 'scuttle' : function(v) { + return { 'new_peers' : ['127.0.0.1:8010'] }; + }} ; + + var emitted = false; + g.on('new_peer', function() { + emitted = true; + }); + g.firstResponseMessage({}); + beforeExit(function() { + assert.ok(emitted); + }); + }, + 'should emit update event when we learn more about a peer' : function(assert, beforeExit) { + var g = new Gossiper(); + g.peers['127.0.0.1:8010'] = new PeerState(); + g.handleNewPeers(['127.0.0.1:8010']); + var update = null; + g.on('update', function(peer,k,v) { + update = [peer,k,v]; + }); + g.peers['127.0.0.1:8010'].updateLocal('howdy', 'yall'); + beforeExit(function() { + assert.deepEqual(['127.0.0.1:8010', 'howdy', 'yall'], update); + }); + } +} diff --git a/test/peer_state.test.js b/test/peer_state.test.js new file mode 100644 index 0000000..74c2631 --- /dev/null +++ b/test/peer_state.test.js @@ -0,0 +1,51 @@ +var PeerState = require('peer_state').PeerState; +module.exports = { + // UpdateWithDelta + "updateWithDelta should set key to value" : function(assert) { + var ps = new PeerState(); + ps.updateWithDelta('a', 'hello', 12); + assert.equal('hello', ps.getValue('a')); + }, + + "updateWithDelta should update the max version" : function(assert) { + var ps = new PeerState(); + ps.updateWithDelta('a', 'hello', 12); + ps.updateWithDelta('a', 'hello', 14); + assert.equal(14, ps.max_version_seen); + }, + + "updates should trigger 'update' event" : function(assert, beforeExit) { + var ps = new PeerState(); + var n = 0; + ps.on('update', function(k,v) { + ++n; + assert.equal('a', k); + assert.equal('hello', v); + }); + ps.updateWithDelta('a', 'hello', 12); + beforeExit(function() { assert.equal(1, n) }); + }, + + // updateLocal + "updateLocal should set key to value" : function(assert) { + var ps = new PeerState(); + ps.updateLocal('a', 'hello', 12); + assert.equal('hello', ps.getValue('a')); + }, + + "updateLocal should increment the max version" : function(assert) { + var ps = new PeerState(); + ps.updateLocal('a', 'hello'); + ps.updateLocal('a', 'hello'); + assert.equal(2, ps.max_version_seen); + }, + + // deltasAfterVersion + "deltasAfterVersion should return all deltas after a version number" : function(assert) { + var ps = new PeerState(); + ps.updateLocal('a', 1); + ps.updateLocal('b', 'blah'); + ps.updateLocal('a', 'super'); + assert.deepEqual([['a','super','3']], ps.deltasAfterVersion(2)); + } +} diff --git a/test/scuttle.test.js b/test/scuttle.test.js new file mode 100644 index 0000000..f8fba58 --- /dev/null +++ b/test/scuttle.test.js @@ -0,0 +1,51 @@ +var Scuttle = require('scuttle').Scuttle; +var PeerState = require('peer_state').PeerState; + +module.exports = { + // digest + 'digest should have max versions we have seen' : function(assert) { + var p1 = new PeerState(); + p1.max_version_seen = 10; + var p2 = new PeerState(); + p2.max_version_seen = 12; + var p3 = new PeerState(); + p3.max_version_seen = 22; + + var peers = { + 'a' : p1, + 'b' : p2, + 'c' : p3 + } + + var scuttle = new Scuttle(peers); + assert.deepEqual( { 'a' : 10, 'b' : 12, 'c' : 22 }, + scuttle.digest()); + }, + + // scuttle + // scuttle new peer + 'new peers should be in result' : function(assert) { + var scuttle = new Scuttle({}); + var res = scuttle.scuttle( { 'new_peer' : 12 } ) + assert.deepEqual(['new_peer'], res.new_peers); + }, + 'request all information about a new peer' : function(assert) { + var scuttle = new Scuttle({}); + var res = scuttle.scuttle( { 'new_peer' : 12 } ) + assert.deepEqual({ 'new_peer' : 0}, res.requests); + }, + // scuttle deltas + 'send peer all deltas for peers we know more about' : function(assert) { + var p1 = new PeerState(); + p1.updateLocal('hi', 'hello'); + p1.updateLocal('meh', 'goodbye'); + var scuttle = new Scuttle({'me' : p1}); + var res = scuttle.scuttle( {'me' : 0, 'new_peer' : 12 } ) + assert.deepEqual([['me', 'hi', 'hello', 1], + ['me', 'meh', 'goodbye', 2]], + res.deltas); + } + + // deltas should be sorted by version number + // deltas should be ordered by the peer with the most +} diff --git a/test/serial/.heartbeat.test.js.swp b/test/serial/.heartbeat.test.js.swp new file mode 100644 index 0000000..6dee9a7 Binary files /dev/null and b/test/serial/.heartbeat.test.js.swp differ diff --git a/test/serial/.simple.test.js.swp b/test/serial/.simple.test.js.swp new file mode 100644 index 0000000..45b20d7 Binary files /dev/null and b/test/serial/.simple.test.js.swp differ diff --git a/test/serial/heartbeat.test.js b/test/serial/heartbeat.test.js new file mode 100644 index 0000000..e1df1bc --- /dev/null +++ b/test/serial/heartbeat.test.js @@ -0,0 +1,47 @@ +var Gossiper = require('gossiper').Gossiper; + +module.exports = { + 'heartbeat' : function(assert, beforeExit) { + var seed = new Gossiper(7000, []); + seed.start(); + + var g1 = new Gossiper(7001, ['127.0.0.1:7000']); + g1.start(); + + var g2 = new Gossiper(7002, ['127.0.0.1:7000']); + g2.start(); + + var dead_emitted = false; + g2.on('peer_failed', function(peer) { + dead_emitted = true; + assert.equal('127.0.0.1:7001', peer); + }); + + var alive_emitted = false; + g2.on('peer_alive', function(peer) { + alive_emitted = true; + assert.equal('127.0.0.1:7001', peer); + }); + + setTimeout(function() { + console.log("stopping g1"); + g1.stop(); + }, 10000); + + setTimeout(function() { + console.log("starting g1"); + g1.start(); + }, 45000); + + setTimeout(function() { + g1.stop(); + seed.stop(); + g2.stop(); + }, 55000); + + beforeExit(function() { + assert.ok(dead_emitted); + assert.ok(alive_emitted); + }); + }, +} diff --git a/test/serial/simple.test.js b/test/serial/simple.test.js new file mode 100644 index 0000000..e10158e --- /dev/null +++ b/test/serial/simple.test.js @@ -0,0 +1,30 @@ +var Gossiper = require('gossiper').Gossiper; + +module.exports = { + 'basic test' : function(assert, beforeExit) { + var seed = new Gossiper(7000, []); + seed.start(); + + var g1 = new Gossiper(7001, ['127.0.0.1:7000']); + g1.start(); + g1.setLocalState('holla','at'); + + var g2 = new Gossiper(7002, ['127.0.0.1:7000']); + g2.start(); + g2.setLocalState('your','node'); + + setTimeout(function() { + seed.stop(); + g1.stop(); + g2.stop(); + }, 10000); + + beforeExit(function() { + assert.equal('node', g1.peerValue('127.0.0.1:7002', 'your')); + assert.equal('node', g2.peerValue('127.0.0.1:7002', 'your')); + assert.equal('node', seed.peerValue('127.0.0.1:7002', 'your')); + assert.equal('at', g2.peerValue('127.0.0.1:7001', 'holla')); + }); + } + +}