Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jruby/jruby
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 68f93a137df2
Choose a base ref
...
head repository: jruby/jruby
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: b435b4fb893b
Choose a head ref
  • 2 commits
  • 14 files changed
  • 1 contributor

Commits on Feb 11, 2016

  1. Copy the full SHA
    df75a85 View commit details
  2. Copy the full SHA
    b435b4f View commit details
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2016 Oracle and/or its affiliates. All rights reserved. This
* code is released under a tri EPL/GPL/LGPL license. You can use it,
* redistribute it and/or modify it under the terms of the:
*
* Eclipse Public License version 1.0
* GNU General Public License version 2
* GNU Lesser General Public License version 2.1
*/
package org.jruby.truffle.core.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public interface ArrayBlockingQueueLocksConditions<T> extends BlockingQueue<T> {

ReentrantLock getLock();

Condition getNotEmptyCondition();

Condition getNotFullCondition();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright (c) 2016 Oracle and/or its affiliates. All rights reserved. This
* code is released under a tri EPL/GPL/LGPL license. You can use it,
* redistribute it and/or modify it under the terms of the:
*
* Eclipse Public License version 1.0
* GNU General Public License version 2
* GNU Lesser General Public License version 2.1
*/
package org.jruby.truffle.core.queue;

import java.util.Collection;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;

public class DelegatingBlockingQueue<T> implements BlockingQueue<T> {

private final BlockingQueue<T> queue;

public DelegatingBlockingQueue(BlockingQueue<T> queue) {
this.queue = queue;
}

protected BlockingQueue<T> getQueue() {
return queue;
}

@Override
public boolean add(T t) {
return queue.add(t);
}

@Override
public boolean offer(T t) {
return queue.offer(t);
}

@Override
public void put(T t) throws InterruptedException {
queue.put(t);
}

@Override
public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
return queue.offer(t, timeout, unit);
}

@Override
public T take() throws InterruptedException {
return queue.take();
}

@Override
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}

@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}

@Override
public boolean remove(Object o) {
return queue.remove(o);
}

@Override
public boolean contains(Object o) {
return queue.contains(o);
}

@Override
public int drainTo(Collection<? super T> c) {
return queue.drainTo(c);
}

@Override
public int drainTo(Collection<? super T> c, int maxElements) {
return queue.drainTo(c, maxElements);
}

@Override
public T remove() {
return queue.remove();
}

@Override
public T poll() {
return queue.poll();
}

@Override
public T element() {
return queue.element();
}

@Override
public T peek() {
return queue.peek();
}

@Override
public int size() {
return queue.size();
}

@Override
public Stream<T> parallelStream() {
return queue.parallelStream();
}

@Override
public Stream<T> stream() {
return queue.stream();
}

@Override
public Spliterator<T> spliterator() {
return queue.spliterator();
}

@Override
public int hashCode() {
return queue.hashCode();
}

@Override
public boolean equals(Object o) {
return queue.equals(o);
}

@Override
public void clear() {
queue.clear();
}

@Override
public boolean retainAll(Collection<?> c) {
return queue.retainAll(c);
}

@Override
public boolean removeIf(Predicate<? super T> filter) {
return queue.removeIf(filter);
}

@Override
public boolean removeAll(Collection<?> c) {
return queue.removeAll(c);
}

@Override
public boolean addAll(Collection<? extends T> c) {
return queue.addAll(c);
}

@Override
public boolean containsAll(Collection<?> c) {
return queue.containsAll(c);
}

@Override
public <T1> T1[] toArray(T1[] a) {
return queue.toArray(a);
}

@Override
public Object[] toArray() {
return queue.toArray();
}

@Override
public Iterator<T> iterator() {
return queue.iterator();
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2016 Oracle and/or its affiliates. All rights reserved. This
* code is released under a tri EPL/GPL/LGPL license. You can use it,
* redistribute it and/or modify it under the terms of the:
*
* Eclipse Public License version 1.0
* GNU General Public License version 2
* GNU Lesser General Public License version 2.1
*/
package org.jruby.truffle.core.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public interface LinkedBlockingQueueLocksConditions<T> extends BlockingQueue<T> {

ReentrantLock getLock();

Condition getNotEmptyCondition();

}
Original file line number Diff line number Diff line change
@@ -24,11 +24,11 @@ DynamicObjectFactory createQueueShape(DynamicObject logicalClass,
DynamicObject metaClass);

DynamicObject createQueue(DynamicObjectFactory factory,
LinkedBlockingQueue<Object> queue);
LinkedBlockingQueueLocksConditions<Object> queue);

boolean isQueue(ObjectType objectType);
boolean isQueue(DynamicObject object);

LinkedBlockingQueue<Object> getQueue(DynamicObject object);
LinkedBlockingQueueLocksConditions<Object> getQueue(DynamicObject object);

}
23 changes: 4 additions & 19 deletions truffle/src/main/java/org/jruby/truffle/core/queue/QueueNodes.java
Original file line number Diff line number Diff line change
@@ -27,11 +27,8 @@
import org.jruby.truffle.language.objects.AllocateObjectNode;
import org.jruby.truffle.language.objects.AllocateObjectNodeGen;

import java.lang.invoke.MethodHandle;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@CoreClass(name = "Queue")
@@ -49,7 +46,7 @@ public AllocateNode(RubyContext context, SourceSection sourceSection) {

@Specialization
public DynamicObject allocate(DynamicObject rubyClass) {
return allocateNode.allocate(rubyClass, new LinkedBlockingQueue<Object>());
return allocateNode.allocate(rubyClass, getContext().getNativePlatform().createLinkedBlockingQueueLocksConditions());
}

}
@@ -248,27 +245,15 @@ public Object marshal_dump(DynamicObject self) {
@CoreMethod(names = "num_waiting")
public abstract static class NumWaitingNode extends CoreMethodArrayArgumentsNode {

private static final MethodHandle TAKE_LOCK_FIELD_GETTER = MethodHandleUtils.getPrivateGetter(LinkedBlockingQueue.class, "takeLock");
private static final MethodHandle NOT_EMPTY_CONDITION_FIELD_GETTER = MethodHandleUtils.getPrivateGetter(LinkedBlockingQueue.class, "notEmpty");

public NumWaitingNode(RubyContext context, SourceSection sourceSection) {
super(context, sourceSection);
}

@Specialization
public int num_waiting(DynamicObject self) {
final BlockingQueue<Object> queue = Layouts.QUEUE.getQueue(self);

final LinkedBlockingQueue<Object> linkedBlockingQueue = (LinkedBlockingQueue<Object>) queue;
final LinkedBlockingQueueLocksConditions<Object> queue = Layouts.QUEUE.getQueue(self);

final ReentrantLock lock;
final Condition notEmptyCondition;
try {
lock = (ReentrantLock) TAKE_LOCK_FIELD_GETTER.invokeExact(linkedBlockingQueue);
notEmptyCondition = (Condition) NOT_EMPTY_CONDITION_FIELD_GETTER.invokeExact(linkedBlockingQueue);
} catch (Throwable e) {
throw new RuntimeException(e);
}
final ReentrantLock lock = queue.getLock();

getContext().getThreadManager().runUntilResult(this, new BlockingAction<Boolean>() {
@Override
@@ -278,7 +263,7 @@ public Boolean block() throws InterruptedException {
}
});
try {
return lock.getWaitQueueLength(notEmptyCondition);
return lock.getWaitQueueLength(queue.getNotEmptyCondition());
} finally {
lock.unlock();
}
Original file line number Diff line number Diff line change
@@ -24,9 +24,9 @@ DynamicObjectFactory createSizedQueueShape(DynamicObject logicalClass,
DynamicObject metaClass);

DynamicObject createSizedQueue(DynamicObjectFactory factory,
@Nullable BlockingQueue<Object> queue);
@Nullable ArrayBlockingQueueLocksConditions<Object> queue);

BlockingQueue<Object> getQueue(DynamicObject object);
void setQueue(DynamicObject object, BlockingQueue<Object> queue);
ArrayBlockingQueueLocksConditions<Object> getQueue(DynamicObject object);
void setQueue(DynamicObject object, ArrayBlockingQueueLocksConditions<Object> queue);

}
Original file line number Diff line number Diff line change
@@ -25,10 +25,7 @@
import org.jruby.truffle.language.RubyNode;
import org.jruby.truffle.language.control.RaiseException;

import java.lang.invoke.MethodHandle;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
@@ -66,7 +63,7 @@ public DynamicObject initialize(DynamicObject self, int capacity) {
throw new RaiseException(getContext().getCoreLibrary().argumentError("queue size must be positive", this));
}

final BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<Object>(capacity);
final ArrayBlockingQueueLocksConditions<Object> blockingQueue = getContext().getNativePlatform().createArrayBlockingQueueLocksConditions(capacity);
Layouts.SIZED_QUEUE.setQueue(self, blockingQueue);
return self;
}
@@ -88,8 +85,8 @@ public int setMax(DynamicObject self, int newCapacity) {
throw new RaiseException(getContext().getCoreLibrary().argumentError("queue size must be positive", this));
}

final BlockingQueue<Object> oldQueue = Layouts.SIZED_QUEUE.getQueue(self);
final BlockingQueue<Object> newQueue = new ArrayBlockingQueue<Object>(newCapacity);
final ArrayBlockingQueueLocksConditions<Object> oldQueue = Layouts.SIZED_QUEUE.getQueue(self);
final ArrayBlockingQueueLocksConditions<Object> newQueue = getContext().getNativePlatform().createArrayBlockingQueueLocksConditions(newCapacity);

// TODO (eregon, 12 July 2015): racy and what to do if the new capacity is lower?
Object element;
@@ -282,30 +279,15 @@ public DynamicObject clear(DynamicObject self) {
@CoreMethod(names = "num_waiting")
public abstract static class NumWaitingNode extends CoreMethodArrayArgumentsNode {

private static final MethodHandle LOCK_FIELD_GETTER = MethodHandleUtils.getPrivateGetter(ArrayBlockingQueue.class, "lock");
private static final MethodHandle NOT_EMPTY_CONDITION_FIELD_GETTER = MethodHandleUtils.getPrivateGetter(ArrayBlockingQueue.class, "notEmpty");
private static final MethodHandle NOT_FULL_CONDITION_FIELD_GETTER = MethodHandleUtils.getPrivateGetter(ArrayBlockingQueue.class, "notFull");

public NumWaitingNode(RubyContext context, SourceSection sourceSection) {
super(context, sourceSection);
}

@Specialization
public int num_waiting(DynamicObject self) {
final BlockingQueue<Object> queue = Layouts.SIZED_QUEUE.getQueue(self);

final ArrayBlockingQueue<Object> arrayBlockingQueue = (ArrayBlockingQueue<Object>) queue;
final ArrayBlockingQueueLocksConditions<Object> queue = Layouts.SIZED_QUEUE.getQueue(self);

final ReentrantLock lock;
final Condition notEmptyCondition;
final Condition notFullCondition;
try {
lock = (ReentrantLock) LOCK_FIELD_GETTER.invokeExact(arrayBlockingQueue);
notEmptyCondition = (Condition) NOT_EMPTY_CONDITION_FIELD_GETTER.invokeExact(arrayBlockingQueue);
notFullCondition = (Condition) NOT_FULL_CONDITION_FIELD_GETTER.invokeExact(arrayBlockingQueue);
} catch (Throwable e) {
throw new RuntimeException(e);
}
final ReentrantLock lock = queue.getLock();

getContext().getThreadManager().runUntilResult(this, new BlockingAction<Boolean>() {
@Override
@@ -315,7 +297,7 @@ public Boolean block() throws InterruptedException {
}
});
try {
return lock.getWaitQueueLength(notEmptyCondition) + lock.getWaitQueueLength(notFullCondition);
return lock.getWaitQueueLength(queue.getNotEmptyCondition()) + lock.getWaitQueueLength(queue.getNotFullCondition());
} finally {
lock.unlock();
}
Loading