Skip to content

Commit

Permalink
Merge pull request #40 from nodejitsu/feature-meta
Browse files Browse the repository at this point in the history
Add "meta" property to event data and associated Reactors
  • Loading branch information
indexzero committed Mar 14, 2013
2 parents 1db34f5 + bad34ce commit 8c92a04
Show file tree
Hide file tree
Showing 10 changed files with 490 additions and 7 deletions.
5 changes: 3 additions & 2 deletions README.md
Expand Up @@ -63,8 +63,9 @@ Similar to [Riemann][riemann], events in `godot` are simply JSON sent over UDP o
state: "Any string less than 255 bytes, e.g. 'ok', 'warning', 'critical'",
time: "The time of the event, in unix epoch seconds",
description: "Freeform text",
tags: "Freeform list of strings, e.g. ['rate', 'fooproduct', 'transient']"
metric: "A number associated with this event, e.g. the number of reqs/sec."
tags: "Freeform list of strings, e.g. ['rate', 'fooproduct', 'transient']",
meta: "Freeform set of key:value pairs e.g. { 'ewma': 12345 }",
metric: "A number associated with this event, e.g. the number of reqs/sec.",
ttl: "A floating-point time, in seconds, that this event is considered valid for."
}
```
Expand Down
7 changes: 7 additions & 0 deletions lib/godot.js
Expand Up @@ -26,6 +26,13 @@ exports.reactor = require('./godot/reactor');
//
exports.producer = require('./godot/producer');

//
// ### @common {Object}
// Expose `common` module for performing basic
// streaming.
//
exports.common = require('./godot/common');

//
// ### @math {Object}
// Expose `math` module for performing basic
Expand Down
8 changes: 7 additions & 1 deletion lib/godot/common/index.js
Expand Up @@ -28,4 +28,10 @@ exports.ReadWriteStream = require('./read-write-stream');
// ### @JsonParser {function}
// Constructor function for the godot streaming JsonParser
//
exports.JsonParser = require('./json-parser');
exports.JsonParser = require('./json-parser');

//
// ### @FilterStream {function}
// Constructor function for the base FilterStream
//
exports.FilterStream = require('./filter-stream');
25 changes: 25 additions & 0 deletions lib/godot/reactor/all-meta.js
@@ -0,0 +1,25 @@
/*
* all-meta.js: Stream for filtering events with a given meta key (or set of keys).
*
* (C) 2012, Nodejitsu Inc.
*
*/

var util = require('util'),
HasMeta = require('./has-meta');

//
// ### function AllMeta (keys|key0, key1, ..., keyN)
// #### @keys|key0..keyN {Array|arguments} Full set of keys to filter over.
// Constructor function of the AllMeta stream responsible for filtering
// events with any of a given key (or set of keys).
//
var AllMeta = module.exports = function () {
HasMeta.apply(this, arguments);
this.type = 'all';
};

//
// Inherit from HasMeta reactor.
//
util.inherits(AllMeta, HasMeta);
25 changes: 25 additions & 0 deletions lib/godot/reactor/any-meta.js
@@ -0,0 +1,25 @@
/*
* any-meta.js: Stream for filtering events with any of a given meta key (or set of keys).
*
* (C) 2012, Nodejitsu Inc.
*
*/

var util = require('util'),
HasMeta = require('./has-meta');

//
// ### function AnyMeta (keys|key0, key1, ..., keyN)
// #### @keys|key0..keyN {Array|arguments} Full set of keys to filter over.
// Constructor function of the AnyMeta stream responsible for filtering
// events with any of a given key (or set of keys).
//
var AnyMeta = module.exports = function () {
HasMeta.apply(this, arguments);
this.type = 'any';
};

//
// Inherit from HasMeta reactor.
//
util.inherits(AnyMeta, HasMeta);
98 changes: 98 additions & 0 deletions lib/godot/reactor/has-meta.js
@@ -0,0 +1,98 @@
/*
* has-meta.js: Stream for filtering events with a given meta key (or set of keys).
*
* (C) 2012, Nodejitsu Inc.
*
*/

var util = require('util'),
ReadWriteStream = require('../common/read-write-stream');

//
// ### function HasMeta ([type], keys|key0, key, ..., keyN)
// #### @type {any|all} Type of tag filtering to perform: any or all.
// #### @keys|key0..keyN {Array|arguments} Full set of keys to filter over.
// Constructor function of the HasMeta stream responsible for filtering
// events with a given meta key (or set of keys).
//
var HasMeta = module.exports = function (type) {
ReadWriteStream.call(this);

var keys = Array.prototype.slice.call(arguments);

if (type === 'any' || type === 'all') {
this.type = type;
keys.splice(0, 1);
}
else {
this.type === 'any';
}

//
// Create a lookup table of any values provided.
// If no value is provided set the key to `null`
// since we will be checking for `undefined`.
//
this.lookup = keys.reduce(function (all, key) {
if (Array.isArray(key)) {
key.forEach(function (kkey) {
all[kkey] = null;
});
}
else if (typeof key === 'object') {
Object.keys(key).forEach(function (kkey) {
all[kkey] = key[kkey];
});
}
else {
all[key] = null;
}

return all;
}, {});

//
// Store the list of keys for iterating over later.
//
this.keys = Object.keys(this.lookup);
};

//
// Inherit from ReadWriteStream
//
util.inherits(HasMeta, ReadWriteStream);

//
// ### function write (data)
// #### @data {Object} JSON to rollup.
// Only filters `data` according to `this.keys`.
//
HasMeta.prototype.write = function (data) {
var self = this;

//
// If there are no tags on the data return
//
if (!data.meta) {
return;
}

//
// Helper function for checking a given `key`.
//
function hasKey(key) {
return data.meta[key] !== undefined
&& (self.lookup[key] === null ||
self.lookup[key] === data.meta[key]);
}

var valid = this.type === 'all'
? this.keys.every(hasKey)
: this.keys.some(hasKey);

if (!valid) {
return;
}

this.emit('data', data);
};
50 changes: 50 additions & 0 deletions lib/godot/reactor/meta.js
@@ -0,0 +1,50 @@
/*
* meta.js: Stream for setting the value (and any meta) from a second reactor on any data received.
*
* (C) 2013, Nodejitsu Inc.
*
*/

var util = require('util'),
ReadWriteStream = require('../common/read-write-stream'),
Reactor = require('./reactor');

//
// ### function Meta (tag, reactor)
// #### @key {string} Meta key to use for the value of `reactor`.
// #### @reactor {godot.reactor().type()} Reactor to be created
// Constructor function for the Meta stream responsible for setting
// the value (and any meta) from a second reactor on any data received.
//
var Meta = module.exports = function (key, reactor) {
ReadWriteStream.call(this);

var self = this;

this.key = key;
this.stream = new ReadWriteStream();
this.reactor = reactor.createStream(this.stream);
this.reactor.on('data', function (data) {
data.meta = data.meta || {};
data.meta[key] = data.metric;
data.metric = data._metric;
delete data._metric;
self.emit('data', data);
});
};

//
// Inherit from ReadWriteStream
//
util.inherits(Meta, ReadWriteStream);

//
// ### function write (data)
// Writes the `data` to the meta stream associated
// with this instance and sets `_metric` so it can
// be replaced later on.
//
Meta.prototype.write = function (data) {
data._metric = data.metric;
this.stream.write(data);
};
82 changes: 82 additions & 0 deletions test/fixtures/meta.json
@@ -0,0 +1,82 @@
[
{
"service": "charlie/app/health/ping",
"metric": 1,
"meta": {
"a": 1,
"b": 1,
"c": 1
}
},
{
"service": "charlie/app/health/ping",
"metric": 1,
"meta": {
"a": 1
}
},
{
"service": "charlie/app/health/ping",
"metric": 1,
"meta": {
"a": 1,
"b": 1,
"c": 1
}
},
{
"service": "charlie/app/health/ping",
"metric": 1,
"meta": {
"a": 1
}
},
{
"service": "charlie/app/health/ping",
"metric": 1,
"meta": {
"a": 1,
"b": 1,
"c": 1
}
},
{
"service": "charlie/app/health/ping",
"metric": 1,
"meta": {
"a": 1
}
},
{
"service": "charlie/app/health/ping",
"metric": 1,
"meta": {
"a": 1,
"b": 1,
"c": 1
}
},
{
"service": "charlie/app/health/ping",
"metric": 1,
"meta": {
"a": 1
}
},
{
"service": "charlie/app/health/ping",
"metric": 1,
"meta": {
"a": 1,
"b": 1,
"c": 1
}
},
{
"service": "charlie/app/health/ping",
"metric": 1,
"meta": {
"a": 1
}
}
]
20 changes: 16 additions & 4 deletions test/helpers/index.js
Expand Up @@ -58,10 +58,22 @@ exports.timeSeries = function (event, length, duration) {
now = +Date.now();

return range(1, intervals).map(function (interval) {
return Object.keys(event).reduce(function (obj, key) {
obj[key] = typeof event[key] === 'function'
? event[key](interval)
: event[key]
return Object.keys(event).reduce(function reduceKey(obj, key) {
if (typeof event[key] === 'function') {
obj[key] = event[key](interval)
}
else if (Array.isArray(event[key])) {
obj[key] = event[key].slice();
}
else if (typeof event[key] === 'object') {
obj[key] = Object.keys(event[key]).reduce(function (value, kkey) {
value[kkey] = event[key][kkey];
return value;
}, {});
}
else {
obj[key] = event[key];
}

return obj;
}, { time: now + (interval * duration) });
Expand Down

0 comments on commit 8c92a04

Please sign in to comment.