Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
pipe: allow queueing pending handles
Browse files Browse the repository at this point in the history
Introduce `int uv_pipe_pending_count(uv_pipe_t*)` and
`uv_handle_type uv_pipe_pending_type(uv_pipe_t*)`. They should be
used in IPC pipe's read cb to accept incoming handles:

    int count = uv_pipe_pending_count(pipe);
    int i;
    for (i = 0; i < count; i++) {
      uv_handle_type type = uv_pipe_pending_type(pipe);
      /* ... */
      uv_accept(...);
    }
  • Loading branch information
indutny committed Mar 3, 2014
1 parent 409c7b3 commit b284c8d
Show file tree
Hide file tree
Showing 18 changed files with 556 additions and 252 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Expand Up @@ -158,6 +158,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-pipe-bind-error.c \
test/test-pipe-connect-error.c \
test/test-pipe-getsockname.c \
test/test-pipe-sendmsg.c \
test/test-pipe-server-close.c \
test/test-platform-output.c \
test/test-poll-close.c \
Expand Down
1 change: 1 addition & 0 deletions checksparse.sh
Expand Up @@ -115,6 +115,7 @@ test/test-pass-always.c
test/test-ping-pong.c
test/test-pipe-bind-error.c
test/test-pipe-connect-error.c
test/test-pipe-sendmsg.c
test/test-pipe-server-close.c
test/test-platform-output.c
test/test-poll-close.c
Expand Down
1 change: 1 addition & 0 deletions include/uv-unix.h
Expand Up @@ -230,6 +230,7 @@ typedef struct {
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
void* queued_fds; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \

#define UV_TCP_PRIVATE_FIELDS /* empty */
Expand Down
5 changes: 2 additions & 3 deletions include/uv-win.h
Expand Up @@ -424,10 +424,9 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
uv_write_t ipc_header_write_req; \
int ipc_pid; \
uint64_t remaining_ipc_rawdata_bytes; \
unsigned char reserved[sizeof(void*)]; \
struct { \
WSAPROTOCOL_INFOW* socket_info; \
int tcp_connection; \
void* queue[2]; \
int queue_len; \
} pending_ipc_info; \
uv_write_t* non_overlapped_writes_tail;

Expand Down
28 changes: 9 additions & 19 deletions include/uv.h
Expand Up @@ -398,17 +398,6 @@ typedef void (*uv_alloc_cb)(uv_handle_t* handle,
typedef void (*uv_read_cb)(uv_stream_t* stream,
ssize_t nread,
const uv_buf_t* buf);

/*
* Just like the uv_read_cb except that if the pending parameter is true
* then you can use uv_accept() to pull the new handle into the process.
* If no handle is pending then pending will be UV_UNKNOWN_HANDLE.
*/
typedef void (*uv_read2_cb)(uv_pipe_t* pipe,
ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending);

typedef void (*uv_write_cb)(uv_write_t* req, int status);
typedef void (*uv_connect_cb)(uv_connect_t* req, int status);
typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status);
Expand Down Expand Up @@ -611,7 +600,6 @@ UV_EXTERN uv_buf_t uv_buf_init(char* base, unsigned int len);
size_t write_queue_size; \
uv_alloc_cb alloc_cb; \
uv_read_cb read_cb; \
uv_read2_cb read2_cb; \
/* private */ \
UV_STREAM_PRIVATE_FIELDS

Expand Down Expand Up @@ -660,13 +648,6 @@ UV_EXTERN int uv_read_start(uv_stream_t*,

UV_EXTERN int uv_read_stop(uv_stream_t*);

/*
* Extended read methods for receiving handles over a pipe. The pipe must be
* initialized with ipc == 1.
*/
UV_EXTERN int uv_read2_start(uv_stream_t*, uv_alloc_cb alloc_cb,
uv_read2_cb read_cb);


/*
* Write data to stream. Buffers are written in order. Example:
Expand Down Expand Up @@ -1213,6 +1194,15 @@ UV_EXTERN int uv_pipe_getsockname(const uv_pipe_t* handle,
*/
UV_EXTERN void uv_pipe_pending_instances(uv_pipe_t* handle, int count);

/*
* Should be used to receive handles on ipc pipe.
*
* First - call `uv_pipe_pending_count`, if it is > 0 - initialize handle
* using type, returned by `uv_pipe_pending_type` and call
* `uv_accept(pipe, handle)`.
*/
UV_EXTERN int uv_pipe_pending_count(uv_pipe_t* handle);
UV_EXTERN uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle);

/*
* uv_poll_t is a subclass of uv_handle_t.
Expand Down
10 changes: 10 additions & 0 deletions src/unix/internal.h
Expand Up @@ -120,6 +120,8 @@
# define O_CLOEXEC 0x00100000
#endif

typedef struct uv__stream_queued_fds_s uv__stream_queued_fds_t;

/* handle flags */
enum {
UV_CLOSING = 0x01, /* uv_close() called but not finished. */
Expand All @@ -142,6 +144,13 @@ typedef enum {
UV_CLOCK_FAST = 1 /* Use the fastest clock with <= 1ms granularity. */
} uv_clocktype_t;

struct uv__stream_queued_fds_s {
unsigned int size;
unsigned int offset;
int fds[1];
};


/* core */
int uv__nonblock(int fd, int set);
int uv__close(int fd);
Expand Down Expand Up @@ -226,6 +235,7 @@ void uv__tcp_close(uv_tcp_t* handle);
void uv__timer_close(uv_timer_t* handle);
void uv__udp_close(uv_udp_t* handle);
void uv__udp_finish_close(uv_udp_t* handle);
uv_handle_type uv__handle_type(int fd);

#if defined(__APPLE__)
int uv___stream_fd(uv_stream_t* handle);
Expand Down
28 changes: 28 additions & 0 deletions src/unix/pipe.c
Expand Up @@ -246,3 +246,31 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) {

void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
}


int uv_pipe_pending_count(uv_pipe_t* handle) {
uv__stream_queued_fds_t* queued_fds;

if (!handle->ipc)
return 0;

if (handle->accepted_fd == -1)
return 0;

if (handle->queued_fds == NULL)
return 1;

queued_fds = handle->queued_fds;
return 1 + queued_fds->offset;
}


uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
if (!handle->ipc)
return UV_UNKNOWN_HANDLE;

if (handle->accepted_fd == -1)
return UV_UNKNOWN_HANDLE;
else
return uv__handle_type(handle->accepted_fd);
}

0 comments on commit b284c8d

Please sign in to comment.