Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
unix: replace ev_io with uv__io_t
Browse files Browse the repository at this point in the history
Replace ev_io usage with wrapper constructs.

This is preliminary work for the transition to a libev-less linux backend.
  • Loading branch information
bnoordhuis committed May 23, 2012
1 parent 7a64ec4 commit 3bc9707
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 155 deletions.
24 changes: 17 additions & 7 deletions include/uv-private/uv-unix.h
Expand Up @@ -70,6 +70,17 @@ typedef struct {
char* errmsg;
} uv_lib_t;

struct uv__io_s;
struct uv_loop_s;

typedef struct uv__io_s uv__io_t;
typedef void (*uv__io_cb)(struct uv_loop_s* loop, uv__io_t* handle, int events);

struct uv__io_s {
ev_io io_watcher;
uv__io_cb cb;
};

#define UV_REQ_TYPE_PRIVATE /* empty */

#if __linux__
Expand All @@ -78,7 +89,7 @@ typedef struct {
struct uv__inotify_watchers { \
struct uv_fs_event_s* rbh_root; \
} inotify_watchers; \
ev_io inotify_read_watcher; \
uv__io_t inotify_read_watcher; \
int inotify_fd;
#elif defined(PORT_SOURCE_FILE)
# define UV_LOOP_PRIVATE_PLATFORM_FIELDS \
Expand Down Expand Up @@ -142,8 +153,8 @@ typedef struct {
#define UV_STREAM_PRIVATE_FIELDS \
uv_connect_t *connect_req; \
uv_shutdown_t *shutdown_req; \
ev_io read_watcher; \
ev_io write_watcher; \
uv__io_t read_watcher; \
uv__io_t write_watcher; \
ngx_queue_t write_queue; \
ngx_queue_t write_completed_queue; \
int delayed_error; \
Expand All @@ -160,8 +171,8 @@ typedef struct {
#define UV_UDP_PRIVATE_FIELDS \
uv_alloc_cb alloc_cb; \
uv_udp_recv_cb recv_cb; \
ev_io read_watcher; \
ev_io write_watcher; \
uv__io_t read_watcher; \
uv__io_t write_watcher; \
ngx_queue_t write_queue; \
ngx_queue_t write_completed_queue; \

Expand All @@ -173,7 +184,7 @@ typedef struct {

/* UV_POLL */
#define UV_POLL_PRIVATE_FIELDS \
ev_io io_watcher;
uv__io_t io_watcher;


/* UV_PREPARE */ \
Expand Down Expand Up @@ -238,7 +249,6 @@ typedef struct {
struct uv_fs_event_s* rbe_parent; \
int rbe_color; \
} node; \
ev_io read_watcher; \
uv_fs_event_cb cb;

#elif defined(__APPLE__) \
Expand Down
39 changes: 39 additions & 0 deletions src/unix/core.c
Expand Up @@ -592,3 +592,42 @@ uv_err_t uv_chdir(const char* dir) {
return uv__new_sys_error(errno);
}
}


static void uv__io_rw(struct ev_loop* ev, ev_io* w, int events) {
uv_loop_t* loop = ev_userdata(ev);
uv__io_t* handle = container_of(w, uv__io_t, io_watcher);
handle->cb(loop, handle, events & (EV_READ|EV_WRITE|EV_ERROR));
}


void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events) {
ev_io_init(&handle->io_watcher, uv__io_rw, fd, events & (EV_READ|EV_WRITE));
handle->cb = cb;
}


void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events) {
ev_io_set(&handle->io_watcher, fd, events);
handle->cb = cb;
}


void uv__io_start(uv_loop_t* loop, uv__io_t* handle) {
ev_io_start(loop->ev, &handle->io_watcher);
}


void uv__io_stop(uv_loop_t* loop, uv__io_t* handle) {
ev_io_stop(loop->ev, &handle->io_watcher);
}


void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event) {
ev_feed_event(loop->ev, &handle->io_watcher, event);
}


int uv__io_active(uv__io_t* handle) {
return ev_is_active(&handle->io_watcher);
}
16 changes: 12 additions & 4 deletions src/unix/internal.h
Expand Up @@ -78,6 +78,10 @@
} \
while (0)

#define UV__IO_READ EV_READ
#define UV__IO_WRITE EV_WRITE
#define UV__IO_ERROR EV_ERROR

/* flags */
enum {
UV_CLOSING = 0x01, /* uv_close() called but not finished. */
Expand Down Expand Up @@ -127,6 +131,13 @@ int uv__socket(int domain, int type, int protocol);
int uv__dup(int fd);
int uv_async_stop(uv_async_t* handle);

void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events);
void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events);
void uv__io_start(uv_loop_t* loop, uv__io_t* handle);
void uv__io_stop(uv_loop_t* loop, uv__io_t* handle);
void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event);
int uv__io_active(uv__io_t* handle);

/* loop */
int uv__loop_init(uv_loop_t* loop, int default_loop);
void uv__loop_delete(uv_loop_t* loop);
Expand All @@ -143,8 +154,7 @@ void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream,
uv_handle_type type);
int uv__stream_open(uv_stream_t*, int fd, int flags);
void uv__stream_destroy(uv_stream_t* stream);
void uv__stream_io(EV_P_ ev_io* watcher, int revents);
void uv__server_io(EV_P_ ev_io* watcher, int revents);
void uv__server_io(uv_loop_t* loop, uv__io_t* watcher, int events);
int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
socklen_t addrlen, uv_connect_cb cb);
Expand All @@ -156,11 +166,9 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay);

/* pipe */
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
void uv__pipe_accept(EV_P_ ev_io* watcher, int revents);

/* poll */
void uv__poll_close(uv_poll_t* handle);
int uv__poll_active(const uv_poll_t* handle);

/* various */
void uv__async_close(uv_async_t* handle);
Expand Down
22 changes: 9 additions & 13 deletions src/unix/linux/inotify.c
Expand Up @@ -51,7 +51,7 @@ static int compare_watchers(const uv_fs_event_t* a, const uv_fs_event_t* b) {
RB_GENERATE_STATIC(uv__inotify_watchers, uv_fs_event_s, node, compare_watchers)


static void uv__inotify_read(EV_P_ ev_io* w, int revents);
static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int revents);


static int new_inotify_fd(void) {
Expand Down Expand Up @@ -85,11 +85,11 @@ static int init_inotify(uv_loop_t* loop) {
return -1;
}

ev_io_init(&loop->inotify_read_watcher,
uv__inotify_read,
loop->inotify_fd,
EV_READ);
ev_io_start(loop->ev, &loop->inotify_read_watcher);
uv__io_init(&loop->inotify_read_watcher,
uv__inotify_read,
loop->inotify_fd,
UV__IO_READ);
uv__io_start(loop, &loop->inotify_read_watcher);

return 0;
}
Expand All @@ -112,22 +112,18 @@ static void remove_watcher(uv_fs_event_t* handle) {
}


static void uv__inotify_read(EV_P_ ev_io* w, int revents) {
static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int events) {
const struct uv__inotify_event* e;
uv_fs_event_t* handle;
uv_loop_t* uv_loop;
const char* filename;
ssize_t size;
int events;
const char *p;
/* needs to be large enough for sizeof(inotify_event) + strlen(filename) */
char buf[4096];

uv_loop = container_of(w, uv_loop_t, inotify_read_watcher);

while (1) {
do {
size = read(uv_loop->inotify_fd, buf, sizeof buf);
size = read(loop->inotify_fd, buf, sizeof buf);
}
while (size == -1 && errno == EINTR);

Expand All @@ -148,7 +144,7 @@ static void uv__inotify_read(EV_P_ ev_io* w, int revents) {
if (e->mask & ~(UV__IN_ATTRIB|UV__IN_MODIFY))
events |= UV_RENAME;

handle = find_watcher(uv_loop, e->wd);
handle = find_watcher(loop, e->wd);
if (handle == NULL)
continue; /* Handle has already been closed. */

Expand Down
9 changes: 5 additions & 4 deletions src/unix/loop.c
Expand Up @@ -66,10 +66,11 @@ void uv__loop_delete(uv_loop_t* loop) {
uv_ares_destroy(loop, loop->channel);
ev_loop_destroy(loop->ev);
#if __linux__
if (loop->inotify_fd == -1) return;
ev_io_stop(loop->ev, &loop->inotify_read_watcher);
close(loop->inotify_fd);
loop->inotify_fd = -1;
if (loop->inotify_fd != -1) {
uv__io_stop(loop, &loop->inotify_read_watcher);
close(loop->inotify_fd);
loop->inotify_fd = -1;
}
#endif
#if HAVE_PORTS_FS
if (loop->fs_fd != -1)
Expand Down
19 changes: 12 additions & 7 deletions src/unix/pipe.c
Expand Up @@ -29,6 +29,8 @@
#include <unistd.h>
#include <stdlib.h>

static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events);


int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
Expand Down Expand Up @@ -138,8 +140,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
uv__set_sys_error(handle->loop, errno);
} else {
handle->connection_cb = cb;
ev_io_init(&handle->read_watcher, uv__pipe_accept, handle->fd, EV_READ);
ev_io_start(handle->loop->ev, &handle->read_watcher);
uv__io_init(&handle->read_watcher,
uv__pipe_accept,
handle->fd,
UV__IO_READ);
uv__io_start(handle->loop, &handle->read_watcher);
}

out:
Expand Down Expand Up @@ -211,8 +216,8 @@ void uv_pipe_connect(uv_connect_t* req,
uv__stream_open((uv_stream_t*)handle,
sockfd,
UV_STREAM_READABLE | UV_STREAM_WRITABLE);
ev_io_start(handle->loop->ev, &handle->read_watcher);
ev_io_start(handle->loop->ev, &handle->write_watcher);
uv__io_start(handle->loop, &handle->read_watcher);
uv__io_start(handle->loop, &handle->write_watcher);
status = 0;

out:
Expand All @@ -235,14 +240,14 @@ void uv_pipe_connect(uv_connect_t* req,


/* TODO merge with uv__server_io()? */
void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) {
static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) {
struct sockaddr_un saddr;
uv_pipe_t* pipe;
int saved_errno;
int sockfd;

saved_errno = errno;
pipe = watcher->data;
pipe = container_of(w, uv_pipe_t, read_watcher);

assert(pipe->type == UV_NAMED_PIPE);

Expand All @@ -257,7 +262,7 @@ void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) {
pipe->connection_cb((uv_stream_t*)pipe, 0);
if (pipe->accepted_fd == sockfd) {
/* The user hasn't called uv_accept() yet */
ev_io_stop(pipe->loop->ev, &pipe->read_watcher);
uv__io_stop(pipe->loop, &pipe->read_watcher);
}
}

Expand Down
58 changes: 25 additions & 33 deletions src/unix/poll.c
Expand Up @@ -27,11 +27,13 @@
#include <errno.h>


static void uv__poll_io(EV_P_ ev_io* watcher, int ev_events) {
uv_poll_t* handle = watcher->data;
int events;
static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, int events) {
uv_poll_t* handle;
int pevents;

handle = container_of(w, uv_poll_t, io_watcher);

if (ev_events & EV_ERROR) {
if (events & UV__IO_ERROR) {
/* An error happened. Libev has implicitly stopped the watcher, but we */
/* need to fix the refcount. */
uv__handle_stop(handle);
Expand All @@ -40,16 +42,13 @@ static void uv__poll_io(EV_P_ ev_io* watcher, int ev_events) {
return;
}

assert(ev_events & (EV_READ | EV_WRITE));
assert((ev_events & ~(EV_READ | EV_WRITE)) == 0);
pevents = 0;
if (events & UV__IO_READ)
pevents |= UV_READABLE;
if (events & UV__IO_WRITE)
pevents |= UV_WRITABLE;

events = 0;
if (ev_events & EV_READ)
events |= UV_READABLE;
if (ev_events & EV_WRITE)
events |= UV_WRITABLE;

handle->poll_cb(handle, 0, events);
handle->poll_cb(handle, 0, pevents);
}


Expand All @@ -59,9 +58,7 @@ int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) {

handle->fd = fd;
handle->poll_cb = NULL;

ev_init(&handle->io_watcher, uv__poll_io);
handle->io_watcher.data = handle;
uv__io_init(&handle->io_watcher, uv__poll_io, fd, 0);

return 0;
}
Expand All @@ -74,7 +71,7 @@ int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle,


static void uv__poll_stop(uv_poll_t* handle) {
ev_io_stop(handle->loop->ev, &handle->io_watcher);
uv__io_stop(handle->loop, &handle->io_watcher);
uv__handle_stop(handle);
}

Expand All @@ -86,25 +83,25 @@ int uv_poll_stop(uv_poll_t* handle) {
}


int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb poll_cb) {
int ev_events;
int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) {
int events;

assert((events & ~(UV_READABLE | UV_WRITABLE)) == 0);
assert((pevents & ~(UV_READABLE | UV_WRITABLE)) == 0);
assert(!(handle->flags & (UV_CLOSING | UV_CLOSED)));

if (events == 0) {
if (pevents == 0) {
uv__poll_stop(handle);
return 0;
}

ev_events = 0;
if (events & UV_READABLE)
ev_events |= EV_READ;
if (events & UV_WRITABLE)
ev_events |= EV_WRITE;
events = 0;
if (pevents & UV_READABLE)
events |= UV__IO_READ;
if (pevents & UV_WRITABLE)
events |= UV__IO_WRITE;

ev_io_set(&handle->io_watcher, handle->fd, ev_events);
ev_io_start(handle->loop->ev, &handle->io_watcher);
uv__io_set(&handle->io_watcher, uv__poll_io, handle->fd, events);
uv__io_start(handle->loop, &handle->io_watcher);

handle->poll_cb = poll_cb;
uv__handle_start(handle);
Expand All @@ -116,8 +113,3 @@ int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb poll_cb) {
void uv__poll_close(uv_poll_t* handle) {
uv__poll_stop(handle);
}


int uv__poll_active(const uv_poll_t* handle) {
return ev_is_active(&handle->io_watcher);
}

0 comments on commit 3bc9707

Please sign in to comment.