Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Backport Queue, SizedQueue, ThreadFiber and support code.
Browse files Browse the repository at this point in the history
headius committed Jan 7, 2015
1 parent b110106 commit afc5062
Showing 5 changed files with 320 additions and 192 deletions.
75 changes: 71 additions & 4 deletions core/src/main/java/org/jruby/RubyThread.java
Original file line number Diff line number Diff line change
@@ -62,6 +62,8 @@
import org.jruby.runtime.builtin.IRubyObject;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.jruby.anno.JRubyMethod;
@@ -154,6 +156,12 @@ public static enum Status {
/** The current task blocking a thread, to allow interrupting it in an appropriate way */
private volatile BlockingTask currentBlockingTask;

/** A function to use to unblock this thread, if possible */
private Unblocker unblockFunc;

/** Argument to pass to the unblocker */
private Object unblockArg;

/** The list of locks this thread currently holds, so they can be released on exit */
private final List<Lock> heldLocks = new Vector<Lock>();

@@ -927,7 +935,6 @@ public IRubyObject raise(IRubyObject[] args, Block block) {
* Ruby threads like Timeout's thread.
*
* @param args Same args as for Thread#raise
* @param block Same as for Thread#raise
*/
public void internalRaise(IRubyObject[] args) {
Ruby runtime = getRuntime();
@@ -1033,6 +1040,15 @@ public static interface BlockingTask {
public void wakeup();
}

public interface Unblocker<Data> {
public void wakeup(RubyThread thread, Data self);
}

public interface Task<Data, Return> extends Unblocker<Data> {
public Return run(ThreadContext context, Data data) throws InterruptedException;
public void wakeup(RubyThread thread, Data data);
}

public static final class SleepTask implements BlockingTask {
private final Object object;
private final long millis;
@@ -1057,6 +1073,30 @@ public void wakeup() {
}
}

private static final class SleepTask2 implements Task<Object[], Long> {
@Override
public Long run(ThreadContext context, Object[] data) throws InterruptedException {
long millis = (Long)data[1];
int nanos = (Integer)data[2];

long start = System.currentTimeMillis();
// TODO: nano handling?
if (millis == 0) {
((Semaphore) data[0]).acquire();
} else {
((Semaphore) data[0]).tryAcquire(millis, TimeUnit.MILLISECONDS);
}
return System.currentTimeMillis() - start;
}

@Override
public void wakeup(RubyThread thread, Object[] data) {
((Semaphore)data[0]).release();
}
}

private static final Task<Object[], Long> SLEEP_TASK2 = new SleepTask2();

public void executeBlockingTask(BlockingTask task) throws InterruptedException {
enterSleep();
try {
@@ -1070,6 +1110,25 @@ public void executeBlockingTask(BlockingTask task) throws InterruptedException {
}
}

public <Data, Return> Return executeTask(ThreadContext context, Data data, Task<Data, Return> task) throws InterruptedException {
try {
this.unblockFunc = task;
this.unblockArg = data;

// check for interrupt before going into blocking call
pollThreadEvents(context);

enterSleep();

return task.run(context, data);
} finally {
exitSleep();
this.unblockFunc = null;
this.unblockArg = null;
pollThreadEvents(context);
}
}

public void enterSleep() {
status.set(Status.SLEEP);
}
@@ -1351,10 +1410,18 @@ public void interrupt() {
if (iowait != null) {
iowait.cancel();
}
BlockingTask task = currentBlockingTask;

Unblocker task = this.unblockFunc;
if (task != null) {
task.wakeup();
task.wakeup(this, unblockArg);
}

// deprecated
{
BlockingTask t = currentBlockingTask;
if (t != null) {
t.wakeup();
}
}
}
private volatile BlockingIO.Condition blockingIO = null;
180 changes: 116 additions & 64 deletions core/src/main/java/org/jruby/ext/fiber/ThreadFiber.java
Original file line number Diff line number Diff line change
@@ -18,64 +18,68 @@
import org.jruby.runtime.Visibility;
import org.jruby.runtime.builtin.IRubyObject;

import org.jruby.ir.runtime.IRBreakJump;
import org.jruby.ir.runtime.IRReturnJump;
import org.jruby.ir.operands.IRException;

public class ThreadFiber extends RubyObject implements ExecutionContext {
public ThreadFiber(Ruby runtime, RubyClass klass) {
super(runtime, klass);
}

public static void initRootFiber(ThreadContext context) {
Ruby runtime = context.runtime;

ThreadFiber rootFiber = new ThreadFiber(runtime, runtime.getClass("Fiber")); // FIXME: getFiber()

assert runtime.getClass("SizedQueue") != null : "SizedQueue has not been loaded";
rootFiber.data = new FiberData(new SizedQueue(runtime, runtime.getClass("SizedQueue")), null, rootFiber);
rootFiber.data = new FiberData(new SizedQueue(runtime, runtime.getClass("SizedQueue"), 1), null, rootFiber);
rootFiber.thread = context.getThread();
context.setRootFiber(rootFiber);
}

@JRubyMethod(visibility = Visibility.PRIVATE)
public IRubyObject initialize(ThreadContext context, Block block) {
Ruby runtime = context.runtime;

if (!block.isGiven()) throw runtime.newArgumentError("tried to create Proc object without block");
data = new FiberData(new SizedQueue(runtime, runtime.getClass("SizedQueue")), context.getFiberCurrentThread(), this);

data = new FiberData(new SizedQueue(runtime, runtime.getClass("SizedQueue"), 1), context.getFiberCurrentThread(), this);

FiberData currentFiberData = context.getFiber().data;

thread = createThread(runtime, data, currentFiberData.queue, block);

return context.nil;
}

@JRubyMethod(rest = true)
public IRubyObject resume(ThreadContext context, IRubyObject[] values) {
Ruby runtime = context.runtime;

if (data.prev != null || data.transferred) throw runtime.newFiberError("double resume");

if (!alive()) throw runtime.newFiberError("dead fiber called");

FiberData currentFiberData = context.getFiber().data;

if (this.data == currentFiberData) {
switch (values.length) {
case 0: return context.nil;
case 1: return values[0];
default: return runtime.newArrayNoCopyLight(values);
}
}

IRubyObject val;
switch (values.length) {
case 0: val = NEVER; break;
case 1: val = values[0]; break;
default: val = runtime.newArrayNoCopyLight(values);
}

if (data.parent != context.getFiberCurrentThread()) throw runtime.newFiberError("fiber called across threads");

data.prev = context.getFiber();

try {
@@ -86,64 +90,96 @@ public IRubyObject resume(ThreadContext context, IRubyObject[] values) {
}

private static IRubyObject exchangeWithFiber(ThreadContext context, FiberData currentFiberData, FiberData targetFiberData, IRubyObject val) {
targetFiberData.queue.push(context, val);
// At this point we consider ourselves "in" the resume, so we need to enforce exception-propagation
// rules for both the push (to wake up fiber) and pop (to wait for fiber). Failure to do this can
// cause interrupts destined for the fiber to be caught after the fiber is running but before the
// resuming thread has started waiting for it, leaving the fiber to run rather than receiving the
// interrupt, and the parent thread propagates the error.

// Note: these need to be separate try/catches because of the while loop.
try {
targetFiberData.queue.push(context, new IRubyObject[] {val});
} catch (RaiseException re) {
handleExceptionDuringExchange(context, currentFiberData, targetFiberData, re);
}

while (true) {
try {
IRubyObject result = currentFiberData.queue.pop(context);
if (result == NEVER) result = context.nil;
return result;
} catch (RaiseException re) {
// If we received a LJC we need to bubble it out
if (context.runtime.getLocalJumpError().isInstance(re.getException())) {
throw re;
}
handleExceptionDuringExchange(context, currentFiberData, targetFiberData, re);
}
}
}

// If we were trying to yield but our queue has been shut down,
// let the exception bubble out and (ideally) kill us.
if (currentFiberData.queue.isShutdown()) {
throw re;
}
/**
* Handle exceptions raised while exchanging data with a fiber.
*
* The rules work like this:
*
* <ul>
* <li>If the thread has called Fiber#resume on the fiber and an interrupt is sent to the thread,
* forward it to the fiber</li>
* <li>If the fiber has called Fiber.yield and an interrupt is sent to the fiber (e.g. Timeout.timeout(x) { Fiber.yield })
* forward it to the fiber's parent thread.</li>
* </ul>
*
* @param context
* @param currentFiberData
* @param targetFiberData
* @param re
*/
private static void handleExceptionDuringExchange(ThreadContext context, FiberData currentFiberData, FiberData targetFiberData, RaiseException re) {
// If we received a LJC we need to bubble it out
if (context.runtime.getLocalJumpError().isInstance(re.getException())) {
throw re;
}

// re-raise if the target fiber has been shut down
if (targetFiberData.queue.isShutdown()) {
throw re;
}
// If we were trying to yield but our queue has been shut down,
// let the exception bubble out and (ideally) kill us.
if (currentFiberData.queue.isShutdown()) {
throw re;
}

// Otherwise, we want to forward the exception to the target fiber
// since it has the ball
targetFiberData.fiber.get().thread.raise(re.getException());
}
// re-raise if the target fiber has been shut down
if (targetFiberData.queue.isShutdown()) {
throw re;
}

// Otherwise, we want to forward the exception to the target fiber
// since it has the ball
targetFiberData.fiber.get().thread.raise(re.getException());
}

@JRubyMethod(rest = true)
public IRubyObject __transfer__(ThreadContext context, IRubyObject[] values) {
Ruby runtime = context.runtime;

if (data.prev != null) throw runtime.newFiberError("double resume");

if (!alive()) throw runtime.newFiberError("dead fiber called");

FiberData currentFiberData = context.getFiber().data;

if (this.data == currentFiberData) {
switch (values.length) {
case 0: return context.nil;
case 1: return values[0];
default: return runtime.newArrayNoCopyLight(values);
}
}

IRubyObject val;
switch (values.length) {
case 0: val = NEVER; break;
case 1: val = values[0]; break;
default: val = runtime.newArrayNoCopyLight(values);
}

if (data.parent != context.getFiberCurrentThread()) throw runtime.newFiberError("fiber called across threads");

if (currentFiberData.prev != null) {
// new fiber should answer to current prev and this fiber is marked as transferred
data.prev = currentFiberData.prev;
@@ -152,42 +188,42 @@ public IRubyObject __transfer__(ThreadContext context, IRubyObject[] values) {
} else {
data.prev = context.getFiber();
}

try {
return exchangeWithFiber(context, currentFiberData, data, val);
} finally {
data.prev = null;
currentFiberData.transferred = false;
}
}

@JRubyMethod(meta = true)
public static IRubyObject yield(ThreadContext context, IRubyObject recv) {
return yield(context, recv, context.nil);
}

@JRubyMethod(meta = true)
public static IRubyObject yield(ThreadContext context, IRubyObject recv, IRubyObject value) {
Ruby runtime = context.runtime;

FiberData currentFiberData = context.getFiber().data;

if (currentFiberData.parent == null) throw runtime.newFiberError("can't yield from root fiber");

if (currentFiberData.prev == null) throw runtime.newFiberError("BUG: yield occured with null previous fiber. Report this at http://bugs.jruby.org");

if (currentFiberData.queue.isShutdown()) throw runtime.newFiberError("dead fiber yielded");

FiberData prevFiberData = currentFiberData.prev.data;

return exchangeWithFiber(context, currentFiberData, prevFiberData, value);
}

@JRubyMethod
public IRubyObject __alive__(ThreadContext context) {
return context.runtime.newBoolean(thread != null && thread.isAlive());
return context.runtime.newBoolean(alive());
}

@JRubyMethod(meta = true)
public static IRubyObject __current__(ThreadContext context, IRubyObject recv) {
return context.getFiber();
@@ -197,11 +233,11 @@ public static IRubyObject __current__(ThreadContext context, IRubyObject recv) {
public Map<Object, IRubyObject> getContextVariables() {
return thread.getContextVariables();
}

boolean alive() {
return thread != null && thread.isAlive() && !data.queue.isShutdown();
}

static RubyThread createThread(final Ruby runtime, final FiberData data, final SizedQueue queue, final Block block) {
final AtomicReference<RubyThread> fiberThread = new AtomicReference();
runtime.getFiberExecutor().execute(new Runnable() {
@@ -210,7 +246,7 @@ public void run() {
context.setFiber(data.fiber.get());
context.setRootThread(data.parent);
fiberThread.set(context.getThread());

IRubyObject init = data.queue.pop(context);

try {
@@ -223,7 +259,7 @@ public void run() {
result = block.yieldArray(context, init, null, null);
}

data.prev.data.queue.push(context, result);
data.prev.data.queue.push(context, new IRubyObject[] { result });
} finally {
data.queue.shutdown();
runtime.getThreadService().disposeCurrentThread();
@@ -232,6 +268,18 @@ public void run() {
if (data.prev != null) {
data.prev.thread.raise(fce.buildException(runtime).getException());
}
} catch (IRBreakJump bj) {
// This is one of the rare cases where IR flow-control jumps
// leaks into the runtime impl.
if (data.prev != null) {
data.prev.thread.raise(((RaiseException)IRException.BREAK_LocalJumpError.getException(runtime)).getException());
}
} catch (IRReturnJump rj) {
// This is one of the rare cases where IR flow-control jumps
// leaks into the runtime impl.
if (data.prev != null) {
data.prev.thread.raise(((RaiseException)IRException.RETURN_LocalJumpError.getException(runtime)).getException());
}
} catch (RaiseException re) {
if (data.prev != null) {
data.prev.thread.raise(re.getException());
@@ -240,22 +288,26 @@ public void run() {
if (data.prev != null) {
data.prev.thread.raise(JavaUtil.convertJavaToUsableRubyObject(runtime, t));
}
} finally {
// clear reference to the fiber's thread
ThreadFiber tf = data.fiber.get();
if (tf != null) tf.thread = null;
}
}
});

while (fiberThread.get() == null) {Thread.yield();}

return fiberThread.get();
}

protected void finalize() throws Throwable {
try {
FiberData data = this.data;
if (data != null) {
// we never interrupt or shutdown root fibers
if (data.parent == null) return;

data.queue.shutdown();
}

@@ -282,7 +334,7 @@ public FiberData getData() {
public RubyThread getThread() {
return thread;
}

public static class FiberData {
FiberData(SizedQueue queue, RubyThread parent, ThreadFiber fiber) {
this.queue = queue;
@@ -293,14 +345,14 @@ public static class FiberData {
public ThreadFiber getPrev() {
return prev;
}

final SizedQueue queue;
volatile ThreadFiber prev;
final RubyThread parent;
final WeakReference<ThreadFiber> fiber;
volatile boolean transferred;
}

volatile FiberData data;
volatile RubyThread thread;
}
152 changes: 96 additions & 56 deletions core/src/main/java/org/jruby/ext/thread/Queue.java
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
* rights and limitations under the License.
*
* Copyright (C) 2006 MenTaLguY <mental@rydia.net>
*
*
* Alternatively, the contents of this file may be used under the terms of
* either of the GNU General Public License Version 2 or later (the "GPL"),
* or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
@@ -27,38 +27,63 @@
***** END LICENSE BLOCK *****/
package org.jruby.ext.thread;

import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.exceptions.RaiseException;
import org.jruby.runtime.Block;
import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.Visibility;
import org.jruby.runtime.builtin.IRubyObject;

/**
* The "Queue" class from the 'thread' library.
*/
@JRubyClass(name = "Queue")
public class Queue extends RubyObject {
private LinkedList entries;
protected volatile int numWaiting = 0;
protected BlockingQueue<IRubyObject> queue;
protected AtomicLong numWaiting = new AtomicLong();

@JRubyMethod(name = "new", rest = true, meta = true)
public static IRubyObject newInstance(ThreadContext context, IRubyObject recv, IRubyObject[] args, Block block) {
Queue result = new Queue(context.runtime, (RubyClass) recv);
result.callInit(context, args, block);
return result;
}
final RubyThread.Task<Queue, IRubyObject> takeTask = new RubyThread.Task<Queue, IRubyObject>() {
@Override
public IRubyObject run(ThreadContext context, Queue queue) throws InterruptedException {
return queue.getQueueSafe().take();
}

@Override
public void wakeup(RubyThread thread, Queue data) {
thread.getNativeThread().interrupt();
}
};

final RubyThread.Task<IRubyObject[], IRubyObject> putTask = new RubyThread.Task<IRubyObject[], IRubyObject>() {
@Override
public IRubyObject run(ThreadContext context, IRubyObject[] args) throws InterruptedException {
final BlockingQueue<IRubyObject> queue = getQueueSafe();
if(args.length == 2 && args[1].isTrue() && queue.remainingCapacity() == 0) {
throw context.runtime.newThreadError("queue full");
}
queue.put(args[0]);
return context.nil;
}

@Override
public void wakeup(RubyThread thread, IRubyObject[] data) {
thread.getNativeThread().interrupt();
}
};

public Queue(Ruby runtime, RubyClass type) {
super(runtime, type);
entries = new LinkedList();
}

public static void setup(Ruby runtime) {
@@ -68,96 +93,111 @@ public IRubyObject allocate(Ruby runtime, RubyClass klass) {
return new Queue(runtime, klass);
}
});
cQueue.undefineMethod("initialize_copy");
cQueue.setReifiedClass(Queue.class);
cQueue.defineAnnotatedMethods(Queue.class);
}

@JRubyMethod(visibility = Visibility.PRIVATE)
public IRubyObject initialize(ThreadContext context) {
queue = new LinkedBlockingQueue<IRubyObject>();
return this;
}

@JRubyMethod(name = "shutdown!")
public synchronized IRubyObject shutdown(ThreadContext context) {
entries = null;
notifyAll();
public IRubyObject shutdown(ThreadContext context) {
queue = null;
return context.runtime.getNil();
}

public synchronized void shutdown() {
entries = null;
notifyAll();
queue = null;
}

public boolean isShutdown() {
return entries == null;
return queue == null;
}

public BlockingQueue<IRubyObject> getQueueSafe() {
BlockingQueue<IRubyObject> queue = this.queue;
checkShutdown();
return queue;
}

public synchronized void checkShutdown(ThreadContext context) {
if (entries == null) {
throw new RaiseException(context.runtime, context.runtime.getThreadError(), "queue shut down", false);
public synchronized void checkShutdown() {
if (queue == null) {
Ruby runtime = getRuntime();
throw new RaiseException(runtime, runtime.getThreadError(), "queue shut down", false);
}
}

@JRubyMethod
public synchronized IRubyObject clear(ThreadContext context) {
checkShutdown(context);
entries.clear();
return context.runtime.getNil();
BlockingQueue<IRubyObject> queue = getQueueSafe();
queue.clear();
return this;
}

@JRubyMethod(name = "empty?")
public synchronized RubyBoolean empty_p(ThreadContext context) {
checkShutdown(context);
return context.runtime.newBoolean(entries.size() == 0);
public RubyBoolean empty_p(ThreadContext context) {
BlockingQueue<IRubyObject> queue = getQueueSafe();
return context.runtime.newBoolean(queue.size() == 0);
}

@JRubyMethod(name = {"length", "size"})
public synchronized RubyNumeric length(ThreadContext context) {
checkShutdown(context);
return RubyNumeric.int2fix(context.runtime, entries.size());
public RubyNumeric length(ThreadContext context) {
checkShutdown();
return RubyNumeric.int2fix(context.runtime, queue.size());
}

protected synchronized long java_length() {
return entries.size();
protected long java_length() {
return queue.size();
}

@JRubyMethod
public RubyNumeric num_waiting(ThreadContext context) {
return context.runtime.newFixnum(numWaiting);
return context.runtime.newFixnum(numWaiting.longValue());
}

@JRubyMethod(name = {"pop", "deq", "shift"})
public synchronized IRubyObject pop(ThreadContext context) {
public IRubyObject pop(ThreadContext context) {
return pop(context, true);
}

@JRubyMethod(name = {"pop", "deq", "shift"})
public synchronized IRubyObject pop(ThreadContext context, IRubyObject arg0) {
public IRubyObject pop(ThreadContext context, IRubyObject arg0) {
return pop(context, !arg0.isTrue());
}

@JRubyMethod(name = {"push", "<<", "enq"})
public synchronized IRubyObject push(ThreadContext context, IRubyObject value) {
checkShutdown(context);
entries.addLast(value);
notify();
return context.runtime.getNil();
@JRubyMethod(name = {"push", "<<", "enq"}, required = 1, optional = 1)
public IRubyObject push(ThreadContext context, final IRubyObject[] args) {
checkShutdown();
try {
context.getThread().executeTask(context, args, putTask);
return this;
} catch (InterruptedException ie) {
throw context.runtime.newThreadError("interrupted in " + getMetaClass().getName() + "#push");
}
}

@JRubyMethod
public IRubyObject marshal_dump(ThreadContext context) {
return ThreadLibrary.undumpable(context, this);
}

private synchronized IRubyObject pop(ThreadContext context, boolean should_block) {
checkShutdown(context);
if (!should_block && entries.size() == 0) {
private IRubyObject pop(ThreadContext context, boolean should_block) {
final BlockingQueue<IRubyObject> queue = getQueueSafe();
if (!should_block && queue.size() == 0) {
throw new RaiseException(context.runtime, context.runtime.getThreadError(), "queue empty", false);
}
numWaiting++;
numWaiting.incrementAndGet();
try {
while (java_length() == 0) {
try {
context.getThread().wait_timeout(this, null);
} catch (InterruptedException e) {
}
checkShutdown(context);
}
return context.getThread().executeTask(context, this, takeTask);
} catch (InterruptedException ie) {
throw context.runtime.newThreadError("interrupted in " + getMetaClass().getName() + "#pop");
} finally {
numWaiting--;
numWaiting.decrementAndGet();
}
return (IRubyObject) entries.removeFirst();
}

}
93 changes: 25 additions & 68 deletions core/src/main/java/org/jruby/ext/thread/SizedQueue.java
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
* rights and limitations under the License.
*
* Copyright (C) 2006 MenTaLguY <mental@rydia.net>
*
*
* Alternatively, the contents of this file may be used under the terms of
* either of the GNU General Public License Version 2 or later (the "GPL"),
* or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
@@ -37,24 +37,24 @@
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.Visibility;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.Visibility;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* The "SizedQueue" class from the 'thread' library.
*/
@JRubyClass(name = "SizedQueue", parent = "Queue")
public class SizedQueue extends Queue {
private int capacity;

@JRubyMethod(name = "new", rest = true, meta = true)
public static IRubyObject newInstance(ThreadContext context, IRubyObject recv, IRubyObject[] args, Block block) {
SizedQueue result = new SizedQueue(context.runtime, (RubyClass) recv);
result.callInit(context, args, block);
return result;
public SizedQueue(Ruby runtime, RubyClass type) {
super(runtime, type);
}

public SizedQueue(Ruby runtime, RubyClass type) {
public SizedQueue(Ruby runtime, RubyClass type, int size) {
super(runtime, type);
capacity = 1;

this.queue = new ArrayBlockingQueue<IRubyObject>(size, false);
}

public static void setup(Ruby runtime) {
@@ -70,78 +70,35 @@ public IRubyObject allocate(Ruby runtime, RubyClass klass) {

@JRubyMethod
@Override
public synchronized IRubyObject clear(ThreadContext context) {
public IRubyObject clear(ThreadContext context) {
super.clear(context);
notifyAll();
return context.runtime.getNil();

return this;
}

@JRubyMethod
public synchronized RubyNumeric max(ThreadContext context) {
return RubyNumeric.int2fix(context.runtime, capacity);
public RubyNumeric max(ThreadContext context) {
return RubyNumeric.int2fix(context.runtime, queue.size() + queue.remainingCapacity());
}

@JRubyMethod(name = {"max="})
@JRubyMethod(name = "max=")
public synchronized IRubyObject max_set(ThreadContext context, IRubyObject arg) {
return initialize(context, arg);
BlockingQueue<IRubyObject> oldQueue = this.queue;
initialize(context, arg);
oldQueue.drainTo(this.queue);
return arg;
}

@JRubyMethod(name = {"initialize"}, visibility = Visibility.PRIVATE)
@JRubyMethod(name = "initialize", visibility = Visibility.PRIVATE)
public synchronized IRubyObject initialize(ThreadContext context, IRubyObject arg) {
int new_capacity = RubyNumeric.fix2int(arg);

if (new_capacity <= 0) {
context.runtime.newArgumentError("queue size must be positive");
}
int difference;
if (new_capacity > capacity) {
difference = new_capacity - capacity;
} else {
difference = 0;
throw context.runtime.newArgumentError("queue size must be positive");
}
capacity = new_capacity;
if (difference > 0) {
notifyAll();
}
return context.runtime.getNil();
}

@JRubyMethod(name = {"pop", "deq", "shift"})
@Override
public synchronized IRubyObject pop(ThreadContext context) {
IRubyObject result = super.pop(context);
notifyAll();
return result;
}

@JRubyMethod(name = {"pop", "deq", "shift"})
@Override
public synchronized IRubyObject pop(ThreadContext context, IRubyObject arg0) {
IRubyObject result = super.pop(context, arg0);
notifyAll();
return result;
}
this.queue = new ArrayBlockingQueue<IRubyObject>(new_capacity, false);

@JRubyMethod(name = {"push", "<<"})
@Override
public synchronized IRubyObject push(ThreadContext context, IRubyObject value) {
checkShutdown(context);
if (java_length() >= capacity) {
numWaiting++;
try {
while (java_length() >= capacity) {
try {
context.getThread().wait_timeout(this, null);
} catch (InterruptedException e) {
}
checkShutdown(context);
}
} finally {
numWaiting--;
}
}
super.push(context, value);
notifyAll();
return context.runtime.getNil();
return this;
}

}
12 changes: 12 additions & 0 deletions core/src/main/java/org/jruby/ext/thread/ThreadLibrary.java
Original file line number Diff line number Diff line change
@@ -33,6 +33,9 @@
import java.io.IOException;

import org.jruby.Ruby;
import org.jruby.RubyObject;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.load.Library;

/**
@@ -46,4 +49,13 @@ public void load(final Ruby runtime, boolean wrap) throws IOException {
Queue.setup(runtime);
SizedQueue.setup(runtime);
}

/**
* Convenience method for objects that are undumpable. Always throws.
*
* @throws TypeError
*/
static IRubyObject undumpable(ThreadContext context, RubyObject self) {
throw context.runtime.newTypeError("can't dump " + self.type());
}
}

0 comments on commit afc5062

Please sign in to comment.