Skip to content

Commit

Permalink
Merge pull request #60 from nodejitsu/v0.7.x
Browse files Browse the repository at this point in the history
v0.7.x
  • Loading branch information
jcrugzz committed Sep 16, 2013
2 parents 8bc5ffb + b133b74 commit 8a99c94
Show file tree
Hide file tree
Showing 14 changed files with 95 additions and 70 deletions.
7 changes: 3 additions & 4 deletions README.md
Expand Up @@ -51,12 +51,11 @@ Here is a simple example of a [Reactor](#reactors) server that will send an emai
})
],
//
// Add Reconnect logic that uses node-backoff
// Add Reconnect logic that uses `back`
//
reconnect: {
type: 'exponential',
maxTries: 2,
initialDelay: 100,
retries: 2,
minDelay: 100,
maxDelay: 300
}
}).connect(1337);
Expand Down
81 changes: 51 additions & 30 deletions lib/godot/net/client.js
Expand Up @@ -7,9 +7,11 @@

var dgram = require('dgram'),
net = require('net'),
util = require('util'),
backoff = require('backoff'),
EventEmitter = require('events').EventEmitter;
utile = require('utile'),
clone = utile.clone,
back = require('back'),
EventEmitter = require('events').EventEmitter,
noop = function () {};

//
// ### function Server (options)
Expand All @@ -29,13 +31,19 @@ var Client = module.exports = function Client(options) {
throw new Error('Cannot create client without type: udp, tcp, unix');
}

if(typeof options.reconnect !== 'undefined'
&& typeof options.reconnect!== 'object') {
throw new Error('Reconnect must be a defined object if used')
}

var self = this;

this.type = options.type;
this.host = options.host;
this.port = options.port;
this.path = options.path;
this.reconnect = options.reconnect;
this.attempt = null;
this.producers = {};
this.handlers = {
data: {},
Expand All @@ -52,7 +60,7 @@ var Client = module.exports = function Client(options) {
//
// Inherit from EventEmitter
//
util.inherits(Client, EventEmitter);
utile.inherits(Client, EventEmitter);

//
// ### function add (producer)
Expand Down Expand Up @@ -126,28 +134,7 @@ Client.prototype.write = function (data) {
// Opens the underlying network connection for this client.
//
Client.prototype.connect = function (port, host, callback) {
var self = this,
connectBackoff, backoffType;

if (this.reconnect) {
if (typeof this.reconnect === 'object') {
backoffType = this.reconnect.type || 'exponential';
connectBackoff = backoff[backoffType](this.reconnect);
connectBackoff.failAfter(this.reconnect.maxTries || 10);
}
else {
connectBackoff = backoff.exponential();
connectBackoff.failAfter(10);
}

connectBackoff.on('fail', function (err) {
self.emit('error', err);
});

connectBackoff.on('ready', function () {
connect();
});
}
var self = this;

//
// Do some fancy arguments parsing to support everything
Expand All @@ -173,8 +160,41 @@ Client.prototype.connect = function (port, host, callback) {
: self.emit('error', err) ;
}

function reconnect(err) {
self.attempt = self.attempt || clone(self.reconnect);
//
// Remark: Terminate the backoff when we have hit our fail condition with
// a noop to avoid an if statement
//
// TODO: Make this less coupled (I feel like i want this contained in
// `back` but eh)
//
return self.terminate
? noop()
: back(function (fail, backoff) {
//
// Remark: We are done here, emit error and set termination
//
if (fail) {
self.terminate = true;
self.attempt = null;
return self.emit('error', err);
}
//
// So we can listen on when reconnect events are about to fire
//
self.emit('reconnect');
//
// Attempt a CONNECT!
//
return connect();
}, self.attempt);
}

function onError(err) {
return connectBackoff ? connectBackoff.backoff(err) : self.emit('error', err);
return self.reconnect
? reconnect(err)
: self.emit('error', err);
}

function connect() {
Expand All @@ -193,9 +213,10 @@ Client.prototype.connect = function (port, host, callback) {

self.socket.on('error', onError);
self.socket.on('connect', function () {
if (connectBackoff) {
connectBackoff.reset();
}
//
// Remark: We have successfully connected so reset the terminate variable
//
self.terminate = false;
self.emit('connect');
});
}
Expand Down
1 change: 1 addition & 0 deletions lib/godot/net/server.js
Expand Up @@ -71,6 +71,7 @@ utile.inherits(Server, events.EventEmitter);
Server.prototype.add = function (reactor) {
this.emit('add', reactor);
reactor.on('error', this.emit.bind(this, 'error'));
reactor.on('reactor:error', this.emit.bind(this, 'reactor:error'));
this.reactors[reactor.id] = reactor;
};

Expand Down
9 changes: 5 additions & 4 deletions lib/godot/producer/producer.js
Expand Up @@ -8,10 +8,11 @@
var stream = require('stream'),
ip = require('ip'),
utile = require('utile'),
uuid = require('node-uuid');
uuid = require('node-uuid'),
tick = typeof setImmediate == 'undefined'
? process.nextTick
: setImmediate;

//
// ### function Producer (options)
// #### @options {Object} Options for this producer.
// Constructor function for the Producer object responsible
// for creating events to process.
Expand Down Expand Up @@ -107,7 +108,7 @@ Object.keys(Producer.prototype.types).forEach(function (key) {
//
if (value === 0) {
return (function tickProduce() {
process.nextTick(function () {
tick(function () {
self.produce();
tickProduce();
});
Expand Down
2 changes: 1 addition & 1 deletion lib/godot/reactor/email.js
Expand Up @@ -76,7 +76,7 @@ Email.prototype.write = function (data) {
self._last = new Date();

return err
? self.emit('error', err)
? self.emit('reactor:error', err)
: self.emit('data', data);
});
};
2 changes: 1 addition & 1 deletion lib/godot/reactor/graphite.js
Expand Up @@ -82,7 +82,7 @@ Graphite.prototype.write = function (data) {
metrics[metricName] = this.meta ? data.meta[this.meta] : data.metric;
this.client.write(metrics, data.time, function (err) {
self._last = now;
if (err) { return self.emit('error', err) }
if (err) { return self.emit('reactor:error', err) }
});

self.emit('data', data);
Expand Down
4 changes: 2 additions & 2 deletions lib/godot/reactor/map.js
Expand Up @@ -55,13 +55,13 @@ Map.prototype.write = function (data) {
if (!this.passThrough) {
return this.mapFn(data, function (err, data) {
return err
? self.emit('error', err)
? self.emit('reactor:error', err)
: self.emit('data', data);
});
}

this.mapFn(data, function (err) {
if (err) { self.emit('error', err) }
if (err) { self.emit('reactor:error', err) }
});

this.emit('data', data);
Expand Down
1 change: 1 addition & 0 deletions lib/godot/reactor/reactor.js
Expand Up @@ -75,6 +75,7 @@ Reactor.prototype.createStream = function (source) {
return this.reactors.reduce(function (last, nextOptions) {
var stream = wrapStream(nextOptions.Factory, nextOptions.args || []);
stream.on('error', self.emit.bind(self, 'error'));
stream.on('reactor:error', self.emit.bind(self, 'reactor:error'));
return last.pipe(stream);
}, source);
};
4 changes: 2 additions & 2 deletions lib/godot/reactor/redis.js
Expand Up @@ -34,7 +34,7 @@ var Redis = module.exports = function Redis(options, redisFn) {
if (!this.client) {
this.client = redis.createClient(this.port, this.host, this.redisOptions);

this.client.on('error', this.emit.bind(this, 'error'));
this.client.on('error', this.emit.bind(this, 'reactor:error'));
if (this.password) {
this.client.auth(this.password, function () {
// Remark: What if data is sent before we are authenticated?
Expand All @@ -56,7 +56,7 @@ utile.inherits(Redis, ReadWriteStream);
Redis.prototype.write = function (data) {
var self = this;
this.redisFn(this.client, data, function (err, data) {
if (err) { return self.emit('error', err) }
if (err) { return self.emit('reactor:error', err) }
});

this.emit('data', data);
Expand Down
4 changes: 2 additions & 2 deletions lib/godot/reactor/sms.js
Expand Up @@ -80,7 +80,7 @@ Sms.prototype.write = function (data) {
self._last = new Date();

return err
? self.emit('error', err)
? self.emit('reactor:error', err)
: self.emit('data', data);
});
};
};
8 changes: 4 additions & 4 deletions package.json
@@ -1,7 +1,7 @@
{
"name": "godot",
"description": "Godot is a streaming real-time event processor written in node.js",
"version": "0.6.1",
"version": "0.7.0-dev",
"author": "Nodejitsu Inc <info@nodejitsu.com>",
"contributors": [
{
Expand All @@ -26,8 +26,8 @@
"telenode": "0.0.3",
"utile": "0.2.x",
"window-stream": "~0.4.0",
"backoff": "2.1.x",
"json-stream": "0.1.x"
"json-stream": "0.2.x",
"back": "0.1.x"
},
"devDependencies": {
"optimist": "0.3.4",
Expand All @@ -36,7 +36,7 @@
},
"main": "./lib/godot",
"engines": {
"node": "0.8.x"
"node": "0.10.x"
},
"scripts": {
"test": "vows --spec -i"
Expand Down
16 changes: 10 additions & 6 deletions test/macros/reactor.js
Expand Up @@ -209,7 +209,10 @@ exports.shouldError = function (reactor, fixture, timeout) {
stream = reactor.createStream(source),
error;

reactor.on('error', function (err) { error = err });
reactor.once('reactor:error', function (err) {
error = err;
stream.end();
});

stream.on('data', function (data) { /*Data is emit through but we dont care*/ });
stream.on('end', function () {
Expand Down Expand Up @@ -237,18 +240,19 @@ exports.shouldErrorSync = function (reactor, fixture) {
return {
topic: function () {
var that = this,
source = new ReadWriteStream(),
stream = reactor.createStream(source);
source = new ReadWriteStream();
stream = this.stream = reactor.createStream(source);

reactor.on('error', function (err) {that.callback(err, null)});
reactor.once('reactor:error', function (err) {that.callback(err, null)});

stream.on('data', function (data) { that.callback(null, stream) });
stream.on('end', function () { that.callback(null, stream) });
stream.on('data', function (data) { });
stream.on('end', function () { });
helpers.writeFixture(source, fixture);
},
"should error": function (err, _) {
assert.instanceOf(err, Error);
assert.isNull(_);
this.stream.end();
}
};
};
16 changes: 7 additions & 9 deletions test/net/client-reconnect-test.js
Expand Up @@ -28,7 +28,7 @@ vows.describe('godot/net/client-reconnect').addBatch({
});

client.connect(port);
client.on('error', function (err) {
client.once('error', function (err) {
callback(null, err);
});
},
Expand All @@ -49,15 +49,14 @@ vows.describe('godot/net/client-reconnect').addBatch({
godot.producer(helpers.fixtures['producer-test'])
],
reconnect: {
type: 'exponential',
maxTries: 2,
initialDelay: 100,
retries: 2,
minDelay: 100,
maxDelay: 300
}
});

client.connect(port);
client.on('error', function (err) {
client.once('error', function (err) {
callback(null, err, (new Date() - d));
});
},
Expand All @@ -66,7 +65,7 @@ vows.describe('godot/net/client-reconnect').addBatch({
assert.instanceOf(err, Error);
},
"should take appropiate amount of time": function (_, err, t) {
assert(t >= 300);
assert(t >= 200);
}
},
"with backoff and server eventually coming up": {
Expand All @@ -81,9 +80,8 @@ vows.describe('godot/net/client-reconnect').addBatch({
godot.producer(helpers.fixtures['producer-test'])
],
reconnect: {
type: 'exponential',
maxTries: 2,
initialDelay: 100,
retries: 2,
minDelay: 100,
maxDelay: 300
}
});
Expand Down

0 comments on commit 8a99c94

Please sign in to comment.