Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'ry/master' into merge-v0.6
Browse files Browse the repository at this point in the history
  • Loading branch information
isaacs committed Feb 1, 2012
2 parents 18d179c + db3c4ef commit bd21038
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 48 deletions.
42 changes: 16 additions & 26 deletions lib/child_process.js
Expand Up @@ -68,29 +68,13 @@ function mergeOptions(target, overrides) {


function setupChannel(target, channel) {
var isWindows = process.platform === 'win32';
target._channel = channel;

var jsonBuffer = '';

if (isWindows) {
var setSimultaneousAccepts = function(handle) {
var simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS &&
process.env.NODE_MANY_ACCEPTS != '0') ? true : false;

if (handle._simultaneousAccepts != simultaneousAccepts) {
handle.setSimultaneousAccepts(simultaneousAccepts);
handle._simultaneousAccepts = simultaneousAccepts;
}
}
}

channel.buffering = false;
channel.onread = function(pool, offset, length, recvHandle) {
if (recvHandle && setSimultaneousAccepts) {
// Update simultaneous accepts on Windows
setSimultaneousAccepts(recvHandle);
}
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);

if (pool) {
jsonBuffer += pool.toString('ascii', offset, offset + length);
Expand Down Expand Up @@ -140,10 +124,8 @@ function setupChannel(target, channel) {

var buffer = Buffer(JSON.stringify(message) + '\n');

if (sendHandle && setSimultaneousAccepts) {
// Update simultaneous accepts on Windows
setSimultaneousAccepts(sendHandle);
}
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);

var writeReq = channel.write(buffer, 0, buffer.length, sendHandle);

Expand Down Expand Up @@ -582,9 +564,13 @@ Isolate.prototype.spawn = function(options) {
self._handle = isolates.create(options.args, options.options);
if (!self._handle) throw new Error('Cannot create isolate.');

self._handle.onmessage = function(msg) {
self._handle.onmessage = function(msg, recvHandle) {
msg = JSON.parse('' + msg);
self.emit('message', msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);

self.emit('message', msg, recvHandle);
};

self._handle.onexit = function() {
Expand All @@ -600,10 +586,14 @@ Isolate.prototype.kill = function(sig) {
};


Isolate.prototype.send = function(msg) {
Isolate.prototype.send = function(msg, sendHandle) {
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
if (!this._handle) throw new Error('Isolate not running.');
msg = JSON.stringify(msg);
msg = new Buffer(msg);
return this._handle.send(msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);

return this._handle.send(msg, sendHandle);
};
23 changes: 23 additions & 0 deletions lib/net.js
Expand Up @@ -942,3 +942,26 @@ exports.isIPv4 = function(input) {
exports.isIPv6 = function(input) {
return exports.isIP(input) === 6;
};


if (process.platform === 'win32') {
var simultaneousAccepts;

exports._setSimultaneousAccepts = function(handle) {
if (typeof handle === 'undefined') {
return;
}

if (typeof simultaneousAccepts === 'undefined') {
simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS &&
process.env.NODE_MANY_ACCEPTS != '0') ? true : false;
}

if (handle._simultaneousAccepts != simultaneousAccepts) {
handle.setSimultaneousAccepts(simultaneousAccepts);
handle._simultaneousAccepts = simultaneousAccepts;
}
}
} else {
exports._setSimultaneousAccepts = function(handle) {}
}
25 changes: 20 additions & 5 deletions src/node.js
Expand Up @@ -123,17 +123,27 @@

if (process.tid === 1) return;

var net = NativeModule.require('net');

// isolate initialization
process.send = function(msg) {
process.send = function(msg, sendHandle) {
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
msg = JSON.stringify(msg);
msg = new Buffer(msg);
return process._send(msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);

return process._send(msg, sendHandle);
};

process._onmessage = function(msg) {
process._onmessage = function(msg, recvHandle) {
msg = JSON.parse('' + msg);
process.emit('message', msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);

process.emit('message', msg, recvHandle);
};

process.exit = process._exit;
Expand Down Expand Up @@ -441,10 +451,15 @@
// Load tcp_wrap to avoid situation where we might immediately receive
// a message.
// FIXME is this really necessary?
process.binding('tcp_wrap')
process.binding('tcp_wrap');

cp._forkChild();
assert(process.send);
} else if (process.tid !== 1) {
// Load tcp_wrap to avoid situation where we might immediately receive
// a message.
// FIXME is this really necessary?
process.binding('tcp_wrap');
}
}

Expand Down
123 changes: 106 additions & 17 deletions src/node_isolate.cc
Expand Up @@ -26,6 +26,7 @@
#include <node_isolate.h>
#include <node_internals.h>
#include <node_object_wrap.h>
#include <tcp_wrap.h>

#include <stdlib.h>
#include <string.h>
Expand All @@ -34,6 +35,8 @@

#define isolate_debugger_constructor NODE_VAR(isolate_debugger_constructor)

#define ISOLATEMESSAGE_SHARED_STREAM 0x0001


namespace node {

Expand Down Expand Up @@ -166,23 +169,35 @@ class Channel {


struct IsolateMessage {
size_t size_;
char* data_;
int flags;
struct {
size_t size_;
char* buffer_;
} data_;
uv_stream_info_t shared_stream_info_;

IsolateMessage(const char* buffer, size_t size,
uv_stream_info_t* shared_stream_info) {
flags = 0;

IsolateMessage(const char* data, size_t size) {
// make a copy for now
size_ = size;
data_ = new char[size];
memcpy(data_, data, size);
data_.size_ = size;
data_.buffer_ = new char[size];
memcpy(data_.buffer_, buffer, size);

if (shared_stream_info) {
flags |= ISOLATEMESSAGE_SHARED_STREAM;
shared_stream_info_ = *shared_stream_info;
}
}

~IsolateMessage() {
delete[] data_;
delete[] data_.buffer_;
}

static void Free(char* data, void* arg) {
IsolateMessage* msg = static_cast<IsolateMessage*>(arg);
assert(data == msg->data_);
assert(data == msg->data_.buffer_);
delete msg;
}
};
Expand All @@ -208,7 +223,23 @@ Handle<Value> Isolate::Send(const Arguments& args) {
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);

IsolateMessage* msg = new IsolateMessage(data, size);
IsolateMessage* msg;

if (args[1]->IsObject()) {
uv_stream_info_t stream_info;

Local<Object> send_stream_obj = args[1]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
uv_stream_t* send_stream = send_stream_wrap->GetStream();
int r = uv_export(send_stream, &stream_info);
assert(r == 0);
msg = new IsolateMessage(data, size, &stream_info);
} else {
msg = new IsolateMessage(data, size, NULL);
}

isolate->send_channel_->Send(msg);

return Undefined();
Expand All @@ -231,9 +262,31 @@ void Isolate::OnMessage(IsolateMessage* msg, void* arg) {
Isolate* self = static_cast<Isolate*>(arg);
NODE_ISOLATE_CHECK(self);

Buffer* buf = Buffer::New(msg->data_, msg->size_, IsolateMessage::Free, msg);
Handle<Value> argv[] = { buf->handle_ };
MakeCallback(self->globals_.process, "_onmessage", ARRAY_SIZE(argv), argv);
Buffer* buf = Buffer::New(msg->data_.buffer_, msg->data_.size_,
IsolateMessage::Free, msg);

int argc = 1;
Handle<Value> argv[2] = {
buf->handle_
};

if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();

// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));

int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
assert(r == 0);

argv[1] = pending_obj;
argc++;
}

MakeCallback(self->globals_.process, "_onmessage", argc, argv);
}


Expand Down Expand Up @@ -442,9 +495,30 @@ struct IsolateWrap: public ObjectWrap {
NODE_ISOLATE_CHECK(parent_isolate_);
HandleScope scope;
Buffer* buf = Buffer::New(
msg->data_, msg->size_, IsolateMessage::Free, msg);
Handle<Value> argv[] = { buf->handle_ };
MakeCallback(handle_, "onmessage", ARRAY_SIZE(argv), argv);
msg->data_.buffer_, msg->data_.size_, IsolateMessage::Free, msg);

int argc = 1;
Handle<Value> argv[2] = {
buf->handle_
};

if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();

// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));

int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
assert(r == 0);

argv[1] = pending_obj;
argc++;
}

MakeCallback(handle_, "onmessage", argc, argv);
}

// TODO merge with Isolate::Send(), it's almost identical
Expand All @@ -457,9 +531,24 @@ struct IsolateWrap: public ObjectWrap {
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);

IsolateMessage* msg = new IsolateMessage(data, size);
self->send_channel_->Send(msg);
IsolateMessage* msg;

if (args[1]->IsObject()) {
uv_stream_info_t stream_info;

Local<Object> send_stream_obj = args[1]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
uv_stream_t* send_stream = send_stream_wrap->GetStream();
int r = uv_export(send_stream, &stream_info);
assert(r == 0);
msg = new IsolateMessage(data, size, &stream_info);
} else {
msg = new IsolateMessage(data, size, NULL);
}

self->send_channel_->Send(msg);
return Undefined();
}

Expand Down

0 comments on commit bd21038

Please sign in to comment.