Skip to content

Commit

Permalink
Showing 3 changed files with 61 additions and 37 deletions.
15 changes: 5 additions & 10 deletions core/src/main/java/org/jruby/RubyIO.java
Original file line number Diff line number Diff line change
@@ -2035,7 +2035,8 @@ protected IRubyObject rbIoClose(Ruby runtime) {
if (fptr.fd() == null) return runtime.getNil();

// interrupt waiting threads
fptr.interruptBlockingThreads();
fptr.interruptBlockingThreads(context);
fptr.waitForBlockingThreads(context);
fptr.cleanup(runtime, false);

if (fptr.getProcess() != null) {
@@ -4353,7 +4354,7 @@ private static ByteList getNilByteList(Ruby runtime) {
*/
public void addBlockingThread(RubyThread thread) {
OpenFile fptr = openFile;
if (openFile != null) openFile.addBlockingThread(thread);
if (fptr != null) fptr.addBlockingThread(thread);
}

/**
@@ -4362,14 +4363,8 @@ public void addBlockingThread(RubyThread thread) {
* @param thread A thread blocking on this IO
*/
public void removeBlockingThread(RubyThread thread) {
if (openFile != null) openFile.removeBlockingThread(thread);
}

/**
* Fire an IOError in all threads blocking on this IO object
*/
protected void interruptBlockingThreads() {
if (openFile != null) openFile.interruptBlockingThreads();
OpenFile fptr = this.openFile;
if (fptr != null) fptr.removeBlockingThread(thread);
}

/**
4 changes: 3 additions & 1 deletion core/src/main/java/org/jruby/RubyThread.java
Original file line number Diff line number Diff line change
@@ -82,6 +82,7 @@
import org.jruby.util.StringSupport;
import org.jruby.util.TypeConverter;
import org.jruby.util.io.BlockingIO;
import org.jruby.util.io.ChannelFD;
import org.jruby.util.io.OpenFile;
import org.jruby.util.log.Logger;
import org.jruby.util.log.LoggerFactory;
@@ -1766,7 +1767,8 @@ public boolean select(Channel channel, RubyIO io, int ops, long timeout) {
*/
public boolean select(Channel channel, OpenFile fptr, int ops, long timeout) {
// Use selectables but only if they're not associated with a file (which has odd select semantics)
if (channel instanceof SelectableChannel && (fptr == null || !fptr.fd().isNativeFile)) {
ChannelFD fd = fptr == null ? null : fptr.fd();
if (channel instanceof SelectableChannel && fd != null) {
SelectableChannel selectable = (SelectableChannel)channel;

// ensure we have fptr locked, but release it to avoid deadlock
79 changes: 53 additions & 26 deletions core/src/main/java/org/jruby/util/io/OpenFile.java
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.jruby.ext.fcntl.FcntlLibrary;
import org.jruby.platform.Platform;
import org.jruby.runtime.Block;
import org.jruby.runtime.Helpers;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.ByteList;
@@ -41,8 +42,8 @@
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@@ -149,7 +150,7 @@ public static class Buffer {

private final Ruby runtime;

protected List<RubyThread> blockingThreads;
protected volatile Set<RubyThread> blockingThreads;

public void clearStdio() {
stdio_file = null;
@@ -2609,14 +2610,19 @@ public static void fdFixCloexec(PosixShim posix, int fd) {
* @param thread A thread blocking on this IO
*/
public void addBlockingThread(RubyThread thread) {
boolean locked = lock();
try {
if (blockingThreads == null) {
blockingThreads = new ArrayList<RubyThread>(1);
Set<RubyThread> blockingThreads = this.blockingThreads;

if (blockingThreads == null) {
synchronized (this) {
blockingThreads = this.blockingThreads;
if (blockingThreads == null) {
this.blockingThreads = blockingThreads = new HashSet<RubyThread>(1);
}
}
}

synchronized (blockingThreads) {
blockingThreads.add(thread);
} finally {
if (locked) unlock();
}
}

@@ -2628,15 +2634,13 @@ public void addBlockingThread(RubyThread thread) {
public void removeBlockingThread(RubyThread thread) {
boolean locked = lock();
try {
Set<RubyThread> blockingThreads = this.blockingThreads;

if (blockingThreads == null) {
return;
}
for (int i = 0; i < blockingThreads.size(); i++) {
if (blockingThreads.get(i) == thread) {
// not using remove(Object) here to avoid the equals() call
blockingThreads.remove(i);
}
}

blockingThreads.remove(thread);
} finally {
if (locked) unlock();
}
@@ -2645,20 +2649,43 @@ public void removeBlockingThread(RubyThread thread) {
/**
* Fire an IOError in all threads blocking on this IO object
*/
public void interruptBlockingThreads() {
boolean locked = lock();
try {
if (blockingThreads == null) {
return;
}
for (int i = 0; i < blockingThreads.size(); i++) {
RubyThread thread = blockingThreads.get(i);
public void interruptBlockingThreads(ThreadContext context) {
Set<RubyThread> blockingThreads = this.blockingThreads;

if (blockingThreads == null) {
return;
}

synchronized (blockingThreads) {
for (RubyThread thread : blockingThreads) {
// If it's the current thread, ignore it since we're the one doing the interrupting
if (thread == context.getThread()) continue;

// raise will also wake the thread from selection
thread.raise(new IRubyObject[]{runtime.newIOError("stream closed").getException()}, Block.NULL_BLOCK);
RubyException exception = (RubyException) runtime.getIOError().newInstance(context, runtime.newString("stream closed"), Block.NULL_BLOCK);
thread.raise(Helpers.arrayOf(exception), Block.NULL_BLOCK);
}
}
}

/**
* Wait until all blocking threads have exited their blocking area. Use in combination with
* interruptBlockingThreads to ensure every blocking thread has moved on before proceding to
* manipulate the IO.
*/
public void waitForBlockingThreads(ThreadContext context) {
Set<RubyThread> blockingThreads = this.blockingThreads;

if (blockingThreads == null) {
return;
}

while (blockingThreads.size() > 0) {
try {
context.getThread().sleep(1);
} catch (InterruptedException ie) {
break;
}
} finally {
if (locked) unlock();
}
}

0 comments on commit 3493a91

Please sign in to comment.