23
23
import org .jruby .ext .fcntl .FcntlLibrary ;
24
24
import org .jruby .platform .Platform ;
25
25
import org .jruby .runtime .Block ;
26
+ import org .jruby .runtime .Helpers ;
26
27
import org .jruby .runtime .ThreadContext ;
27
28
import org .jruby .runtime .builtin .IRubyObject ;
28
29
import org .jruby .util .ByteList ;
41
42
import java .nio .channels .Selector ;
42
43
import java .nio .channels .SocketChannel ;
43
44
import java .nio .channels .WritableByteChannel ;
44
- import java .util .ArrayList ;
45
- import java .util .List ;
45
+ import java .util .HashSet ;
46
+ import java .util .Set ;
46
47
import java .util .concurrent .locks .ReentrantLock ;
47
48
import java .util .concurrent .locks .ReentrantReadWriteLock ;
48
49
@@ -149,7 +150,7 @@ public static class Buffer {
149
150
150
151
private final Ruby runtime ;
151
152
152
- protected List <RubyThread > blockingThreads ;
153
+ protected volatile Set <RubyThread > blockingThreads ;
153
154
154
155
public void clearStdio () {
155
156
stdio_file = null ;
@@ -2609,14 +2610,19 @@ public static void fdFixCloexec(PosixShim posix, int fd) {
2609
2610
* @param thread A thread blocking on this IO
2610
2611
*/
2611
2612
public void addBlockingThread (RubyThread thread ) {
2612
- boolean locked = lock ();
2613
- try {
2614
- if (blockingThreads == null ) {
2615
- blockingThreads = new ArrayList <RubyThread >(1 );
2613
+ Set <RubyThread > blockingThreads = this .blockingThreads ;
2614
+
2615
+ if (blockingThreads == null ) {
2616
+ synchronized (this ) {
2617
+ blockingThreads = this .blockingThreads ;
2618
+ if (blockingThreads == null ) {
2619
+ this .blockingThreads = blockingThreads = new HashSet <RubyThread >(1 );
2620
+ }
2616
2621
}
2622
+ }
2623
+
2624
+ synchronized (blockingThreads ) {
2617
2625
blockingThreads .add (thread );
2618
- } finally {
2619
- if (locked ) unlock ();
2620
2626
}
2621
2627
}
2622
2628
@@ -2628,15 +2634,13 @@ public void addBlockingThread(RubyThread thread) {
2628
2634
public void removeBlockingThread (RubyThread thread ) {
2629
2635
boolean locked = lock ();
2630
2636
try {
2637
+ Set <RubyThread > blockingThreads = this .blockingThreads ;
2638
+
2631
2639
if (blockingThreads == null ) {
2632
2640
return ;
2633
2641
}
2634
- for (int i = 0 ; i < blockingThreads .size (); i ++) {
2635
- if (blockingThreads .get (i ) == thread ) {
2636
- // not using remove(Object) here to avoid the equals() call
2637
- blockingThreads .remove (i );
2638
- }
2639
- }
2642
+
2643
+ blockingThreads .remove (thread );
2640
2644
} finally {
2641
2645
if (locked ) unlock ();
2642
2646
}
@@ -2645,20 +2649,43 @@ public void removeBlockingThread(RubyThread thread) {
2645
2649
/**
2646
2650
* Fire an IOError in all threads blocking on this IO object
2647
2651
*/
2648
- public void interruptBlockingThreads () {
2649
- boolean locked = lock ();
2650
- try {
2651
- if (blockingThreads == null ) {
2652
- return ;
2653
- }
2654
- for (int i = 0 ; i < blockingThreads .size (); i ++) {
2655
- RubyThread thread = blockingThreads .get (i );
2652
+ public void interruptBlockingThreads (ThreadContext context ) {
2653
+ Set <RubyThread > blockingThreads = this .blockingThreads ;
2654
+
2655
+ if (blockingThreads == null ) {
2656
+ return ;
2657
+ }
2658
+
2659
+ synchronized (blockingThreads ) {
2660
+ for (RubyThread thread : blockingThreads ) {
2661
+ // If it's the current thread, ignore it since we're the one doing the interrupting
2662
+ if (thread == context .getThread ()) continue ;
2656
2663
2657
2664
// raise will also wake the thread from selection
2658
- thread .raise (new IRubyObject []{runtime .newIOError ("stream closed" ).getException ()}, Block .NULL_BLOCK );
2665
+ RubyException exception = (RubyException ) runtime .getIOError ().newInstance (context , runtime .newString ("stream closed" ), Block .NULL_BLOCK );
2666
+ thread .raise (Helpers .arrayOf (exception ), Block .NULL_BLOCK );
2667
+ }
2668
+ }
2669
+ }
2670
+
2671
+ /**
2672
+ * Wait until all blocking threads have exited their blocking area. Use in combination with
2673
+ * interruptBlockingThreads to ensure every blocking thread has moved on before proceding to
2674
+ * manipulate the IO.
2675
+ */
2676
+ public void waitForBlockingThreads (ThreadContext context ) {
2677
+ Set <RubyThread > blockingThreads = this .blockingThreads ;
2678
+
2679
+ if (blockingThreads == null ) {
2680
+ return ;
2681
+ }
2682
+
2683
+ while (blockingThreads .size () > 0 ) {
2684
+ try {
2685
+ context .getThread ().sleep (1 );
2686
+ } catch (InterruptedException ie ) {
2687
+ break ;
2659
2688
}
2660
- } finally {
2661
- if (locked ) unlock ();
2662
2689
}
2663
2690
}
2664
2691
0 commit comments