Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
unix: implement async handles in libuv
Browse files Browse the repository at this point in the history
Replace libev backed async handles with a pure libuv implementation.
  • Loading branch information
bnoordhuis committed Jun 11, 2012
1 parent ddb5f55 commit 78bc0d6
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 20 deletions.
11 changes: 8 additions & 3 deletions include/uv-private/uv-unix.h
Expand Up @@ -38,6 +38,7 @@

#include <semaphore.h>
#include <pthread.h>
#include <signal.h>

#if __sun
# include <sys/port.h>
Expand Down Expand Up @@ -113,6 +114,9 @@ struct uv__io_s {
ngx_queue_t prepare_handles; \
ngx_queue_t check_handles; \
ngx_queue_t idle_handles; \
ngx_queue_t async_handles; \
uv__io_t async_watcher; \
int async_pipefd[2]; \
/* RB_HEAD(uv__timers, uv_timer_s) */ \
struct uv__timers { struct uv_timer_s* rbh_root; } timer_handles; \
uint64_t time; \
Expand Down Expand Up @@ -211,9 +215,10 @@ struct uv__io_s {


/* UV_ASYNC */
#define UV_ASYNC_PRIVATE_FIELDS \
ev_async async_watcher; \
uv_async_cb async_cb;
#define UV_ASYNC_PRIVATE_FIELDS \
volatile sig_atomic_t pending; \
uv_async_cb async_cb; \
ngx_queue_t queue;


/* UV_TIMER */
Expand Down
94 changes: 77 additions & 17 deletions src/unix/async.c
Expand Up @@ -21,39 +21,99 @@
#include "uv.h"
#include "internal.h"

#include <errno.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>

static void uv__async(EV_P_ ev_async* w, int revents) {
uv_async_t* async = container_of(w, uv_async_t, async_watcher);
static int uv__async_init(uv_loop_t* loop);
static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events);

if (async->async_cb) {
async->async_cb(async, 0);
}
}

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
if (uv__async_init(loop))
return uv__set_sys_error(loop, errno);

int uv_async_init(uv_loop_t* loop, uv_async_t* async, uv_async_cb async_cb) {
uv__handle_init(loop, (uv_handle_t*)async, UV_ASYNC);
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
loop->counters.async_init++;

ev_async_init(&async->async_watcher, uv__async);
async->async_cb = async_cb;
handle->async_cb = async_cb;
handle->pending = 0;

/* Note: This does not have symmetry with the other libev wrappers. */
ev_async_start(loop->ev, &async->async_watcher);
uv__handle_start(async);
ngx_queue_insert_tail(&loop->async_handles, &handle->queue);
uv__handle_start(handle);

return 0;
}


int uv_async_send(uv_async_t* async) {
ev_async_send(async->loop->ev, &async->async_watcher);
int uv_async_send(uv_async_t* handle) {
int r;

handle->pending = 1; /* XXX needs a memory barrier? */

do
r = write(handle->loop->async_pipefd[1], "x", 1);
while (r == -1 && errno == EINTR);

if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
return uv__set_sys_error(handle->loop, errno);

return 0;
}


void uv__async_close(uv_async_t* handle) {
ev_async_stop(handle->loop->ev, &handle->async_watcher);
uv__handle_ref(handle);
ngx_queue_remove(&handle->queue);
uv__handle_stop(handle);
}


static int uv__async_init(uv_loop_t* loop) {
if (loop->async_pipefd[0] != -1)
return 0;

if (uv__make_pipe(loop->async_pipefd, UV__F_NONBLOCK))
return -1;

uv__io_init(&loop->async_watcher,
uv__async_io,
loop->async_pipefd[0],
UV__IO_READ);
uv__io_start(loop, &loop->async_watcher);

return 0;
}


static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events) {
char buf[1024];
ngx_queue_t* q;
uv_async_t* h;
ssize_t r;

while (1) {
r = read(loop->async_pipefd[0], buf, sizeof(buf));

if (r == sizeof(buf))
continue;

if (r != -1)
break;

if (errno == EAGAIN || errno == EWOULDBLOCK)
break;

if (errno == EINTR)
continue;

abort();
}

ngx_queue_foreach(q, &loop->async_handles) {
h = ngx_queue_data(q, uv_async_t, queue);
if (!h->pending) continue;
h->pending = 0;
h->async_cb(h, 0);
}
}
3 changes: 3 additions & 0 deletions src/unix/loop.c
Expand Up @@ -40,12 +40,15 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
RB_INIT(&loop->timer_handles);
ngx_queue_init(&loop->active_reqs);
ngx_queue_init(&loop->idle_handles);
ngx_queue_init(&loop->async_handles);
ngx_queue_init(&loop->check_handles);
ngx_queue_init(&loop->prepare_handles);
ngx_queue_init(&loop->handle_queue);
loop->closing_handles = NULL;
loop->channel = NULL;
loop->time = uv_hrtime() / 1000000;
loop->async_pipefd[0] = -1;
loop->async_pipefd[1] = -1;
loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
ev_set_userdata(loop->ev, loop);
eio_channel_init(&loop->uv_eio_channel, loop);
Expand Down

0 comments on commit 78bc0d6

Please sign in to comment.