Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Do load balancing test in test-child-process-fork2.
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Oct 7, 2011
1 parent 153629c commit 26c08a3
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 47 deletions.
43 changes: 25 additions & 18 deletions lib/child_process_uv.js
Expand Up @@ -20,6 +20,7 @@
// USE OR OTHER DEALINGS IN THE SOFTWARE.

var EventEmitter = require('events').EventEmitter;
var net = require('net');
var Process = process.binding('process_wrap').Process;
var inherits = require('util').inherits;
var constants; // if (!constants) constants = process.binding('constants');
Expand All @@ -39,8 +40,7 @@ function createPipe(ipc) {
}

function createSocket(pipe, readable) {
var Socket = require('net').Socket;
var s = new Socket({ handle: pipe });
var s = new net.Socket({ handle: pipe });

if (readable) {
s.writable = false;
Expand Down Expand Up @@ -75,16 +75,24 @@ function setupChannel(target, channel) {

channel.onread = function(pool, offset, length, recvStream) {
if (pool) {
for (var i = 0; i < length; i++) {
if (pool[offset + i] === LF) {
jsonBuffer += pool.toString('ascii', offset, offset + i);
var message = JSON.parse(jsonBuffer);
jsonBuffer = pool.toString('ascii', i, length);
offset = i + 1;

target.emit('message', message, recvStream);
jsonBuffer += pool.toString('ascii', offset, offset + length);

var i;
while ((i = jsonBuffer.indexOf('\n')) >= 0) {
var json = jsonBuffer.slice(0, i);
var message = JSON.parse(json);
jsonBuffer = jsonBuffer.slice(i + 1);

if (recvStream) {
// TODO support other types of stream.
// TODO probably need a queue of recvStreams
var server = new net.Server();
server._handle = recvStream;
}

target.emit('message', message, server);
}

} else {
channel.close();
target._channel = null;
Expand All @@ -94,13 +102,12 @@ function setupChannel(target, channel) {
target.send = function(message, sendStream) {
if (!target._channel) throw new Error("channel closed");

// Open up net.Socket instances
if (sendStream instanceof require('net').Socket ||
sendStream instanceof require('net').Server) {
sendStream = sendStream._handle;
if (!sendStream) {
throw new Error("sendStream handle not yet opened");
// Open up net.Server instances
if (sendStream) {
if (false == sendStream instanceof net.Server) {
throw new Error("sendStream must be instance of net.Server");
}
sendStream = sendStream._handle;
}

// For overflow protection don't write if channel queue is too deep.
Expand All @@ -114,10 +121,10 @@ function setupChannel(target, channel) {

if (!writeReq) {
throw new Error(errno + " cannot write to IPC channel.");
} else {
writeReq.oncomplete = nop;
}

writeReq.oncomplete = nop;

return true;
};

Expand Down
52 changes: 29 additions & 23 deletions lib/net_uv.js
Expand Up @@ -621,48 +621,54 @@ function toPort(x) { return (x = Number(x)) >= 0 ? x : false; }
function listen(self, address, port, addressType) {
var r = 0;

// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if (!self._handle) {
// assign handle in listen, and clean up if bind or listen fails
self._handle =
(port == -1 && addressType == -1) ? createPipe() : createTCP();
}

self._handle.socket = self;
self._handle.onconnection = onconnection;
if (address || port) {
debug('bind to ' + address);
if (addressType == 6) {
r = self._handle.bind6(address, port);
} else {
r = self._handle.bind(address, port);
}
}

if (address || port) {
debug('bind to ' + address);
if (addressType == 6) {
r = self._handle.bind6(address, port);
} else {
r = self._handle.bind(address, port);
if (r) {
self._handle.close();
self._handle = null;

process.nextTick(function() {
self.emit('error', errnoException(errno, 'listen'));
});
return;
}
}

self._handle.onconnection = onconnection;
self._handle.socket = self;

r = self._handle.listen(self._backlog || 128);

if (r) {
self._handle.close();
self._handle = null;

process.nextTick(function() {
self.emit('error', errnoException(errno, 'listen'));
});
} else {
r = self._handle.listen(self._backlog || 128);
if (r) {
self._handle.close();
self._handle = null;

process.nextTick(function() {
self.emit('error', errnoException(errno, 'listen'));
});
} else {
process.nextTick(function() {
self.emit('listening');
});
}
return;
}

process.nextTick(function() {
self.emit('listening');
});
}


Server.prototype.listen = function() {
var self = this;

Expand Down
20 changes: 19 additions & 1 deletion test/fixtures/fork2.js
@@ -1,9 +1,27 @@
var assert = require('assert');
var net = require('net');

var connections = 0;

process.on('message', function(m, server) {
console.log('CHILD got message:', m);
assert.ok(m.hello);

assert.ok(server);
process.send({ gotHandle: true });
assert.ok(server instanceof net.Server);

// TODO need better API for this.
server._backlog = 9;

server.listen(function() {
process.send({ gotHandle: true });
});

server.on('connection', function(c) {
connections++;
console.log('CHILD got connection');
c.destroy();
process.send({ childConnections: connections });
});
});

38 changes: 33 additions & 5 deletions test/simple/test-child-process-fork2.js
Expand Up @@ -3,25 +3,53 @@ var common = require('../common');
var fork = require('child_process').fork;
var net = require('net');

var socketCloses = 0;
var N = 10;

var n = fork(common.fixturesDir + '/fork2.js');

var messageCount = 0;

var server = new net.Server();
var server = new net.Server(function(c) {
console.log('PARENT got connection');
c.destroy();
});

// TODO need better API for this.
server._backlog = 9;

server.listen(common.PORT, function() {
console.log('PARENT send child server handle');
n.send({ hello: 'world' }, server);
});

function makeConnections() {
for (var i = 0; i < N; i++) {
var socket = net.connect(common.PORT, function() {
console.log("CLIENT connected");
});

socket.on("close", function() {
socketCloses++;
console.log("CLIENT closed " + socketCloses);
if (socketCloses == N) {
n.kill();
server.close();
}
});
}
}

n.on('message', function(m) {
console.log('PARENT got message:', m);
assert.ok(m.gotHandle);
if (m.gotHandle) {
makeConnections();
}
messageCount++;
n.kill();
server.close();
});

process.on('exit', function() {
assert.equal(1, messageCount);
assert.equal(10, socketCloses);
assert.ok(messageCount > 1);
});

0 comments on commit 26c08a3

Please sign in to comment.