Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
Merge branch 'poll'
Browse files Browse the repository at this point in the history
Closes: #316, #401
Ref: #265, #349
  • Loading branch information
piscisaureus committed May 3, 2012
2 parents e387554 + b9504f7 commit acd0afb
Show file tree
Hide file tree
Showing 16 changed files with 1,475 additions and 0 deletions.
1 change: 1 addition & 0 deletions config-unix.mk
Expand Up @@ -37,6 +37,7 @@ OBJS += src/unix/fs.o
OBJS += src/unix/idle.o
OBJS += src/unix/loop.o
OBJS += src/unix/pipe.o
OBJS += src/unix/poll.o
OBJS += src/unix/prepare.o
OBJS += src/unix/process.o
OBJS += src/unix/stream.o
Expand Down
7 changes: 7 additions & 0 deletions include/uv-private/uv-unix.h
Expand Up @@ -45,6 +45,8 @@ typedef struct {

typedef int uv_file;

typedef int uv_os_sock_t;

#define UV_ONCE_INIT PTHREAD_ONCE_INIT

typedef pthread_once_t uv_once_t;
Expand Down Expand Up @@ -162,6 +164,11 @@ typedef void* uv_lib_t;
const char* pipe_fname; /* strdup'ed */


/* UV_POLL */
#define UV_POLL_PRIVATE_FIELDS \
ev_io io_watcher;


/* UV_PREPARE */ \
#define UV_PREPARE_PRIVATE_FIELDS \
ev_prepare prepare_watcher; \
Expand Down
23 changes: 23 additions & 0 deletions include/uv-private/uv-win.h
Expand Up @@ -146,6 +146,8 @@ typedef struct _AFD_POLL_INFO {
AFD_POLL_HANDLE_INFO Handles[1];
} AFD_POLL_INFO, *PAFD_POLL_INFO;

#define UV_MSAFD_PROVIDER_COUNT 3


/**
* It should be possible to cast uv_buf_t[] to WSABUF[]
Expand All @@ -158,6 +160,8 @@ typedef struct uv_buf_t {

typedef int uv_file;

typedef SOCKET uv_os_sock_t;

typedef HANDLE uv_thread_t;

typedef CRITICAL_SECTION uv_mutex_t;
Expand Down Expand Up @@ -219,6 +223,9 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
uv_prepare_t* next_prepare_handle; \
uv_check_t* next_check_handle; \
uv_idle_t* next_idle_handle; \
/* This handle holds the peer sockets for the fast variant of uv_poll_t */ \
SOCKET poll_peer_sockets[UV_MSAFD_PROVIDER_COUNT]; \
/* State used by uv_ares. */ \
ares_channel ares_chan; \
int ares_active_sockets; \
uv_timer_t ares_polling_timer; \
Expand All @@ -237,6 +244,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
UV_ARES_CLEANUP_REQ, \
UV_FS_EVENT_REQ, \
UV_GETADDRINFO_REQ, \
UV_POLL_REQ, \
UV_PROCESS_EXIT, \
UV_PROCESS_CLOSE, \
UV_READ, \
Expand Down Expand Up @@ -386,6 +394,21 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
COORD saved_position; \
WORD saved_attributes;

#define UV_POLL_PRIVATE_FIELDS \
SOCKET socket; \
/* Used in fast mode */ \
SOCKET peer_socket; \
AFD_POLL_INFO afd_poll_info_1; \
AFD_POLL_INFO afd_poll_info_2; \
/* Used in fast and slow mode. */ \
uv_req_t poll_req_1; \
uv_req_t poll_req_2; \
unsigned char submitted_events_1; \
unsigned char submitted_events_2; \
unsigned char mask_events_1; \
unsigned char mask_events_2; \
unsigned char events;

#define UV_TIMER_PRIVATE_FIELDS \
RB_ENTRY(uv_timer_s) tree_entry; \
int64_t due; \
Expand Down
72 changes: 72 additions & 0 deletions include/uv.h
Expand Up @@ -164,6 +164,7 @@ typedef enum {
#define XX(uc, lc) UV_##uc,
UV_HANDLE_TYPE_MAP(XX)
#undef XX
UV_POLL,
UV_FILE,
UV_HANDLE_TYPE_PRIVATE
UV_HANDLE_TYPE_MAX
Expand All @@ -189,6 +190,7 @@ typedef struct uv_tcp_s uv_tcp_t;
typedef struct uv_udp_s uv_udp_t;
typedef struct uv_pipe_s uv_pipe_t;
typedef struct uv_tty_s uv_tty_t;
typedef struct uv_poll_s uv_poll_t;
typedef struct uv_timer_s uv_timer_t;
typedef struct uv_prepare_s uv_prepare_t;
typedef struct uv_check_s uv_check_t;
Expand Down Expand Up @@ -288,6 +290,7 @@ typedef void (*uv_connect_cb)(uv_connect_t* req, int status);
typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status);
typedef void (*uv_connection_cb)(uv_stream_t* server, int status);
typedef void (*uv_close_cb)(uv_handle_t* handle);
typedef void (*uv_poll_cb)(uv_poll_t* handle, int status, int events);
typedef void (*uv_timer_cb)(uv_timer_t* handle, int status);
/* TODO: do these really need a status argument? */
typedef void (*uv_async_cb)(uv_async_t* handle, int status);
Expand Down Expand Up @@ -926,6 +929,74 @@ UV_EXTERN void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
UV_EXTERN void uv_pipe_pending_instances(uv_pipe_t* handle, int count);


/*
* uv_poll_t is a subclass of uv_handle_t.
*
* The uv_poll watcher is used to watch file descriptors for readability and
* writability, similar to the purpose of poll(2).
*
* The purpose of uv_poll is to enable integrating external libraries that
* rely on the event loop to signal it about the socket status changes, like
* c-ares or libssh2. Using uv_poll_t for any other other purpose is not
* recommended; uv_tcp_t, uv_udp_t, etc. provide an implementation that is
* much faster and more scalable than what can be achieved with uv_poll_t,
* especially on Windows.
*
* It is possible that uv_poll occasionally signals that a file descriptor is
* readable or writable even when it isn't. The user should therefore always
* be prepared to handle EAGAIN or equivalent when it attempts to read from or
* write to the fd.
*
* It is not okay to have multiple active uv_poll watchers for the same socket.
* This can cause libuv to busyloop or otherwise malfunction.
*
* The user should not close a file descriptor while it is being polled by an
* active uv_poll watcher. This can cause the poll watcher to report an error,
* but it might also start polling another socket. However the fd can be safely
* closed immediately after a call to uv_poll_stop() or uv_close().
*
* On windows only sockets can be polled with uv_poll. On unix any file
* descriptor that would be accepted by poll(2) can be used with uv_poll.
*/
struct uv_poll_s {
UV_HANDLE_FIELDS
uv_poll_cb poll_cb;
UV_POLL_PRIVATE_FIELDS
};

enum uv_poll_event {
UV_READABLE = 1,
UV_WRITABLE = 2
};

/* Initialize the poll watcher using a file descriptor. */
UV_EXTERN int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd);

/* Initialize the poll watcher using a socket descriptor. On unix this is */
/* identical to uv_poll_init. On windows it takes a SOCKET handle. */
UV_EXTERN int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle,
uv_os_sock_t socket);

/*
* Starts polling the file descriptor. `events` is a bitmask consisting made up
* of UV_READABLE and UV_WRITABLE. As soon as an event is detected the callback
* will be called with `status` set to 0, and the detected events set en the
* `events` field.
*
* If an error happens while polling status may be set to -1 and the error
* code can be retrieved with uv_last_error. The user should not close the
* socket while uv_poll is active. If the user does that anyway, the callback
* *may* be called reporting an error status, but this is not guaranteed.
*
* Calling uv_poll_start on an uv_poll watcher that is already active is fine.
* Doing so will update the events mask that is being watched for.
*/
UV_EXTERN int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb);

/* Stops polling the file descriptor. */
UV_EXTERN int uv_poll_stop(uv_poll_t* handle);


/*
* uv_prepare_t is a subclass of uv_handle_t.
*
Expand Down Expand Up @@ -1543,6 +1614,7 @@ struct uv_counters_s {
uint64_t udp_init;
uint64_t pipe_init;
uint64_t tty_init;
uint64_t poll_init;
uint64_t prepare_init;
uint64_t check_init;
uint64_t idle_init;
Expand Down
9 changes: 9 additions & 0 deletions src/unix/core.c
Expand Up @@ -107,6 +107,10 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
uv__fs_event_close((uv_fs_event_t*)handle);
break;

case UV_POLL:
uv__poll_close((uv_poll_t*)handle);
break;

default:
assert(0);
}
Expand Down Expand Up @@ -249,6 +253,9 @@ void uv__finish_close(uv_handle_t* handle) {
case UV_FS_EVENT:
break;

case UV_POLL:
break;

default:
assert(0);
break;
Expand Down Expand Up @@ -285,6 +292,8 @@ int64_t uv_now(uv_loop_t* loop) {

int uv_is_active(const uv_handle_t* handle) {
switch (handle->type) {
case UV_POLL:
return uv__poll_active((const uv_poll_t*)handle);
case UV_CHECK:
return uv__check_active((const uv_check_t*)handle);
case UV_IDLE:
Expand Down
4 changes: 4 additions & 0 deletions src/unix/internal.h
Expand Up @@ -141,6 +141,10 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay);
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
void uv__pipe_accept(EV_P_ ev_io* watcher, int revents);

/* poll */
void uv__poll_close(uv_poll_t* handle);
int uv__poll_active(const uv_poll_t* handle);

/* various */
int uv__check_active(const uv_check_t* handle);
int uv__idle_active(const uv_idle_t* handle);
Expand Down
130 changes: 130 additions & 0 deletions src/unix/poll.c
@@ -0,0 +1,130 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

#include "uv.h"
#include "internal.h"

#include <unistd.h>
#include <assert.h>
#include <errno.h>


static void uv__poll_io(EV_P_ ev_io* watcher, int ev_events) {
uv_poll_t* handle = watcher->data;
int events;

if (ev_events & EV_ERROR) {
/* An error happened. Libev has implicitly stopped the watcher, but we */
/* need to fix the refcount. */
uv_ref(handle->loop);
uv__set_sys_error(handle->loop, EBADF);
handle->poll_cb(handle, -1, 0);
return;
}

assert(ev_events & (EV_READ | EV_WRITE));
assert((ev_events & ~(EV_READ | EV_WRITE)) == 0);

events = 0;
if (ev_events & EV_READ)
events |= UV_READABLE;
if (ev_events & EV_WRITE)
events |= UV_WRITABLE;

handle->poll_cb(handle, 0, events);
}


int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) {
uv__handle_init(loop, (uv_handle_t*) handle, UV_POLL);
loop->counters.poll_init++;

handle->fd = fd;
handle->poll_cb = NULL;

ev_init(&handle->io_watcher, uv__poll_io);
handle->io_watcher.data = handle;

return 0;
}


int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle,
uv_os_sock_t socket) {
return uv_poll_init(loop, handle, socket);
}


static void uv__poll_stop(uv_poll_t* handle) {
if (ev_is_active(&handle->io_watcher)) {
ev_io_stop(handle->loop->ev, &handle->io_watcher);
uv_ref(handle->loop);
}
}


int uv_poll_stop(uv_poll_t* handle) {
assert(!(handle->flags & (UV_CLOSING | UV_CLOSED)));
uv__poll_stop(handle);
return 0;
}


int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb poll_cb) {
int ev_events;
int was_active;

assert((events & ~(UV_READABLE | UV_WRITABLE)) == 0);
assert(!(handle->flags & (UV_CLOSING | UV_CLOSED)));

if (events == 0) {
uv__poll_stop(handle);
return 0;
}

ev_events = 0;
if (events & UV_READABLE)
ev_events |= EV_READ;
if (events & UV_WRITABLE)
ev_events |= EV_WRITE;

was_active = ev_is_active(&handle->io_watcher);

ev_io_set(&handle->io_watcher, handle->fd, ev_events);
ev_io_start(handle->loop->ev, &handle->io_watcher);

if (!was_active)
uv_unref(handle->loop);

handle->poll_cb = poll_cb;

return 0;
}


void uv__poll_close(uv_poll_t* handle) {
uv__poll_stop(handle);
}


int uv__poll_active(const uv_poll_t* handle) {
return ev_is_active(&handle->io_watcher);
}
10 changes: 10 additions & 0 deletions src/win/core.c
Expand Up @@ -84,6 +84,8 @@ static void uv_loop_init(uv_loop_t* loop) {
loop->next_check_handle = NULL;
loop->next_idle_handle = NULL;

memset(&loop->poll_peer_sockets, 0, sizeof loop->poll_peer_sockets);

loop->ares_active_sockets = 0;
loop->ares_chan = NULL;

Expand Down Expand Up @@ -130,6 +132,14 @@ uv_loop_t* uv_loop_new(void) {

void uv_loop_delete(uv_loop_t* loop) {
if (loop != &uv_default_loop_) {
int i;
for (i = 0; i < ARRAY_SIZE(loop->poll_peer_sockets); i++) {
SOCKET sock = loop->poll_peer_sockets[i];
if (sock != 0 && sock != INVALID_SOCKET) {
closesocket(sock);
}
}

free(loop);
}
}
Expand Down

0 comments on commit acd0afb

Please sign in to comment.