Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
isolates: drain message queue completely
Browse files Browse the repository at this point in the history
  • Loading branch information
bnoordhuis committed Jan 10, 2012
1 parent 787f62d commit 97e4b3a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 18 deletions.
20 changes: 13 additions & 7 deletions src/node_isolate.cc
Expand Up @@ -87,17 +87,23 @@ class Queue {
uv_mutex_unlock(&mutex_);
}

T Consume() {
bool Consume(T& item) {
ngx_queue_t* q = NULL;

uv_mutex_lock(&mutex_);
ngx_queue_t* q = ngx_queue_head(&queue_);
ngx_queue_remove(q);
if (!ngx_queue_empty(&queue_)) {
q = ngx_queue_head(&queue_);
ngx_queue_remove(q);
}
uv_mutex_unlock(&mutex_);

if (q == NULL) return false;

Message* m = ngx_queue_data(q, Message, queue_);
T item = m->item_;
item = m->item_;
delete m;

return item;
return true;
}

private:
Expand Down Expand Up @@ -140,8 +146,8 @@ class Channel {
}

void OnMessage() {
T item = queue_.Consume();
callback_(item, arg_);
T item;
while (queue_.Consume(item)) callback_(item, arg_);
}

void* arg_;
Expand Down
51 changes: 40 additions & 11 deletions test/simple/test-isolates-ping-pong.js
@@ -1,39 +1,68 @@
var isolates = process.binding('isolates');
var assert = require('assert');

var N = 4; // # of child isolates
var N_ISOLATES = 4;
var N_MESSAGES = 20;
var N_MESSAGES_PER_TICK = 4;

assert(N_MESSAGES % N_MESSAGES_PER_TICK == 0);

if (process.tid === 1)
master();
else
child();

function master() {
for (var i = 0; i < N; ++i) spawn();
for (var i = 0; i < N_ISOLATES; ++i) spawn();

function spawn() {
var isolate = isolates.create(process.argv);

var gotExit = false; // exit event emitted?
var msgId = 0; // message IDs seen so far
var tick = 0;

isolate.onexit = function() {
console.error("onexit isolate #%d", isolate.tid);
gotExit = true;
};
isolate.onmessage = function(m) {
console.error("parent received message '%s'", m);
isolate.send(Buffer('ACK ' + m));

isolate.onmessage = function(buf) {
var msg = JSON.parse(buf);
assert.equal(msg.id, msgId + 1); // verify that messages arrive in order
assert.equal(msg.tick, tick); // and on the proper tick (=full mq drain)
msgId = msg.id;
if (msgId % N_MESSAGES_PER_TICK == 0) tick++;
isolate.send(buf);
};

process.on('exit', function() {
assert.equal(gotExit, true);
assert.equal(msgId, N_MESSAGES);
assert.equal(tick, N_MESSAGES / N_MESSAGES_PER_TICK);
});
}
}

function child() {
var n = 0;
var msgId = 0;
var tick = 0;

function send() {
if (++n > 10) return;
process._send(Buffer('SYN' + n));
setTimeout(send, 10);
// Send multiple messages, verify that the message queue
// is completely drained on each tick of the event loop.
for (var i = 0; i < N_MESSAGES_PER_TICK; ++i) {
process.send({tick:tick, id:++msgId});
}

if (msgId < N_MESSAGES) {
setTimeout(send, 10);
}

tick++;
}

send();

process._onmessage = function(m) {
console.error("child %d received message '%s'", process.tid, m);
};
}

0 comments on commit 97e4b3a

Please sign in to comment.