Skip to content

Commit

Permalink
Merge pull request #45 from nodejitsu/client-reconnect
Browse files Browse the repository at this point in the history
Client reconnect
  • Loading branch information
jcrugzz committed Mar 29, 2013
2 parents fac825c + 4a87801 commit dc34297
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 43 deletions.
81 changes: 59 additions & 22 deletions lib/godot/net/client.js
Expand Up @@ -6,7 +6,10 @@
*/

var dgram = require('dgram'),
net = require('net');
net = require('net'),
util = require('util'),
backoff = require('backoff'),
EventEmitter = require('events').EventEmitter;

//
// ### function Server (options)
Expand All @@ -19,6 +22,8 @@ var dgram = require('dgram'),
// Producers attached to a TCP or UDP client.
//
var Client = module.exports = function Client(options) {
EventEmitter.call(this);

if (!options || !options.type
|| !~['tcp', 'udp', 'unix'].indexOf(options.type)) {
throw new Error('Cannot create client without type: udp, tcp, unix');
Expand All @@ -30,6 +35,7 @@ var Client = module.exports = function Client(options) {
this.host = options.host;
this.port = options.port;
this.path = options.path;
this.reconnect = options.reconnect;
this.producers = {};
this.handlers = {
data: {},
Expand All @@ -42,6 +48,7 @@ var Client = module.exports = function Client(options) {
});
}
};
util.inherits(Client, EventEmitter);

//
// ### function add (producer)
Expand Down Expand Up @@ -117,7 +124,28 @@ Client.prototype.write = function (data) {
// Opens the underlying network connection for this client.
//
Client.prototype.connect = function (port, host, callback) {
var err;
var self = this,
connectBackoff, backoffType;

if (this.reconnect) {
if (typeof this.reconnect === 'object') {
backoffType = this.reconnect.type || 'exponential';
connectBackoff = backoff[backoffType](this.reconnect);
connectBackoff.failAfter(this.reconnect.maxTries || 10);
}
else {
connectBackoff = backoff.exponential();
connectBackoff.failAfter(10);
}

connectBackoff.on('fail', function (err) {
self.emit('error', err);
});

connectBackoff.on('ready', function () {
connect();
});
}

//
// Do some fancy arguments parsing to support everything
Expand All @@ -137,12 +165,36 @@ Client.prototype.connect = function (port, host, callback) {
});

function error (arg) {
err = new Error(arg + ' required to connect');
if (callback) {
return callback(err);
var err = new Error(arg + ' required to connect');
return callback
? callback(err)
: self.emit('error', err) ;
}

function onError(err) {
return connectBackoff ? connectBackoff.backoff(err) : self.emit('error', err);
}

function connect() {
if (self.type === 'tcp') {
self.socket = net.connect({ port: self.port, host: self.host }, callback);
}
else if (self.type === 'udp') {
self.socket = dgram.createSocket('udp4');
if (callback) {
process.nextTick(callback);
}
}
else if (self.type === 'unix') {
self.socket = net.connect({ path: self.path }, callback);
}

throw err;
self.socket.on('error', onError);
self.socket.on('connect', function () {
if (connectBackoff) {
connectBackoff.reset();
}
});
}

// Split cases due to unix using `this.path`
Expand All @@ -162,22 +214,7 @@ Client.prototype.connect = function (port, host, callback) {
}
}

if (this.type === 'tcp') {
this.socket = new net.Socket({ type: 'tcp4' });
this.socket.setEncoding('utf8');
this.socket.connect(this.port, this.host, callback);
}
else if (this.type === 'udp') {
this.socket = dgram.createSocket('udp4');
if (callback) {
callback();
}
}
else if (this.type === 'unix') {
this.socket = new net.Socket({type: 'unix' });
this.socket.setEncoding('utf8');
this.socket.connect(this.path, callback);
}
connect();
};

//
Expand Down
3 changes: 2 additions & 1 deletion package.json
Expand Up @@ -21,7 +21,8 @@
"sendgrid-web": "0.0.2",
"telenode": "0.0.3",
"utile": "0.1.7",
"window-stream": "~0.3.1"
"window-stream": "~0.3.1",
"backoff": "2.1.x"
},
"devDependencies": {
"optimist": "0.3.4",
Expand Down
69 changes: 49 additions & 20 deletions test/macros/net.js
Expand Up @@ -12,6 +12,46 @@ var assert = require('assert'),
fs = require('fs'),
godot = require('../../lib/godot');

//
// ### function shouldStartServer(options, nested)
// #### @options {Options} Options to setup communication
// #### @options.type {udp|tcp} Network protocol.
// #### @options.port {number} Port to communicate over.
// #### @nested {Object} Vows context to use once server is started.
// Starts the server with specified options.
//
exports.shouldStartServer = function (options, nested) {
var context = {
topic: function () {
var self = this;

function create() {
mocks.net.createServer(options, function (err, server) {
if (err) {
console.log('Error creating mock server');
console.dir(err);
process.exit(1);
}

self.server = server;

self.callback(null, server);
});
}

options.type === 'unix'
? fs.unlink('unix.sock', create)
: create();
}
};

if (nested) {
context['after the server is created'] = nested;
}

return context;
};

//
// ### function shouldSendData (options, nested)
// #### @options {Options} Options to setup communication
Expand All @@ -31,26 +71,14 @@ exports.shouldSendData = function (options, nested) {
}

var context = {
topic: function () {
var callback = this.callback;
var that = this;
// Clears the socket for the unix sockets case
fs.unlink('unix.sock', function () {
async.series({
server: async.apply(mocks.net.createServer, options),
client: async.apply(helpers.net.createClient, options)
}, function (err, results) {
if (err) {
console.log('Error creating mock server');
console.dir(err);
process.exit(1);
}
that.server = results.server;
that.client = results.client;
topic: function (server) {
var self = this;

helpers.net.createClient(options, function (err, client) {
self.client = client;

results.server.once('data', function (data) {
callback(null, data);
});
server.once('data', function (data) {
self.callback(null, data);
});
});
},
Expand All @@ -65,6 +93,7 @@ exports.shouldSendData = function (options, nested) {
}
};


if (nested) {
Object.keys(nested).forEach(function (vow) {
if (!context.hasOwnProperty('after data is sent')) {
Expand All @@ -74,7 +103,7 @@ exports.shouldSendData = function (options, nested) {
});
}

return context;
return exports.shouldStartServer(options, context);
};

//
Expand Down
117 changes: 117 additions & 0 deletions test/net/client-reconnect-test.js
@@ -0,0 +1,117 @@
/*
* client-reconnect-test.js: Basic tests for the reconnection of net client.
*
* (C) 2013, Nodejitsu Inc.
*
*/

var assert = require('assert'),
vows = require('vows'),
async = require('utile').async,
godot = require('../../lib/godot'),
helpers = require('../helpers'),
macros = require('../macros'),
mocks = require('../mocks');

vows.describe('godot/net/client').addBatch({
"Godot client": {
"with no backoff and no server": {
topic: function () {
var callback = this.callback,
port = helpers.nextPort;

var client = godot.createClient({
type: 'tcp',
producers: [
godot.producer(helpers.fixtures['producer-test'])
]
});

client.connect(port);
client.on('error', function (err) {
callback(null, err);
});
},
"should emit an error": function (_, err) {
assert(err);
assert.instanceOf(err, Error);
}
},
"with backoff and no server": {
topic: function () {
var callback = this.callback,
port = helpers.nextPort,
d = new Date();

var client = godot.createClient({
type: 'tcp',
producers: [
godot.producer(helpers.fixtures['producer-test'])
],
reconnect: {
type: 'exponential',
maxTries: 2,
initialDelay: 100,
maxDelay: 300
}
});

client.connect(port);
client.on('error', function (err) {
callback(null, err, (new Date() - d));
});
},
"should emit an error": function (_, err) {
assert(err);
assert.instanceOf(err, Error);
},
"should take appropiate amount of time": function (_, err, t) {
assert(t >= 300);
}
},
"with backoff and server eventually coming up": {
topic: function () {
var callback = this.callback,
port = helpers.nextPort,
d = new Date();

var client = godot.createClient({
type: 'tcp',
producers: [
godot.producer(helpers.fixtures['producer-test'])
],
reconnect: {
type: 'exponential',
maxTries: 2,
initialDelay: 100,
maxDelay: 300
}
});

client.connect(port);
client.on('error', function (err) {
throw err;
});

setTimeout(function () {
mocks.net.createServer({ type: 'tcp', port: port }, function (err, server) {
if (err) {
throw err;
}

server.once('data', function (data) {
callback(null, data, (new Date()) - d);
});
});
}, 300);
},
"should send data": function (err, data) {
assert(!err);
assert(data);
},
"should take appropiate amount of time": function (_, err, t) {
assert(t >= 200);
}
}
}
}).export(module);

0 comments on commit dc34297

Please sign in to comment.