Navigation Menu

Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
unix: rework pending handle/req logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bnoordhuis committed May 29, 2012
1 parent 12ee388 commit 58a272e
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 110 deletions.
4 changes: 2 additions & 2 deletions include/uv-private/uv-unix.h
Expand Up @@ -106,7 +106,7 @@ struct uv__io_s {
uv_async_t uv_eio_want_poll_notifier; \
uv_async_t uv_eio_done_poll_notifier; \
uv_idle_t uv_eio_poller; \
uv_handle_t* pending_handles; \
uv_handle_t* closing_handles; \
ngx_queue_t prepare_handles; \
ngx_queue_t check_handles; \
ngx_queue_t idle_handles; \
Expand Down Expand Up @@ -144,7 +144,7 @@ struct uv__io_s {
/* TODO: union or classes please! */
#define UV_HANDLE_PRIVATE_FIELDS \
int flags; \
uv_handle_t* next_pending; \
uv_handle_t* next_closing; \


#define UV_STREAM_PRIVATE_FIELDS \
Expand Down
156 changes: 70 additions & 86 deletions src/unix/core.c
Expand Up @@ -59,8 +59,6 @@
static uv_loop_t default_loop_struct;
static uv_loop_t* default_loop_ptr;

static void uv__finish_close(uv_handle_t* handle);


void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
handle->close_cb = close_cb;
Expand Down Expand Up @@ -116,7 +114,72 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
}

handle->flags |= UV_CLOSING;
uv__make_pending(handle);

handle->next_closing = handle->loop->closing_handles;
handle->loop->closing_handles = handle;
}


static void uv__finish_close(uv_handle_t* handle) {
assert(!uv__is_active(handle));
assert(handle->flags & UV_CLOSING);
assert(!(handle->flags & UV_CLOSED));
handle->flags |= UV_CLOSED;

switch (handle->type) {
case UV_PREPARE:
case UV_CHECK:
case UV_IDLE:
case UV_ASYNC:
case UV_TIMER:
case UV_PROCESS:
break;

case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
assert(!uv__io_active(&((uv_stream_t*)handle)->read_watcher));
assert(!uv__io_active(&((uv_stream_t*)handle)->write_watcher));
assert(((uv_stream_t*)handle)->fd == -1);
uv__stream_destroy((uv_stream_t*)handle);
break;

case UV_UDP:
uv__udp_finish_close((uv_udp_t*)handle);
break;

case UV_FS_EVENT:
break;

case UV_POLL:
break;

default:
assert(0);
break;
}


if (handle->close_cb) {
handle->close_cb(handle);
}

uv__handle_unref(handle);
}


static void uv__run_closing_handles(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;

p = loop->closing_handles;
loop->closing_handles = NULL;

while (p) {
q = p->next_closing;
uv__finish_close(p);
p = q;
}
}


Expand Down Expand Up @@ -163,36 +226,6 @@ void uv_loop_delete(uv_loop_t* loop) {
}


static void uv__run_pending(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;

if (!loop->pending_handles)
return;

for (p = loop->pending_handles, loop->pending_handles = NULL; p; p = q) {
q = p->next_pending;
p->next_pending = NULL;
p->flags &= ~UV__PENDING;

if (p->flags & UV_CLOSING) {
uv__finish_close(p);
continue;
}

switch (p->type) {
case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
uv__stream_pending((uv_stream_t*)p);
break;
default:
abort();
}
}
}


static void uv__poll(uv_loop_t* loop, int block) {
/* bump the loop's refcount, otherwise libev does
* a zero timeout poll and we end up busy looping
Expand All @@ -210,7 +243,6 @@ static int uv__should_block(uv_loop_t* loop) {

static int uv__run(uv_loop_t* loop) {
uv__run_idle(loop);
uv__run_pending(loop);

if (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) {
uv__run_prepare(loop);
Expand All @@ -221,9 +253,9 @@ static int uv__run(uv_loop_t* loop) {
uv__run_check(loop);
}

return uv__has_pending_handles(loop)
|| uv__has_active_handles(loop)
|| uv__has_active_reqs(loop);
uv__run_closing_handles(loop);

return uv__has_active_handles(loop) || uv__has_active_reqs(loop);
}


Expand All @@ -245,55 +277,7 @@ void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,
handle->loop = loop;
handle->type = type;
handle->flags = UV__REF; /* ref the loop when active */
handle->next_pending = NULL;
}


void uv__finish_close(uv_handle_t* handle) {
assert(!uv__is_active(handle));
assert(handle->flags & UV_CLOSING);
assert(!(handle->flags & UV_CLOSED));
handle->flags |= UV_CLOSED;

switch (handle->type) {
case UV_PREPARE:
case UV_CHECK:
case UV_IDLE:
case UV_ASYNC:
case UV_TIMER:
case UV_PROCESS:
break;

case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
assert(!uv__io_active(&((uv_stream_t*)handle)->read_watcher));
assert(!uv__io_active(&((uv_stream_t*)handle)->write_watcher));
assert(((uv_stream_t*)handle)->fd == -1);
uv__stream_destroy((uv_stream_t*)handle);
break;

case UV_UDP:
uv__udp_finish_close((uv_udp_t*)handle);
break;

case UV_FS_EVENT:
break;

case UV_POLL:
break;

default:
assert(0);
break;
}


if (handle->close_cb) {
handle->close_cb(handle);
}

uv__handle_unref(handle);
handle->next_closing = NULL;
}


Expand Down
14 changes: 0 additions & 14 deletions src/unix/internal.h
Expand Up @@ -98,18 +98,6 @@ enum {
UV__PENDING = 0x800
};

inline static int uv__has_pending_handles(const uv_loop_t* loop) {
return loop->pending_handles != NULL;
}

inline static void uv__make_pending(uv_handle_t* h) {
if (h->flags & UV__PENDING) return;
h->next_pending = h->loop->pending_handles;
h->loop->pending_handles = h;
h->flags |= UV__PENDING;
}
#define uv__make_pending(h) uv__make_pending((uv_handle_t*)(h))

inline static void uv__req_init(uv_loop_t* loop,
uv_req_t* req,
uv_req_type type) {
Expand Down Expand Up @@ -180,8 +168,6 @@ void uv__timer_close(uv_timer_t* handle);
void uv__udp_close(uv_udp_t* handle);
void uv__udp_finish_close(uv_udp_t* handle);

void uv__stream_pending(uv_stream_t* handle);

#define UV__F_IPC (1 << 0)
#define UV__F_NONBLOCK (1 << 1)
int uv__make_socketpair(int fds[2], int flags);
Expand Down
2 changes: 1 addition & 1 deletion src/unix/loop.c
Expand Up @@ -41,7 +41,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
ngx_queue_init(&loop->idle_handles);
ngx_queue_init(&loop->check_handles);
ngx_queue_init(&loop->prepare_handles);
loop->pending_handles = NULL;
loop->closing_handles = NULL;
loop->channel = NULL;
loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
ev_set_userdata(loop->ev, loop);
Expand Down
2 changes: 1 addition & 1 deletion src/unix/pipe.c
Expand Up @@ -218,7 +218,7 @@ void uv_pipe_connect(uv_connect_t* req,
ngx_queue_init(&req->queue);

/* Run callback on next tick. */
uv__make_pending(handle);
uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE);

/* Mimic the Windows pipe implementation, always
* return 0 and let the callback handle errors.
Expand Down
7 changes: 1 addition & 6 deletions src/unix/stream.c
Expand Up @@ -718,11 +718,6 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
}


void uv__stream_pending(uv_stream_t* handle) {
uv__stream_io(handle->loop, &handle->write_watcher, UV__IO_WRITE);
}


static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events) {
uv_stream_t* stream;

Expand Down Expand Up @@ -859,7 +854,7 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
uv__io_start(stream->loop, &stream->write_watcher);

if (stream->delayed_error)
uv__make_pending(stream);
uv__io_feed(stream->loop, &stream->write_watcher, UV__IO_WRITE);

return 0;
}
Expand Down

0 comments on commit 58a272e

Please sign in to comment.