Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
UNIX: Loop on blocking streams
Browse files Browse the repository at this point in the history
Also removes a superfluous syscall during uv_tty_init for writable TTY
streams.
  • Loading branch information
ry committed Nov 2, 2011
1 parent 74b49e8 commit e1bee05
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
37 changes: 31 additions & 6 deletions src/unix/stream.c
Expand Up @@ -62,6 +62,7 @@ void uv__stream_init(uv_loop_t* loop,
stream->accepted_fd = -1;
stream->fd = -1;
stream->delayed_error = 0;
stream->blocking = 0;
ngx_queue_init(&stream->write_queue);
ngx_queue_init(&stream->write_completed_queue);
stream->write_queue_size = 0;
Expand Down Expand Up @@ -340,9 +341,9 @@ static void uv__write(uv_stream_t* stream) {
int iovcnt;
ssize_t n;

assert(stream->fd >= 0);
start:

/* TODO: should probably while(1) here until EAGAIN */
assert(stream->fd >= 0);

/* Get the request at the head of the queue. */
req = uv_write_queue_head(stream);
Expand All @@ -353,14 +354,16 @@ static void uv__write(uv_stream_t* stream) {

assert(req->handle == stream);

/* Cast to iovec. We had to have our own uv_buf_t instead of iovec
/*
* Cast to iovec. We had to have our own uv_buf_t instead of iovec
* because Windows's WSABUF is not an iovec.
*/
assert(sizeof(uv_buf_t) == sizeof(struct iovec));
iov = (struct iovec*) &(req->bufs[req->write_index]);
iovcnt = req->bufcnt - req->write_index;

/* Now do the actual writev. Note that we've been updating the pointers
/*
* Now do the actual writev. Note that we've been updating the pointers
* inside the iov each time we write. So there is no need to offset it.
*/

Expand Down Expand Up @@ -409,6 +412,9 @@ static void uv__write(uv_stream_t* stream) {
stream->write_queue_size -= uv__write_req_size(req);
uv__write_req_finish(req);
return;
} else if (stream->blocking) {
/* If this is a blocking stream, try again. */
goto start;
}
} else {
/* Successful write */
Expand All @@ -426,8 +432,17 @@ static void uv__write(uv_stream_t* stream) {
stream->write_queue_size -= n;
n = 0;

/* There is more to write. Break and ensure the watcher is pending. */
break;
/* There is more to write. */
if (stream->blocking) {
/*
* If we're blocking then we should not be enabling the write
* watcher - instead we need to try again.
*/
goto start;
} else {
/* Break loop and ensure the watcher is pending. */
break;
}

} else {
/* Finished writing the buf at index req->write_index. */
Expand All @@ -453,6 +468,9 @@ static void uv__write(uv_stream_t* stream) {
/* Either we've counted n down to zero or we've got EAGAIN. */
assert(n == 0 || n == -1);

/* Only non-blocking streams should use the write_watcher. */
assert(!stream->blocking);

/* We're not done. */
ev_io_start(stream->loop->ev, &stream->write_watcher);
}
Expand Down Expand Up @@ -862,6 +880,13 @@ int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
if (empty_queue) {
uv__write(stream);
} else {
/*
* blocking streams should never have anything in the queue.
* if this assert fires then somehow the blocking stream isn't being
* sufficently flushed in uv__write.
*/
assert(!stream->blocking);

ev_io_start(stream->loop->ev, &stream->write_watcher);
}

Expand Down
1 change: 0 additions & 1 deletion src/unix/tty.c
Expand Up @@ -41,7 +41,6 @@ int uv_tty_init(uv_loop_t* loop, uv_tty_t* tty, int fd, int readable) {
uv__stream_open((uv_stream_t*)tty, fd, UV_READABLE);
} else {
/* Note: writable tty we set to blocking mode. */
uv__nonblock(fd, 0);
uv__stream_open((uv_stream_t*)tty, fd, UV_WRITABLE);
tty->blocking = 1;
}
Expand Down

0 comments on commit e1bee05

Please sign in to comment.