Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jruby/jruby
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: bfc43934bcf4
Choose a base ref
...
head repository: jruby/jruby
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 908512a89d23
Choose a head ref
  • 2 commits
  • 2 files changed
  • 2 contributors

Commits on Dec 19, 2015

  1. Concurrency improvements to Queue/SizedQueue.

    The implementation is now based on Doug Lea’s LinkedBlockingQueue.
    
    Concurrency wise, this is now dual locked (head/tail) linked queue,
    since Ruby std lib queues have blocking operations a completely
    lock-free queue is not practical endeavor.
    thedarkone committed Dec 19, 2015
    Copy the full SHA
    7f8c26f View commit details

Commits on Dec 28, 2015

  1. Merge pull request #3553 from thedarkone/ruby-2-3-improved-queues

    [Ruby 2.3] Concurrency improvements to Queue/SizedQueue
    headius committed Dec 28, 2015
    Copy the full SHA
    908512a View commit details
Showing with 477 additions and 169 deletions.
  1. +409 −77 core/src/main/java/org/jruby/ext/thread/Queue.java
  2. +68 −92 core/src/main/java/org/jruby/ext/thread/SizedQueue.java
486 changes: 409 additions & 77 deletions core/src/main/java/org/jruby/ext/thread/Queue.java
Original file line number Diff line number Diff line change
@@ -27,8 +27,8 @@
***** END LICENSE BLOCK *****/
package org.jruby.ext.thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@@ -47,18 +47,199 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.marshal.DataType;

/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

/**
* The "Queue" class from the 'thread' library.
*
* The implementation is a kernel of Doug Lea's LinkedBlockingQueue with
* Ruby related tweaks: closeable (blocks out producers, no effect on consumers)
* and capacity adjustable (Ruby allows sized_queue.max = 123 post construction).
* Relevant changes noted in comments below.
*
* An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
* linked nodes.
* This queue orders elements FIFO (first-in-first-out).
* The <em>head</em> of the queue is that element that has been on the
* queue the longest time.
* The <em>tail</em> of the queue is that element that has been on the
* queue the shortest time. New elements
* are inserted at the tail of the queue, and the queue retrieval
* operations obtain elements at the head of the queue.
* Linked queues typically have higher throughput than array-based queues but
* less predictable performance in most concurrent applications.
*
* <p>The optional capacity bound constructor argument serves as a
* way to prevent excessive queue expansion. The capacity, if unspecified,
* is equal to {@link Integer#MAX_VALUE}. Linked nodes are
* dynamically created upon each insertion unless this would bring the
* queue above capacity.
*
* @since 1.5
* @author Doug Lea
*/
@JRubyClass(name = "Queue")
public class Queue extends RubyObject implements DataType {
protected final ReentrantLock lock = new ReentrantLock();
protected final Condition popCond = lock.newCondition();
protected volatile List<IRubyObject> que;
protected volatile boolean closed;

/*
* A variant of the "two lock queue" algorithm. The putLock gates
* entry to put (and offer), and has an associated condition for
* waiting puts. Similarly for the takeLock. The "count" field
* that they both rely on is maintained as an atomic to avoid
* needing to get both locks in most cases. Also, to minimize need
* for puts to get takeLock and vice-versa, cascading notifies are
* used. When a put notices that it has enabled at least one takeInternal,
* it signals taker. That taker in turn signals others if more
* items have been entered since the signal. And symmetrically for
* takes signalling puts. Operations such as remove(Object) and
* iterators acquire both locks.
*
* Visibility between writers and readers is provided as follows:
*
* Whenever an element is enqueued, the putLock is acquired and
* count updated. A subsequent reader guarantees visibility to the
* enqueued Node by either acquiring the putLock (via fullyLock)
* or by acquiring the takeLock, and then reading n = count.get();
* this gives visibility to the first n items.
*
* To implement weakly consistent iterators, it appears we need to
* keep all Nodes GC-reachable from a predecessor dequeued Node.
* That would cause two problems:
* - allow a rogue Iterator to cause unbounded memory retention
* - cause cross-generational linking of old Nodes to new Nodes if
* a Node was tenured while live, which generational GCs have a
* hard time dealing with, causing repeated major collections.
* However, only non-deleted Nodes need to be reachable from
* dequeued Nodes, and reachability does not necessarily have to
* be of the kind understood by the GC. We use the trick of
* linking a Node that has just been dequeued to itself. Such a
* self-link implicitly means to advance to head.next.
*/


/**
* Linked list node class
*/
static class Node {
IRubyObject item;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node next;

Node(IRubyObject x) { item = x; }
}

protected volatile boolean closed = false;

/** The capacity bound, or Integer.MAX_VALUE if none */
// LinkedBlockingQueue diffs:
// Having this volatile allows for lock-free & non-blocking push() for sized queues.
// The capacity is also no longer final because of SizedQueue#max=.
protected volatile int capacity;

/** Current number of elements */
protected final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node head;

/**
* Tail of linked list.
* Invariant: last.next == null
*/
protected transient Node last;

/** Lock held by takeInternal, poll, etc */
protected final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
protected final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
protected final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
protected final Condition notFull = putLock.newCondition();

/**
* Signals a waiting takeInternal. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
protected void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

/**
* Signals a waiting put. Called only from take/poll.
*/
protected void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

protected void enqueue(Node node) {
last = last.next = node;
}

protected IRubyObject dequeue() {
Node h = head;
Node first = h.next;
h.next = h; // help GC
head = first;
IRubyObject x = first.item;
first.item = null;
return x;
}

/**
* Locks to prevent both puts and takes.
*/
protected void fullyLock() {
putLock.lock();
takeLock.lock();
}

/**
* Unlocks to allow both puts and takes.
*/
protected void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

protected void initializedCheck() {
// Ruby initialized check seems to be a genius way to make all methods slower :),
// here we piggy back on capacity not being allowed to equal 0.
if (capacity == 0) throw getRuntime().newTypeError(this + " not initialized");
}

public Queue(Ruby runtime, RubyClass type) {
super(runtime, type);
// LinkedBlockingQueue diff: leaving capacity setup to initialize().
last = head = new Node(null);
}

public static void setup(Ruby runtime) {
@@ -77,80 +258,170 @@ public IRubyObject allocate(Ruby runtime, RubyClass klass) {

@JRubyMethod(visibility = Visibility.PRIVATE)
public IRubyObject initialize(ThreadContext context) {
que = new ArrayList<>();

this.capacity = Integer.MAX_VALUE;
return this;
}

/**
* Atomically removes all of the elements from this queue.
* The queue will be empty after this call returns.
*/
@JRubyMethod
public IRubyObject clear(ThreadContext context) {
initializedCheck();
try {
lock.lockInterruptibly();

getQue().clear();
clearInternal();
} catch (InterruptedException ie) {
throw context.runtime.newThreadError("interrupted in " + getMetaClass().getName() + "#clear");
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
throw createInterruptedError(context, "clear");
}

return this;
}

protected void clearInternal() throws InterruptedException {
final ReentrantLock putLock = this.putLock;
final ReentrantLock takeLock = this.takeLock;
// LinkedBlockingQueue diff: lock acquisition is interruptible
putLock.lockInterruptibly();
try {
takeLock.lockInterruptibly();
try {
for (Node p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
takeLock.unlock();
}
} finally {
putLock.unlock();
}
}

@JRubyMethod(name = "empty?")
public RubyBoolean empty_p(ThreadContext context) {
return context.runtime.newBoolean(getQue().isEmpty());
initializedCheck();
return context.runtime.newBoolean(count.get() == 0);
}

@JRubyMethod(name = {"length", "size"})
public RubyNumeric length(ThreadContext context) {
return RubyNumeric.int2fix(context.runtime, getQue().size());
initializedCheck();
return RubyNumeric.int2fix(context.runtime, count.get());
}

@JRubyMethod
public RubyNumeric num_waiting(ThreadContext context) {
lock.lock();
initializedCheck();
final ReentrantLock takeLock = this.takeLock;
try {
return context.runtime.newFixnum(lock.getWaitQueueLength(popCond));
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
takeLock.lockInterruptibly();
try {
return context.runtime.newFixnum(takeLock.getWaitQueueLength(notEmpty));
} finally {
takeLock.unlock();
}
} catch (InterruptedException ie) {
throw createInterruptedError(context, "num_waiting");
}
}

@JRubyMethod(name = {"pop", "deq", "shift"})
public IRubyObject pop(ThreadContext context) {
initializedCheck();
try {
return context.getThread().executeTask(context, this, BLOCKING_POP_TASK);
} catch (InterruptedException ie) {
// FIXME: is this the right thing to do?
throw context.runtime.newThreadError("interrupted in " + getMetaClass().getName() + "#pop");
throw createInterruptedError(context, "pop");
}
}

@JRubyMethod(name = {"pop", "deq", "shift"})
public IRubyObject pop(ThreadContext context, IRubyObject arg0) {
initializedCheck();
try {
return context.getThread().executeTask(context, this, !arg0.isTrue() ? BLOCKING_POP_TASK : NONBLOCKING_POP_TASK);
} catch (InterruptedException ie) {
throw context.runtime.newThreadError("interrupted in " + getMetaClass().getName() + "#pop");
throw createInterruptedError(context, "pop");
}
}

@JRubyMethod(name = {"push", "<<", "enq"})
public IRubyObject push(ThreadContext context, IRubyObject value) {
lock.lock();
initializedCheck();
try {
putInternal(context, value);
} catch (InterruptedException ie) {
throw createInterruptedError(context, "push");
}

return this;
}

/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
protected void putInternal(ThreadContext context, IRubyObject e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/takeInternal/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
if (closed) {
boolean isClosed;
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
// LinkedBlockingQueue diff:
// * newly check the closed flag,
// * count.get() == capacity is now a ">=" check, otherwise
// when the queue (if SizedQueue) gets full (ie some threads
// will sleep/spin on the following loop) and then capacity
// is shrunk (via SizedQueue#max=) blocked producers would
// think they can go on, when in fact the queue is still full.
while (!(isClosed = closed) && count.get() >= capacity) {
notFull.await();
}

if (isClosed) {
// wake the next in line
notFull.signal();
// note that this is now a new early exit from the method,
// this doesn't matter because for the closed queues it is
// not a producer's responsibility to wake the blocked consumers
// (they wake each other, while the first in line gets notified
// by the close() caller)
raiseClosedError(context);
}

getQue().add(value);
popCond.signal();
enqueue(node);
c = count.getAndIncrement();

if (c + 1 < capacity)
notFull.signal();
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
putLock.unlock();
}

return this;
if (c == 0)
signalNotEmpty();
}

@JRubyMethod
@@ -160,37 +431,57 @@ public IRubyObject marshal_dump(ThreadContext context) {

@JRubyMethod
public IRubyObject close(ThreadContext context) {
lock.lock();
initializedCheck();
try {
doClose(context);
closeInternal();
} catch (InterruptedException ie) {
throw context.runtime.newThreadError("interrupted in " + getMetaClass().getName() + "#pop");
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
throw createInterruptedError(context, "close");
}

return this;
}

@JRubyMethod(name = "closed?")
public IRubyObject closed_p(ThreadContext context) {
return context.runtime.newBoolean(closed);
}

protected IRubyObject doClose(ThreadContext context) throws InterruptedException {
if (!closed) {
closed = true;

if (lock.hasWaiters(popCond)) {
popCond.signalAll();
protected void closeInternal() throws InterruptedException {
final ReentrantLock putLock = this.putLock;
final ReentrantLock takeLock = this.takeLock;
final AtomicInteger count = this.count;
int c;
putLock.lockInterruptibly();
try {
takeLock.lockInterruptibly();
try {
if (closed) {
return;
}

closed = true;

c = count.get();
// queue's item count can exceed capacity because of SizedQueue#max=
if (c >= capacity) {
// any blocked producers are now free to error out, wake the first
// in line
notFull.signal();
} else if (c == 0) {
// wake the first blocked consumer
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
} finally {
putLock.unlock();
}
}

return this;
@JRubyMethod(name = "closed?")
public IRubyObject closed_p(ThreadContext context) {
initializedCheck();
return context.runtime.newBoolean(closed);
}

public synchronized void shutdown() {
closed = true;
public synchronized void shutdown() throws InterruptedException {
closeInternal();
}

public boolean isShutdown() {
@@ -205,47 +496,87 @@ public synchronized void checkShutdown() {
}

protected long java_length() {
return getQue().size();
return count.get();
}

protected IRubyObject popInternal(ThreadContext context, boolean should_block) throws InterruptedException {
lock.lock();
protected IRubyObject takeInternal(ThreadContext context) throws InterruptedException {
IRubyObject x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
boolean isClosed;
boolean notFullSignalNeeded = false;

takeLock.lockInterruptibly();
try {
return should_block ? popBlocking(context) : popNonblocking(context);
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
}
}
// LinkedBlockingQueue diff: newly checking closed flag
while (!(isClosed = closed) && count.get() == 0) {
notEmpty.await();
}

protected IRubyObject popBlocking(ThreadContext context) throws InterruptedException {
while (getQue().isEmpty()) {
if (closed) {
return context.nil;
// LinkedBlockingQueue diff: dequeue conditionally (if there
// are values to deque.)
boolean canDequeue = !isClosed || count.get() != 0;
if (canDequeue) {
x = dequeue();
c = count.getAndDecrement();
} else {
x = context.nil;
}
else {
popCond.await();

// LinkedBlockingQueue diff: wake the next in line consumer
// for closed queue as well
if (c > 1 || isClosed)
notEmpty.signal();

if (canDequeue) {
// LinkedBlockingQueue diff: moved this check into locked
// section because of SizedQueue.max=, this might not be
// necessary (just be being overly cautious).
notFullSignalNeeded = c == capacity;
}
} finally {
takeLock.unlock();
}

return getQue().remove(0);
if (notFullSignalNeeded)
signalNotFull();

return x;
}

protected IRubyObject popNonblocking(ThreadContext context) throws InterruptedException {
lock.lock();
public IRubyObject pollInternal() throws InterruptedException {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
IRubyObject x = null;
boolean notFullSignalNeeded = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
if (getQue().isEmpty()) {
throw context.runtime.newThreadError("queue empty");
if (count.get() > 0) {
x = dequeue();
int c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
// LinkedBlockingQueue diff: moved this check into locked
// section because of SizedQueue.max=, this might not be
// necessary (just be being overly cautious). With that move
// `int c` declaration is also now in locked region.
notFullSignalNeeded = c == capacity;
}

return getQue().remove(0);
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
takeLock.unlock();
}
if (notFullSignalNeeded)
signalNotFull();
return x;
}


private static final RubyThread.Task<Queue, IRubyObject> BLOCKING_POP_TASK = new RubyThread.Task<Queue, IRubyObject>() {
public IRubyObject run(ThreadContext context, Queue queue) throws InterruptedException {
return queue.popInternal(context, true);
return queue.takeInternal(context);
}
public void wakeup(RubyThread thread, Queue queue) {
thread.getNativeThread().interrupt();
@@ -254,7 +585,12 @@ public void wakeup(RubyThread thread, Queue queue) {

private static final RubyThread.Task<Queue, IRubyObject> NONBLOCKING_POP_TASK = new RubyThread.Task<Queue, IRubyObject>() {
public IRubyObject run(ThreadContext context, Queue queue) throws InterruptedException {
return queue.popInternal(context, false);
IRubyObject result = queue.pollInternal();
if (result == null) {
throw context.runtime.newThreadError("queue empty");
} else {
return result;
}
}
public void wakeup(RubyThread thread, Queue queue) {
thread.getNativeThread().interrupt();
@@ -265,11 +601,7 @@ public IRubyObject raiseClosedError(ThreadContext context) {
throw context.runtime.newRaiseException(context.runtime.getClass("ClosedQueueError"), "queue closed");
}

protected List<IRubyObject> getQue() {
List<IRubyObject> que = this.que;

if (que == null) throw getRuntime().newTypeError(this + " not initialized");

return que;
protected RaiseException createInterruptedError(ThreadContext context, String methodName) {
return context.runtime.newThreadError("interrupted in " + getMetaClass().getName() + "#" + methodName);
}
}
160 changes: 68 additions & 92 deletions core/src/main/java/org/jruby/ext/thread/SizedQueue.java
Original file line number Diff line number Diff line change
@@ -40,16 +40,15 @@
import org.jruby.runtime.builtin.IRubyObject;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* The "SizedQueue" class from the 'thread' library.
*/
@JRubyClass(name = "SizedQueue", parent = "Queue")
public class SizedQueue extends Queue {
protected final Condition pushCond = lock.newCondition();
protected volatile int max;

protected SizedQueue(Ruby runtime, RubyClass type) {
super(runtime, type);
}
@@ -71,92 +70,110 @@ public IRubyObject allocate(Ruby runtime, RubyClass klass) {
cSizedQueue.defineAnnotatedMethods(SizedQueue.class);
}

@JRubyMethod
@Override
public IRubyObject clear(ThreadContext context) {
lock.lock();
try {
getQue().clear();

pushCond.signalAll();
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
}

return this;
}

@JRubyMethod
public RubyNumeric max(ThreadContext context) {
return RubyNumeric.int2fix(context.runtime, max);
return RubyNumeric.int2fix(context.runtime, capacity);
}

@JRubyMethod(name = "max=")
public synchronized IRubyObject max_set(ThreadContext context, IRubyObject arg) {
initializedCheck();
Ruby runtime = context.runtime;
int max = RubyNumeric.num2int(arg), diff = 0;

if (max <= 0) {
throw runtime.newArgumentError("queue size must be positive");
}

lock.lock();
fullyLock();
try {
if (max > this.max) {
diff = max - this.max;
if (count.get() >= capacity && max > capacity) {
diff = max - capacity;
}
this.max = max;
capacity = max;
while (diff-- > 0) {
pushCond.signal();
notFull.signal();
}
return arg;
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
fullyUnlock();
}
}

@JRubyMethod(name = "initialize", visibility = Visibility.PRIVATE)
public synchronized IRubyObject initialize(ThreadContext context, IRubyObject arg) {
que = new ArrayList<>();

capacity = Integer.MAX_VALUE; // don't trigger initializedCheck() trap in max_set
max_set(context, arg);

return this;
}

@JRubyMethod
public RubyNumeric num_waiting(ThreadContext context) {
initializedCheck();
final ReentrantLock takeLock = this.takeLock;
final ReentrantLock putLock = this.putLock;
try {
takeLock.lockInterruptibly();
try {
putLock.lockInterruptibly();
try {
return context.runtime.newFixnum(takeLock.getWaitQueueLength(notEmpty) + putLock.getWaitQueueLength(notFull));
} finally {
putLock.unlock();
}
} finally {
takeLock.unlock();
}
} catch (InterruptedException ie) {
throw createInterruptedError(context, "num_waiting");
}
}

@JRubyMethod(name = {"push", "<<", "enq"}, required = 1, optional = 1)
public IRubyObject push(ThreadContext context, final IRubyObject[] argv) {
initializedCheck();

boolean should_block = shouldBlock(context, argv);

try {
return context.getThread().executeTask(context, argv[0], should_block ? blockingPushTask : nonblockingPushTask);
} catch (InterruptedException ie) {
throw context.runtime.newThreadError("interrupted in " + getMetaClass().getName() + "#push");
throw createInterruptedError(context, "push");
}
}

protected boolean offerInternal(ThreadContext context, IRubyObject e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (closed) {
raiseClosedError(context);
}
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

private final RubyThread.Task<IRubyObject, IRubyObject> blockingPushTask = new RubyThread.Task<IRubyObject, IRubyObject>() {
@Override
public IRubyObject run(ThreadContext context, IRubyObject value) throws InterruptedException {
lock.lock();
try {
while (getQue().size() >= max) {
if (closed) {
return raiseClosedError(context);
}
else {
pushCond.await();
}
}

if (closed) {
raiseClosedError(context);
}

return push(context, value);
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
}
putInternal(context, value);
return SizedQueue.this;
}

@Override
@@ -168,20 +185,10 @@ public void wakeup(RubyThread thread, IRubyObject value) {
private final RubyThread.Task<IRubyObject, IRubyObject> nonblockingPushTask = new RubyThread.Task<IRubyObject, IRubyObject>() {
@Override
public IRubyObject run(ThreadContext context, IRubyObject value) {
lock.lock();
try {
if (getQue().size() >= max) {
throw context.runtime.newThreadError("queue full");
}

if (closed) {
raiseClosedError(context);
}

return push(context, value);
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
if (!offerInternal(context, value)) {
throw context.runtime.newThreadError("queue full");
}
return SizedQueue.this;
}

@Override
@@ -198,35 +205,4 @@ private static boolean shouldBlock(ThreadContext context, IRubyObject[] argv) {
}
return should_block;
}

@Override
protected IRubyObject doClose(ThreadContext context) throws InterruptedException {
if (!closed) {
closed = true;

if (lock.hasWaiters(popCond)) {
popCond.signalAll();
}

if (lock.hasWaiters(pushCond)) {
pushCond.signalAll();
}
}

return this;
}

@Override
protected IRubyObject popInternal(ThreadContext context, boolean should_block) throws InterruptedException {
lock.lock();
try {
IRubyObject result = should_block ? popBlocking(context) : popNonblocking(context);

pushCond.signal();

return result;
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
}
}
}