Skip to content

Commit

Permalink
Backport Queue, SizedQueue, ThreadFiber and support code.
Browse files Browse the repository at this point in the history
  • Loading branch information
headius committed Jan 7, 2015
1 parent b110106 commit afc5062
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 192 deletions.
75 changes: 71 additions & 4 deletions core/src/main/java/org/jruby/RubyThread.java
Expand Up @@ -62,6 +62,8 @@
import org.jruby.runtime.builtin.IRubyObject;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.jruby.anno.JRubyMethod;
Expand Down Expand Up @@ -154,6 +156,12 @@ public static enum Status {
/** The current task blocking a thread, to allow interrupting it in an appropriate way */
private volatile BlockingTask currentBlockingTask;

/** A function to use to unblock this thread, if possible */
private Unblocker unblockFunc;

/** Argument to pass to the unblocker */
private Object unblockArg;

/** The list of locks this thread currently holds, so they can be released on exit */
private final List<Lock> heldLocks = new Vector<Lock>();

Expand Down Expand Up @@ -927,7 +935,6 @@ public IRubyObject raise(IRubyObject[] args, Block block) {
* Ruby threads like Timeout's thread.
*
* @param args Same args as for Thread#raise
* @param block Same as for Thread#raise
*/
public void internalRaise(IRubyObject[] args) {
Ruby runtime = getRuntime();
Expand Down Expand Up @@ -1033,6 +1040,15 @@ public static interface BlockingTask {
public void wakeup();
}

public interface Unblocker<Data> {
public void wakeup(RubyThread thread, Data self);
}

public interface Task<Data, Return> extends Unblocker<Data> {
public Return run(ThreadContext context, Data data) throws InterruptedException;
public void wakeup(RubyThread thread, Data data);
}

public static final class SleepTask implements BlockingTask {
private final Object object;
private final long millis;
Expand All @@ -1057,6 +1073,30 @@ public void wakeup() {
}
}

private static final class SleepTask2 implements Task<Object[], Long> {
@Override
public Long run(ThreadContext context, Object[] data) throws InterruptedException {
long millis = (Long)data[1];
int nanos = (Integer)data[2];

long start = System.currentTimeMillis();
// TODO: nano handling?
if (millis == 0) {
((Semaphore) data[0]).acquire();
} else {
((Semaphore) data[0]).tryAcquire(millis, TimeUnit.MILLISECONDS);
}
return System.currentTimeMillis() - start;
}

@Override
public void wakeup(RubyThread thread, Object[] data) {
((Semaphore)data[0]).release();
}
}

private static final Task<Object[], Long> SLEEP_TASK2 = new SleepTask2();

public void executeBlockingTask(BlockingTask task) throws InterruptedException {
enterSleep();
try {
Expand All @@ -1070,6 +1110,25 @@ public void executeBlockingTask(BlockingTask task) throws InterruptedException {
}
}

public <Data, Return> Return executeTask(ThreadContext context, Data data, Task<Data, Return> task) throws InterruptedException {
try {
this.unblockFunc = task;
this.unblockArg = data;

// check for interrupt before going into blocking call
pollThreadEvents(context);

enterSleep();

return task.run(context, data);
} finally {
exitSleep();
this.unblockFunc = null;
this.unblockArg = null;
pollThreadEvents(context);
}
}

public void enterSleep() {
status.set(Status.SLEEP);
}
Expand Down Expand Up @@ -1351,10 +1410,18 @@ public void interrupt() {
if (iowait != null) {
iowait.cancel();
}
BlockingTask task = currentBlockingTask;

Unblocker task = this.unblockFunc;
if (task != null) {
task.wakeup();
task.wakeup(this, unblockArg);
}

// deprecated
{
BlockingTask t = currentBlockingTask;
if (t != null) {
t.wakeup();
}
}
}
private volatile BlockingIO.Condition blockingIO = null;
Expand Down

0 comments on commit afc5062

Please sign in to comment.