Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
prefork II
  • Loading branch information
piscisaureus committed Sep 15, 2011
1 parent cfc5182 commit 8afa067
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 35 deletions.
4 changes: 3 additions & 1 deletion include/uv-win.h
Expand Up @@ -61,7 +61,6 @@ typedef struct uv_buf_t {
OVERLAPPED overlapped; \
size_t queued_bytes; \
}; \
HANDLE wait; \
}; \
struct uv_req_s* next_req;

Expand All @@ -83,10 +82,13 @@ typedef struct uv_buf_t {
HANDLE pipeHandle; \
struct uv_pipe_accept_s* next_pending; \
} uv_pipe_accept_t; \
\
typedef struct uv_tcp_accept_s { \
UV_REQ_FIELDS \
SOCKET accept_socket; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
HANDLE event_handle; \
HANDLE wait_handle; \
struct uv_tcp_accept_s* next_pending; \
} uv_tcp_accept_t;

Expand Down
55 changes: 37 additions & 18 deletions prefork.c
Expand Up @@ -5,8 +5,6 @@
#include <stdio.h>
#include <string.h>

#define NUM_CHILDREN 3 /* Not including the master server */

int server_id;
int accepted = 0;

Expand Down Expand Up @@ -53,7 +51,7 @@ void spawn(int id, SOCKET sock) {
uv_process_t* process;
WSAPROTOCOL_INFOW* blob;
uv_process_options_t options;
char* args[3];
char* args[4];
char id_str[3];
uv_write_t* wr_req;
uv_buf_t buf;
Expand All @@ -64,8 +62,9 @@ void spawn(int id, SOCKET sock) {
_snprintf(id_str, sizeof id_str, "%d", id);

args[0] = exe_path;
args[1] = id_str;
args[2] = NULL;
args[1] = "--child";
args[2] = id_str;
args[3] = NULL;

r = uv_pipe_init(in);
CHECK(r == 0);
Expand Down Expand Up @@ -97,7 +96,7 @@ void cl_close_cb(uv_handle_t* handle) {
}

void cl_write_cb(uv_write_t* req, int status) {
CHECK(status == 0);
//CHECK(status == 0);

uv_close((uv_handle_t*) req->handle, cl_close_cb);

Expand All @@ -111,7 +110,12 @@ void cl_write(uv_tcp_t* handle) {
uv_write_t* req = malloc(sizeof *req);

r = uv_write(req, (uv_stream_t*) handle, &buf, 1, cl_write_cb);
CHECK(r == 0);
if (r) {
LOG("error");
uv_close((uv_handle_t*) handle, cl_close_cb);
free(req);
}


// Pretend our server is very busy:
// Sleep(10);
Expand Down Expand Up @@ -146,18 +150,13 @@ void master() {
r = uv_tcp_init(&server);
CHECK(r == 0);

r = uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", 8000));
r = uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", 80));
CHECK(r == 0);

exe_path_size = sizeof exe_path;
r = uv_exepath(exe_path, &exe_path_size);
CHECK(r == 0);
exe_path[exe_path_size] = '\0';

// Spawn slaves
for (i = NUM_CHILDREN; i > 0; i--) {
spawn(i, server.socket);
}
}


Expand All @@ -179,25 +178,45 @@ void slave() {
CHECK(r == 0);
}


int main(int argv, char** argc) {
int r;

uv_init();

if (argv == 1) {
if (argv < 3) {
int i;
int num_children = 0;

if (argv == 2) {
num_children = strtol(argc[1], NULL, 10);
}

/* We're the master process */
server_id = 0;

master();

// Start listening now
LOG("listen\n");
r = uv_listen((uv_stream_t*) &server, 10, connection_cb);
CHECK(r == 0);

// Spawn slaves
for (i = num_children; i > 0; i--) {
spawn(i, server.socket);
}
} else {
/* We're a slave process */
server_id = strtol(argc[1], NULL, 10);
server_id = strtol(argc[2], NULL, 10);
slave();

// Start listening now
LOG("listen\n");
r = uv_listen((uv_stream_t*) &server, 10, connection_cb);
CHECK(r == 0);
}

// Start listening now
r = uv_listen((uv_stream_t*) &server, 512, connection_cb);
CHECK(r == 0);

r = uv_timer_init(&timer);
CHECK(r == 0);
Expand Down
29 changes: 13 additions & 16 deletions src/win/tcp.c
Expand Up @@ -234,11 +234,7 @@ static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
assert(req != NULL);
assert(!timed_out);

UnregisterWait(req->wait);
CloseHandle(req->overlapped.hEvent);
req->overlapped.hEvent = NULL;

PostQueuedCompletionStatus(LOOP->iocp, req->overlapped.InternalHigh, NULL, &req->overlapped);
PostQueuedCompletionStatus(LOOP->iocp, req->overlapped.InternalHigh, 0, &req->overlapped);
}

static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
Expand Down Expand Up @@ -271,16 +267,8 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {

/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));

if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->overlapped.hEvent = CreateEvent(NULL, 0, 0, NULL);
if (!req->overlapped.hEvent) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
return;
}
req->overlapped.hEvent = (HANDLE) ((DWORD) req->overlapped.hEvent | 1);
req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
}

success = pAcceptExFamily(handle->socket,
Expand All @@ -301,8 +289,8 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
/* The req will be processed with IOCP. */
req->accept_socket = accept_socket;
handle->reqs_pending++;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (!RegisterWaitForSingleObject(&req->wait, req->overlapped.hEvent, post_completion, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
if ((handle->flags & UV_HANDLE_EMULATE_IOCP) && !req->wait_handle) {
if (!RegisterWaitForSingleObject(&req->wait_handle, req->overlapped.hEvent, post_completion, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req((uv_req_t*)req);
return;
Expand Down Expand Up @@ -421,6 +409,15 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
req->type = UV_ACCEPT;
req->accept_socket = INVALID_SOCKET;
req->data = handle;
req->wait_handle = NULL;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
} else {
req->event_handle = NULL;
}
uv_tcp_queue_accept(handle, req);
}

Expand Down

0 comments on commit 8afa067

Please sign in to comment.