Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
windows: Enable passing of TCP connections over IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor Zinkovsky committed Mar 30, 2012
1 parent 31ff986 commit 70925c3
Show file tree
Hide file tree
Showing 9 changed files with 635 additions and 289 deletions.
5 changes: 4 additions & 1 deletion include/uv-private/uv-win.h
Expand Up @@ -318,7 +318,10 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
uv_write_t ipc_header_write_req; \
int ipc_pid; \
uint64_t remaining_ipc_rawdata_bytes; \
WSAPROTOCOL_INFOW* pending_socket_info; \
struct { \
WSAPROTOCOL_INFOW* socket_info; \
int tcp_connection; \
} pending_ipc_info; \
uv_write_t* non_overlapped_writes_tail;

#define UV_PIPE_PRIVATE_FIELDS \
Expand Down
15 changes: 1 addition & 14 deletions src/win/handle.c
Expand Up @@ -71,7 +71,6 @@ int uv_is_active(uv_handle_t* handle) {


void uv_close(uv_handle_t* handle, uv_close_cb cb) {
uv_tcp_t* tcp;
uv_pipe_t* pipe;
uv_udp_t* udp;
uv_process_t* process;
Expand All @@ -88,19 +87,7 @@ void uv_close(uv_handle_t* handle, uv_close_cb cb) {
/* Handle-specific close actions */
switch (handle->type) {
case UV_TCP:
tcp = (uv_tcp_t*)handle;
/* If we don't shutdown before calling closesocket, windows will */
/* silently discard the kernel send buffer and reset the connection. */
if ((tcp->flags & UV_HANDLE_CONNECTION) &&
!(tcp->flags & UV_HANDLE_SHUT)) {
shutdown(tcp->socket, SD_SEND);
tcp->flags |= UV_HANDLE_SHUTTING | UV_HANDLE_SHUT;
}
tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
closesocket(tcp->socket);
if (tcp->reqs_pending == 0) {
uv_want_endgame(loop, handle);
}
uv_tcp_close((uv_tcp_t*)handle);
return;

case UV_NAMED_PIPE:
Expand Down
6 changes: 4 additions & 2 deletions src/win/internal.h
Expand Up @@ -68,11 +68,12 @@ void uv_process_timers(uv_loop_t* loop);
#define UV_HANDLE_NON_OVERLAPPED_PIPE 0x00200000
#define UV_HANDLE_TTY_SAVED_POSITION 0x00400000
#define UV_HANDLE_TTY_SAVED_ATTRIBUTES 0x00800000
#define UV_HANDLE_SHARED_TCP_SERVER 0x01000000
#define UV_HANDLE_SHARED_TCP_SOCKET 0x01000000
#define UV_HANDLE_TCP_NODELAY 0x02000000
#define UV_HANDLE_TCP_KEEPALIVE 0x04000000
#define UV_HANDLE_TCP_SINGLE_ACCEPT 0x08000000
#define UV_HANDLE_TCP_ACCEPT_STATE_CHANGING 0x10000000
#define UV_HANDLE_TCP_SOCKET_CLOSED 0x20000000

void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle);
void uv_process_endgames(uv_loop_t* loop);
Expand Down Expand Up @@ -143,7 +144,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,

void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle);

int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info);
int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
int tcp_connection);

int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
LPWSAPROTOCOL_INFOW protocol_info);
Expand Down
59 changes: 34 additions & 25 deletions src/win/pipe.c
Expand Up @@ -42,8 +42,9 @@ static const int64_t eof_timeout = 50; /* ms */
static const int default_pending_pipe_instances = 4;

/* IPC protocol flags. */
#define UV_IPC_RAW_DATA 0x0001
#define UV_IPC_UV_STREAM 0x0002
#define UV_IPC_RAW_DATA 0x0001
#define UV_IPC_TCP_SERVER 0x0002
#define UV_IPC_TCP_CONNECTION 0x0004

/* IPC frame header. */
typedef struct {
Expand Down Expand Up @@ -79,7 +80,8 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
handle->name = NULL;
handle->ipc_pid = 0;
handle->remaining_ipc_rawdata_bytes = 0;
handle->pending_socket_info = NULL;
handle->pending_ipc_info.socket_info = NULL;
handle->pending_ipc_info.tcp_connection = 0;
handle->ipc = ipc;
handle->non_overlapped_writes_tail = NULL;

Expand Down Expand Up @@ -356,9 +358,9 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
handle->flags |= UV_HANDLE_CLOSED;

if (handle->flags & UV_HANDLE_CONNECTION) {
if (handle->pending_socket_info) {
free(handle->pending_socket_info);
handle->pending_socket_info = NULL;
if (handle->pending_ipc_info.socket_info) {
free(handle->pending_ipc_info.socket_info);
handle->pending_ipc_info.socket_info = NULL;
}

if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
Expand Down Expand Up @@ -711,13 +713,14 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
uv_pipe_accept_t* req;

if (server->ipc) {
if (!server->pending_socket_info) {
if (!server->pending_ipc_info.socket_info) {
/* No valid pending sockets. */
uv__set_sys_error(loop, WSAEWOULDBLOCK);
return -1;
}

return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info);
return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
server->pending_ipc_info.tcp_connection);
} else {
pipe_client = (uv_pipe_t*)client;

Expand Down Expand Up @@ -1051,9 +1054,8 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
return -1;
}

/* Only TCP server handles are supported for sharing. */
if (send_handle && (send_handle->type != UV_TCP ||
send_handle->flags & UV_HANDLE_CONNECTION)) {
/* Only TCP handles are supported for sharing. */
if (send_handle && send_handle->type != UV_TCP) {
uv__set_artificial_error(loop, UV_ENOTSUP);
return -1;
}
Expand Down Expand Up @@ -1091,7 +1093,11 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
&ipc_frame.socket_info)) {
return -1;
}
ipc_frame.header.flags |= UV_IPC_UV_STREAM;
ipc_frame.header.flags |= UV_IPC_TCP_SERVER;

if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
}
}

if (bufcnt == 1) {
Expand Down Expand Up @@ -1132,7 +1138,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,

result = WriteFile(handle->handle,
&ipc_frame,
ipc_frame.header.flags & UV_IPC_UV_STREAM ?
ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
sizeof(ipc_frame) : sizeof(ipc_frame.header),
NULL,
&ipc_header_req->overlapped);
Expand All @@ -1146,7 +1152,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
ipc_header_req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_UV_STREAM ?
ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
sizeof(ipc_frame) : sizeof(ipc_frame.header);
handle->write_queue_size += req->queued_bytes;
}
Expand Down Expand Up @@ -1330,9 +1336,10 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
}

assert(bytes == sizeof(ipc_frame.header));
assert(ipc_frame.header.flags <= (UV_IPC_UV_STREAM | UV_IPC_RAW_DATA));
assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
UV_IPC_TCP_CONNECTION));

if (ipc_frame.header.flags & UV_IPC_UV_STREAM) {
if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
assert(avail - sizeof(ipc_frame.header) >=
sizeof(ipc_frame.socket_info));

Expand All @@ -1350,14 +1357,16 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));

/* Store the pending socket info. */
assert(!handle->pending_socket_info);
handle->pending_socket_info =
(WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info)));
if (!handle->pending_socket_info) {
assert(!handle->pending_ipc_info.socket_info);
handle->pending_ipc_info.socket_info =
(WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
if (!handle->pending_ipc_info.socket_info) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}

*(handle->pending_socket_info) = ipc_frame.socket_info;
*(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
handle->pending_ipc_info.tcp_connection =
ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
}

if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
Expand Down Expand Up @@ -1385,14 +1394,14 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
handle->remaining_ipc_rawdata_bytes - bytes;
if (handle->read2_cb) {
handle->read2_cb(handle, bytes, buf,
handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
} else if (handle->read_cb) {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
}

if (handle->pending_socket_info) {
free(handle->pending_socket_info);
handle->pending_socket_info = NULL;
if (handle->pending_ipc_info.socket_info) {
free(handle->pending_ipc_info.socket_info);
handle->pending_ipc_info.socket_info = NULL;
}
} else {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
Expand Down

0 comments on commit 70925c3

Please sign in to comment.