Navigation Menu

Skip to content

Commit

Permalink
[api test] Added .tag() reactor for setting tags based on values ca…
Browse files Browse the repository at this point in the history
…lculated from other reactors.
  • Loading branch information
indexzero committed Mar 11, 2013
1 parent 74555a8 commit b84a118
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
50 changes: 50 additions & 0 deletions lib/godot/reactor/tag.js
@@ -0,0 +1,50 @@
/*
* tag.js: Stream for setting the value (and any tags) from a second reactor on any data received.
*
* (C) 2013, Nodejitsu Inc.
*
*/

var util = require('util'),
ReadWriteStream = require('../common/read-write-stream'),
Reactor = require('./reactor');

//
// ### function Tag (tag, reactor)
// #### @tag {string} Tag to use for the value of `reactor`.
// #### @reactor {godot.reactor().type()} Reactor to be created
// Constructor function for the tag stream responsible for setting
// the value (and any tags) from a second reactor on any data received.
//
var Tag = module.exports = function (tag, reactor) {
ReadWriteStream.call(this);

var self = this;

this.tag = tag;
this.stream = new ReadWriteStream();
this.reactor = reactor.createStream(this.stream);
this.reactor.on('data', function (data) {
data.tags.push(self.tag + ':' + data.metric)
data.metric = data._metric;
data.tags = data.tags || [];
delete data._metric;
self.emit('data', data);
});
};

//
// Inherit from ReadWriteStream
//
util.inherits(Tag, ReadWriteStream);

//
// ### function write (data)
// Writes the `data` to the tag stream associated
// with this instance and sets `_metric` so it can
// be replaced later on.
//
Tag.prototype.write = function (data) {
data._metric = data.metric;
this.stream.write(data);
};
58 changes: 58 additions & 0 deletions test/reactor/tag-test.js
@@ -0,0 +1,58 @@
/*
* tag-test.js: Tests for the Tag reactor stream.
*
* (C) 2012, Nodejitsu Inc.
*
*/

var assert = require('assert'),
vows = require('vows'),
range = require('r...e'),
windowStream = require('window-stream'),
godot = require('../../lib/godot'),
helpers = require('../helpers'),
macros = require('../macros').reactor;

var M1_ALPHA = 1 - Math.exp(-5/60);

vows.describe('godot/reactor/tag').addBatch({
"Godot tag": {
"with a simple movingAverage": macros.shouldEmitDataSync(
godot
.reactor()
.tag('avg', godot.reactor().movingAverage({
average: 'simple',
window: new windowStream.EventWindow({ size: 10 })
})),
helpers.timeSeries({
metric: function (num) {
return num;
}
}, 1000, 10),
100,
function (all) {
all.forEach(function (data, i) {
var num = data.metric,
di = i + 1,
avg, set;

set = di < 10
? range(1, di)
: range(di - 9, di);

avg = data.tags.filter(function (tag) {
return tag.indexOf('avg') !== -1
})[0];

assert.equal(num, di);
assert.isString(avg);

avg = parseFloat(avg.split(':')[1], 10);
assert.equal(avg, godot.math.mean(set.toArray().map(function (n) {
return { metric: n };
})));
});
}
)
}
}).export(module);

0 comments on commit b84a118

Please sign in to comment.