Skip to content

Commit

Permalink
Merge pull request #67 from nodejitsu/new-api
Browse files Browse the repository at this point in the history
New api
  • Loading branch information
jcrugzz committed Apr 9, 2014
2 parents 6cd192f + 1baa7a5 commit eaa7646
Show file tree
Hide file tree
Showing 43 changed files with 228 additions and 160 deletions.
45 changes: 31 additions & 14 deletions README.md
@@ -1,5 +1,7 @@
# godot

![](https://i.cloudup.com/zCF6jLRpLf.png)

Godot is a streaming real-time event processor based on [Riemann][riemann] written in Node.js

* [Usage](#usage)
Expand All @@ -14,7 +16,7 @@ Godot is a streaming real-time event processor based on [Riemann][riemann] writt
Here is a simple example of a [Reactor](#reactors) server that will send an email to `user@host.com` if the [Producer](#producer) server for `app.server` fails to send a heartbeat after 60 seconds.

``` js
var godot = require('../lib/godot');
var godot = require('godot');

//
// Reactor server which will email `user@host.com`
Expand All @@ -27,10 +29,12 @@ Here is a simple example of a [Reactor](#reactors) server that will send an emai
//
type: 'udp',
reactors: [
godot.reactor()
.where('service', '*/health/heartbeat')
.expire(1000 * 60)
.email({ to: 'user@host.com' })
function (socket) {
socket
.pipe(godot.where('service', '*/health/heartbeat'))
.pipe(godot.expire(1000 * 60))
.pipe(godot.email({ to: 'user@host.com' }))
}
]
}).listen(1337);

Expand Down Expand Up @@ -79,7 +83,9 @@ Similar to [Riemann][riemann], events in `godot` are simply JSON sent over UDP o
```

## Reactors
Reactors in Godot are **readable and writable** [Stream][stream] instances which consume [Events](#events) and produce actions or aggregate data flow.
Reactors in Godot are **readable and writable** [Stream][stream] instances which consume [Events](#events) and produce actions or aggregate data flow. In the example above you may see that when we define the array of reactors by wrapping it with a simple function. This function has a single argument that represents the data coming over the wire. This data can be piped to any `godot` stream or any Transform stream you find on NPM!

*Note* Reactors are currently still streams1 streams (so they do not handle backpressure) but this will begin to change in the near future for node `0.12.x`. (Performance reasons)

### Primitives

Expand All @@ -98,26 +104,34 @@ There are several core Reactor primitives available in `godot` which can be comp
Here are two possible rollup examples:

```js

var godot = require('godot');

//
// Rolls up 10,0000 events every 5 minute interval
// then sends them in an email
//
var rollup =
reactor()
.rollup(1000 * 60 * 5, 10000)
var rollup = function (socket) {
socket
.pipe(godot.rollup(1000 * 60 * 5, 10000))
.pipe(godot.email({ to: 'me@nodejitsu.com' }))
}

//
// Scaling Rollup, rolls up 10,000 events every 5min interval for 1 hour,
// then rolls up 10,000 events every 30mins
// then rolls up 10,000 events every 30mins and emails them out
//

var scalingRollup =
reactor()
.rollup(function (period) {
var scalingRollup = function (socket) {
socket
.pipe(godot.rollup(function (period) {
if(period < 12) {
return 1000 * 60 * 5;
}
return 1000 * 60 * 30;
}, 10000)
}, 10000))
.pipe(godot.email({ to: 'me@nodejitsu.com' }))
}
```

## Producers
Expand All @@ -132,6 +146,9 @@ All tests are written in [vows][vows] and can be run with [npm][npm]:
```

#### Copyright (C) 2012. Nodejitsu Inc.
#### License: MIT

_Sound Wave designed by Alessandro Suraci from the thenounproject.com_

[riemann]: http://aphyr.github.com/riemann/
[stream]: http://nodejs.org/api/stream.html
Expand Down
12 changes: 12 additions & 0 deletions example/simple-server.js
@@ -0,0 +1,12 @@

var g = require('../');

g.createServer({
type: 'tcp',
reactors: [
function (socket) {
socket
.pipe(new g.console())
}
]
}).listen(1337);
4 changes: 2 additions & 2 deletions lib/godot.js
Expand Up @@ -17,7 +17,7 @@ exports.net = require('./godot/net');
// Expose `reactor` module for processing and
// reacting to events.
//
exports.reactor = require('./godot/reactor');
require('./godot/reactor')(exports);

//
// ### @producer {Object}
Expand Down Expand Up @@ -68,4 +68,4 @@ exports.createClient = function (options) {
options.type = options.type || 'udp';

return new exports.net.Client(options);
};
};
3 changes: 2 additions & 1 deletion lib/godot/common/filter-stream.js
Expand Up @@ -13,6 +13,7 @@ var utile = require('utile'),
// A aimple readable and writable stream that filters key by value.
//
var FilterStream = module.exports = function FilterStream(key, value) {
if (!(this instanceof FilterStream)) { return new FilterStream(key, value) }
this.key = key;
this.value = value

Expand All @@ -32,4 +33,4 @@ FilterStream.prototype.write = function (data) {
if (data[this.key] === this.value) {
this.emit('data', data);
}
};
};
2 changes: 2 additions & 0 deletions lib/godot/common/read-write-stream.js
Expand Up @@ -13,6 +13,8 @@ var stream = require('stream'),
// A simple Readble and Writable base stream.
//
var ReadWriteStream = module.exports = function ReadWriteStream() {
if (!(this instanceof ReadWriteStream)) { return new ReadWriteStream() }

this.readable = true;
this.writable = true;

Expand Down
44 changes: 24 additions & 20 deletions lib/godot/net/server.js
Expand Up @@ -9,8 +9,10 @@ var dgram = require('dgram'),
events = require('events'),
net = require('net'),
utile = require('utile'),
common = require('../common'),
jsonStream = require('json-stream'),
uuid = require('node-uuid'),
Socket = require('./socket'),
common = require('../common'),
ReadWriteStream = common.ReadWriteStream;

//
Expand Down Expand Up @@ -45,6 +47,8 @@ var Server = module.exports = function Server(options) {
? options.multiplex
: true;

this.rawReactors = options.reactors;

if (Array.isArray(options.reactors)) {
options.reactors.forEach(function (reactor) {
self.add(reactor);
Expand All @@ -70,8 +74,11 @@ utile.inherits(Server, events.EventEmitter);
//
Server.prototype.add = function (reactor) {
this.emit('add', reactor);
reactor.on('error', this.emit.bind(this, 'error'));
reactor.on('reactor:error', this.emit.bind(this, 'reactor:error'));
//
// Ok so we have this function that is a `factory` for a reactor
// or something like that
//
reactor.id = reactor.id || uuid.v4();
this.reactors[reactor.id] = reactor;
//
// Add reactor to the running set
Expand Down Expand Up @@ -100,7 +107,7 @@ Server.prototype.add = function (reactor) {
//
Server.prototype.remove = function (reactor) {
this.emit('remove', reactor);
this.reactors[reactor.id].removeAllListeners();
this.reactors[reactor.id].socket.removeAllListeners();
delete this.reactors[reactor.id];

//
Expand Down Expand Up @@ -221,11 +228,11 @@ Server.prototype.close = function (callback) {
// the socket ends up writing to
//
Server.prototype.createReactor = function (id) {
var source = new ReadWriteStream();
var socket = new Socket();
return {
dest: this.reactors[id].createStream(source),
source: source
};
socket: socket,
reactor: this.reactors[id](socket)
}
};

//
Expand Down Expand Up @@ -280,10 +287,9 @@ Server.prototype._onUdpMessage = function (msg, rinfo) {
// TODO: Streaming JSON parsing sounds like the right things to do here
//
msg = JSON.parse(('' + msg).replace('\n', ''));

reactors.forEach(function (reactor) {
reactor.source.write(msg);
});
for(var i = 0; i < reactors.length; i++) {
reactors[i].socket.write(msg);
}
};

//
Expand Down Expand Up @@ -313,10 +319,9 @@ Server.prototype._onTcpSocket = function (socket) {
socket.setEncoding('utf8');

parser.on('data', function (event) {
reactors.forEach(function (reactor) {
var copy = common.clone(event);
reactor.source.write(copy);
});
for(var i = 0; i < reactors.length; i++) {
reactors[i].socket.write(event);
}
});

socket.pipe(parser);
Expand All @@ -343,10 +348,9 @@ Server.prototype._onUnixSocket = function (socket) {
socket.setEncoding('utf8');

parser.on('data', function (event) {
reactors.forEach(function (reactor) {
var copy = common.clone(event);
reactor.source.write(copy);
});
for(var i = 0; i < reactors.length; i++) {
reactors[i].socket.write(event);
}
});

socket.pipe(parser);
Expand Down
24 changes: 24 additions & 0 deletions lib/godot/net/socket.js
@@ -0,0 +1,24 @@

var Transform = require('readable-stream').Transform,
util = require('util'),
clone = require('../common').clone;

var extend = util._extend;

//
// Pseudo "Socket" like thing to ensure we have a new copy of
// each data event that is written to this. This is passed into the constructor
// of the `reactor`
//
var Socket = module.exports = function (options) {
if (!(this instanceof Socket)) return new Socket(options);
Transform.call(this, extend({ objectMode: true }, options));
};

util.inherits(Socket, Transform);

Socket.prototype._transform = function (chunk, encoding, callback) {
this.push(clone(chunk));
if (callback) callback();
};

3 changes: 2 additions & 1 deletion lib/godot/reactor/all-meta.js
Expand Up @@ -15,11 +15,12 @@ var util = require('util'),
// events with any of a given key (or set of keys).
//
var AllMeta = module.exports = function () {
if (!(this instanceof AllMeta)) { return new AllMeta() }
HasMeta.apply(this, arguments);
this.type = 'all';
};

//
// Inherit from HasMeta reactor.
//
util.inherits(AllMeta, HasMeta);
util.inherits(AllMeta, HasMeta);
3 changes: 2 additions & 1 deletion lib/godot/reactor/any-meta.js
Expand Up @@ -15,11 +15,12 @@ var util = require('util'),
// events with any of a given key (or set of keys).
//
var AnyMeta = module.exports = function () {
if (!(this instanceof AnyMeta)) { return new AnyMeta() }
HasMeta.apply(this, arguments);
this.type = 'any';
};

//
// Inherit from HasMeta reactor.
//
util.inherits(AnyMeta, HasMeta);
util.inherits(AnyMeta, HasMeta);
13 changes: 6 additions & 7 deletions lib/godot/reactor/around.js
@@ -1,13 +1,12 @@
/*
* around.js: Stream for piping to multiple independent reactors and passing through values
* around.js: Stream for piping to multiple independent reactors and passing through values
*
* (C) 2013, Nodejitsu Inc.
*
*/

var util = require('util'),
ReadWriteStream = require('../common/read-write-stream'),
Reactor = require('./reactor');
ReadWriteStream = require('../common/read-write-stream');

//
// ### function Around (reactor0, reactor1, ...)
Expand All @@ -22,19 +21,19 @@ var Around = module.exports = function () {
self = this;

reactors.forEach(function (reactor) {
if (!(reactor instanceof Reactor)) {
throw new Error('This reactor a set of godot.reactor() arguments');
if (!(typeof reactor !== 'function' )) {
throw new Error('This reactor takes a set of reactor factories');
}
});

this.reactors = reactors;
this.streams = reactors.map(function (reactor) {
var source = self.pipe(new ReadWriteStream());
return reactor.createStream(source);
return reactor(source);
});
};

//
// Inherit from ReadWriteStream
//
util.inherits(Around, ReadWriteStream);
util.inherits(Around, ReadWriteStream);
13 changes: 8 additions & 5 deletions lib/godot/reactor/by.js
Expand Up @@ -7,8 +7,7 @@

var util = require('util'),
FilterStream = require('../common/filter-stream'),
ReadWriteStream = require('../common/read-write-stream'),
Reactor = require('./reactor');
ReadWriteStream = require('../common/read-write-stream');

//
// ### function By (keys, reactor)
Expand All @@ -20,11 +19,12 @@ var util = require('util'),
// of streams based on a key change.
//
var By = module.exports = function (keys, reactor, options) {
if (!(this instanceof By)) { return new By(keys, reactor, options )}
ReadWriteStream.call(this);

if ((typeof keys !== 'string' && !Array.isArray(keys))
|| !(reactor instanceof Reactor)) {
throw new Error('This reactor takes key(s) and godot.reactor() as an argument');
|| typeof reactor !== 'function') {
throw new Error('This reactor takes key(s) and a reactor factory function as an argument');
}

this.keys = !Array.isArray(keys) ? [keys] : keys;
Expand Down Expand Up @@ -56,12 +56,15 @@ util.inherits(By, ReadWriteStream);
By.prototype.write = function (data) {
var self = this;

//
// TODO: Internal error handling here with this internal pipe chain
//
this.keys.forEach(function (key) {
var value = data[key];

if (!self.streams[key][value]) {
self.sources[key][value] = new FilterStream(key, value);
self.streams[key][value] = self.reactor.createStream(self.sources[key][value]);
self.streams[key][value] = self.reactor(self.sources[key][value]);
if (self.recombine) {
self.streams[key][value].on('data', self.emit.bind(self, 'data'));
}
Expand Down
1 change: 1 addition & 0 deletions lib/godot/reactor/change.js
Expand Up @@ -17,6 +17,7 @@ var util = require('util'),
// Constructor function of the Change stream responsible for filtering events
//
var Change = module.exports = function (key, options) {
if (!(this instanceof Change)) { return new Change(key, options) }
if (!key) {
throw new Error('a key is required for this reactor');
}
Expand Down

0 comments on commit eaa7646

Please sign in to comment.