Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
support for sharing streams accross isolates
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor Zinkovsky committed Feb 1, 2012
1 parent 33b7fc2 commit db3c4ef
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 48 deletions.
42 changes: 16 additions & 26 deletions lib/child_process.js
Expand Up @@ -68,29 +68,13 @@ function mergeOptions(target, overrides) {


function setupChannel(target, channel) {
var isWindows = process.platform === 'win32';
target._channel = channel;

var jsonBuffer = '';

if (isWindows) {
var setSimultaneousAccepts = function(handle) {
var simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS &&
process.env.NODE_MANY_ACCEPTS != '0') ? true : false;

if (handle._simultaneousAccepts != simultaneousAccepts) {
handle.setSimultaneousAccepts(simultaneousAccepts);
handle._simultaneousAccepts = simultaneousAccepts;
}
}
}

channel.buffering = false;
channel.onread = function(pool, offset, length, recvHandle) {
if (recvHandle && setSimultaneousAccepts) {
// Update simultaneous accepts on Windows
setSimultaneousAccepts(recvHandle);
}
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);

if (pool) {
jsonBuffer += pool.toString('ascii', offset, offset + length);
Expand Down Expand Up @@ -140,10 +124,8 @@ function setupChannel(target, channel) {

var buffer = Buffer(JSON.stringify(message) + '\n');

if (sendHandle && setSimultaneousAccepts) {
// Update simultaneous accepts on Windows
setSimultaneousAccepts(sendHandle);
}
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);

var writeReq = channel.write(buffer, 0, buffer.length, sendHandle);

Expand Down Expand Up @@ -582,9 +564,13 @@ Isolate.prototype.spawn = function(options) {
self._handle = isolates.create(options.args, options.options);
if (!self._handle) throw new Error('Cannot create isolate.');

self._handle.onmessage = function(msg) {
self._handle.onmessage = function(msg, recvHandle) {
msg = JSON.parse('' + msg);
self.emit('message', msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);

self.emit('message', msg, recvHandle);
};

self._handle.onexit = function() {
Expand All @@ -600,10 +586,14 @@ Isolate.prototype.kill = function(sig) {
};


Isolate.prototype.send = function(msg) {
Isolate.prototype.send = function(msg, sendHandle) {
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
if (!this._handle) throw new Error('Isolate not running.');
msg = JSON.stringify(msg);
msg = new Buffer(msg);
return this._handle.send(msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);

return this._handle.send(msg, sendHandle);
};
23 changes: 23 additions & 0 deletions lib/net.js
Expand Up @@ -942,3 +942,26 @@ exports.isIPv4 = function(input) {
exports.isIPv6 = function(input) {
return exports.isIP(input) === 6;
};


if (process.platform === 'win32') {
var simultaneousAccepts;

exports._setSimultaneousAccepts = function(handle) {
if (typeof handle === 'undefined') {
return;
}

if (typeof simultaneousAccepts === 'undefined') {
simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS &&
process.env.NODE_MANY_ACCEPTS != '0') ? true : false;
}

if (handle._simultaneousAccepts != simultaneousAccepts) {
handle.setSimultaneousAccepts(simultaneousAccepts);
handle._simultaneousAccepts = simultaneousAccepts;
}
}
} else {
exports._setSimultaneousAccepts = function(handle) {}
}
25 changes: 20 additions & 5 deletions src/node.js
Expand Up @@ -123,17 +123,27 @@

if (process.tid === 1) return;

var net = NativeModule.require('net');

// isolate initialization
process.send = function(msg) {
process.send = function(msg, sendHandle) {
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
msg = JSON.stringify(msg);
msg = new Buffer(msg);
return process._send(msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);

return process._send(msg, sendHandle);
};

process._onmessage = function(msg) {
process._onmessage = function(msg, recvHandle) {
msg = JSON.parse('' + msg);
process.emit('message', msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);

process.emit('message', msg, recvHandle);
};

process.exit = process._exit;
Expand Down Expand Up @@ -439,10 +449,15 @@
// Load tcp_wrap to avoid situation where we might immediately receive
// a message.
// FIXME is this really necessary?
process.binding('tcp_wrap')
process.binding('tcp_wrap');

cp._forkChild();
assert(process.send);
} else if (process.tid !== 1) {
// Load tcp_wrap to avoid situation where we might immediately receive
// a message.
// FIXME is this really necessary?
process.binding('tcp_wrap');
}
}

Expand Down
123 changes: 106 additions & 17 deletions src/node_isolate.cc
Expand Up @@ -26,6 +26,7 @@
#include <node_isolate.h>
#include <node_internals.h>
#include <node_object_wrap.h>
#include <tcp_wrap.h>

#include <stdlib.h>
#include <string.h>
Expand All @@ -34,6 +35,8 @@

#define isolate_debugger_constructor NODE_VAR(isolate_debugger_constructor)

#define ISOLATEMESSAGE_SHARED_STREAM 0x0001


namespace node {

Expand Down Expand Up @@ -166,23 +169,35 @@ class Channel {


struct IsolateMessage {
size_t size_;
char* data_;
int flags;
struct {
size_t size_;
char* buffer_;
} data_;
uv_stream_info_t shared_stream_info_;

IsolateMessage(const char* buffer, size_t size,
uv_stream_info_t* shared_stream_info) {
flags = 0;

IsolateMessage(const char* data, size_t size) {
// make a copy for now
size_ = size;
data_ = new char[size];
memcpy(data_, data, size);
data_.size_ = size;
data_.buffer_ = new char[size];
memcpy(data_.buffer_, buffer, size);

if (shared_stream_info) {
flags |= ISOLATEMESSAGE_SHARED_STREAM;
shared_stream_info_ = *shared_stream_info;
}
}

~IsolateMessage() {
delete[] data_;
delete[] data_.buffer_;
}

static void Free(char* data, void* arg) {
IsolateMessage* msg = static_cast<IsolateMessage*>(arg);
assert(data == msg->data_);
assert(data == msg->data_.buffer_);
delete msg;
}
};
Expand All @@ -208,7 +223,23 @@ Handle<Value> Isolate::Send(const Arguments& args) {
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);

IsolateMessage* msg = new IsolateMessage(data, size);
IsolateMessage* msg;

if (args[1]->IsObject()) {
uv_stream_info_t stream_info;

Local<Object> send_stream_obj = args[1]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
uv_stream_t* send_stream = send_stream_wrap->GetStream();
int r = uv_export(send_stream, &stream_info);
assert(r == 0);
msg = new IsolateMessage(data, size, &stream_info);
} else {
msg = new IsolateMessage(data, size, NULL);
}

isolate->send_channel_->Send(msg);

return Undefined();
Expand All @@ -231,9 +262,31 @@ void Isolate::OnMessage(IsolateMessage* msg, void* arg) {
Isolate* self = static_cast<Isolate*>(arg);
NODE_ISOLATE_CHECK(self);

Buffer* buf = Buffer::New(msg->data_, msg->size_, IsolateMessage::Free, msg);
Handle<Value> argv[] = { buf->handle_ };
MakeCallback(self->globals_.process, "_onmessage", ARRAY_SIZE(argv), argv);
Buffer* buf = Buffer::New(msg->data_.buffer_, msg->data_.size_,
IsolateMessage::Free, msg);

int argc = 1;
Handle<Value> argv[2] = {
buf->handle_
};

if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();

// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));

int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
assert(r == 0);

argv[1] = pending_obj;
argc++;
}

MakeCallback(self->globals_.process, "_onmessage", argc, argv);
}


Expand Down Expand Up @@ -442,9 +495,30 @@ struct IsolateWrap: public ObjectWrap {
NODE_ISOLATE_CHECK(parent_isolate_);
HandleScope scope;
Buffer* buf = Buffer::New(
msg->data_, msg->size_, IsolateMessage::Free, msg);
Handle<Value> argv[] = { buf->handle_ };
MakeCallback(handle_, "onmessage", ARRAY_SIZE(argv), argv);
msg->data_.buffer_, msg->data_.size_, IsolateMessage::Free, msg);

int argc = 1;
Handle<Value> argv[2] = {
buf->handle_
};

if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();

// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));

int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
assert(r == 0);

argv[1] = pending_obj;
argc++;
}

MakeCallback(handle_, "onmessage", argc, argv);
}

// TODO merge with Isolate::Send(), it's almost identical
Expand All @@ -457,9 +531,24 @@ struct IsolateWrap: public ObjectWrap {
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);

IsolateMessage* msg = new IsolateMessage(data, size);
self->send_channel_->Send(msg);
IsolateMessage* msg;

if (args[1]->IsObject()) {
uv_stream_info_t stream_info;

Local<Object> send_stream_obj = args[1]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
uv_stream_t* send_stream = send_stream_wrap->GetStream();
int r = uv_export(send_stream, &stream_info);
assert(r == 0);
msg = new IsolateMessage(data, size, &stream_info);
} else {
msg = new IsolateMessage(data, size, NULL);
}

self->send_channel_->Send(msg);
return Undefined();
}

Expand Down

3 comments on commit db3c4ef

@AndreasMadsen
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the difference between UDP and TCP handlers on windows, but it seams wrong to put setSimultaneousAccepts intro the net module if it is also required when sending UDP handlers.

I know UDP is not yet supported but will be in cluster anyway.

@bnoordhuis
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seams wrong to put setSimultaneousAccepts intro the net module if it is also required when sending UDP handlers.

It's because accept() is for TCP sockets only.

@AndreasMadsen
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought so, thanks for making this clear :)

Please sign in to comment.