Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
windows: use UV_HANDLE_READABLE and UV_HANDLE_WRITABLE
Browse files Browse the repository at this point in the history
These flags supersede UV_HANDLE_EOF and UV_HANDLE_SHUTTING. This patch
also moves a lot of stream related state-checking code to stream.c.
  • Loading branch information
piscisaureus committed Sep 3, 2012
1 parent 483043b commit 032c041
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 99 deletions.
4 changes: 2 additions & 2 deletions src/win/internal.h
Expand Up @@ -52,8 +52,8 @@
#define UV_HANDLE_LISTENING 0x00000800
#define UV_HANDLE_CONNECTION 0x00001000
#define UV_HANDLE_CONNECTED 0x00002000
#define UV_HANDLE_EOF 0x00004000
#define UV_HANDLE_SHUTTING 0x00008000
#define UV_HANDLE_READABLE 0x00008000
#define UV_HANDLE_WRITABLE 0x00010000
#define UV_HANDLE_READ_PENDING 0x00020000
#define UV_HANDLE_SYNC_BYPASS_IOCP 0x00040000
#define UV_HANDLE_ZERO_READ 0x00080000
Expand Down
43 changes: 13 additions & 30 deletions src/win/pipe.c
Expand Up @@ -114,7 +114,7 @@ static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) {
FILE_FLAG_OVERLAPPED,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = 0;
*duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
return pipeHandle;
}

Expand All @@ -133,7 +133,7 @@ static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) {
NULL);

if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = UV_HANDLE_SHUTTING;
*duplex_flags = UV_HANDLE_READABLE;
return pipeHandle;
}
}
Expand All @@ -148,7 +148,7 @@ static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) {
NULL);

if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = UV_HANDLE_EOF;
*duplex_flags = UV_HANDLE_WRITABLE;
return pipeHandle;
}
}
Expand Down Expand Up @@ -316,7 +316,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
/* Failure */
UNREGISTER_HANDLE_REQ(loop, handle, req);

handle->flags &= ~UV_HANDLE_SHUTTING;
handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
if (req->cb) {
uv__set_sys_error(loop, pRtlNtStatusToDosError(nt_status));
req->cb(req, -1);
Expand All @@ -343,7 +343,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
/* Failure. */
UNREGISTER_HANDLE_REQ(loop, handle, req);

handle->flags &= ~UV_HANDLE_SHUTTING;
handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
if (req->cb) {
uv__set_sys_error(loop, GetLastError());
req->cb(req, -1);
Expand Down Expand Up @@ -630,7 +630,7 @@ void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
}

if (handle->flags & UV_HANDLE_CONNECTION) {
handle->flags |= UV_HANDLE_SHUTTING;
handle->flags &= ~UV_HANDLE_WRITABLE;
eof_timer_destroy(handle);
}

Expand Down Expand Up @@ -659,6 +659,7 @@ void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
uv_want_endgame(loop, (uv_handle_t*) handle);
}

handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
uv__handle_closing(handle);
}

Expand Down Expand Up @@ -746,6 +747,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
/* Initialize the client handle and copy the pipeHandle to the client */
uv_pipe_connection_init(pipe_client);
pipe_client->handle = req->pipeHandle;
pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;

/* Prepare the req to pick up a new connection */
server->pending_accepts = req->next_pending;
Expand Down Expand Up @@ -964,16 +966,6 @@ static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb, uv_read2_cb read2_cb) {
uv_loop_t* loop = handle->loop;

if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv__set_artificial_error(loop, UV_EINVAL);
return -1;
}

if (handle->flags & UV_HANDLE_EOF) {
uv__set_artificial_error(loop, UV_EOF);
return -1;
}

handle->flags |= UV_HANDLE_READING;
INCREASE_ACTIVE_COUNT(loop, handle);
handle->read_cb = read_cb;
Expand Down Expand Up @@ -1072,16 +1064,6 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,

assert(handle->handle != INVALID_HANDLE_VALUE);

if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv__set_artificial_error(loop, UV_EINVAL);
return -1;
}

if (handle->flags & UV_HANDLE_SHUTTING) {
uv__set_artificial_error(loop, UV_EOF);
return -1;
}

uv_req_init(loop, (uv_req_t*) req);
req->type = UV_WRITE;
req->handle = (uv_stream_t*) handle;
Expand Down Expand Up @@ -1253,7 +1235,7 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
/* so discard it. */
eof_timer_destroy(handle);

handle->flags |= UV_HANDLE_EOF;
handle->flags &= ~UV_HANDLE_READABLE;
uv_read_stop((uv_stream_t*) handle);

uv__set_artificial_error(loop, UV_EOF);
Expand Down Expand Up @@ -1483,8 +1465,8 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_queue_non_overlapped_write(handle);
}

if (handle->write_reqs_pending == 0 &&
handle->flags & UV_HANDLE_SHUTTING) {
if (handle->shutdown_req != NULL &&
handle->write_reqs_pending == 0) {
uv_want_endgame(loop, (uv_handle_t*)handle);
}

Expand Down Expand Up @@ -1548,7 +1530,7 @@ void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,

/* Initialize and optionally start the eof timer. */
/* This makes no sense if we've already seen EOF. */
if (!(handle->flags & UV_HANDLE_EOF)) {
if (handle->flags & UV_HANDLE_READABLE) {
eof_timer_init(handle);

/* If reading start the timer right now. */
Expand Down Expand Up @@ -1662,6 +1644,7 @@ void uv_pipe_open(uv_pipe_t* pipe, uv_file file) {

uv_pipe_connection_init(pipe);
pipe->handle = os_handle;
pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;

if (pipe->ipc) {
assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
Expand Down
3 changes: 3 additions & 0 deletions src/win/process-stdio.c
Expand Up @@ -163,6 +163,9 @@ static uv_err_t uv__create_stdio_pipe_pair(uv_loop_t* loop,
}
}

/* The server end is now readable and writable. */
server_pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;

*child_pipe_ptr = child_pipe;
return uv_ok_;

Expand Down
51 changes: 41 additions & 10 deletions src/win/stream.c
Expand Up @@ -55,6 +55,16 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {

int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
if (handle->flags & UV_HANDLE_READING) {

This comment has been minimized.

Copy link
@saghul

saghul Sep 3, 2012

Contributor

The unix version doesn't seem to do this, it seems to allow multiple calls to uv_read_start. Is this a bug or a feature? :-)

This comment has been minimized.

Copy link
@piscisaureus

piscisaureus Sep 3, 2012

Author

Do you care?

Windows never allowed this. Insofar we can support this, it should be semantically equivalent to uv_read_stop() followed by a uv_read_start[2]() call.

This comment has been minimized.

Copy link
@saghul

saghul Sep 3, 2012

Contributor

Well, it would be unexpected that the same user code silently works on Unix but gives an error on Windows, wouldn't it? Cross platform consistency is nice :-)

This comment has been minimized.

Copy link
@piscisaureus

piscisaureus Sep 3, 2012

Author

@bnoordhuis what would your preference be? Disallow or stop+start. I think I prefer stop+start.

This comment has been minimized.

Copy link
@piscisaureus

piscisaureus Sep 3, 2012

Author

@saghul Fair enough.

This comment has been minimized.

Copy link
@bnoordhuis

bnoordhuis Sep 3, 2012

Contributor

what would your preference be? Disallow or stop+start. I think I prefer stop+start.

In general or in this particular case? It's something of a mess right now, some functions silently return, others raise an error and yet others do some kind of restart.

For the sake of consistency, all libuv functions should react the same. What that reaction is - return an error, abort, restart - is TBD. :-)

Aborting (or printing a warning and returning an error) is my personal favorite at the moment. It's kind of harsh but calling start repeatedly often indicates a bug or logic error in your code. Correctness before convenience and all that.

uv__set_sys_error(handle->loop, UV_EALREADY);
return -1;
}

if (!(handle->flags & UV_HANDLE_READABLE)) {
uv__set_artificial_error(handle->loop, UV_ENOTCONN);
return -1;
}

switch (handle->type) {
case UV_TCP:
return uv_tcp_read_start((uv_tcp_t*)handle, alloc_cb, read_cb);
Expand All @@ -71,6 +81,16 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb,

int uv_read2_start(uv_stream_t* handle, uv_alloc_cb alloc_cb,
uv_read2_cb read_cb) {
if (handle->flags & UV_HANDLE_READING) {
uv__set_sys_error(handle->loop, UV_EALREADY);
return -1;
}

if (!(handle->flags & UV_HANDLE_READABLE)) {
uv__set_artificial_error(handle->loop, UV_ENOTCONN);
return -1;
}

switch (handle->type) {
case UV_NAMED_PIPE:
return uv_pipe_read2_start((uv_pipe_t*)handle, alloc_cb, read_cb);
Expand All @@ -82,14 +102,15 @@ int uv_read2_start(uv_stream_t* handle, uv_alloc_cb alloc_cb,


int uv_read_stop(uv_stream_t* handle) {
if (!(handle->flags & UV_HANDLE_READING))
return 0;

if (handle->type == UV_TTY) {
return uv_tty_read_stop((uv_tty_t*) handle);
} else if (handle->flags & UV_HANDLE_READING) {
} else {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(handle->loop, handle);
return 0;
} else {
return 0;
}
}

Expand All @@ -98,6 +119,11 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
uv_write_cb cb) {
uv_loop_t* loop = handle->loop;

if (!(handle->flags & UV_HANDLE_WRITABLE)) {
uv__set_artificial_error(loop, UV_EPIPE);
return -1;
}

switch (handle->type) {
case UV_TCP:
return uv_tcp_write(loop, req, (uv_tcp_t*) handle, bufs, bufcnt, cb);
Expand All @@ -117,6 +143,11 @@ int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
uv_stream_t* send_handle, uv_write_cb cb) {
uv_loop_t* loop = handle->loop;

if (!(handle->flags & UV_HANDLE_WRITABLE)) {
uv__set_artificial_error(loop, UV_EPIPE);
return -1;
}

switch (handle->type) {
case UV_NAMED_PIPE:
return uv_pipe_write2(loop, req, (uv_pipe_t*) handle, bufs, bufcnt, send_handle, cb);
Expand All @@ -131,13 +162,13 @@ int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
uv_loop_t* loop = handle->loop;

if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv__set_sys_error(loop, WSAEINVAL);
if (!(handle->flags & UV_HANDLE_WRITABLE)) {
uv__set_artificial_error(loop, UV_EPIPE);
return -1;
}

if (handle->flags & UV_HANDLE_SHUTTING) {
uv__set_sys_error(loop, WSAESHUTDOWN);
if (!(handle->flags & UV_HANDLE_WRITABLE)) {
uv__set_artificial_error(loop, UV_EPIPE);
return -1;
}

Expand All @@ -146,7 +177,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
req->handle = handle;
req->cb = cb;

handle->flags |= UV_HANDLE_SHUTTING;
handle->flags &= ~UV_HANDLE_WRITABLE;
handle->shutdown_req = req;
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
Expand All @@ -158,10 +189,10 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {


int uv_is_readable(const uv_stream_t* handle) {
return !(handle->flags & UV_HANDLE_EOF);
return !!(handle->flags & UV_HANDLE_READABLE);
}


int uv_is_writable(const uv_stream_t* handle) {
return !(handle->flags & UV_HANDLE_SHUTTING);
return !!(handle->flags & UV_HANDLE_WRITABLE);
}
37 changes: 7 additions & 30 deletions src/win/tcp.c
Expand Up @@ -604,7 +604,7 @@ int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
} else {
uv_connection_init((uv_stream_t*) client);
/* AcceptEx() implicitly binds the accepted socket. */
client->flags |= UV_HANDLE_BOUND;
client->flags |= UV_HANDLE_BOUND | UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
}

/* Prepare the req to pick up a new connection */
Expand Down Expand Up @@ -645,21 +645,6 @@ int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
uv_loop_t* loop = handle->loop;

if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv__set_sys_error(loop, WSAEINVAL);
return -1;
}

if (handle->flags & UV_HANDLE_READING) {
uv__set_sys_error(loop, WSAEALREADY);
return -1;
}

if (handle->flags & UV_HANDLE_EOF) {
uv__set_sys_error(loop, WSAESHUTDOWN);
return -1;
}

handle->flags |= UV_HANDLE_READING;
handle->read_cb = read_cb;
handle->alloc_cb = alloc_cb;
Expand Down Expand Up @@ -854,16 +839,6 @@ int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
int result;
DWORD bytes;

if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv__set_sys_error(loop, WSAEINVAL);
return -1;
}

if (handle->flags & UV_HANDLE_SHUTTING) {
uv__set_sys_error(loop, WSAESHUTDOWN);
return -1;
}

uv_req_init(loop, (uv_req_t*) req);
req->type = UV_WRITE;
req->handle = (uv_stream_t*) handle;
Expand Down Expand Up @@ -969,7 +944,7 @@ void uv_process_tcp_read_req(uv_loop_t* loop, uv_tcp_t* handle,
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
}
handle->flags |= UV_HANDLE_EOF;
handle->flags &= ~UV_HANDLE_READABLE;

uv__set_error(loop, UV_EOF, ERROR_SUCCESS);
buf.base = 0;
Expand Down Expand Up @@ -1000,9 +975,8 @@ void uv_process_tcp_read_req(uv_loop_t* loop, uv_tcp_t* handle,
}
} else {
/* Connection closed */
handle->flags &= ~UV_HANDLE_READING;
handle->flags &= ~(UV_HANDLE_READING | UV_HANDLE_READABLE);
DECREASE_ACTIVE_COUNT(loop, handle);
handle->flags |= UV_HANDLE_EOF;

uv__set_error(loop, UV_EOF, ERROR_SUCCESS);
handle->read_cb((uv_stream_t*)handle, -1, buf);
Expand Down Expand Up @@ -1069,7 +1043,7 @@ void uv_process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
}

handle->write_reqs_pending--;
if (handle->flags & UV_HANDLE_SHUTTING &&
if (handle->shutdown_req != NULL &&
handle->write_reqs_pending == 0) {
uv_want_endgame(loop, (uv_handle_t*)handle);
}
Expand Down Expand Up @@ -1138,6 +1112,7 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
NULL,
0) == 0) {
uv_connection_init((uv_stream_t*)handle);
handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
loop->active_tcp_streams++;
((uv_connect_cb)req->cb)(req, 0);
} else {
Expand Down Expand Up @@ -1180,6 +1155,7 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,

if (tcp_connection) {
uv_connection_init((uv_stream_t*)tcp);
tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
}

tcp->flags |= UV_HANDLE_BOUND;
Expand Down Expand Up @@ -1395,6 +1371,7 @@ void uv_tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
tcp->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
}

tcp->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
uv__handle_closing(tcp);

if (tcp->reqs_pending == 0) {
Expand Down

0 comments on commit 032c041

Please sign in to comment.