Skip to content

Commit 3493a91

Browse files
committedDec 12, 2017
Cherry-picked from dad9ab0.
This does not incorporate the updated MRI 2.4 test.
1 parent 99b7edc commit 3493a91

File tree

3 files changed

+61
-37
lines changed

3 files changed

+61
-37
lines changed
 

‎core/src/main/java/org/jruby/RubyIO.java

+5-10
Original file line numberDiff line numberDiff line change
@@ -2035,7 +2035,8 @@ protected IRubyObject rbIoClose(Ruby runtime) {
20352035
if (fptr.fd() == null) return runtime.getNil();
20362036

20372037
// interrupt waiting threads
2038-
fptr.interruptBlockingThreads();
2038+
fptr.interruptBlockingThreads(context);
2039+
fptr.waitForBlockingThreads(context);
20392040
fptr.cleanup(runtime, false);
20402041

20412042
if (fptr.getProcess() != null) {
@@ -4353,7 +4354,7 @@ private static ByteList getNilByteList(Ruby runtime) {
43534354
*/
43544355
public void addBlockingThread(RubyThread thread) {
43554356
OpenFile fptr = openFile;
4356-
if (openFile != null) openFile.addBlockingThread(thread);
4357+
if (fptr != null) fptr.addBlockingThread(thread);
43574358
}
43584359

43594360
/**
@@ -4362,14 +4363,8 @@ public void addBlockingThread(RubyThread thread) {
43624363
* @param thread A thread blocking on this IO
43634364
*/
43644365
public void removeBlockingThread(RubyThread thread) {
4365-
if (openFile != null) openFile.removeBlockingThread(thread);
4366-
}
4367-
4368-
/**
4369-
* Fire an IOError in all threads blocking on this IO object
4370-
*/
4371-
protected void interruptBlockingThreads() {
4372-
if (openFile != null) openFile.interruptBlockingThreads();
4366+
OpenFile fptr = this.openFile;
4367+
if (fptr != null) fptr.removeBlockingThread(thread);
43734368
}
43744369

43754370
/**

‎core/src/main/java/org/jruby/RubyThread.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.jruby.util.StringSupport;
8383
import org.jruby.util.TypeConverter;
8484
import org.jruby.util.io.BlockingIO;
85+
import org.jruby.util.io.ChannelFD;
8586
import org.jruby.util.io.OpenFile;
8687
import org.jruby.util.log.Logger;
8788
import org.jruby.util.log.LoggerFactory;
@@ -1766,7 +1767,8 @@ public boolean select(Channel channel, RubyIO io, int ops, long timeout) {
17661767
*/
17671768
public boolean select(Channel channel, OpenFile fptr, int ops, long timeout) {
17681769
// Use selectables but only if they're not associated with a file (which has odd select semantics)
1769-
if (channel instanceof SelectableChannel && (fptr == null || !fptr.fd().isNativeFile)) {
1770+
ChannelFD fd = fptr == null ? null : fptr.fd();
1771+
if (channel instanceof SelectableChannel && fd != null) {
17701772
SelectableChannel selectable = (SelectableChannel)channel;
17711773

17721774
// ensure we have fptr locked, but release it to avoid deadlock

‎core/src/main/java/org/jruby/util/io/OpenFile.java

+53-26
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.jruby.ext.fcntl.FcntlLibrary;
2424
import org.jruby.platform.Platform;
2525
import org.jruby.runtime.Block;
26+
import org.jruby.runtime.Helpers;
2627
import org.jruby.runtime.ThreadContext;
2728
import org.jruby.runtime.builtin.IRubyObject;
2829
import org.jruby.util.ByteList;
@@ -41,8 +42,8 @@
4142
import java.nio.channels.Selector;
4243
import java.nio.channels.SocketChannel;
4344
import java.nio.channels.WritableByteChannel;
44-
import java.util.ArrayList;
45-
import java.util.List;
45+
import java.util.HashSet;
46+
import java.util.Set;
4647
import java.util.concurrent.locks.ReentrantLock;
4748
import java.util.concurrent.locks.ReentrantReadWriteLock;
4849

@@ -149,7 +150,7 @@ public static class Buffer {
149150

150151
private final Ruby runtime;
151152

152-
protected List<RubyThread> blockingThreads;
153+
protected volatile Set<RubyThread> blockingThreads;
153154

154155
public void clearStdio() {
155156
stdio_file = null;
@@ -2609,14 +2610,19 @@ public static void fdFixCloexec(PosixShim posix, int fd) {
26092610
* @param thread A thread blocking on this IO
26102611
*/
26112612
public void addBlockingThread(RubyThread thread) {
2612-
boolean locked = lock();
2613-
try {
2614-
if (blockingThreads == null) {
2615-
blockingThreads = new ArrayList<RubyThread>(1);
2613+
Set<RubyThread> blockingThreads = this.blockingThreads;
2614+
2615+
if (blockingThreads == null) {
2616+
synchronized (this) {
2617+
blockingThreads = this.blockingThreads;
2618+
if (blockingThreads == null) {
2619+
this.blockingThreads = blockingThreads = new HashSet<RubyThread>(1);
2620+
}
26162621
}
2622+
}
2623+
2624+
synchronized (blockingThreads) {
26172625
blockingThreads.add(thread);
2618-
} finally {
2619-
if (locked) unlock();
26202626
}
26212627
}
26222628

@@ -2628,15 +2634,13 @@ public void addBlockingThread(RubyThread thread) {
26282634
public void removeBlockingThread(RubyThread thread) {
26292635
boolean locked = lock();
26302636
try {
2637+
Set<RubyThread> blockingThreads = this.blockingThreads;
2638+
26312639
if (blockingThreads == null) {
26322640
return;
26332641
}
2634-
for (int i = 0; i < blockingThreads.size(); i++) {
2635-
if (blockingThreads.get(i) == thread) {
2636-
// not using remove(Object) here to avoid the equals() call
2637-
blockingThreads.remove(i);
2638-
}
2639-
}
2642+
2643+
blockingThreads.remove(thread);
26402644
} finally {
26412645
if (locked) unlock();
26422646
}
@@ -2645,20 +2649,43 @@ public void removeBlockingThread(RubyThread thread) {
26452649
/**
26462650
* Fire an IOError in all threads blocking on this IO object
26472651
*/
2648-
public void interruptBlockingThreads() {
2649-
boolean locked = lock();
2650-
try {
2651-
if (blockingThreads == null) {
2652-
return;
2653-
}
2654-
for (int i = 0; i < blockingThreads.size(); i++) {
2655-
RubyThread thread = blockingThreads.get(i);
2652+
public void interruptBlockingThreads(ThreadContext context) {
2653+
Set<RubyThread> blockingThreads = this.blockingThreads;
2654+
2655+
if (blockingThreads == null) {
2656+
return;
2657+
}
2658+
2659+
synchronized (blockingThreads) {
2660+
for (RubyThread thread : blockingThreads) {
2661+
// If it's the current thread, ignore it since we're the one doing the interrupting
2662+
if (thread == context.getThread()) continue;
26562663

26572664
// raise will also wake the thread from selection
2658-
thread.raise(new IRubyObject[]{runtime.newIOError("stream closed").getException()}, Block.NULL_BLOCK);
2665+
RubyException exception = (RubyException) runtime.getIOError().newInstance(context, runtime.newString("stream closed"), Block.NULL_BLOCK);
2666+
thread.raise(Helpers.arrayOf(exception), Block.NULL_BLOCK);
2667+
}
2668+
}
2669+
}
2670+
2671+
/**
2672+
* Wait until all blocking threads have exited their blocking area. Use in combination with
2673+
* interruptBlockingThreads to ensure every blocking thread has moved on before proceding to
2674+
* manipulate the IO.
2675+
*/
2676+
public void waitForBlockingThreads(ThreadContext context) {
2677+
Set<RubyThread> blockingThreads = this.blockingThreads;
2678+
2679+
if (blockingThreads == null) {
2680+
return;
2681+
}
2682+
2683+
while (blockingThreads.size() > 0) {
2684+
try {
2685+
context.getThread().sleep(1);
2686+
} catch (InterruptedException ie) {
2687+
break;
26592688
}
2660-
} finally {
2661-
if (locked) unlock();
26622689
}
26632690
}
26642691

0 commit comments

Comments
 (0)
Please sign in to comment.