Skip to content
This repository was archived by the owner on Feb 18, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions test/lib/turnip.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

'use strict';

var collectParallel = require('collect-parallel/array');
var net = require('net');

// A Turnip is a dirt-stupid TCP server that notes and closes all incoming
// connections.

module.exports.Turnips = Turnips;
module.exports.Turnip = Turnip;

function Turnips() {
this.turnips = [];
}

Turnips.forAll =
function forAll(things, getPortHost, allReady) {
var turnips = new Turnips();
collectParallel(things, function createEachTurnip(thing, i, ready) {
var portHost = getPortHost;
turnips.turnips[i] = new Turnip(portHost[0], portHost[1], ready);
}, allReady);
return turnips;
};

Turnips.prototype.destroy =
function destroy(cb) {
collectParallel(this.turnips, function destroyEach(turnip, i, done) {
turnip.destroy(done);
}, cb);
};

Turnips.prototype.takeConnLogs =
function takeConnLogs() {
var logs = [];
for (var i = 0; i < this.turnips.length; ++i) {
logs[i] = this.turnips[i].takeConnLog();
}
return logs;
};

function Turnip(port, host, onListening) {
this.server = net.createServer(boundOnConnection);
this.connLog = [];
this.server.listen(port, host, onListening);
this.port = port;
this.host = host;
this.hostPort = host + ':' + port;

var self = this;

function boundOnConnection(socket) {
self.onConnection(socket);
}
}

Turnip.prototype.destroy =
function destroy(cb) {
this.server.close(cb);
};

Turnip.prototype.takeConnLog =
function takeConnLog() {
var log = this.connLog;
this.connLog = [];
return log;
};

Turnip.prototype.onConnection =
function onConnection(socket) {
this.connLog.push({
remoteAddress: socket.remoteAddress,
remotePort: socket.remotePort
});
socket.end();
};

if (require.main === module) {
var process = require('process');
var util = require('util');
main(process.argv.slice(2), process.stdout);
}

function main(args, logs) {
/* eslint-disable no-console */
var port = parseInt(args[0], 10) || 0;
var host = args[1];
var turnip = new Turnip(port, host, turnipListening);
process.on('SIGINT', onSigInt);

function turnipListening() {
var addr = turnip.server.address();
log('turnip on %j', addr);
}

function onSigInt() {
log('destroying turnip');
turnip.destroy(finish);
}

function finish() {
log('turnip conn log:', turnip.takeConnLog());
}

function log() {
var time = (new Date()).toISOString();
var mess = util.format.apply(null, arguments);
mess = util.format('%s %s\n', time, mess);
logs.write(mess);
}
}
113 changes: 37 additions & 76 deletions test/peer-churn.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,24 @@

var collectParallel = require('collect-parallel/array');
var CountedReadySignal = require('ready-signal/counted');
var net = require('net');
var timers = require('timers');
var util = require('util');

var Turnips = require('./lib/turnip').Turnips;
var allocCluster = require('./lib/test-cluster.js');
var CollapsedAssert = require('./lib/collapsed-assert.js');

/* eslint-disable no-multi-spaces */
var PERIOD = 100;
var REQUEST_TIMEOUT = 200;
var REQUEST_FACTOR = 1;
var COOL_OFF_PERIODS = 10;
var CLUSTER_SIZE = 10;
var CHURN_FACTOR = 0.5;
var K_VALUE = 5;
var SERVICE_SIZE = 10;
var PERIOD = 100;
var REQUEST_TIMEOUT = 2 * PERIOD;
var REQUEST_FACTOR = 1;
var SETTLE_PERIODS = 10;
var CLUSTER_SIZE = 10;
var CHURN_FACTOR = 0.5;
var K_VALUE = 5;
var SERVICE_SIZE = 10;
var ENDPOINT_DELAY = 0.5 * PERIOD;
var ENDPOINT_DELAY_FUZZ = 0.50;

function fuzzedPeriods(N) {
return 1.05 * N * PERIOD;
Expand Down Expand Up @@ -166,7 +168,7 @@ allocCluster.test('peer churn', {
].indexOf(record.msg) >= 0, 'expected reaping logs');
});

timers.setTimeout(thenTurnip, fuzzedPeriods(COOL_OFF_PERIODS));
timers.setTimeout(thenTurnip, fuzzedPeriods(SETTLE_PERIODS));
}

function thenTurnip() {
Expand All @@ -186,28 +188,37 @@ allocCluster.test('peer churn', {
].indexOf(record.msg) >= 0, 'expected peer churn logs');
});

turnips = createTurnips(first, thenWaitAndSee);
turnips = Turnips.forAll(first, function getRemotePortHost(remote) {
var parts = remote.channel.hostPort.split(':');
var host = parts[0];
var port = parseInt(parts[1], 10);
return [port, host];
}, thenWaitAndSee);
}

function thenWaitAndSee() {
assert.comment('- thenWaitAndSee');

checkNoLogs('turnip', cluster, assert);

timers.setTimeout(function tendTurnips() {
checkNoLogs('turnip tending', cluster, assert);
checkTurnips();
sendAfterChurn();
}, fuzzedPeriods(COOL_OFF_PERIODS));
timers.setTimeout(thenTendAndSend, fuzzedPeriods(SETTLE_PERIODS));
}

function thenTendAndSend() {
assert.comment('- thenTendAndSend');
checkNoLogs('turnip tending', cluster, assert);
checkTurnips();
sendAfterChurn();
}

function checkTurnips() {
var logs = turnips.takeConnLogs();
for (var i = 0; i < logs.length; ++i) {
var log = logs[i];
var turnip = turnips.turnips[i];
assert.equal(log.length, 0, util.format(
'expected no connections to turnip[%s] (%s)',
i, turnips[i].remote.channel.hostPort
i, turnip.hostPort
));
}
}
Expand All @@ -234,7 +245,7 @@ allocCluster.test('peer churn', {

checkTurnips();

destroyAll(second.concat(turnips).concat([client]), finish);
destroyAll(second.concat([turnips, client]), finish);
}

function finish() {
Expand Down Expand Up @@ -276,8 +287,13 @@ function createRemotes(cluster, N, opts, cb) {
}

function who(req, res) {
res.headers.as = 'raw';
res.sendOk('', req.channel.hostPort);
var delay = ENDPOINT_DELAY + 1 + (0.5 - Math.random()) * ENDPOINT_DELAY_FUZZ;
timers.setTimeout(thenRespond, delay);

function thenRespond() {
res.headers.as = 'raw';
res.sendOk(delay.toString(), req.channel.hostPort);
}
}

function checkExitsTo(cluster, serviceName, cohort, desc, assert) {
Expand All @@ -299,7 +315,7 @@ function checkRequestsTo(serviceName, cohort, desc, chan, assert, cb) {
serviceName: serviceName,
timeout: REQUEST_TIMEOUT
}, 'who', '', '', function sent(err, res, arg2, arg3) {
cassert.ifError(err, 'no unexpected error');
cassert.ifError(err, 'no unexpected check request error');
var serverHostPort = String(arg3);
cassert.ok(cohort.some(function isit(remote) {
return remote.hostPort === serverHostPort;
Expand Down Expand Up @@ -370,58 +386,3 @@ function checkNoLogs(desc, cluster, assert) {
}
}
}

// creates dirst-stupid fixture tcp servers in place of each destroyed remote
function createTurnips(remotes, cb) {
var turnips = [];
turnips.takeConnLogs = takeConnLogs;
collectParallel(remotes, createEachTurnip, cb);
return turnips;

function takeConnLogs() {
var logs = [];
for (var i = 0; i < turnips.length; ++i) {
logs[i] = turnips[i].takeConnLog();
}
return logs;
}

function createEachTurnip(remote, i, done) {
turnips[i] = createTurnip(remote, done);
}
}

function createTurnip(remote, cb) {
var turnip = {
remote: remote,
server: net.createServer(onConnection),
connLog: [],
destroy: destroy,
takeConnLog: takeConnLog
};

var parts = remote.channel.hostPort.split(':');
var host = parts[0];
var port = parseInt(parts[1], 10);
turnip.server.listen(port, host, cb);

return turnip;

function takeConnLog() {
var log = turnip.connLog;
turnip.connLog = [];
return log;
}

function destroy(destroyCb) {
turnip.server.close(destroyCb);
}

function onConnection(socket) {
turnip.connLog.push({
remoteAddress: socket.remoteAddress,
remotePort: socket.remotePort
});
socket.end();
}
}