Skip to content
This repository was archived by the owner on Feb 18, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions partial_range.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ function PartialRange(relayHostPort, minPeersPerWorker, minPeersPerRelay) {
this.length = NaN;
this.start = NaN;
this.stop = NaN;
this.connectedWorkers = Object.create(null);
this.pendingCompute = null;
}

Expand Down
84 changes: 39 additions & 45 deletions service-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ function ServiceDispatchHandler(options) {
* exitServices :: Map<serviceName, lastRefresh>
* peersToReap :: Map<hostPort, lastRefresh>
* knownPeers :: Map<hostPort, lastRefresh>
* connectedServicePeers :: Map<serviceName, Map<hostPort, lastRefresh>>
* connectedPeerServices :: Map<hostPort, Map<serviceName, lastRefresh>>
*
* PartialRange :: {
Expand All @@ -124,8 +123,8 @@ function ServiceDispatchHandler(options) {
* affineWorkers :: ?Array<hostPort> // the computed subset of workers
* }
*
* connectedServicePeers and connectedPeerServices are updated by
* connection events, maybe subject to partial affinity.
* connectedPeerServices is updated by connection events, maybe subject to
* partial affinity.
*
* On every advertise knownPeers is updated.
*
Expand All @@ -135,7 +134,6 @@ function ServiceDispatchHandler(options) {
self.relaysFor = Object.create(null);
self.partialRanges = Object.create(null);
self.exitServices = Object.create(null);
self.connectedServicePeers = Object.create(null);
self.connectedPeerServices = Object.create(null);
self.peersToReap = Object.create(null);
self.knownPeers = Object.create(null);
Expand Down Expand Up @@ -697,7 +695,8 @@ function deletePeerIndex(serviceName, hostPort) {
var self = this;

if (self.partialAffinityEnabled) {
deleteIndexEntry(self.connectedServicePeers, serviceName, hostPort);
var partialRange = self.partialRanges[serviceName];
delete partialRange.connectedWorkers[hostPort];
deleteIndexEntry(self.connectedPeerServices, hostPort, serviceName);
}
deleteIndexEntry(self.knownPeers, hostPort, serviceName);
Expand All @@ -708,7 +707,8 @@ function ensurePeerConnected(serviceName, peer, reason, now) {
var self = this;

if (self.partialAffinityEnabled) {
addIndexEntry(self.connectedServicePeers, serviceName, peer.hostPort, now);
var partialRange = self.partialRanges[serviceName];
partialRange.connectedWorkers[peer.hostPort] = now;
addIndexEntry(self.connectedPeerServices, peer.hostPort, serviceName, now);
}
delete self.peersToPrune[peer.hostPort];
Expand Down Expand Up @@ -843,11 +843,11 @@ function addNewPartialPeer(serviceChannel, hostPort, now) {
// secondary indices since neither ensurePeerConnected nor
// ensurePeerDisconnected were called for the advertising peer
if (result.isAffine[hostPort]) {
addIndexEntry(self.connectedServicePeers, serviceName, hostPort, now);
partialRange.connectedWorkers[hostPort] = now;
addIndexEntry(self.connectedPeerServices, hostPort, serviceName, now);
delete self.peersToPrune[hostPort];
} else {
deleteIndexEntry(self.connectedServicePeers, serviceName, hostPort);
delete partialRange.connectedWorkers[hostPort];
deleteIndexEntry(self.connectedPeerServices, hostPort, serviceName);
}
}
Expand All @@ -858,16 +858,16 @@ function freshenPartialPeer(peer, serviceName, now) {
var self = this;

var hostPort = peer.hostPort;
var connectedPeers = self.connectedServicePeers[serviceName];
var connected = connectedPeers && connectedPeers[hostPort];
var partialRange = self.getPartialRange(serviceName, 'refresh partial peer audit', now);
var connected = partialRange.connectedWorkers[hostPort];

// Update secondary indices
if (connected) {
addIndexEntry(self.connectedServicePeers, serviceName, peer.hostPort, now);
partialRange.connectedWorkers[hostPort] = now;
addIndexEntry(self.connectedPeerServices, hostPort, serviceName, now);
delete self.peersToPrune[hostPort];
} else {
deleteIndexEntry(self.connectedServicePeers, serviceName, peer.hostPort);
delete partialRange.connectedWorkers[hostPort];
deleteIndexEntry(self.connectedPeerServices, hostPort, serviceName);
}

Expand All @@ -878,26 +878,23 @@ function freshenPartialPeer(peer, serviceName, now) {

// TODO: this audit shouldn't be necessary once we understand and fix
// why it was needed in the first place
var partialRange = self.getPartialRange(serviceName, 'refresh partial peer audit', now);
if (partialRange) {
var shouldConnect = partialRange.affineWorkers.indexOf(hostPort) >= 0;
var isConnected = !!connected;
if (isConnected !== shouldConnect) {
self.logger.warn(
'partial affinity audit fail',
self.extendLogInfo(partialRange.extendLogInfo({
path: 'freshenPartialPeer',
serviceName: serviceName,
serviceHostPort: hostPort,
isConnected: isConnected,
shouldConnect: shouldConnect
}))
);
if (shouldConnect) {
connected = now;
} else {
connected = null;
}
var isConnected = !!connected;
var shouldConnect = partialRange.affineWorkers.indexOf(hostPort) >= 0;
if (isConnected !== shouldConnect) {
self.logger.warn(
'partial affinity audit fail',
self.extendLogInfo(partialRange.extendLogInfo({
path: 'freshenPartialPeer',
serviceName: serviceName,
serviceHostPort: hostPort,
isConnected: isConnected,
shouldConnect: shouldConnect
}))
);
if (shouldConnect) {
connected = now;
} else {
connected = null;
}
}

Expand Down Expand Up @@ -951,7 +948,8 @@ function ensurePeerDisconnected(serviceName, peer, reason, now) {
var self = this;

if (self.partialAffinityEnabled) {
deleteIndexEntry(self.connectedServicePeers, serviceName, peer.hostPort);
var partialRange = self.partialRanges[serviceName];
delete partialRange.connectedWorkers[peer.hostPort];
deleteIndexEntry(self.connectedPeerServices, peer.hostPort, serviceName);
}

Expand Down Expand Up @@ -994,7 +992,7 @@ function removeServicePeer(serviceName, hostPort) {
// if ensurePartialConnections did no work, we need to celar the
// secondary indices since neither ensurePeerDisconnected was called
// for the unadvertising peer
deleteIndexEntry(self.connectedServicePeers, serviceName, hostPort);
delete partialRange.connectedWorkers[hostPort];
deleteIndexEntry(self.connectedPeerServices, hostPort, serviceName);
}
}
Expand Down Expand Up @@ -1596,7 +1594,6 @@ function setPartialAffinityEnabled(enabled) {
var self = this;
self.partialAffinityEnabled = !!enabled;
self.partialRanges = Object.create(null);
self.connectedServicePeers = Object.create(null);
self.connectedPeerServices = Object.create(null);
};

Expand Down Expand Up @@ -1651,8 +1648,7 @@ function AffinityChange(proxy, serviceChannel, partialRange, now) {
this.staleToConnect = 0;
this.staleToDisconnect = 0;

this.connectedPeers = null;
this.connectedPeerKeys = [];
this.connectedWorkerKeys = [];

this.compute();

Expand Down Expand Up @@ -1691,9 +1687,7 @@ function compute() {
var worker;
var peer;

this.connectedPeers = this.proxy.connectedServicePeers[this.serviceChannel.serviceName] || null;
this.connectedPeerKeys = this.connectedPeers ? Object.keys(this.connectedPeers) : [];

this.connectedWorkerKeys = Object.keys(this.partialRange.connectedWorkers);
this.toConnect = [];
this.toDisconnect = [];
this.isAffine = {};
Expand All @@ -1702,13 +1696,13 @@ function compute() {
worker = this.partialRange.affineWorkers[i];
this.isAffine[worker] = true;
peer = this.serviceChannel.peers.get(worker);
if (!(this.connectedPeers && this.connectedPeers[worker]) || !(peer && peer.isConnected('out'))) {
if (!(this.partialRange.connectedWorkers && this.partialRange.connectedWorkers[worker]) || !(peer && peer.isConnected('out'))) {
this.toConnect.push(worker);
}
}

for (i = 0; i < this.connectedPeerKeys.length; i++) {
worker = this.connectedPeerKeys[i];
for (i = 0; i < this.connectedWorkerKeys.length; i++) {
worker = this.connectedWorkerKeys[i];
if (!this.isAffine[worker] && !this.proxy.peersToPrune[worker]) {
this.toDisconnect.push(worker);
}
Expand Down Expand Up @@ -1754,8 +1748,8 @@ function audit() {
})
);
this.partialRange.removeWorker(worker, this.now);
if (this.connectedPeers) {
delete this.connectedPeers[worker];
if (this.partialRange.connectedWorkers) {
delete this.partialRange.connectedWorkers[worker];
}
++this.staleToDisconnect;
}
Expand Down