Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
Work around windows udp bug, allow zero reads
Browse files Browse the repository at this point in the history
  • Loading branch information
piscisaureus committed Oct 20, 2011
1 parent e8a418e commit 51e9dbc
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 60 deletions.
25 changes: 24 additions & 1 deletion include/uv-private/uv-win.h
Expand Up @@ -104,6 +104,27 @@
DWORD dwFlags);
#endif

typedef int (WSAAPI* LPFN_WSARECV)
(SOCKET socket,
LPWSABUF buffers,
DWORD buffer_count,
LPDWORD bytes,
LPDWORD flags,
LPWSAOVERLAPPED overlapped,
LPWSAOVERLAPPED_COMPLETION_ROUTINE
completion_routine);

typedef int (WSAAPI* LPFN_WSARECVFROM)
(SOCKET socket,
LPWSABUF buffers,
DWORD buffer_count,
LPDWORD bytes,
LPDWORD flags,
struct sockaddr* addr,
LPINT addr_len,
LPWSAOVERLAPPED overlapped,
LPWSAOVERLAPPED_COMPLETION_ROUTINE completion_routine);


/**
* It should be possible to cast uv_buf_t[] to WSABUF[]
Expand Down Expand Up @@ -236,7 +257,9 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
struct sockaddr_storage recv_from; \
int recv_from_len; \
uv_udp_recv_cb recv_cb; \
uv_alloc_cb alloc_cb;
uv_alloc_cb alloc_cb; \
LPFN_WSARECV func_wsarecv; \
LPFN_WSARECVFROM func_wsarecvfrom;

#define uv_pipe_server_fields \
uv_pipe_accept_t accept_reqs[4]; \
Expand Down
2 changes: 1 addition & 1 deletion include/uv.h
Expand Up @@ -1061,7 +1061,7 @@ int uv_fs_lstat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb);
int uv_fs_link(uv_loop_t* loop, uv_fs_t* req, const char* path,
const char* new_path, uv_fs_cb cb);

/*
/*
* This flag can be used with uv_fs_symlink on Windows
* to specify whether path argument points to a directory.
*/
Expand Down
30 changes: 28 additions & 2 deletions src/win/internal.h
Expand Up @@ -307,14 +307,40 @@ uv_err_code uv_translate_sys_error(int sys_errno);


/*
* Initialization for the windows and winsock api
* Winapi and ntapi utility functions
*/
void uv_winapi_init();


/*
* Winsock utility functions
*/
void uv_winsock_init();

int uv_ntstatus_to_winsock_error(NTSTATUS status);

BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target);
BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target);

int WSAAPI uv_wsarecv_workaround(SOCKET socket, WSABUF* buffers,
DWORD buffer_count, DWORD* bytes, DWORD* flags, WSAOVERLAPPED *overlapped,
LPWSAOVERLAPPED_COMPLETION_ROUTINE completion_routine);
int WSAAPI uv_wsarecvfrom_workaround(SOCKET socket, WSABUF* buffers,
DWORD buffer_count, DWORD* bytes, DWORD* flags, struct sockaddr* addr,
int* addr_len, WSAOVERLAPPED *overlapped,
LPWSAOVERLAPPED_COMPLETION_ROUTINE completion_routine);

/* Threads and synchronization */
/* Whether ipv6 is supported */
extern int uv_allow_ipv6;

/* Ip address used to bind to any port at any interface */
extern struct sockaddr_in uv_addr_ip4_any_;
extern struct sockaddr_in6 uv_addr_ip6_any_;


/*
* Threads and synchronization
*/
typedef struct uv_once_s {
unsigned char ran;
/* The actual event handle must be aligned to sizeof(HANDLE), so in */
Expand Down
114 changes: 65 additions & 49 deletions src/win/udp.c
Expand Up @@ -24,17 +24,15 @@
#include "uv.h"
#include "../uv-common.h"
#include "internal.h"
#include <stdio.h>

#if 0

/*
* Threshold of active udp streams for which to preallocate udp read buffers.
*/
const unsigned int uv_active_udp_streams_threshold = 0;

/* A zero-size buffer for use by uv_udp_read */
static char uv_zero_[] = "";
#endif

/* Counter to keep track of active udp streams */
static unsigned int active_udp_streams = 0;
Expand Down Expand Up @@ -63,6 +61,8 @@ int uv_udp_getsockname(uv_udp_t* handle, struct sockaddr* name,
static int uv_udp_set_socket(uv_loop_t* loop, uv_udp_t* handle,
SOCKET socket) {
DWORD yes = 1;
WSAPROTOCOL_INFOW info;
int opt_len;

assert(handle->socket == INVALID_SOCKET);

Expand All @@ -89,14 +89,33 @@ static int uv_udp_set_socket(uv_loop_t* loop, uv_udp_t* handle,
}

if (pSetFileCompletionNotificationModes) {
if (pSetFileCompletionNotificationModes((HANDLE)socket,
FILE_SKIP_SET_EVENT_ON_HANDLE |
FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)) {
handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
} else if (GetLastError() != ERROR_INVALID_FUNCTION) {
/* All know windowses that support SetFileCompletionNotificationModes */
/* have a bug that makes it impossible to use this function in */
/* conjunction with datagram sockets. We can work around that but only */
/* if the user is using the default UDP driver (AFD) and has no other */
/* LSPs stacked on top. Here we check whether that is the case. */
opt_len = (int) sizeof info;
if (!getsockopt(socket,
SOL_SOCKET,
SO_PROTOCOL_INFOW,
(char*) &info,
&opt_len) == SOCKET_ERROR) {
uv__set_sys_error(loop, GetLastError());
return -1;
}

if (info.ProtocolChain.ChainLen == 1) {
if (pSetFileCompletionNotificationModes((HANDLE)socket,
FILE_SKIP_SET_EVENT_ON_HANDLE |
FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)) {
handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
handle->func_wsarecv = uv_wsarecv_workaround;
handle->func_wsarecvfrom = uv_wsarecvfrom_workaround;
} else if (GetLastError() != ERROR_INVALID_FUNCTION) {
uv__set_sys_error(loop, GetLastError());
return -1;
}
}
}

handle->socket = socket;
Expand All @@ -111,6 +130,8 @@ int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
handle->reqs_pending = 0;
handle->loop = loop;
handle->flags = 0;
handle->func_wsarecv = WSARecv;
handle->func_wsarecvfrom = WSARecvFrom;

uv_req_init(loop, (uv_req_t*) &(handle->recv_req));
handle->recv_req.type = UV_UDP_RECV;
Expand Down Expand Up @@ -248,10 +269,9 @@ static void uv_udp_queue_recv(uv_loop_t* loop, uv_udp_t* handle) {
* Preallocate a read buffer if the number of active streams is below
* the threshold.
*/
#if 0
if (active_udp_streams < uv_active_udp_streams_threshold) {
handle->flags &= ~UV_HANDLE_ZERO_READ;
#endif

handle->recv_buffer = handle->alloc_cb((uv_handle_t*) handle, 65536);
assert(handle->recv_buffer.len > 0);

Expand All @@ -260,15 +280,15 @@ static void uv_udp_queue_recv(uv_loop_t* loop, uv_udp_t* handle) {
handle->recv_from_len = sizeof handle->recv_from;
flags = 0;

result = WSARecvFrom(handle->socket,
(WSABUF*) &buf,
1,
&bytes,
&flags,
(struct sockaddr*) &handle->recv_from,
&handle->recv_from_len,
&req->overlapped,
NULL);
result = handle->func_wsarecvfrom(handle->socket,
(WSABUF*) &buf,
1,
&bytes,
&flags,
(struct sockaddr*) &handle->recv_from,
&handle->recv_from_len,
&req->overlapped,
NULL);

if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
/* Process the req without IOCP. */
Expand All @@ -286,21 +306,21 @@ static void uv_udp_queue_recv(uv_loop_t* loop, uv_udp_t* handle) {
uv_insert_pending_req(loop, req);
handle->reqs_pending++;
}
#if 0

} else {
handle->flags |= UV_HANDLE_ZERO_READ;

buf.base = (char*) uv_zero_;
buf.len = 0;
flags = MSG_PARTIAL;
flags = MSG_PEEK;

result = WSARecv(handle->socket,
(WSABUF*) &buf,
1,
&bytes,
&flags,
&req->overlapped,
NULL);
result = handle->func_wsarecv(handle->socket,
(WSABUF*) &buf,
1,
&bytes,
&flags,
&req->overlapped,
NULL);

if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
/* Process the req without IOCP. */
Expand All @@ -319,7 +339,6 @@ static void uv_udp_queue_recv(uv_loop_t* loop, uv_udp_t* handle) {
handle->reqs_pending++;
}
}
#endif
}


Expand Down Expand Up @@ -448,34 +467,27 @@ void uv_process_udp_recv_req(uv_loop_t* loop, uv_udp_t* handle,
handle->flags &= ~UV_HANDLE_READ_PENDING;

if (!REQ_SUCCESS(req) &&
GET_REQ_STATUS(req) != STATUS_RECEIVE_EXPEDITED) {
GET_REQ_SOCK_ERROR(req) != WSAEMSGSIZE) {
/* An error occurred doing the read. */
if ((handle->flags & UV_HANDLE_READING)) {
uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req));
if (handle->flags & UV_HANDLE_READING) {
uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req));
uv_udp_recv_stop(handle);
#if 0
buf = (handle->flags & UV_HANDLE_ZERO_READ) ?
uv_buf_init(NULL, 0) : handle->recv_buffer;
#else
buf = handle->recv_buffer;
#endif
handle->recv_cb(handle, -1, buf, NULL, 0);
}
goto done;
}

#if 0
if (!(handle->flags & UV_HANDLE_ZERO_READ)) {
#endif
/* Successful read */
partial = (GET_REQ_STATUS(req) == STATUS_RECEIVE_EXPEDITED);
partial = !REQ_SUCCESS(req);
handle->recv_cb(handle,
req->overlapped.InternalHigh,
handle->recv_buffer,
(struct sockaddr*) &handle->recv_from,
partial ? UV_UDP_PARTIAL : 0);
#if 0
} else {
} else if (handle->flags & UV_HANDLE_READING) {
DWORD bytes, err, flags;
struct sockaddr_storage from;
int from_len;
Expand All @@ -487,7 +499,8 @@ void uv_process_udp_recv_req(uv_loop_t* loop, uv_udp_t* handle,

memset(&from, 0, sizeof from);
from_len = sizeof from;
flags = MSG_PARTIAL;

flags = 0;

if (WSARecvFrom(handle->socket,
(WSABUF*)&buf,
Expand All @@ -500,14 +513,18 @@ void uv_process_udp_recv_req(uv_loop_t* loop, uv_udp_t* handle,
NULL) != SOCKET_ERROR) {

/* Message received */
handle->recv_cb(handle,
bytes,
buf,
(struct sockaddr*) &from,
(flags & MSG_PARTIAL) ? UV_UDP_PARTIAL : 0);
handle->recv_cb(handle, bytes, buf, (struct sockaddr*) &from, 0);
} else {
err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
if (err == WSAEMSGSIZE) {
/* Message truncated */
handle->recv_cb(handle,
bytes,
buf,
(struct sockaddr*) &from,
UV_UDP_PARTIAL);
} if (err == WSAEWOULDBLOCK) {
/* Kernel buffer empty */
uv__set_sys_error(loop, WSAEWOULDBLOCK);
handle->recv_cb(handle, 0, buf, NULL, 0);
} else {
Expand All @@ -517,7 +534,6 @@ void uv_process_udp_recv_req(uv_loop_t* loop, uv_udp_t* handle,
}
}
}
#endif

done:
/* Post another read if still reading and not closing. */
Expand Down
8 changes: 8 additions & 0 deletions src/win/winapi.c
Expand Up @@ -27,6 +27,7 @@


sRtlNtStatusToDosError pRtlNtStatusToDosError;
sNtDeviceIoControlFile pNtDeviceIoControlFile;
sNtQueryInformationFile pNtQueryInformationFile;
sNtSetInformationFile pNtSetInformationFile;
sGetQueuedCompletionStatusEx pGetQueuedCompletionStatusEx;
Expand Down Expand Up @@ -57,6 +58,13 @@ void uv_winapi_init() {
uv_fatal_error(GetLastError(), "GetProcAddress");
}

pNtDeviceIoControlFile = (sNtDeviceIoControlFile) GetProcAddress(
ntdll_module,
"NtDeviceIoControlFile");
if (pNtDeviceIoControlFile == NULL) {
uv_fatal_error(GetLastError(), "GetProcAddress");
}

pNtSetInformationFile = (sNtSetInformationFile) GetProcAddress(
ntdll_module,
"NtSetInformationFile");
Expand Down
18 changes: 18 additions & 0 deletions src/win/winapi.h
Expand Up @@ -4270,9 +4270,26 @@ typedef enum _FILE_INFORMATION_CLASS {
FILE_SPECIAL_ACCESS)
#endif

typedef VOID (NTAPI *PIO_APC_ROUTINE)
(PVOID ApcContext,
PIO_STATUS_BLOCK IoStatusBlock,
ULONG Reserved);

typedef ULONG (NTAPI *sRtlNtStatusToDosError)
(NTSTATUS Status);

typedef NTSTATUS (NTAPI *sNtDeviceIoControlFile)
(HANDLE FileHandle,
HANDLE Event,
PIO_APC_ROUTINE ApcRoutine,
PVOID ApcContext,
PIO_STATUS_BLOCK IoStatusBlock,
ULONG IoControlCode,
PVOID InputBuffer,
ULONG InputBufferLength,
PVOID OutputBuffer,
ULONG OutputBufferLength);

typedef NTSTATUS (NTAPI *sNtQueryInformationFile)
(HANDLE FileHandle,
PIO_STATUS_BLOCK IoStatusBlock,
Expand Down Expand Up @@ -4325,6 +4342,7 @@ typedef BOOLEAN (WINAPI* sCreateSymbolicLinkW)

/* Ntapi function pointers */
extern sRtlNtStatusToDosError pRtlNtStatusToDosError;
extern sNtDeviceIoControlFile pNtDeviceIoControlFile;
extern sNtQueryInformationFile pNtQueryInformationFile;
extern sNtSetInformationFile pNtSetInformationFile;

Expand Down

0 comments on commit 51e9dbc

Please sign in to comment.