Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
uv_write2 uv_read2_start binding
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Oct 7, 2011
1 parent f018be3 commit 471c570
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 245 deletions.
8 changes: 4 additions & 4 deletions src/pipe_wrap.cc
Expand Up @@ -86,16 +86,16 @@ Handle<Value> PipeWrap::New(const Arguments& args) {
assert(args.IsConstructCall());

HandleScope scope;
PipeWrap* wrap = new PipeWrap(args.This());
PipeWrap* wrap = new PipeWrap(args.This(), args[0]->IsTrue());
assert(wrap);

return scope.Close(args.This());
}


PipeWrap::PipeWrap(Handle<Object> object) : StreamWrap(object,
(uv_stream_t*) &handle_) {
int r = uv_pipe_init(uv_default_loop(), &handle_, 0);
PipeWrap::PipeWrap(Handle<Object> object, bool ipc)
: StreamWrap(object, (uv_stream_t*) &handle_) {
int r = uv_pipe_init(uv_default_loop(), &handle_, ipc);
assert(r == 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
handle_.data = reinterpret_cast<void*>(this);
Expand Down
2 changes: 1 addition & 1 deletion src/pipe_wrap.h
Expand Up @@ -12,7 +12,7 @@ class PipeWrap : StreamWrap {
static void Initialize(v8::Handle<v8::Object> target);

private:
PipeWrap(v8::Handle<v8::Object> object);
PipeWrap(v8::Handle<v8::Object> object, bool ipc);

static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> Bind(const v8::Arguments& args);
Expand Down
80 changes: 74 additions & 6 deletions src/stream_wrap.cc
Expand Up @@ -2,6 +2,7 @@
#include <node_buffer.h>
#include <handle_wrap.h>
#include <stream_wrap.h>
#include <tcp_wrap.h>
#include <req_wrap.h>


Expand Down Expand Up @@ -95,7 +96,14 @@ Handle<Value> StreamWrap::ReadStart(const Arguments& args) {

UNWRAP

int r = uv_read_start(wrap->stream_, OnAlloc, OnRead);
bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE &&
((uv_pipe_t*)wrap->stream_)->ipc;
int r;
if (ipc_pipe) {
r = uv_read2_start(wrap->stream_, OnAlloc, OnRead2);
} else {
r = uv_read_start(wrap->stream_, OnAlloc, OnRead);
}

// Error starting the tcp.
if (r) SetErrno(uv_last_error(uv_default_loop()).code);
Expand Down Expand Up @@ -170,9 +178,13 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
return buf;
}

void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {

void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending) {
HandleScope scope;

assert(pending == UV_UNKNOWN_HANDLE); // TODO

StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);

// We should not be getting this callback if someone as already called
Expand Down Expand Up @@ -201,25 +213,59 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
}

if (nread > 0) {
Local<Value> argv[3] = {
int argc = 3;
Local<Value> argv[4] = {
slab_v,
Integer::New(wrap->slab_offset_),
Integer::New(nread)
};
MakeCallback(wrap->object_, "onread", 3, argv);

if (pending == UV_TCP) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();

// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));

int r = uv_accept(handle, pending_wrap->GetStream());
assert(r == 0);

argv[3] = pending_obj;
argc++;
} else {
// We only support sending UV_TCP right now.
assert(pending == UV_UNKNOWN_HANDLE);
}

MakeCallback(wrap->object_, "onread", argc, argv);
}
}


void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
OnReadCommon(handle, nread, buf, UV_UNKNOWN_HANDLE);
}


void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
uv_handle_type pending) {
OnReadCommon((uv_stream_t*)handle, nread, buf, pending);
}


Handle<Value> StreamWrap::Write(const Arguments& args) {
HandleScope scope;

UNWRAP

bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE &&
((uv_pipe_t*)wrap->stream_)->ipc;

// The first argument is a buffer.
assert(Buffer::HasInstance(args[0]));
Local<Object> buffer_obj = args[0]->ToObject();

size_t offset = 0;
size_t length = Buffer::Length(buffer_obj);

Expand All @@ -239,7 +285,29 @@ Handle<Value> StreamWrap::Write(const Arguments& args) {
buf.base = Buffer::Data(buffer_obj) + offset;
buf.len = length;

int r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite);
int r;

if (!ipc_pipe) {
r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite);
} else {
uv_stream_t* send_stream = NULL;

if (args.Length() > 3) {
assert(args[3]->IsObject());
Local<Object> send_stream_obj = args[3]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
send_stream = send_stream_wrap->GetStream();
}

r = uv_write2(&req_wrap->req_,
wrap->stream_,
&buf,
1,
send_stream,
StreamWrap::AfterWrite);
}

req_wrap->Dispatched();

Expand Down
7 changes: 6 additions & 1 deletion src/stream_wrap.h
Expand Up @@ -32,9 +32,14 @@ class StreamWrap : public HandleWrap {
// Callbacks for libuv
static void AfterWrite(uv_write_t* req, int status);
static uv_buf_t OnAlloc(uv_handle_t* handle, size_t suggested_size);
static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf);
static void AfterShutdown(uv_shutdown_t* req, int status);

static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf);
static void OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
uv_handle_type pending);
static void OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending);

size_t slab_offset_;
uv_stream_t* stream_;
};
Expand Down

0 comments on commit 471c570

Please sign in to comment.