Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
windows ipc fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor Zinkovsky committed Oct 6, 2011
1 parent aad62d9 commit 3502da6
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 53 deletions.
9 changes: 2 additions & 7 deletions include/uv-private/uv-win.h
Expand Up @@ -43,11 +43,6 @@ typedef struct uv_buf_t {
char* base;
} uv_buf_t;

typedef struct uv_duplicate_socket_info_s {
WSAPROTOCOL_INFOW socket_info;
struct uv_duplicate_socket_info_s* next;
} uv_duplicate_socket_info_t;

typedef int uv_file;

RB_HEAD(uv_timer_tree_s, uv_timer_s);
Expand Down Expand Up @@ -103,7 +98,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
struct uv_req_s* next_req;

#define UV_WRITE_PRIVATE_FIELDS \
/* empty */
int ipc_header;

#define UV_CONNECT_PRIVATE_FIELDS \
/* empty */
Expand Down Expand Up @@ -181,7 +176,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
uv_write_t ipc_header_write_req; \
int ipc_pid; \
uint64_t remaining_ipc_rawdata_bytes; \
uv_duplicate_socket_info_t* pending_ipc_sockets;
WSAPROTOCOL_INFOW* pending_socket_info;

#define UV_PIPE_PRIVATE_FIELDS \
HANDLE handle; \
Expand Down
2 changes: 0 additions & 2 deletions src/win/internal.h
Expand Up @@ -66,8 +66,6 @@ void uv_process_timers(uv_loop_t* loop);
#define UV_HANDLE_TTY_RAW 0x80000
#define UV_HANDLE_USE_IPC_PROTOCOL 0x100000
#define UV_HANDLE_EMULATE_IOCP 0x200000
#define UV_HANDLE_DUPLICATED_SOCKET 0x400000
#define UV_HANDLE_WINSOCK_EXT_INIT 0x800000

void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle);
void uv_process_endgames(uv_loop_t* loop);
Expand Down
89 changes: 52 additions & 37 deletions src/win/pipe.c
Expand Up @@ -77,7 +77,9 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
handle->name = NULL;
handle->ipc_pid = 0;
handle->remaining_ipc_rawdata_bytes = 0;
handle->pending_ipc_sockets = NULL;
handle->pending_socket_info = NULL;

uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);

if (ipc) {
handle->flags |= UV_HANDLE_USE_IPC_PROTOCOL;
Expand Down Expand Up @@ -196,7 +198,6 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_PIPE_LOCAL_INFORMATION pipe_info;
uv_duplicate_socket_info_t* socket_info, *next_socket_info;

if (handle->flags & UV_HANDLE_SHUTTING &&
!(handle->flags & UV_HANDLE_SHUT) &&
Expand Down Expand Up @@ -256,11 +257,9 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
handle->flags |= UV_HANDLE_CLOSED;

if (handle->flags & UV_HANDLE_CONNECTION) {
next_socket_info = handle->pending_ipc_sockets;
while (next_socket_info) {
socket_info = next_socket_info;
next_socket_info = next_socket_info->next;
free(socket_info);
if (handle->pending_socket_info) {
free(handle->pending_socket_info);
handle->pending_socket_info = NULL;
}
}

Expand Down Expand Up @@ -582,24 +581,18 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,


int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
int r;
uv_loop_t* loop = server->loop;
uv_pipe_t* pipe_client;
uv_pipe_accept_t* req;
uv_duplicate_socket_info_t* pending_socket;

if (server->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
pending_socket = server->pending_ipc_sockets;
if (!pending_socket) {
if (!server->pending_socket_info) {
/* No valid pending sockets. */
uv__set_sys_error(loop, WSAEWOULDBLOCK);
return -1;
}

server->pending_ipc_sockets = pending_socket->next;
r = uv_tcp_import((uv_tcp_t*)client, &pending_socket->socket_info);
free(pending_socket);
return r;
return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info);
} else {
pipe_client = (uv_pipe_t*)client;

Expand Down Expand Up @@ -754,7 +747,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
uv_stream_t* send_handle, uv_write_cb cb) {
int result;
uv_tcp_t* tcp_send_handle;
uv_req_t* ipc_header_req;
uv_write_t* ipc_header_req;
DWORD written;
uv_ipc_frame_uv_stream ipc_frame;

Expand Down Expand Up @@ -784,6 +777,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
req->type = UV_WRITE;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
req->ipc_header = 0;
memset(&req->overlapped, 0, sizeof(req->overlapped));

if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
Expand All @@ -809,16 +803,26 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
* the first write, and then use the provided req for the second write.
*/
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
ipc_header_req = (uv_req_t*)req;
ipc_header_req = req;
} else {
ipc_header_req = (uv_req_t*)&handle->ipc_header_write_req;
/* Initialize the req if needed. */
/*
* Try to use the preallocated write req if it's available.
* Otherwise allocate a new one.
*/
if (handle->ipc_header_write_req.type != UV_WRITE) {
uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
handle->ipc_header_write_req.type = UV_WRITE;
handle->ipc_header_write_req.handle = (uv_stream_t*) handle;
handle->ipc_header_write_req.cb = NULL;
ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
} else {
ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
if (!handle->accept_reqs) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
}

uv_req_init(loop, (uv_req_t*) ipc_header_req);
ipc_header_req->type = UV_WRITE;
ipc_header_req->handle = (uv_stream_t*) handle;
ipc_header_req->cb = NULL;
ipc_header_req->ipc_header = 1;
}

/* Write the header or the whole frame. */
Expand Down Expand Up @@ -947,7 +951,6 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
DWORD bytes, avail;
uv_buf_t buf;
uv_ipc_frame_uv_stream ipc_frame;
uv_duplicate_socket_info_t* pending_ipc_socket;

assert(handle->type == UV_NAMED_PIPE);

Expand Down Expand Up @@ -1015,16 +1018,15 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,

assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));

/* Insert a new pending socket entry. */
pending_ipc_socket =
(uv_duplicate_socket_info_t*)malloc(sizeof(*pending_ipc_socket));
if (!pending_ipc_socket) {
/* Store the pending socket info. */
assert(!handle->pending_socket_info);
handle->pending_socket_info =
(WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info)));
if (!handle->pending_socket_info) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}

pending_ipc_socket->socket_info = ipc_frame.socket_info;
pending_ipc_socket->next = handle->pending_ipc_sockets;
handle->pending_ipc_sockets = pending_ipc_socket;
*(handle->pending_socket_info) = ipc_frame.socket_info;
}

if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
Expand Down Expand Up @@ -1052,10 +1054,15 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
handle->remaining_ipc_rawdata_bytes - bytes;
if (handle->read2_cb) {
handle->read2_cb(handle, bytes, buf,
handle->pending_ipc_sockets ? UV_TCP : UV_UNKNOWN_HANDLE);
handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
} else if (handle->read_cb) {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
}

if (handle->pending_socket_info) {
free(handle->pending_socket_info);
handle->pending_socket_info = NULL;
}
} else {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
}
Expand Down Expand Up @@ -1087,12 +1094,20 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,

handle->write_queue_size -= req->queued_bytes;

if (req->cb) {
if (!REQ_SUCCESS(req)) {
uv__set_sys_error(loop, GET_REQ_ERROR(req));
((uv_write_cb)req->cb)(req, -1);
if (req->ipc_header) {
if (req == &handle->ipc_header_write_req) {
req->type = UV_UNKNOWN_REQ;
} else {
((uv_write_cb)req->cb)(req, 0);
free(req);
}
} else {
if (req->cb) {
if (!REQ_SUCCESS(req)) {
uv__set_sys_error(loop, GET_REQ_ERROR(req));
((uv_write_cb)req->cb)(req, -1);
} else {
((uv_write_cb)req->cb)(req, 0);
}
}
}

Expand Down
12 changes: 5 additions & 7 deletions src/win/tcp.c
Expand Up @@ -103,6 +103,8 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
handle->socket = INVALID_SOCKET;
handle->type = UV_TCP;
handle->reqs_pending = 0;
handle->func_acceptex = NULL;
handle->func_connectex = NULL;

loop->counters.tcp_init++;

Expand Down Expand Up @@ -410,12 +412,11 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
return -1;

if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) {
if (!handle->func_acceptex) {
if(!uv_get_acceptex_function(handle->socket, &handle->func_acceptex)) {
uv__set_sys_error(loop, WSAEAFNOSUPPORT);
return -1;
}
handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT;
}

if (listen(handle->socket, backlog) == SOCKET_ERROR) {
Expand Down Expand Up @@ -548,12 +549,11 @@ int uv__tcp_connect(uv_connect_t* req,
uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
return -1;

if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) {
if (!handle->func_connectex) {
if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) {
uv__set_sys_error(loop, WSAEAFNOSUPPORT);
return -1;
}
handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT;
}

uv_req_init(loop, (uv_req_t*) req);
Expand Down Expand Up @@ -609,12 +609,11 @@ int uv__tcp_connect6(uv_connect_t* req,
uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0)
return -1;

if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) {
if (!handle->func_connectex) {
if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) {
uv__set_sys_error(loop, WSAEAFNOSUPPORT);
return -1;
}
handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT;
}

uv_req_init(loop, (uv_req_t*) req);
Expand Down Expand Up @@ -952,7 +951,6 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
}

tcp->flags |= UV_HANDLE_BOUND;
tcp->flags |= UV_HANDLE_DUPLICATED_SOCKET;

return uv_tcp_set_socket(tcp->loop, tcp, socket, 1);
}

0 comments on commit 3502da6

Please sign in to comment.