Skip to content

Commit

Permalink
[rollup reactor] allow for scaling rollups by passing in a function t…
Browse files Browse the repository at this point in the history
…hat accepts the period as an argument
  • Loading branch information
jcrugzz committed Jun 4, 2013
1 parent 273ce42 commit 6073ce8
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 7 deletions.
27 changes: 27 additions & 0 deletions README.md
Expand Up @@ -93,6 +93,33 @@ There are several core Reactor primitives available in `godot` which can be comp
* `.forward(options)`: Forwards all events to a remote server located at `options.host` and `options.port`.
* `.sms(options)`: Sends an sms to the specified [options][sms-options].
* `.where(key, value)|.where(filters)`: Filters events based on a single `key:value` pair or a set of `key:value` filters.
* `.rollup(interval, limit)|.rollup(options)`: Rollup a `limit` amount of events to emit every `interval`. `interval` can also be a function to allow you to create varying intervals (see below).

#### Rollup
Here are two possible rollup examples:

```js
//
// Rolls up 10,0000 events every 5 minute interval
//
var rollup =
reactor()
.rollup(1000 * 60 * 5, 10000)

//
// Scaling Rollup, rolls up 10,000 events every 5min interval for 1 hour,
// then rolls up 10,000 events every 30mins
//

var scalingRollup =
reactor()
.rollup(function (period) {
if(period < 12) {
return 1000 * 60 * 5;
}
return 1000 * 60 * 30;
}, 10000)
```

## Producers
Producers in Godot are **readable** [Stream][stream] instances which produce [Events](#events). Events will be emitted by a given Producer every `ttl` milliseconds.
Expand Down
23 changes: 16 additions & 7 deletions lib/godot/reactor/rollup.js
Expand Up @@ -31,10 +31,11 @@ var Rollup = module.exports = function (interval, limit) {
};
}

this.interval = options.interval || 1000 * 60 * 60;
this.limit = options.limit || 100;
this.events = [];
this.next = [];
this.interval = options.interval || 1000 * 60 * 60;
this.limit = options.limit || 100;
this.period = 0;
this.events = [];
this.next = [];
};

//
Expand All @@ -55,7 +56,7 @@ Rollup.prototype.write = function (data) {
else {
this.events.push(data);
}

if (!this.intervalId) {
this.resetInterval();
}
Expand All @@ -68,6 +69,13 @@ Rollup.prototype.write = function (data) {
Rollup.prototype.resetInterval = function () {
var self = this;

function interval () {
if (typeof self.interval === 'function') {
return self.interval(self.period);
}
return self.interval;
}

if (this.intervalId) {
clearInterval(this.intervalId);
}
Expand All @@ -82,6 +90,7 @@ Rollup.prototype.resetInterval = function () {
}

self.next.length = 0;
self.period += 1;
}
}, this.interval);
};
}, interval());
};

0 comments on commit 6073ce8

Please sign in to comment.