Skip to content

Commit

Permalink
Merge pull request #54 from nodejitsu/by-recombine
Browse files Browse the repository at this point in the history
[api] implement recombine option
  • Loading branch information
mmalecki committed Jun 14, 2013
2 parents 20bd2c9 + 04ec58f commit 399f826
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 11 deletions.
29 changes: 21 additions & 8 deletions lib/godot/reactor/by.js
Expand Up @@ -14,20 +14,28 @@ var util = require('util'),
// ### function By (keys, reactor)
// #### @key {string|Array} Particular key to listen for a change
// #### @reactor {godot.reactor().type()} Reactor or reactor chain to be created
// #### @options {Object} options object
// #### @recombine {Boolean} Recombines the data from the split streams
// Constructor function for the by stream to trigger the creation of a new set
// of streams based on a key change.
//
var By = module.exports = function (keys, reactor) {
var By = module.exports = function (keys, reactor, options) {
ReadWriteStream.call(this);

if ((typeof keys !== 'string' && !Array.isArray(keys))
|| !(reactor instanceof Reactor)) {
throw new Error('This reactor takes key(s) and godot.reactor() as an argument');
}

this.keys = !Array.isArray(keys) ? [keys] : keys;
this.reactor = reactor;
this.streams = this.keys.reduce(function (all, key) {
this.keys = !Array.isArray(keys) ? [keys] : keys;
this.options = options || {};
this.reactor = reactor;
this.recombine = this.options.recombine || false;
this.sources = this.keys.reduce(function (all, key) {
all[key] = {};
return all;
}, {});
this.streams = this.keys.reduce(function (all, key) {
all[key] = {};
return all;
}, {});
Expand All @@ -52,11 +60,16 @@ By.prototype.write = function (data) {
var value = data[key];

if (!self.streams[key][value]) {
var source = new FilterStream(key, value);
self.streams[key][value] = self.reactor.createStream(source);
self.pipe(source);
self.sources[key][value] = new FilterStream(key, value);
self.streams[key][value] = self.reactor.createStream(self.sources[key][value]);
if (self.recombine) {
self.streams[key][value].on('data', self.emit.bind(self, 'data'));
}
}
self.sources[key][value].write(data);
});

this.emit('data', data);
if (!this.recombine) {
this.emit('data', data);
}
};
28 changes: 25 additions & 3 deletions test/reactor/by-test.js
Expand Up @@ -12,7 +12,8 @@ var assert = require('assert'),

var counts = {
service: 0,
'service+ttl': 0
'service+ttl': 0,
'service+recombine': 0
};

vows.describe('godot/reactor/by').addBatch({
Expand Down Expand Up @@ -40,11 +41,32 @@ vows.describe('godot/reactor/by').addBatch({
),
'by',
6
),
"service, recombine": macros.shouldEmitDataSync(
godot.reactor()
.by(
'service',
godot.reactor().map(function (data, callback) {
counts['service+recombine']++;

//
// Calling `callback` twice will cause `data` event to be emitted
// twice on the `Map` stream, thus resulting in doubling each
// message.
//
callback(null, data);
callback(null, data);
}),
{ recombine: true }
),
'by',
12
)
}
}).addBatch({
"Should emit pipe the events to the correct pipe-chains": function () {
"Should emit and pipe the events to the correct pipe-chains": function () {
assert.equal(counts.service, 6);
assert.equal(counts['service+ttl'], 12);
assert.equal(counts['service+recombine'], 6);
}
}).export(module);
}).export(module);

0 comments on commit 399f826

Please sign in to comment.