Skip to content
This repository was archived by the owner on Feb 18, 2021. It is now read-only.

WIP: test peer churn under sustained forwarding load #271

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 63 additions & 14 deletions service-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -696,17 +696,6 @@ function refreshServicePeer(serviceName, hostPort) {
self.ensurePeerConnected(serviceName, peer, 'service peer refresh', now);
};

ServiceDispatchHandler.prototype.deletePeerIndex =
function deletePeerIndex(serviceName, hostPort) {
var self = this;

if (self.partialAffinityEnabled) {
deleteIndexEntry(self.connectedServicePeers, serviceName, hostPort);
deleteIndexEntry(self.connectedPeerServices, hostPort, serviceName);
}
deleteIndexEntry(self.knownPeers, hostPort, serviceName);
};

ServiceDispatchHandler.prototype.ensurePeerConnected =
function ensurePeerConnected(serviceName, peer, reason, now) {
var self = this;
Expand Down Expand Up @@ -743,24 +732,53 @@ function connectSinglePeer(hostPort, connectInfo) {
var self = this;

if (self.peersToPrune[hostPort]) {

self.logger.info('SKIPPING PEER CONNECTION: PRUNED', self.extendLogInfo({
hostPort: hostPort,
connectInfo: connectInfo
}));

return;
}

var serviceName = connectInfo.serviceName;
var serviceChannel = self.getServiceChannel(serviceName);
if (!serviceChannel) {

self.logger.info('SKIPPING PEER CONNECTION: NO SERVICE CHANNEL', self.extendLogInfo({
hostPort: hostPort,
connectInfo: connectInfo
}));

return;
}

var peer = serviceChannel.peers.get(hostPort);
if (!peer) {

self.logger.info('SKIPPING PEER CONNECTION: NO PEER', self.extendLogInfo({
hostPort: hostPort,
connectInfo: connectInfo
}));

return;
}

if (peer.draining) {

self.logger.info('SKIPPING PEER CONNECTION: PEER DRAINING', self.extendLogInfo({
hostPort: hostPort,
connectInfo: connectInfo
}));

return;
}

self.logger.info('CONNECTING TO PEER', self.extendLogInfo({
hostPort: hostPort,
connectInfo: connectInfo
}));

peer.connectTo();
};

Expand Down Expand Up @@ -833,6 +851,12 @@ function addNewPartialPeer(serviceChannel, hostPort, now) {
partialRange.addWorker(hostPort, now);
}

self.logger.info('ADD NEW PARTIAL PEER KNOWS', {
serviceName: serviceName,
hostPort: hostPort,
now: now
});

// Unmark recently seen peers, so they don't get reaped
deleteIndexEntry(self.peersToReap, hostPort, serviceName);
// Mark known peers, so they are candidates for future reaping
Expand Down Expand Up @@ -875,6 +899,12 @@ function freshenPartialPeer(peer, serviceName, now) {
deleteIndexEntry(self.connectedPeerServices, hostPort, serviceName);
}

self.logger.info('FRESHEN PARTIAL PEER KNOWS', {
serviceName: serviceName,
hostPort: hostPort,
now: now
});

// Unmark recently seen peers, so they don't get reaped
deleteIndexEntry(self.peersToReap, peer.hostPort, serviceName);
// Mark known peers, so they are candidates for future reaping
Expand Down Expand Up @@ -937,13 +967,13 @@ function ensurePartialConnections(serviceChannel, serviceName, hostPort, reason,
if (!result.noop) {
self.logger.info(
'implementing affinity change',
self.extendLogInfo(partialRange.extendLogInfo({
result.extendLogInfo({
serviceName: serviceName,
reason: reason,
causingWorker: hostPort,
numToConnect: result.toConnect.length,
numToDisconnect: result.toDisconnect.length
}))
})
);
result.implement();
}
Expand Down Expand Up @@ -1373,6 +1403,11 @@ function reapSinglePeer(hostPort, serviceNames, now) {
var self = this;

if (self.knownPeers[hostPort]) {
self.logger.info('REAP ABORT, WHO KNOWS', {
serviceNames: serviceNames,
hostPort: hostPort,
now: now
});
return;
}

Expand Down Expand Up @@ -1406,7 +1441,13 @@ function reapSinglePeer(hostPort, serviceNames, now) {
if (serviceChannel) {
serviceChannel.peers.delete(hostPort);
}
self.deletePeerIndex(serviceName, hostPort);

if (self.partialAffinityEnabled) {
deleteIndexEntry(self.connectedServicePeers, serviceName, hostPort);
deleteIndexEntry(self.connectedPeerServices, hostPort, serviceName);
}
deleteIndexEntry(self.knownPeers, hostPort, serviceName);

var partialRange = self.partialRanges[serviceName];
if (partialRange) {
partialRange.removeWorker(hostPort, now);
Expand Down Expand Up @@ -1757,7 +1798,15 @@ function audit() {
worker = this.toDisconnect[i];
peer = this.serviceChannel.peers.get(worker);
if (!peer) {
// this.proxy.logger.warn(
// 'DEBUG workers before',
// {workers: this.partialRange.workers.join(','), worker: worker}
// );
this.removeWorker(worker, 'toDisconnect');
// this.proxy.logger.warn(
// 'DEBUG workers after',
// {workers: this.partialRange.workers.join(',')}
// );
++this.staleToDisconnect;
}
}
Expand Down
Loading