Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Reimplement child_process.fork
Browse files Browse the repository at this point in the history
Fixes test/simple/test-child-process-fork.js
  • Loading branch information
ry committed Oct 7, 2011
1 parent 08c12de commit 26c5905
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 4 deletions.
121 changes: 117 additions & 4 deletions lib/child_process_uv.js
Expand Up @@ -24,11 +24,18 @@ var Process = process.binding('process_wrap').Process;
var inherits = require('util').inherits;
var constants; // if (!constants) constants = process.binding('constants');

var LF = '\n'.charCodeAt(0);
var Pipe;


// constructors for lazy loading
function createPipe() {
var Pipe = process.binding('pipe_wrap').Pipe;
return new Pipe();
function createPipe(ipc) {
// Lazy load
if (!Pipe) {
Pipe = new process.binding('pipe_wrap').Pipe;
}

return new Pipe(ipc);
}

function createSocket(pipe, readable) {
Expand Down Expand Up @@ -61,6 +68,106 @@ function mergeOptions(target, overrides) {
}


function setupChannel(target, channel) {
target._channel = channel;

var jsonBuffer = '';

channel.onread = function(pool, offset, length) {
if (pool) {
for (var i = 0; i < length; i++) {
if (pool[offset + i] === LF) {
jsonBuffer += pool.toString('ascii', offset, offset + i);
var message = JSON.parse(jsonBuffer);
jsonBuffer = pool.toString('ascii', i, length);
offset = i + 1;

target.emit('message', message);
}
}
} else {
channel.close();
target._channel = null;
}
};

target.send = function(message, fd) {
if (fd) throw new Error("not yet implemented");

if (!target._channel) throw new Error("channel closed");

// For overflow protection don't write if channel queue is too deep.
if (channel.writeQueueSize > 1024 * 1024) {
return false;
}

var buffer = Buffer(JSON.stringify(message) + '\n');

var writeReq = channel.write(buffer);

if (!writeReq) {
throw new Error(errno + " cannot write to IPC channel.");
} else {
writeReq.oncomplete = nop;
}

return true;
};

channel.readStart();
}


function nop() { }


exports.fork = function(modulePath, args, options) {
if (!options) options = {};

if (!args) args = [];
args.unshift(modulePath);

if (options.stdinStream) {
throw new Error("stdinStream not allowed for fork()");
}

if (options.customFds) {
throw new Error("customFds not allowed for fork()");
}

// Leave stdin open for the IPC channel. stdout and stderr should be the
// same as the parent's.
options.customFds = [ -1, 1, 2 ];

// Just need to set this - child process won't actually use the fd.
// For backwards compat - this can be changed to 'NODE_CHANNEL' before v0.6.
options.env = { NODE_CHANNEL_FD: 42 };

// stdin is the IPC channel.
options.stdinStream = createPipe(true);

var child = spawn(process.execPath, args, options);

setupChannel(child, options.stdinStream);

child.on('exit', function() {
if (child._channel) {
child._channel.close();
}
});

return child;
};


exports._forkChild = function() {
// set process.send()
var p = createPipe(true);
p.open(0);
setupChannel(process, p);
};


exports.exec = function(command /*, options, callback */) {
var file, args, options, callback;

Expand Down Expand Up @@ -213,7 +320,8 @@ var spawn = exports.spawn = function(file, args, options) {
cwd: options ? options.cwd : null,
windowsVerbatimArguments: !!(options && options.windowsVerbatimArguments),
envPairs: envPairs,
customFds: options ? options.customFds : null
customFds: options ? options.customFds : null,
stdinStream: options ? options.stdinStream : null
});

return child;
Expand Down Expand Up @@ -266,6 +374,9 @@ inherits(ChildProcess, EventEmitter);


function setStreamOption(name, index, options) {
// Skip if we already have options.stdinStream
if (options[name]) return;

if (options.customFds &&
typeof options.customFds[index] == 'number' &&
options.customFds[index] !== -1) {
Expand All @@ -283,6 +394,8 @@ function setStreamOption(name, index, options) {
ChildProcess.prototype.spawn = function(options) {
var self = this;

debugger;

setStreamOption('stdinStream', 0, options);
setStreamOption('stdoutStream', 1, options);
setStreamOption('stderrStream', 2, options);
Expand Down
3 changes: 3 additions & 0 deletions src/node.cc
Expand Up @@ -1078,6 +1078,9 @@ void MakeCallback(Handle<Object> object,
HandleScope scope;

Local<Value> callback_v = object->Get(String::New(method));
if (!callback_v->IsFunction()) {
fprintf(stderr, "method = %s", method);
}
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);

Expand Down

0 comments on commit 26c5905

Please sign in to comment.