Skip to content

Commit

Permalink
Showing 2 changed files with 75 additions and 37 deletions.
77 changes: 74 additions & 3 deletions core/src/main/java/org/jruby/RubyThread.java
Original file line number Diff line number Diff line change
@@ -1417,27 +1417,98 @@ private boolean abortOnException(Ruby runtime) {
public static RubyThread mainThread(IRubyObject receiver) {
return receiver.getRuntime().getThreadService().getMainThread();
}


/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations or the given timeout.
*
* @param io the RubyIO that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @return true if the IO's channel became ready for the requested operations, false if
* it was not selectable.
*/
public boolean select(RubyIO io, int ops) {
return select(io.getChannel(), io.getOpenFile(), ops);
}


/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations or the given timeout.
*
* @param io the RubyIO that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @param timeout a timeout in ms to limit the select. Less than zero selects forever,
* zero selects and returns ready channels or nothing immediately, and
* greater than zero selects for at most that many ms.
* @return true if the IO's channel became ready for the requested operations, false if
* it timed out or was not selectable.
*/
public boolean select(RubyIO io, int ops, long timeout) {
return select(io.getChannel(), io.getOpenFile(), ops, timeout);
}

/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations.
*
* @param channel the channel to perform a select against. If this is not
* a selectable channel, then this method will just return true.
* @param fptr the fptr that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @return true if the channel became ready for the requested operations, false if
* it was not selectable.
*/
public boolean select(Channel channel, OpenFile fptr, int ops) {
return select(channel, fptr, ops, -1);
}

/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations.
*
* @param channel the channel to perform a select against. If this is not
* a selectable channel, then this method will just return true.
* @param io the RubyIO that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @return true if the channel became ready for the requested operations, false if
* it was not selectable.
*/
public boolean select(Channel channel, RubyIO io, int ops) {
return select(channel, io == null ? null : io.getOpenFile(), ops, -1);
}

/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations or the given timeout.
*
* @param channel the channel to perform a select against. If this is not
* a selectable channel, then this method will just return true.
* @param io the RubyIO that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @param timeout a timeout in ms to limit the select. Less than zero selects forever,
* zero selects and returns ready channels or nothing immediately, and
* greater than zero selects for at most that many ms.
* @return true if the channel became ready for the requested operations, false if
* it timed out or was not selectable.
*/
public boolean select(Channel channel, RubyIO io, int ops, long timeout) {
return select(channel, io == null ? null : io.getOpenFile(), ops, timeout);
}

/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations or the given timeout.
*
* @param channel the channel to perform a select against. If this is not
* a selectable channel, then this method will just return true.
* @param fptr the fptr that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @param timeout a timeout in ms to limit the select. Less than zero selects forever,
* zero selects and returns ready channels or nothing immediately, and
* greater than zero selects for at most that many ms.
* @return true if the channel became ready for the requested operations, false if
* it timed out or was not selectable.
*/
public boolean select(Channel channel, OpenFile fptr, int ops, long timeout) {
// Use selectables but only if they're not associated with a file (which has odd select semantics)
if (channel instanceof SelectableChannel && (fptr == null || !fptr.fd().isNativeFile)) {
@@ -1449,7 +1520,7 @@ public boolean select(Channel channel, OpenFile fptr, int ops, long timeout) {
SelectionKey key = null;
try {
selectable.configureBlocking(false);

if (fptr != null) fptr.addBlockingThread(this);
currentSelector = getRuntime().getSelectorPool().get(selectable.provider());

35 changes: 1 addition & 34 deletions core/src/main/java/org/jruby/util/io/OpenFile.java
Original file line number Diff line number Diff line change
@@ -508,40 +508,7 @@ public boolean ready(Ruby runtime, RubyThread thread, int ops, long timeout) {
boolean locked = lock();
try {
if (fd.chSelect != null) {
int ready_stat = 0;
Selector sel = SelectorFactory.openWithRetryFrom(null, fd.chSelect.provider());
synchronized (fd.chSelect.blockingLock()) {
boolean is_block = fd.chSelect.isBlocking();
boolean addedThread = false;
try {
fd.chSelect.configureBlocking(false);
fd.chSelect.register(sel, ops);
if (timeout == -1) {
ready_stat = sel.selectNow();
} else {
addedThread = true;
addBlockingThread(thread);
// release lock while selecting
unlock();
try {
sel.select(timeout);
} finally {
lock();
}
}
sel.close();
} finally {
if (addedThread) removeBlockingThread(thread);
if (sel != null) {
try {
sel.close();
} catch (Exception e) {
}
}
fd.chSelect.configureBlocking(is_block);
}
}
return ready_stat == 1;
return thread.select(fd.chSelect, this, ops, timeout);
} else {
if (fd.chSeek != null) {
return fd.chSeek.position() != -1

0 comments on commit 739e00f

Please sign in to comment.