Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Upgrade libuv to 4b9b692
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Sep 14, 2011
1 parent 92d4ed3 commit 0e0fbf7
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 63 deletions.
1 change: 1 addition & 0 deletions deps/uv/include/uv-private/uv-unix.h
Expand Up @@ -61,6 +61,7 @@ typedef int uv_file;
int write_index; \
uv_buf_t* bufs; \
int bufcnt; \
int error; \
uv_buf_t bufsml[UV_REQ_BUFSML_SIZE];

#define UV_SHUTDOWN_PRIVATE_FIELDS /* empty */
Expand Down
2 changes: 2 additions & 0 deletions deps/uv/src/unix/core.c
Expand Up @@ -233,6 +233,8 @@ void uv__finish_close(uv_handle_t* handle) {
case UV_TCP:
assert(!ev_is_active(&((uv_stream_t*)handle)->read_watcher));
assert(!ev_is_active(&((uv_stream_t*)handle)->write_watcher));
assert(((uv_stream_t*)handle)->fd == -1);
uv__stream_destroy((uv_stream_t*)handle);
break;

case UV_UDP:
Expand Down
1 change: 1 addition & 0 deletions deps/uv/src/unix/internal.h
Expand Up @@ -83,6 +83,7 @@ void uv_fatal_error(const int errorno, const char* syscall);
void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream,
uv_handle_type type);
int uv__stream_open(uv_stream_t*, int fd, int flags);
void uv__stream_destroy(uv_stream_t* stream);
void uv__stream_io(EV_P_ ev_io* watcher, int revents);
void uv__server_io(EV_P_ ev_io* watcher, int revents);
int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
Expand Down
139 changes: 76 additions & 63 deletions deps/uv/src/unix/stream.c
Expand Up @@ -31,7 +31,7 @@


static void uv__stream_connect(uv_stream_t*);
static uv_write_t* uv__write(uv_stream_t* stream);
static void uv__write(uv_stream_t* stream);
static void uv__read(uv_stream_t* stream);


Expand Down Expand Up @@ -103,6 +103,39 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
}


void uv__stream_destroy(uv_stream_t* stream) {
uv_write_t* req;
ngx_queue_t* q;

assert(stream->flags & UV_CLOSED);

while (!ngx_queue_empty(&stream->write_queue)) {
q = ngx_queue_head(&stream->write_queue);
ngx_queue_remove(q);

req = ngx_queue_data(q, uv_write_t, queue);
if (req->bufs != req->bufsml)
free(req->bufs);

if (req->cb) {
uv_err_new_artificial(req->handle->loop, UV_EINTR);
req->cb(req, -1);
}
}

while (!ngx_queue_empty(&stream->write_completed_queue)) {
q = ngx_queue_head(&stream->write_completed_queue);
ngx_queue_remove(q);

req = ngx_queue_data(q, uv_write_t, queue);
if (req->cb) {
uv_err_new_artificial(req->handle->loop, UV_OK);
req->cb(req, 0);
}
}
}


void uv__server_io(EV_P_ ev_io* watcher, int revents) {
int fd;
struct sockaddr_storage addr;
Expand Down Expand Up @@ -254,10 +287,28 @@ static void uv__drain(uv_stream_t* stream) {
}


static void uv__write_req_finish(uv_write_t* req) {
uv_stream_t* stream = req->handle;

/* Pop the req off tcp->write_queue. */
ngx_queue_remove(&req->queue);
if (req->bufs != req->bufsml) {
free(req->bufs);
}
req->bufs = NULL;

/* Add it to the write_completed_queue where it will have its
* callback called in the near future.
*/
ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
}


/* On success returns NULL. On error returns a pointer to the write request
* which had the error.
*/
static uv_write_t* uv__write(uv_stream_t* stream) {
static void uv__write(uv_stream_t* stream) {
uv_write_t* req;
struct iovec* iov;
int iovcnt;
Expand All @@ -271,7 +322,7 @@ static uv_write_t* uv__write(uv_stream_t* stream) {
req = uv_write_queue_head(stream);
if (!req) {
assert(stream->write_queue_size == 0);
return NULL;
return;
}

assert(req->handle == stream);
Expand Down Expand Up @@ -299,8 +350,9 @@ static uv_write_t* uv__write(uv_stream_t* stream) {
if (n < 0) {
if (errno != EAGAIN) {
/* Error */
uv_err_new(stream->loop, errno);
return req;
req->error = errno;
uv__write_req_finish(req);
return;
}
} else {
/* Successful write */
Expand Down Expand Up @@ -334,21 +386,9 @@ static uv_write_t* uv__write(uv_stream_t* stream) {
if (req->write_index == req->bufcnt) {
/* Then we're done! */
assert(n == 0);

/* Pop the req off tcp->write_queue. */
ngx_queue_remove(&req->queue);
if (req->bufs != req->bufsml) {
free(req->bufs);
}
req->bufs = NULL;

/* Add it to the write_completed_queue where it will have its
* callback called in the near future.
* TODO: start trying to write the next request.
*/
ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
return NULL;
uv__write_req_finish(req);
/* TODO: start trying to write the next request. */
return;
}
}
}
Expand All @@ -359,8 +399,6 @@ static uv_write_t* uv__write(uv_stream_t* stream) {

/* We're not done. */
ev_io_start(stream->loop->ev, &stream->write_watcher);

return NULL;
}


Expand All @@ -378,7 +416,8 @@ static void uv__write_callbacks(uv_stream_t* stream) {

/* NOTE: call callback AFTER freeing the request data. */
if (req->cb) {
req->cb(req, 0);
uv_err_new_artificial(stream->loop, req->error);
req->cb(req, req->error ? -1 : 0);
}

callbacks_made++;
Expand Down Expand Up @@ -495,15 +534,8 @@ void uv__stream_io(EV_P_ ev_io* watcher, int revents) {
}

if (revents & EV_WRITE) {
uv_write_t* req = uv__write(stream);
if (req) {
/* Error. Notify the user. */
if (req->cb) {
req->cb(req, -1);
}
} else {
uv__write_callbacks(stream);
}
uv__write(stream);
uv__write_callbacks(stream);
}
}
}
Expand Down Expand Up @@ -631,34 +663,29 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
/* The buffers to be written must remain valid until the callback is called.
* This is not required for the uv_buf_t array.
*/
int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
uv_write_cb cb) {
uv_stream_t* stream;
int empty_queue;

stream = (uv_stream_t*)handle;

/* Initialize the req */
uv__req_init((uv_req_t*) req);
req->cb = cb;
req->handle = handle;
ngx_queue_init(&req->queue);

assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE)
assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE)
&& "uv_write (unix) does not yet support other types of streams");

empty_queue = (stream->write_queue_size == 0);

if (stream->fd < 0) {
uv_err_new(stream->loop, EBADF);
return -1;
}

ngx_queue_init(&req->queue);
req->type = UV_WRITE;
empty_queue = (stream->write_queue_size == 0);

/* Initialize the req */
uv__req_init((uv_req_t*) req);
req->cb = cb;
req->handle = stream;
req->error = 0;
req->type = UV_WRITE;
ngx_queue_init(&req->queue);

if (bufcnt < UV_REQ_BUFSML_SIZE) {
if (bufcnt <= UV_REQ_BUFSML_SIZE) {
req->bufs = req->bufsml;
}
else {
Expand Down Expand Up @@ -688,22 +715,8 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
* for the fd to become writable.
*/
if (empty_queue) {
if (uv__write(stream)) {
/* Error. uv_last_error has been set. */
return -1;
}
}

/* If the queue is now empty - we've flushed the request already. That
* means we need to make the callback. The callback can only be done on a
* fresh stack so we feed the event loop in order to service it.
*/
if (ngx_queue_empty(&stream->write_queue)) {
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
uv__write(stream);
} else {
/* Otherwise there is data to write - so we should wait for the file
* descriptor to become writable.
*/
ev_io_start(stream->loop->ev, &stream->write_watcher);
}

Expand Down
2 changes: 2 additions & 0 deletions deps/uv/test/test-list.h
Expand Up @@ -31,6 +31,7 @@ TEST_DECLARE (tcp_bind_error_fault)
TEST_DECLARE (tcp_bind_error_inval)
TEST_DECLARE (tcp_bind_localhost_ok)
TEST_DECLARE (tcp_listen_without_bind)
TEST_DECLARE (tcp_close)
TEST_DECLARE (tcp_bind6_error_addrinuse)
TEST_DECLARE (tcp_bind6_error_addrnotavail)
TEST_DECLARE (tcp_bind6_error_fault)
Expand Down Expand Up @@ -117,6 +118,7 @@ TASK_LIST_START
TEST_ENTRY (tcp_bind_error_inval)
TEST_ENTRY (tcp_bind_localhost_ok)
TEST_ENTRY (tcp_listen_without_bind)
TEST_ENTRY (tcp_close)

TEST_ENTRY (tcp_bind6_error_addrinuse)
TEST_ENTRY (tcp_bind6_error_addrnotavail)
Expand Down

0 comments on commit 0e0fbf7

Please sign in to comment.