Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
unix: implement timers in libuv
Browse files Browse the repository at this point in the history
* replace libev backed timers with a pure libuv implementation
* gut ev_run() and make it take a timeout instead of flags

Incidentally speeds up the loop_count_timed benchmark by about 100%.
  • Loading branch information
bnoordhuis committed May 31, 2012
1 parent 3f37ba8 commit c9396dd
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 166 deletions.
2 changes: 1 addition & 1 deletion include/uv-private/ev.h
Expand Up @@ -623,7 +623,7 @@ enum {
};

#if EV_PROTOTYPES
void ev_run (EV_P_ int flags EV_CPP (= 0));
void ev_run (EV_P_ ev_tstamp waittime);
void ev_break (EV_P_ int how EV_CPP (= EVBREAK_ONE)); /* break out of the loop */

/*
Expand Down
43 changes: 27 additions & 16 deletions include/uv-private/uv-unix.h
Expand Up @@ -98,18 +98,21 @@ struct uv__io_s {
# define UV_LOOP_PRIVATE_PLATFORM_FIELDS
#endif

#define UV_LOOP_PRIVATE_FIELDS \
/* Poll result queue */ \
eio_channel uv_eio_channel; \
struct ev_loop* ev; \
/* Various thing for libeio. */ \
uv_async_t uv_eio_want_poll_notifier; \
uv_async_t uv_eio_done_poll_notifier; \
uv_idle_t uv_eio_poller; \
uv_handle_t* closing_handles; \
ngx_queue_t prepare_handles; \
ngx_queue_t check_handles; \
ngx_queue_t idle_handles; \
#define UV_LOOP_PRIVATE_FIELDS \
/* Poll result queue */ \
eio_channel uv_eio_channel; \
struct ev_loop* ev; \
/* Various thing for libeio. */ \
uv_async_t uv_eio_want_poll_notifier; \
uv_async_t uv_eio_done_poll_notifier; \
uv_idle_t uv_eio_poller; \
uv_handle_t* closing_handles; \
ngx_queue_t prepare_handles; \
ngx_queue_t check_handles; \
ngx_queue_t idle_handles; \
/* RB_HEAD(uv__timers, uv_timer_s) */ \
struct uv__timers { struct uv_timer_s* rbh_root; } timer_handles; \
uint64_t time; \
UV_LOOP_PRIVATE_PLATFORM_FIELDS

#define UV_REQ_BUFSML_SIZE (4)
Expand Down Expand Up @@ -186,7 +189,7 @@ struct uv__io_s {
uv__io_t io_watcher;


/* UV_PREPARE */ \
/* UV_PREPARE */
#define UV_PREPARE_PRIVATE_FIELDS \
uv_prepare_cb prepare_cb; \
ngx_queue_t queue;
Expand All @@ -211,9 +214,17 @@ struct uv__io_s {


/* UV_TIMER */
#define UV_TIMER_PRIVATE_FIELDS \
ev_timer timer_watcher; \
uv_timer_cb timer_cb;
#define UV_TIMER_PRIVATE_FIELDS \
/* RB_ENTRY(uv_timer_s) node; */ \
struct { \
struct uv_timer_s* rbe_left; \
struct uv_timer_s* rbe_right; \
struct uv_timer_s* rbe_parent; \
int rbe_color; \
} tree_entry; \
uv_timer_cb timer_cb; \
uint64_t timeout; \
uint64_t repeat;

#define UV_GETADDRINFO_PRIVATE_FIELDS \
uv_getaddrinfo_cb cb; \
Expand Down
22 changes: 15 additions & 7 deletions src/unix/core.c
Expand Up @@ -226,30 +226,38 @@ void uv_loop_delete(uv_loop_t* loop) {
}


static void uv__poll(uv_loop_t* loop, int block) {
static void uv__poll(uv_loop_t* loop, unsigned int timeout) {
/* bump the loop's refcount, otherwise libev does
* a zero timeout poll and we end up busy looping
*/
ev_ref(loop->ev);
ev_run(loop->ev, block ? EVRUN_ONCE : EVRUN_NOWAIT);
ev_run(loop->ev, timeout / 1000.);
ev_unref(loop->ev);
}


static int uv__should_block(uv_loop_t* loop) {
return loop->active_handles && ngx_queue_empty(&loop->idle_handles);
static unsigned int uv__poll_timeout(uv_loop_t* loop) {
if (!uv__has_active_handles(loop))
return 0;

if (!ngx_queue_empty(&loop->idle_handles))
return 0;

return uv__next_timeout(loop);
}


static int uv__run(uv_loop_t* loop) {
uv_update_time(loop);
uv__run_timers(loop);
uv__run_idle(loop);

if (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) {
uv__run_prepare(loop);
/* Need to poll even if there are no active handles left, otherwise
* uv_work_t reqs won't complete...
*/
uv__poll(loop, uv__should_block(loop));
uv__poll(loop, uv__poll_timeout(loop));
uv__run_check(loop);
}

Expand Down Expand Up @@ -283,12 +291,12 @@ void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,


void uv_update_time(uv_loop_t* loop) {
ev_now_update(loop->ev);
loop->time = uv_hrtime() / 1000000;
}


int64_t uv_now(uv_loop_t* loop) {
return (int64_t)(ev_now(loop->ev) * 1000);
return loop->time;
}


Expand Down
95 changes: 6 additions & 89 deletions src/unix/ev/ev.c
Expand Up @@ -2389,7 +2389,7 @@ time_update (EV_P_ ev_tstamp max_block)
}

void
ev_run (EV_P_ int flags)
ev_run (EV_P_ ev_tstamp waittime)
{
#if EV_FEATURE_API
++loop_depth;
Expand Down Expand Up @@ -2426,15 +2426,6 @@ ev_run (EV_P_ int flags)
}
#endif

#if EV_PREPARE_ENABLE
/* queue prepare watchers (and execute them) */
if (expect_false (preparecnt))
{
queue_events (EV_A_ (W *)prepares, preparecnt, EV_PREPARE);
EV_INVOKE_PENDING;
}
#endif

if (expect_false (loop_done))
break;

Expand All @@ -2445,90 +2436,16 @@ ev_run (EV_P_ int flags)
/* update fd-related kernel structures */
fd_reify (EV_A);

/* calculate blocking time */
{
ev_tstamp waittime = 0.;
ev_tstamp sleeptime = 0.;

/* remember old timestamp for io_blocktime calculation */
ev_tstamp prev_mn_now = mn_now;

/* update time to cancel out callback processing overhead */
time_update (EV_A_ 1e100);

if (expect_true (!(flags & EVRUN_NOWAIT || idleall || !activecnt)))
{
waittime = MAX_BLOCKTIME;

if (timercnt)
{
ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now + backend_fudge;
if (waittime > to) waittime = to;
}

#if EV_PERIODIC_ENABLE
if (periodiccnt)
{
ev_tstamp to = ANHE_at (periodics [HEAP0]) - ev_rt_now + backend_fudge;
if (waittime > to) waittime = to;
}
#endif

/* don't let timeouts decrease the waittime below timeout_blocktime */
if (expect_false (waittime < timeout_blocktime))
waittime = timeout_blocktime;

/* extra check because io_blocktime is commonly 0 */
if (expect_false (io_blocktime))
{
sleeptime = io_blocktime - (mn_now - prev_mn_now);

if (sleeptime > waittime - backend_fudge)
sleeptime = waittime - backend_fudge;

if (expect_true (sleeptime > 0.))
{
ev_sleep (sleeptime);
waittime -= sleeptime;
}
}
}

#if EV_FEATURE_API
++loop_count;
#endif
assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */
backend_poll (EV_A_ waittime);
assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */

/* update ev_rt_now, do magic */
time_update (EV_A_ waittime + sleeptime);
}

/* queue pending timers and reschedule them */
timers_reify (EV_A); /* relative timers called last */
#if EV_PERIODIC_ENABLE
periodics_reify (EV_A); /* absolute timers called first */
#endif

#if EV_IDLE_ENABLE
/* queue idle watchers unless other events are pending */
idle_reify (EV_A);
#endif

#if EV_CHECK_ENABLE
/* queue check watchers, to be executed first */
if (expect_false (checkcnt))
queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);
++loop_count;
#endif
assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */
backend_poll (EV_A_ waittime);
assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */

EV_INVOKE_PENDING;
}
while (expect_true (
activecnt
&& !loop_done
&& !(flags & (EVRUN_ONCE | EVRUN_NOWAIT))
));
while (0);

if (loop_done == EVBREAK_ONE)
loop_done = EVBREAK_CANCEL;
Expand Down
9 changes: 5 additions & 4 deletions src/unix/internal.h
Expand Up @@ -93,8 +93,7 @@ enum {
UV_STREAM_WRITABLE = 0x40, /* The stream is writable */
UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */
UV_TCP_NODELAY = 0x100, /* Disable Nagle. */
UV_TCP_KEEPALIVE = 0x200, /* Turn on keep-alive. */
UV_TIMER_REPEAT = 0x100
UV_TCP_KEEPALIVE = 0x200 /* Turn on keep-alive. */
};

inline static void uv__req_init(uv_loop_t* loop,
Expand Down Expand Up @@ -151,15 +150,17 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay);
/* pipe */
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);

/* poll */
void uv__poll_close(uv_poll_t* handle);
/* timer */
void uv__run_timers(uv_loop_t* loop);
unsigned int uv__next_timeout(uv_loop_t* loop);

/* various */
void uv__async_close(uv_async_t* handle);
void uv__check_close(uv_check_t* handle);
void uv__fs_event_close(uv_fs_event_t* handle);
void uv__idle_close(uv_idle_t* handle);
void uv__pipe_close(uv_pipe_t* handle);
void uv__poll_close(uv_poll_t* handle);
void uv__prepare_close(uv_prepare_t* handle);
void uv__process_close(uv_process_t* handle);
void uv__stream_close(uv_stream_t* handle);
Expand Down
2 changes: 2 additions & 0 deletions src/unix/loop.c
Expand Up @@ -37,13 +37,15 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
memset(loop, 0, sizeof(*loop));

RB_INIT(&loop->ares_handles);
RB_INIT(&loop->timer_handles);
ngx_queue_init(&loop->active_reqs);
ngx_queue_init(&loop->idle_handles);
ngx_queue_init(&loop->check_handles);
ngx_queue_init(&loop->prepare_handles);
ngx_queue_init(&loop->handle_queue);
loop->closing_handles = NULL;
loop->channel = NULL;
loop->time = uv_hrtime() / 1000000;
loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
ev_set_userdata(loop->ev, loop);
eio_channel_init(&loop->uv_eio_channel, loop);
Expand Down

0 comments on commit c9396dd

Please sign in to comment.