Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
import/export streams accross loops
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor Zinkovsky committed Jan 20, 2012
1 parent 517bfc8 commit 9811e0f
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 4 deletions.
3 changes: 3 additions & 0 deletions include/uv-private/uv-unix.h
Expand Up @@ -192,6 +192,9 @@ typedef void* uv_lib_t;
struct termios orig_termios; \
int mode;

#define UV_STREAM_INFO_PRIVATE_FIELDS \
int fd;

/* UV_FS_EVENT_PRIVATE_FIELDS */
#if defined(__linux__)

Expand Down
5 changes: 5 additions & 0 deletions include/uv-private/uv-win.h
Expand Up @@ -450,6 +450,11 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
wchar_t* dirw; \
char* buffer;

#define UV_STREAM_INFO_PRIVATE_FIELDS \
union { \
WSAPROTOCOL_INFOW socket_info; \
};

int uv_utf16_to_utf8(const wchar_t* utf16Buffer, size_t utf16Size,
char* utf8Buffer, size_t utf8Size);
int uv_utf8_to_utf16(const char* utf8Buffer, wchar_t* utf16Buffer,
Expand Down
23 changes: 23 additions & 0 deletions include/uv.h
Expand Up @@ -179,6 +179,7 @@ typedef struct uv_process_s uv_process_t;
typedef struct uv_counters_s uv_counters_t;
typedef struct uv_cpu_info_s uv_cpu_info_t;
typedef struct uv_interface_address_s uv_interface_address_t;
typedef struct uv_stream_info_s uv_stream_info_t;
/* Request types */
typedef struct uv_req_s uv_req_t;
typedef struct uv_shutdown_s uv_shutdown_t;
Expand Down Expand Up @@ -529,6 +530,28 @@ UV_EXTERN int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name,
UV_EXTERN int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name,
int* namelen);

/*
* uv_stream_info_t is used to store exported stream (using uv_export),
* which can be imported into a different event-loop within the same process
* (using uv_import).
*/
struct uv_stream_info_s {
uv_handle_type type;
UV_STREAM_INFO_PRIVATE_FIELDS
};

/*
* Exports uv_stream_t as uv_stream_info_t value, which could
* be used to initialize shared streams within the same process.
*/
UV_EXTERN int uv_export(uv_stream_t* stream, uv_stream_info_t* info);

/*
* Imports uv_stream_info_t value into uv_stream_t to initialize
* shared stream.
*/
UV_EXTERN int uv_import(uv_stream_t* stream, uv_stream_info_t* info);

/*
* uv_tcp_connect, uv_tcp_connect6
* These functions establish IPv4 and IPv6 TCP connections. Provide an
Expand Down
10 changes: 10 additions & 0 deletions src/unix/stream.c
Expand Up @@ -966,3 +966,13 @@ int uv_read_stop(uv_stream_t* stream) {
}


int uv_export(uv_stream_t* stream, uv_stream_info_t* info) {
/* Implement me */
return uv__new_artificial_error(UV_ENOSYS);
}


int uv_import(uv_stream_t* stream, uv_stream_info_t* info) {
/* Implement me */
return uv__new_artificial_error(UV_ENOSYS);
}
5 changes: 4 additions & 1 deletion src/win/internal.h
Expand Up @@ -143,11 +143,14 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,

void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle);

int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info);
int uv__tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info);

int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
LPWSAPROTOCOL_INFOW protocol_info);

int uv_tcp_export(uv_tcp_t* tcp, uv_stream_info_t* info);
int uv_tcp_import(uv_tcp_t* tcp, uv_stream_info_t* info);


/*
* UDP
Expand Down
2 changes: 1 addition & 1 deletion src/win/pipe.c
Expand Up @@ -649,7 +649,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
return -1;
}

return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info);
return uv__tcp_import((uv_tcp_t*)client, server->pending_socket_info);
} else {
pipe_client = (uv_pipe_t*)client;

Expand Down
24 changes: 24 additions & 0 deletions src/win/stream.c
Expand Up @@ -186,3 +186,27 @@ size_t uv_count_bufs(uv_buf_t bufs[], int count) {

return bytes;
}


int uv_export(uv_stream_t* stream, uv_stream_info_t* info) {
switch (stream->type) {
case UV_TCP:
return uv_tcp_export((uv_tcp_t*)stream, info);
default:
assert(0);
uv__set_sys_error(stream->loop, WSAEINVAL);
return -1;
}
}


int uv_import(uv_stream_t* stream, uv_stream_info_t* info) {
switch (stream->type) {
case UV_TCP:
return uv_tcp_import((uv_tcp_t*)stream, info);
default:
assert(0);
uv__set_sys_error(stream->loop, WSAEINVAL);
return -1;
}
}
25 changes: 23 additions & 2 deletions src/win/tcp.c
Expand Up @@ -1019,7 +1019,7 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
}


int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
int uv__tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
SOCKET socket = WSASocketW(AF_INET,
SOCK_STREAM,
IPPROTO_IP,
Expand Down Expand Up @@ -1140,4 +1140,25 @@ int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
}

return 0;
}
}


int uv_tcp_export(uv_tcp_t* tcp, uv_stream_info_t* info) {
if (uv_tcp_duplicate_socket(tcp, GetCurrentProcessId(),
&info->socket_info) == -1) {
return -1;
}

info->type = UV_TCP;
return 0;
}


int uv_tcp_import(uv_tcp_t* tcp, uv_stream_info_t* info) {
if (info->type != UV_TCP) {
uv__set_sys_error(tcp->loop, WSAEINVAL);
return -1;
}

return uv__tcp_import(tcp, &info->socket_info);
}
224 changes: 224 additions & 0 deletions test/test-ipc-threads.c
@@ -0,0 +1,224 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

#include "uv.h"
#include "runner.h"
#include "task.h"

#include <stdio.h>
#include <string.h>

typedef struct {
uv_loop_t* loop;
uv_thread_t thread;
uv_async_t recv_channel;
uv_async_t send_channel;
uv_tcp_t server;
uv_tcp_t conn;
int connection_accepted;
int close_cb_called;
} worker_t;

static worker_t parent, child;

static volatile uv_stream_info_t dup_stream;

typedef struct {
uv_connect_t conn_req;
uv_tcp_t conn;
} tcp_conn;

#define CONN_COUNT 100

static void close_cb(uv_handle_t* handle) {
worker_t* worker = (worker_t*)handle->data;
ASSERT(worker);
worker->close_cb_called++;
}


static void on_connection(uv_stream_t* server, int status) {
int r;
worker_t* worker = CONTAINING_RECORD(server, worker_t, server);

if (!worker->connection_accepted) {
/*
* Accept the connection and close it.
*/
ASSERT(status == 0);

r = uv_tcp_init(server->loop, &worker->conn);
ASSERT(r == 0);

worker->conn.data = worker;

r = uv_accept(server, (uv_stream_t*)&worker->conn);
ASSERT(r == 0);

worker->connection_accepted = 1;

uv_close((uv_handle_t*)&worker->conn, close_cb);
uv_close((uv_handle_t*)&worker->recv_channel, close_cb);
uv_close((uv_handle_t*)server, close_cb);
}
}


static void close_client_conn_cb(uv_handle_t* handle) {
tcp_conn* p = (tcp_conn*)handle->data;
free(p);
}


static void connect_cb(uv_connect_t* req, int status) {
uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
}


static void make_many_connections() {
tcp_conn* conn;
struct sockaddr_in addr;
int r, i;

for (i = 0; i < CONN_COUNT; i++) {
conn = malloc(sizeof(*conn));
ASSERT(conn);

r = uv_tcp_init(uv_default_loop(), &conn->conn);
ASSERT(r == 0);

addr = uv_ip4_addr("127.0.0.1", TEST_PORT);

r = uv_tcp_connect(&conn->conn_req, (uv_tcp_t*)&conn->conn, addr, connect_cb);
ASSERT(r == 0);

conn->conn.data = conn;
}
}


void on_parent_msg(uv_async_t* handle, int status) {
int r;

ASSERT(dup_stream.type == UV_TCP);

/* Import the shared TCP server, and start listening on it. */
r = uv_tcp_init(parent.loop, &parent.server);
ASSERT(r == 0);

parent.server.data = &parent;

r = uv_import((uv_stream_t*)&parent.server,
(uv_stream_info_t*)&dup_stream);
ASSERT(r == 0);

r = uv_listen((uv_stream_t*)&parent.server, 12, on_connection);
ASSERT(r == 0);

/* Create a bunch of connections to get both servers to accept. */
make_many_connections();
}


void on_child_msg(uv_async_t* handle, int status) {
ASSERT(!"no");
}


static void child_thread_entry(void* arg) {
int r;
int listen_after_write = (int)arg;

r = uv_tcp_init(child.loop, &child.server);
ASSERT(r == 0);

child.server.data = &child;

r = uv_tcp_bind(&child.server, uv_ip4_addr("0.0.0.0", TEST_PORT));
ASSERT(r == 0);

if (!listen_after_write) {
r = uv_listen((uv_stream_t*)&child.server, 12, on_connection);
ASSERT(r == 0);
}

r = uv_export((uv_stream_t*)&child.server,
(uv_stream_info_t*)&dup_stream);
ASSERT(r == 0);

r = uv_async_send(&child.send_channel);
ASSERT(r == 0);

if (listen_after_write) {
r = uv_listen((uv_stream_t*)&child.server, 12, on_connection);
ASSERT(r == 0);
}

r = uv_run(child.loop);
ASSERT(r == 0);

ASSERT(child.connection_accepted == 1);
ASSERT(child.close_cb_called == 3);
}


static void run_ipc_threads_test(int listen_after_write) {
int r;

parent.loop = uv_default_loop();
child.loop = uv_loop_new();
ASSERT(child.loop);

r = uv_async_init(parent.loop, &parent.recv_channel, on_parent_msg);
ASSERT(r == 0);
parent.recv_channel.data = &parent;

r = uv_async_init(child.loop, &parent.send_channel, on_child_msg);
ASSERT(r == 0);
parent.send_channel.data = &child;

child.send_channel = parent.recv_channel;
child.recv_channel = parent.send_channel;

r = uv_thread_create(&child.thread, child_thread_entry, (void*)listen_after_write);
ASSERT(r == 0);

r = uv_run(parent.loop);
ASSERT(r == 0);

ASSERT(parent.connection_accepted == 1);
ASSERT(parent.close_cb_called == 3);

r = uv_thread_join(&child.thread);
ASSERT(r == 0);
}


TEST_IMPL(ipc_threads_listen_after_write) {
run_ipc_threads_test(1);
return 0;
}


TEST_IMPL(ipc_threads_listen_before_write) {
run_ipc_threads_test(0);
return 0;
}

0 comments on commit 9811e0f

Please sign in to comment.