Skip to content

Commit

Permalink
[api] Allow for .map() to take an async map function.
Browse files Browse the repository at this point in the history
  • Loading branch information
indexzero committed Mar 20, 2013
1 parent 87db80f commit dfebe96
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 4 deletions.
13 changes: 12 additions & 1 deletion lib/godot/reactor/map.js
Expand Up @@ -21,6 +21,7 @@ var Map = module.exports = function (mapFn) {

ReadWriteStream.call(this);
this.mapFn = mapFn;
this.async = mapFn.length === 2;
};

//
Expand All @@ -34,5 +35,15 @@ util.inherits(Map, ReadWriteStream);
// Emits data after it is mutating with `this.mapFn`.
//
Map.prototype.write = function (data) {
this.emit('data', this.mapFn(data));
var self = this;
if (!this.async) {
this.emit('data', this.mapFn(data));
return;
}

this.mapFn(data, function (err, data) {
if (!err) {
self.emit('data', data);
}
});
};
3 changes: 3 additions & 0 deletions test/fixtures/health.json
@@ -1,14 +1,17 @@
[
{
"service": "charlie/app/health/heartbeat",
"metric": 1,
"ttl": 50
},
{
"service": "charlie/app/health/memory",
"metric": 1,
"ttl": 50
},
{
"service": "charlie/app/health/heartbeat",
"metric": 1,
"ttl": 50
}
]
42 changes: 39 additions & 3 deletions test/macros/reactor.js
Expand Up @@ -28,9 +28,11 @@ exports.shouldEmitData = function (reactor, fixture, length, timeout, assertFn)
stream.on('data', function (data) { all.push(data) });
helpers.writeFixture(source, fixture);

setTimeout(function () {
that.callback(null, all)
}, timeout);
stream.on('end', function () {
setTimeout(function () {
that.callback(null, all)
}, timeout);
});
},
"should emit the appropriate events": function (err, all) {
assert.isNull(err);
Expand Down Expand Up @@ -72,6 +74,40 @@ exports.shouldEmitDataSync = function (reactor, fixture, length, assertFn) {
};
};

//
// ### function shouldHaveMetric (reactor, fixture, value)
// #### @reactor {Reactor} Reactor to assert against
// #### @fixture {string} Test fixture to write data to
// #### @value {number} Expected value of the metric on the last event.
// Test macro for asserting that the value of the `metric` property of
// the last event emitted from the `reactor` is `value` using
// the test `fixture`.
//
exports.shouldHaveMetric = function (reactor, fixture, value, timeout) {
return {
topic: function () {
var source = new ReadWriteStream(),
stream = reactor.createStream(source),
that = this,
last;

stream.on('data', function (data) { last = data });
stream.on('end', function () {
setTimeout(function () {
that.callback(null, last);
}, timeout || 100);
});

helpers.writeFixture(source, fixture);
},
"should have the appropriate `metric`": function (err, last) {
assert.isNull(err);
assert.isObject(last)
assert.equal(last.metric, value);
}
};
};

//
// ### function shouldHaveMetricSync (reactor, fixture, value)
// #### @reactor {Reactor} Reactor to assert against
Expand Down
12 changes: 12 additions & 0 deletions test/reactor/map-test.js
Expand Up @@ -20,5 +20,17 @@ vows.describe('godot/reactor/map').addBatch({
}),
'pings',
3
),
"Godot map async": macros.shouldHaveMetric(
godot
.reactor()
.map(function (data, callback) {
data.metric = data.metric * 3;
process.nextTick(function () {
callback(null, data);
});
}),
'health',
3
)
}).export(module);

0 comments on commit dfebe96

Please sign in to comment.