Skip to content

Commit

Permalink
OpenFile.ready needs to use interruptible select logic.
Browse files Browse the repository at this point in the history
This should fix some test hangs on Travis.
  • Loading branch information
headius committed Nov 11, 2014
1 parent fad8d69 commit 739e00f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 37 deletions.
77 changes: 74 additions & 3 deletions core/src/main/java/org/jruby/RubyThread.java
Expand Up @@ -1417,27 +1417,98 @@ private boolean abortOnException(Ruby runtime) {
public static RubyThread mainThread(IRubyObject receiver) {
return receiver.getRuntime().getThreadService().getMainThread();
}


/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations or the given timeout.
*
* @param io the RubyIO that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @return true if the IO's channel became ready for the requested operations, false if
* it was not selectable.
*/
public boolean select(RubyIO io, int ops) {
return select(io.getChannel(), io.getOpenFile(), ops);
}


/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations or the given timeout.
*
* @param io the RubyIO that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @param timeout a timeout in ms to limit the select. Less than zero selects forever,
* zero selects and returns ready channels or nothing immediately, and
* greater than zero selects for at most that many ms.
* @return true if the IO's channel became ready for the requested operations, false if
* it timed out or was not selectable.
*/
public boolean select(RubyIO io, int ops, long timeout) {
return select(io.getChannel(), io.getOpenFile(), ops, timeout);
}

/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations.
*
* @param channel the channel to perform a select against. If this is not
* a selectable channel, then this method will just return true.
* @param fptr the fptr that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @return true if the channel became ready for the requested operations, false if
* it was not selectable.
*/
public boolean select(Channel channel, OpenFile fptr, int ops) {
return select(channel, fptr, ops, -1);
}

/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations.
*
* @param channel the channel to perform a select against. If this is not
* a selectable channel, then this method will just return true.
* @param io the RubyIO that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @return true if the channel became ready for the requested operations, false if
* it was not selectable.
*/
public boolean select(Channel channel, RubyIO io, int ops) {
return select(channel, io == null ? null : io.getOpenFile(), ops, -1);
}

/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations or the given timeout.
*
* @param channel the channel to perform a select against. If this is not
* a selectable channel, then this method will just return true.
* @param io the RubyIO that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @param timeout a timeout in ms to limit the select. Less than zero selects forever,
* zero selects and returns ready channels or nothing immediately, and
* greater than zero selects for at most that many ms.
* @return true if the channel became ready for the requested operations, false if
* it timed out or was not selectable.
*/
public boolean select(Channel channel, RubyIO io, int ops, long timeout) {
return select(channel, io == null ? null : io.getOpenFile(), ops, timeout);
}

/**
* Perform an interruptible select operation on the given channel and fptr,
* waiting for the requested operations or the given timeout.
*
* @param channel the channel to perform a select against. If this is not
* a selectable channel, then this method will just return true.
* @param fptr the fptr that contains the channel, for managing blocked threads list.
* @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
* @param timeout a timeout in ms to limit the select. Less than zero selects forever,
* zero selects and returns ready channels or nothing immediately, and
* greater than zero selects for at most that many ms.
* @return true if the channel became ready for the requested operations, false if
* it timed out or was not selectable.
*/
public boolean select(Channel channel, OpenFile fptr, int ops, long timeout) {
// Use selectables but only if they're not associated with a file (which has odd select semantics)
if (channel instanceof SelectableChannel && (fptr == null || !fptr.fd().isNativeFile)) {
Expand All @@ -1449,7 +1520,7 @@ public boolean select(Channel channel, OpenFile fptr, int ops, long timeout) {
SelectionKey key = null;
try {
selectable.configureBlocking(false);

if (fptr != null) fptr.addBlockingThread(this);
currentSelector = getRuntime().getSelectorPool().get(selectable.provider());

Expand Down
35 changes: 1 addition & 34 deletions core/src/main/java/org/jruby/util/io/OpenFile.java
Expand Up @@ -508,40 +508,7 @@ public boolean ready(Ruby runtime, RubyThread thread, int ops, long timeout) {
boolean locked = lock();
try {
if (fd.chSelect != null) {
int ready_stat = 0;
Selector sel = SelectorFactory.openWithRetryFrom(null, fd.chSelect.provider());
synchronized (fd.chSelect.blockingLock()) {
boolean is_block = fd.chSelect.isBlocking();
boolean addedThread = false;
try {
fd.chSelect.configureBlocking(false);
fd.chSelect.register(sel, ops);
if (timeout == -1) {
ready_stat = sel.selectNow();
} else {
addedThread = true;
addBlockingThread(thread);
// release lock while selecting
unlock();
try {
sel.select(timeout);
} finally {
lock();
}
}
sel.close();
} finally {
if (addedThread) removeBlockingThread(thread);
if (sel != null) {
try {
sel.close();
} catch (Exception e) {
}
}
fd.chSelect.configureBlocking(is_block);
}
}
return ready_stat == 1;
return thread.select(fd.chSelect, this, ops, timeout);
} else {
if (fd.chSeek != null) {
return fd.chSeek.position() != -1
Expand Down

0 comments on commit 739e00f

Please sign in to comment.