Skip to content

Commit

Permalink
Showing 1 changed file with 98 additions and 39 deletions.
137 changes: 98 additions & 39 deletions core/src/main/java/org/jruby/ext/socket/RubyBasicSocket.java
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
* rights and limitations under the License.
*
* Copyright (C) 2007 Ola Bini <ola@ologix.com>
*
*
* Alternatively, the contents of this file may be used under the terms of
* either of the GNU General Public License Version 2 or later (the "GPL"),
* or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
@@ -69,6 +69,7 @@

import jnr.constants.platform.SocketLevel;
import jnr.constants.platform.SocketOption;
import org.jruby.util.TypeConverter;
import org.jruby.util.io.Sockaddr;


@@ -143,53 +144,112 @@ public IRubyObject send(ThreadContext context, IRubyObject _mesg, IRubyObject _f
return send(context, _mesg, _flags);
}

@Deprecated
@JRubyMethod
public IRubyObject recv(ThreadContext context, IRubyObject length) {
return recv(context, length, null, null);
}

@JRubyMethod(required = 2, optional = 1) // (length) required = 1 handled above
public IRubyObject recv(ThreadContext context, IRubyObject[] args) {
IRubyObject length; RubyString str; IRubyObject flags;

switch (args.length) {
case 1:
return recv(context, args[0]);
case 3:
length = args[0];
str = (RubyString) args[1];
flags = args[2].convertToHash();
break;
case 2:
return recv(context, args[0], args[1]);
length = args[0];
flags = TypeConverter.checkHashType(context.runtime, args[1]);
str = flags.isNil() ? (RubyString) args[1] : null;
break;
case 1:
length = args[0];
str = null; flags = null;
break;
default:
Arity.raiseArgumentError(context.runtime, args, 1, 2);
return null; // not reached
length = context.nil;
str = null; flags = null;
}

return recv(context, length, str, flags);
}

@JRubyMethod
public IRubyObject recv(ThreadContext context, IRubyObject _length) {
Ruby runtime = context.runtime;
@Deprecated
public IRubyObject recv(ThreadContext context, IRubyObject length, IRubyObject flags) {
return recv(context, new IRubyObject[] { length, flags });
}

private IRubyObject recv(ThreadContext context, IRubyObject length,
RubyString str, IRubyObject flags) {
// TODO: implement flags
final ByteBuffer buffer = ByteBuffer.allocate(RubyNumeric.fix2int(length));

ByteList bytes = doReceive(context, RubyNumeric.fix2int(_length));
ByteList bytes = doReceive(context, buffer);

if (bytes == null) return context.nil;

return RubyString.newString(runtime, bytes);
}

@JRubyMethod
public IRubyObject recv(ThreadContext context, IRubyObject _length, IRubyObject _flags) {
// TODO: implement flags
return recv(context, _length);
if (str != null) {
str.setValue(bytes);
return str;
}
return RubyString.newString(context.runtime, bytes);
}

@JRubyMethod
public IRubyObject recv_nonblock(ThreadContext context, IRubyObject _length) {
Ruby runtime = context.runtime;
public IRubyObject recv_nonblock(ThreadContext context, IRubyObject length) {
return recv_nonblock(context, length, null, null);
}

ByteList bytes = doReceiveNonblock(context, RubyNumeric.fix2int(_length));
@JRubyMethod(required = 2, optional = 1) // (length) required = 1 handled above
public IRubyObject recv_nonblock(ThreadContext context, IRubyObject[] args) {
IRubyObject length; RubyString str; IRubyObject flags;

if (bytes == null) {
throw runtime.newErrnoEAGAINReadableError("recvfrom(2)");
switch (args.length) {
case 3:
length = args[0];
str = (RubyString) args[1];
flags = args[2].convertToHash();
break;
case 2:
length = args[0];
flags = TypeConverter.checkHashType(context.runtime, args[1]);
str = flags.isNil() ? (RubyString) args[1] : null;
break;
case 1:
length = args[0];
str = null; flags = null;
break;
default:
length = context.nil;
str = null; flags = null;
}

return RubyString.newString(runtime, bytes);
return recv_nonblock(context, length, str, flags);
}

@JRubyMethod
public IRubyObject recv_nonblock(ThreadContext context, IRubyObject _length, IRubyObject _flags) {
@Deprecated
public IRubyObject recv_nonblock(ThreadContext context, IRubyObject length, IRubyObject flags) {
return recv_nonblock(context, new IRubyObject[] { length, flags });
}

private IRubyObject recv_nonblock(ThreadContext context, IRubyObject length,
RubyString str, IRubyObject flags) {
// TODO: implement flags
return recv_nonblock(context, _length);
final ByteBuffer buffer = ByteBuffer.allocate(RubyNumeric.fix2int(length));

ByteList bytes = doReceiveNonblock(context, buffer);

if (bytes == null) {
throw context.runtime.newErrnoEAGAINReadableError("recvfrom(2)");
}

if (str != null) {
str.setValue(bytes);
return str;
}
return RubyString.newString(context.runtime, bytes);
}

@JRubyMethod
@@ -216,7 +276,7 @@ public IRubyObject getsockopt(ThreadContext context, IRubyObject _level, IRubyOb
}

value = SocketType.forChannel(channel).getSocketOption(channel, opt);

return new Option(runtime, ProtocolFamily.PF_INET, level, opt, value);

default:
@@ -451,23 +511,21 @@ public IRubyObject readmsg_nonblock(ThreadContext context, IRubyObject[] args) {
throw context.runtime.newNotImplementedError("readmsg_nonblock is not implemented");
}

private ByteList doReceive(ThreadContext context, int length) {
private ByteList doReceive(ThreadContext context, final ByteBuffer buffer) {
Ruby runtime = context.runtime;
OpenFile fptr;

fptr = getOpenFileChecked();
fptr.checkReadable(context);

ByteBuffer buf = ByteBuffer.allocate(length);

try {
context.getThread().beforeBlockingCall();

int read = openFile.readChannel().read(buf);
int read = openFile.readChannel().read(buffer);

if (read == 0) return null;

return new ByteList(buf.array(), 0, buf.position());
return new ByteList(buffer.array(), 0, buffer.position());

} catch (IOException e) {
// All errors to sysread should be SystemCallErrors, but on a closed stream
@@ -484,7 +542,7 @@ private ByteList doReceive(ThreadContext context, int length) {
}
}

public ByteList doReceiveNonblock(ThreadContext context, int length) {
public ByteList doReceiveNonblock(ThreadContext context, final ByteBuffer buffer) {
Ruby runtime = context.runtime;
Channel channel = getChannel();

@@ -501,12 +559,13 @@ public ByteList doReceiveNonblock(ThreadContext context, int length) {
selectable.configureBlocking(false);

try {
return doReceive(context, length);
} finally {
return doReceive(context, buffer);
}
finally {
selectable.configureBlocking(oldBlocking);
}

} catch(IOException e) {
}
catch(IOException e) {
throw runtime.newIOErrorFromException(e);
}
}
@@ -626,7 +685,7 @@ protected void initSocket(ChannelFD fd) {
// see rsock_init_sock in MRI; sockets are initialized to binary
setAscii8bitBinmode();
}

private Channel getOpenChannel() {
return getOpenFileChecked().channel();
}

0 comments on commit 775d7e5

Please sign in to comment.