Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge branch 'ipc-binding'
  • Loading branch information
ry committed Oct 7, 2011
2 parents f018be3 + 26c5905 commit 51f7ba4
Show file tree
Hide file tree
Showing 13 changed files with 527 additions and 264 deletions.
2 changes: 1 addition & 1 deletion deps/uv/Makefile
Expand Up @@ -80,7 +80,7 @@ endif
TESTS=test/blackhole-server.c test/echo-server.c test/test-*.c
BENCHMARKS=test/blackhole-server.c test/echo-server.c test/dns-server.c test/benchmark-*.c

all: uv.a test/run-tests$(E) test/run-benchmarks$(E)
all: uv.a

$(CARES_OBJS): %.o: %.c
$(CC) -o $*.o -c $(CFLAGS) $(CPPFLAGS) $< -DHAVE_CONFIG_H
Expand Down
10 changes: 7 additions & 3 deletions deps/uv/src/unix/process.c
Expand Up @@ -110,6 +110,7 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
#endif
int status;
pid_t pid;
int flags;

uv__handle_init(loop, (uv_handle_t*)process, UV_PROCESS);
loop->counters.process_init++;
Expand Down Expand Up @@ -255,26 +256,29 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
assert(stdin_pipe[0] >= 0);
uv__close(stdin_pipe[0]);
uv__nonblock(stdin_pipe[1], 1);
flags = UV_WRITABLE | (options.stdin_stream->ipc ? UV_READABLE : 0);
uv__stream_open((uv_stream_t*)options.stdin_stream, stdin_pipe[1],
UV_WRITABLE);
flags);
}

if (stdout_pipe[0] >= 0) {
assert(options.stdout_stream);
assert(stdout_pipe[1] >= 0);
uv__close(stdout_pipe[1]);
uv__nonblock(stdout_pipe[0], 1);
flags = UV_READABLE | (options.stdout_stream->ipc ? UV_WRITABLE : 0);
uv__stream_open((uv_stream_t*)options.stdout_stream, stdout_pipe[0],
UV_READABLE);
flags);
}

if (stderr_pipe[0] >= 0) {
assert(options.stderr_stream);
assert(stderr_pipe[1] >= 0);
uv__close(stderr_pipe[1]);
uv__nonblock(stderr_pipe[0], 1);
flags = UV_READABLE | (options.stderr_stream->ipc ? UV_WRITABLE : 0);
uv__stream_open((uv_stream_t*)options.stderr_stream, stderr_pipe[0],
UV_READABLE);
flags);
}

return 0;
Expand Down
19 changes: 11 additions & 8 deletions deps/uv/src/unix/stream.c
Expand Up @@ -563,6 +563,7 @@ static void uv__read(uv_stream_t* stream) {
return;
} else {
/* Successful read */
size_t buflen = buf.len;

if (stream->read_cb) {
stream->read_cb(stream, nread, buf);
Expand Down Expand Up @@ -599,6 +600,11 @@ static void uv__read(uv_stream_t* stream) {
} else {
stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_UNKNOWN_HANDLE);
}

/* Return if we didn't fill the buffer, there is no more data to read. */
if (nread < buflen) {
return;
}
}
}
}
Expand Down Expand Up @@ -907,14 +913,11 @@ int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,


int uv_read_stop(uv_stream_t* stream) {
uv_tcp_t* tcp = (uv_tcp_t*)stream;

((uv_handle_t*)tcp)->flags &= ~UV_READING;

ev_io_stop(tcp->loop->ev, &tcp->read_watcher);
tcp->read_cb = NULL;
tcp->read2_cb = NULL;
tcp->alloc_cb = NULL;
ev_io_stop(stream->loop->ev, &stream->read_watcher);
stream->flags &= ~UV_READING;
stream->read_cb = NULL;
stream->read2_cb = NULL;
stream->alloc_cb = NULL;
return 0;
}

Expand Down
28 changes: 25 additions & 3 deletions deps/uv/test/test-fs.c
Expand Up @@ -1163,13 +1163,21 @@ TEST_IMPL(fs_symlink) {

TEST_IMPL(fs_utime) {
utime_check_t checkme;
const char* path = ".";
const char* path = "test_file";
double atime;
double mtime;
uv_fs_t req;
int r;

/* Setup. */
loop = uv_default_loop();
unlink(path);
r = uv_fs_open(loop, &req, path, O_RDWR | O_CREAT,
S_IWRITE | S_IREAD, NULL);
ASSERT(r != -1);
ASSERT(req.result != -1);
uv_fs_req_cleanup(&req);
close(r);

atime = mtime = 400497753; /* 1982-09-10 11:22:33 */

Expand All @@ -1196,24 +1204,35 @@ TEST_IMPL(fs_utime) {
uv_run(loop);
ASSERT(utime_cb_count == 1);

/* Cleanup. */
unlink(path);

return 0;
}


TEST_IMPL(fs_futime) {
utime_check_t checkme;
const char* path = ".";
const char* path = "test_file";
double atime;
double mtime;
uv_file file;
uv_fs_t req;
int r;

/* Setup. */
loop = uv_default_loop();
unlink(path);
r = uv_fs_open(loop, &req, path, O_RDWR | O_CREAT,
S_IWRITE | S_IREAD, NULL);
ASSERT(r != -1);
ASSERT(req.result != -1);
uv_fs_req_cleanup(&req);
close(r);

atime = mtime = 400497753; /* 1982-09-10 11:22:33 */

r = uv_fs_open(loop, &req, path, O_RDONLY, 0, NULL);
r = uv_fs_open(loop, &req, path, O_RDWR, 0, NULL);
ASSERT(r != -1);
ASSERT(req.result != -1);
file = req.result; /* FIXME probably not how it's supposed to be used */
Expand Down Expand Up @@ -1243,6 +1262,9 @@ TEST_IMPL(fs_futime) {
uv_run(loop);
ASSERT(futime_cb_count == 1);

/* Cleanup. */
unlink(path);

return 0;
}

Expand Down
121 changes: 117 additions & 4 deletions lib/child_process_uv.js
Expand Up @@ -24,11 +24,18 @@ var Process = process.binding('process_wrap').Process;
var inherits = require('util').inherits;
var constants; // if (!constants) constants = process.binding('constants');

var LF = '\n'.charCodeAt(0);
var Pipe;


// constructors for lazy loading
function createPipe() {
var Pipe = process.binding('pipe_wrap').Pipe;
return new Pipe();
function createPipe(ipc) {
// Lazy load
if (!Pipe) {
Pipe = new process.binding('pipe_wrap').Pipe;
}

return new Pipe(ipc);
}

function createSocket(pipe, readable) {
Expand Down Expand Up @@ -61,6 +68,106 @@ function mergeOptions(target, overrides) {
}


function setupChannel(target, channel) {
target._channel = channel;

var jsonBuffer = '';

channel.onread = function(pool, offset, length) {
if (pool) {
for (var i = 0; i < length; i++) {
if (pool[offset + i] === LF) {
jsonBuffer += pool.toString('ascii', offset, offset + i);
var message = JSON.parse(jsonBuffer);
jsonBuffer = pool.toString('ascii', i, length);
offset = i + 1;

target.emit('message', message);
}
}
} else {
channel.close();
target._channel = null;
}
};

target.send = function(message, fd) {
if (fd) throw new Error("not yet implemented");

if (!target._channel) throw new Error("channel closed");

// For overflow protection don't write if channel queue is too deep.
if (channel.writeQueueSize > 1024 * 1024) {
return false;
}

var buffer = Buffer(JSON.stringify(message) + '\n');

var writeReq = channel.write(buffer);

if (!writeReq) {
throw new Error(errno + " cannot write to IPC channel.");
} else {
writeReq.oncomplete = nop;
}

return true;
};

channel.readStart();
}


function nop() { }


exports.fork = function(modulePath, args, options) {
if (!options) options = {};

if (!args) args = [];
args.unshift(modulePath);

if (options.stdinStream) {
throw new Error("stdinStream not allowed for fork()");
}

if (options.customFds) {
throw new Error("customFds not allowed for fork()");
}

// Leave stdin open for the IPC channel. stdout and stderr should be the
// same as the parent's.
options.customFds = [ -1, 1, 2 ];

// Just need to set this - child process won't actually use the fd.
// For backwards compat - this can be changed to 'NODE_CHANNEL' before v0.6.
options.env = { NODE_CHANNEL_FD: 42 };

// stdin is the IPC channel.
options.stdinStream = createPipe(true);

var child = spawn(process.execPath, args, options);

setupChannel(child, options.stdinStream);

child.on('exit', function() {
if (child._channel) {
child._channel.close();
}
});

return child;
};


exports._forkChild = function() {
// set process.send()
var p = createPipe(true);
p.open(0);
setupChannel(process, p);
};


exports.exec = function(command /*, options, callback */) {
var file, args, options, callback;

Expand Down Expand Up @@ -213,7 +320,8 @@ var spawn = exports.spawn = function(file, args, options) {
cwd: options ? options.cwd : null,
windowsVerbatimArguments: !!(options && options.windowsVerbatimArguments),
envPairs: envPairs,
customFds: options ? options.customFds : null
customFds: options ? options.customFds : null,
stdinStream: options ? options.stdinStream : null
});

return child;
Expand Down Expand Up @@ -266,6 +374,9 @@ inherits(ChildProcess, EventEmitter);


function setStreamOption(name, index, options) {
// Skip if we already have options.stdinStream
if (options[name]) return;

if (options.customFds &&
typeof options.customFds[index] == 'number' &&
options.customFds[index] !== -1) {
Expand All @@ -283,6 +394,8 @@ function setStreamOption(name, index, options) {
ChildProcess.prototype.spawn = function(options) {
var self = this;

debugger;

setStreamOption('stdinStream', 0, options);
setStreamOption('stdoutStream', 1, options);
setStreamOption('stderrStream', 2, options);
Expand Down
3 changes: 3 additions & 0 deletions src/node.cc
Expand Up @@ -1078,6 +1078,9 @@ void MakeCallback(Handle<Object> object,
HandleScope scope;

Local<Value> callback_v = object->Get(String::New(method));
if (!callback_v->IsFunction()) {
fprintf(stderr, "method = %s", method);
}
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);

Expand Down
8 changes: 4 additions & 4 deletions src/pipe_wrap.cc
Expand Up @@ -86,16 +86,16 @@ Handle<Value> PipeWrap::New(const Arguments& args) {
assert(args.IsConstructCall());

HandleScope scope;
PipeWrap* wrap = new PipeWrap(args.This());
PipeWrap* wrap = new PipeWrap(args.This(), args[0]->IsTrue());
assert(wrap);

return scope.Close(args.This());
}


PipeWrap::PipeWrap(Handle<Object> object) : StreamWrap(object,
(uv_stream_t*) &handle_) {
int r = uv_pipe_init(uv_default_loop(), &handle_, 0);
PipeWrap::PipeWrap(Handle<Object> object, bool ipc)
: StreamWrap(object, (uv_stream_t*) &handle_) {
int r = uv_pipe_init(uv_default_loop(), &handle_, ipc);
assert(r == 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
handle_.data = reinterpret_cast<void*>(this);
Expand Down
2 changes: 1 addition & 1 deletion src/pipe_wrap.h
Expand Up @@ -12,7 +12,7 @@ class PipeWrap : StreamWrap {
static void Initialize(v8::Handle<v8::Object> target);

private:
PipeWrap(v8::Handle<v8::Object> object);
PipeWrap(v8::Handle<v8::Object> object, bool ipc);

static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> Bind(const v8::Arguments& args);
Expand Down

0 comments on commit 51f7ba4

Please sign in to comment.