Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
win: guarantee write callback ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
pietern authored and piscisaureus committed Sep 16, 2011
1 parent 2640aae commit f0044c0
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 23 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Expand Up @@ -21,3 +21,4 @@ Clifford Heath <clifford.heath@gmail.com>
Jorge Chamorro Bieling <jorge@jorgechamorro.com>
Luis Lavena <luislavena@gmail.com>
Matthew Sporleder <msporleder@gmail.com>
Pieter Noordhuis <pcnoordhuis@gmail.com>
4 changes: 3 additions & 1 deletion include/uv-private/uv-win.h
Expand Up @@ -99,7 +99,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
struct uv_req_s* next_req;

#define UV_WRITE_PRIVATE_FIELDS \
/* empty */
int done; \
uv_write_t* next_write;

#define UV_CONNECT_PRIVATE_FIELDS \
/* empty */
Expand All @@ -125,6 +126,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);

#define uv_stream_connection_fields \
unsigned int write_reqs_pending; \
uv_write_t* write_reqs_tail; \
uv_shutdown_t* shutdown_req;

#define uv_stream_server_fields \
Expand Down
1 change: 1 addition & 0 deletions src/win/internal.h
Expand Up @@ -110,6 +110,7 @@ void uv_process_reqs(uv_loop_t* loop);
*/
void uv_stream_init(uv_loop_t* loop, uv_stream_t* handle);
void uv_connection_init(uv_stream_t* handle);
void uv_insert_pending_write_req(uv_stream_t* handle, uv_write_t* req);

size_t uv_count_bufs(uv_buf_t bufs[], int count);

Expand Down
43 changes: 31 additions & 12 deletions src/win/pipe.c
Expand Up @@ -747,8 +747,11 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
req->type = UV_WRITE;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
req->done = 0;
memset(&req->overlapped, 0, sizeof(req->overlapped));

uv_insert_pending_write_req((uv_stream_t*)handle, req);

result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
Expand Down Expand Up @@ -886,22 +889,38 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,

handle->write_queue_size -= req->queued_bytes;

if (req->cb) {
if (!REQ_SUCCESS(req)) {
loop->last_error = GET_REQ_UV_ERROR(req);
((uv_write_cb)req->cb)(req, -1);
req->done = 1;

while (handle->write_reqs_tail) {
req = handle->write_reqs_tail->next_write;

if (!req->done) {
break;
}

if (req == handle->write_reqs_tail) {
handle->write_reqs_tail = NULL;
} else {
((uv_write_cb)req->cb)(req, 0);
handle->write_reqs_tail->next_write = req->next_write;
}
}

handle->write_reqs_pending--;
if (handle->write_reqs_pending == 0 &&
handle->flags & UV_HANDLE_SHUTTING) {
uv_want_endgame(loop, (uv_handle_t*)handle);
}
if (req->cb) {
if (!REQ_SUCCESS(req)) {
loop->last_error = GET_REQ_UV_ERROR(req);
((uv_write_cb)req->cb)(req, -1);
} else {
((uv_write_cb)req->cb)(req, 0);
}
}

DECREASE_PENDING_REQ_COUNT(handle);
handle->write_reqs_pending--;
if (handle->flags & UV_HANDLE_SHUTTING &&
handle->write_reqs_pending == 0) {
uv_want_endgame(loop, (uv_handle_t*)handle);
}

DECREASE_PENDING_REQ_COUNT(handle);
}
}


Expand Down
14 changes: 14 additions & 0 deletions src/win/stream.c
Expand Up @@ -41,13 +41,27 @@ void uv_stream_init(uv_loop_t* loop, uv_stream_t* handle) {
void uv_connection_init(uv_stream_t* handle) {
handle->flags |= UV_HANDLE_CONNECTION;
handle->write_reqs_pending = 0;
handle->write_reqs_tail = NULL;

uv_req_init(handle->loop, (uv_req_t*) &(handle->read_req));
handle->read_req.type = UV_READ;
handle->read_req.data = handle;
}


void uv_insert_pending_write_req(uv_stream_t* handle, uv_write_t* req) {
req->next_write = NULL;
if (handle->write_reqs_tail) {
req->next_write = handle->write_reqs_tail->next_write;
handle->write_reqs_tail->next_write = req;
handle->write_reqs_tail = req;
} else {
req->next_write = req;
handle->write_reqs_tail = req;
}
}


int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
switch (stream->type) {
case UV_TCP:
Expand Down
39 changes: 29 additions & 10 deletions src/win/tcp.c
Expand Up @@ -644,8 +644,11 @@ int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
req->type = UV_WRITE;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
req->done = 0;
memset(&req->overlapped, 0, sizeof(req->overlapped));

uv_insert_pending_write_req((uv_stream_t*)handle, req);

result = WSASend(handle->socket,
(WSABUF*)bufs,
bufcnt,
Expand Down Expand Up @@ -781,18 +784,34 @@ void uv_process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,

handle->write_queue_size -= req->queued_bytes;

if (req->cb) {
loop->last_error = GET_REQ_UV_SOCK_ERROR(req);
((uv_write_cb)req->cb)(req, loop->last_error.code == UV_OK ? 0 : -1);
}
req->done = 1;

handle->write_reqs_pending--;
if (handle->flags & UV_HANDLE_SHUTTING &&
handle->write_reqs_pending == 0) {
uv_want_endgame(loop, (uv_handle_t*)handle);
}
while (handle->write_reqs_tail) {
req = handle->write_reqs_tail->next_write;

DECREASE_PENDING_REQ_COUNT(handle);
if (!req->done) {
break;
}

if (req == handle->write_reqs_tail) {
handle->write_reqs_tail = NULL;
} else {
handle->write_reqs_tail->next_write = req->next_write;
}

if (req->cb) {
loop->last_error = GET_REQ_UV_SOCK_ERROR(req);
((uv_write_cb)req->cb)(req, loop->last_error.code == UV_OK ? 0 : -1);
}

handle->write_reqs_pending--;
if (handle->flags & UV_HANDLE_SHUTTING &&
handle->write_reqs_pending == 0) {
uv_want_endgame(loop, (uv_handle_t*)handle);
}

DECREASE_PENDING_REQ_COUNT(handle);
}
}


Expand Down

0 comments on commit f0044c0

Please sign in to comment.