Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jruby/jruby
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 663d310ade14
Choose a base ref
...
head repository: jruby/jruby
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: e010ef7f7d0a
Choose a head ref
  • 3 commits
  • 8 files changed
  • 1 contributor

Commits on Jun 11, 2015

  1. [Truffle] Make sure all others threads are killed when the main threa…

    …d dies.
    
    * pauseAllThreadsAndExecute() can throw an exception because it calls poll()
      if for instance SIGINT arrives before.
    * All shutdown-like methods should not throw any Ruby exception.
    eregon authored and pitr-ch committed Jun 11, 2015
    Copy the full SHA
    e4e6106 View commit details
  2. Copy the full SHA
    3db37ee View commit details
  3. [Truffle] Define ConditionVariable in core.

    * More limited than the Ruby version but reuses JVM classes
      and does not need to maintain a Hash.
    eregon authored and pitr-ch committed Jun 11, 2015
    2
    Copy the full SHA
    e010ef7 View commit details
96 changes: 1 addition & 95 deletions lib/ruby/truffle/mri/thread.rb
Original file line number Diff line number Diff line change
@@ -26,101 +26,7 @@ class ThreadError < StandardError
Thread.abort_on_exception = true
end

#
# ConditionVariable objects augment class Mutex. Using condition variables,
# it is possible to suspend while in the middle of a critical section until a
# resource becomes available.
#
# Example:
#
# require 'thread'
#
# mutex = Mutex.new
# resource = ConditionVariable.new
#
# a = Thread.new {
# mutex.synchronize {
# # Thread 'a' now needs the resource
# resource.wait(mutex)
# # 'a' can now have the resource
# }
# }
#
# b = Thread.new {
# mutex.synchronize {
# # Thread 'b' has finished using the resource
# resource.signal
# }
# }
#
class ConditionVariable
#
# Creates a new ConditionVariable
#
def initialize
@waiters = {}
@waiters_mutex = Mutex.new
end

#
# Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
#
# If +timeout+ is given, this method returns after +timeout+ seconds passed,
# even if no other thread has signaled.
#
def wait(mutex, timeout=nil)
Thread.handle_interrupt(StandardError => :never) do
begin
Thread.handle_interrupt(StandardError => :on_blocking) do
@waiters_mutex.synchronize do
@waiters[Thread.current] = true
end
mutex.sleep timeout
end
ensure
@waiters_mutex.synchronize do
@waiters.delete(Thread.current)
end
end
end
self
end

#
# Wakes up the first thread in line waiting for this lock.
#
def signal
Thread.handle_interrupt(StandardError => :on_blocking) do
begin
t, _ = @waiters_mutex.synchronize { @waiters.shift }
t.run if t
rescue ThreadError
retry # t was already dead?
end
end
self
end

#
# Wakes up all threads waiting for this lock.
#
def broadcast
Thread.handle_interrupt(StandardError => :on_blocking) do
threads = nil
@waiters_mutex.synchronize do
threads = @waiters.keys
@waiters.clear
end
for t in threads
begin
t.run
rescue ThreadError
end
end
end
self
end
end
# Truffle: ConditionVariable is defined in Java.

#
# This class provides a way to synchronize communication between threads.
4 changes: 4 additions & 0 deletions truffle/src/main/java/org/jruby/truffle/nodes/RubyGuards.java
Original file line number Diff line number Diff line change
@@ -128,6 +128,10 @@ public static boolean isRubyUnboundMethod(RubyBasicObject value) {
return value.getDynamicObject().getShape().getObjectType() == UnboundMethodNodes.UNBOUND_METHOD_TYPE;
}

public static boolean isRubyMutex(RubyBasicObject value) {
return value.getDynamicObject().getShape().getObjectType() == MutexNodes.MUTEX_TYPE;
}

public static boolean isRubyBasicObject(Object value) {
return value instanceof RubyBasicObject;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright (c) 2015 Oracle and/or its affiliates. All rights reserved. This
* code is released under a tri EPL/GPL/LGPL license. You can use it,
* redistribute it and/or modify it under the terms of the:
*
* Eclipse Public License version 1.0
* GNU General Public License version 2
* GNU Lesser General Public License version 2.1
*/
package org.jruby.truffle.nodes.core;

import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.dsl.Specialization;
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.object.*;
import com.oracle.truffle.api.source.SourceSection;

import org.jruby.truffle.nodes.objects.Allocator;
import org.jruby.truffle.runtime.NotProvided;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.control.RaiseException;
import org.jruby.truffle.runtime.core.RubyBasicObject;
import org.jruby.truffle.runtime.core.RubyClass;
import org.jruby.truffle.runtime.object.BasicObjectType;
import org.jruby.truffle.runtime.subsystems.ThreadManager.BlockingActionWithoutGlobalLock;

import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@CoreClass(name = "ConditionVariable")
public abstract class ConditionVariableNodes {

private static class ConditionVariableType extends BasicObjectType {
}

public static final ConditionVariableType CONDITION_VARIABLE_TYPE = new ConditionVariableType();

private static final HiddenKey ASSOCIATED_MUTEX_IDENTIFIER = new HiddenKey("associated_mutex");
private static final Property ASSOCIATED_MUTEX_PROPERTY;
private static final HiddenKey CONDITION_IDENTIFIER = new HiddenKey("condition");
private static final Property CONDITION_PROPERTY;
private static final DynamicObjectFactory CONDITION_VARIABLE_FACTORY;

static {
Shape.Allocator allocator = RubyBasicObject.LAYOUT.createAllocator();
ASSOCIATED_MUTEX_PROPERTY = Property.create(ASSOCIATED_MUTEX_IDENTIFIER,
allocator.locationForType(AtomicReference.class, EnumSet.of(LocationModifier.Final, LocationModifier.NonNull)), 0);
CONDITION_PROPERTY = Property.create(CONDITION_IDENTIFIER,
allocator.locationForType(AtomicReference.class, EnumSet.of(LocationModifier.Final, LocationModifier.NonNull)), 0);
Shape shape = RubyBasicObject.LAYOUT.createShape(CONDITION_VARIABLE_TYPE)
.addProperty(ASSOCIATED_MUTEX_PROPERTY)
.addProperty(CONDITION_PROPERTY);
CONDITION_VARIABLE_FACTORY = shape.createFactory();
}

public static class ConditionVariableAllocator implements Allocator {
@Override
public RubyBasicObject allocate(RubyContext context, RubyClass rubyClass, Node currentNode) {
return new RubyBasicObject(rubyClass, CONDITION_VARIABLE_FACTORY.newInstance(new AtomicReference<RubyBasicObject>(), new AtomicReference<Condition>()));
}
}

@SuppressWarnings("unchecked")
protected static AtomicReference<RubyBasicObject> getAssociatedMutex(RubyBasicObject mutex) {
assert mutex.getDynamicObject().getShape().hasProperty(ASSOCIATED_MUTEX_IDENTIFIER);
return (AtomicReference<RubyBasicObject>) ASSOCIATED_MUTEX_PROPERTY.get(mutex.getDynamicObject(), true);
}

@SuppressWarnings("unchecked")
protected static AtomicReference<Condition> getCondition(RubyBasicObject conditionVariable) {
assert conditionVariable.getDynamicObject().getShape().hasProperty(CONDITION_IDENTIFIER);
return (AtomicReference<Condition>) CONDITION_PROPERTY.get(conditionVariable.getDynamicObject(), true);
}

@CoreMethod(names = "broadcast")
public abstract static class BroadcastNode extends UnaryCoreMethodNode {

public BroadcastNode(RubyContext context, SourceSection sourceSection) {
super(context, sourceSection);
}

@Specialization
public RubyBasicObject broadcast(RubyBasicObject conditionVariable) {
final Condition condition = getCondition(conditionVariable).get();
final RubyBasicObject associatedMutex = getAssociatedMutex(conditionVariable).get();

if (condition == null) {
return conditionVariable;
}

if (!MutexNodes.getLock(associatedMutex).isHeldByCurrentThread()) {
CompilerDirectives.transferToInterpreter();
throw new RaiseException(getContext().getCoreLibrary().threadError("Called ConditionVariable#broadcast without holding associated Mutex", this));
}

condition.signalAll();

return conditionVariable;
}

}

@CoreMethod(names = "signal")
public abstract static class SignalNode extends UnaryCoreMethodNode {

public SignalNode(RubyContext context, SourceSection sourceSection) {
super(context, sourceSection);
}

@Specialization
public RubyBasicObject signal(RubyBasicObject conditionVariable) {
final Condition condition = getCondition(conditionVariable).get();
final RubyBasicObject associatedMutex = getAssociatedMutex(conditionVariable).get();

if (condition == null) {
return conditionVariable;
}

if (!MutexNodes.getLock(associatedMutex).isHeldByCurrentThread()) {
CompilerDirectives.transferToInterpreter();
throw new RaiseException(getContext().getCoreLibrary().threadError("Called ConditionVariable#signal without holding associated Mutex", this));
}

condition.signal();

return conditionVariable;
}

}

@CoreMethod(names = "wait", required = 1, optional = 1)
public abstract static class WaitNode extends CoreMethodArrayArgumentsNode {

public WaitNode(RubyContext context, SourceSection sourceSection) {
super(context, sourceSection);
}

private static interface WaitAction {
void wait(Condition condition) throws InterruptedException;
}

@Specialization(guards = "isRubyMutex(mutex)")
RubyBasicObject wait(RubyBasicObject conditionVariable, RubyBasicObject mutex, NotProvided timeout) {
return waitOn(conditionVariable, mutex, new WaitAction() {
@Override
public void wait(Condition condition) throws InterruptedException {
condition.await();
}
});
}

@Specialization(guards = "isRubyMutex(mutex)")
RubyBasicObject wait(RubyBasicObject conditionVariable, RubyBasicObject mutex, final int timeout) {
return wait(conditionVariable, mutex, (double) timeout);
}

@Specialization(guards = "isRubyMutex(mutex)")
RubyBasicObject wait(RubyBasicObject conditionVariable, RubyBasicObject mutex, final double timeout) {
final long start = System.currentTimeMillis();
final long timeoutInMillis = ((long) (timeout * 1000.0));

return waitOn(conditionVariable, mutex, new WaitAction() {
@Override
public void wait(Condition condition) throws InterruptedException {
final long now = System.currentTimeMillis();
final long waited = now - start;
final long waitMillis = Math.max(0, timeoutInMillis - waited);
condition.awaitNanos(waitMillis * 1_000_000);
}
});
}

private RubyBasicObject waitOn(RubyBasicObject conditionVariable, RubyBasicObject mutex, final WaitAction waitAction) {
final AtomicReference<RubyBasicObject> associatedMutexReference = getAssociatedMutex(conditionVariable);
final AtomicReference<Condition> conditionReference = getCondition(conditionVariable);

final Condition condition;
if (associatedMutexReference.compareAndSet(null, mutex)) {
final ReentrantLock lock = MutexNodes.getLock(mutex);
condition = lock.newCondition();
conditionReference.set(condition);
} else if (associatedMutexReference.get() == mutex) {
condition = conditionReference.get();
} else {
CompilerDirectives.transferToInterpreter();
throw new RaiseException(getContext().getCoreLibrary().threadError("Attempt to associate a ConditionVariable which already has a Mutex", this));
}

getContext().getThreadManager().runUntilResult(new BlockingActionWithoutGlobalLock<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
waitAction.wait(condition);
return SUCCESS;
}
});

return conditionVariable;
}

}

}
Original file line number Diff line number Diff line change
@@ -14,12 +14,14 @@
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.object.*;
import com.oracle.truffle.api.source.SourceSection;

import org.jruby.truffle.nodes.objects.Allocator;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.control.RaiseException;
import org.jruby.truffle.runtime.core.RubyBasicObject;
import org.jruby.truffle.runtime.core.RubyClass;
import org.jruby.truffle.runtime.core.RubyThread;
import org.jruby.truffle.runtime.object.BasicObjectType;
import org.jruby.truffle.runtime.subsystems.ThreadManager.BlockingActionWithoutGlobalLock;

import java.util.EnumSet;
@@ -28,14 +30,19 @@
@CoreClass(name = "Mutex")
public abstract class MutexNodes {

private static class MutexType extends BasicObjectType {
}

public static final MutexType MUTEX_TYPE = new MutexType();

private static final HiddenKey LOCK_IDENTIFIER = new HiddenKey("lock");
private static final Property LOCK_PROPERTY;
private static final DynamicObjectFactory MUTEX_FACTORY;

static {
Shape.Allocator allocator = RubyBasicObject.LAYOUT.createAllocator();
LOCK_PROPERTY = Property.create(LOCK_IDENTIFIER, allocator.locationForType(ReentrantLock.class, EnumSet.of(LocationModifier.Final, LocationModifier.NonNull)), 0);
Shape shape = RubyBasicObject.EMPTY_SHAPE.addProperty(LOCK_PROPERTY);
Shape shape = RubyBasicObject.LAYOUT.createShape(MUTEX_TYPE).addProperty(LOCK_PROPERTY);
MUTEX_FACTORY = shape.createFactory();
}

Original file line number Diff line number Diff line change
@@ -642,15 +642,7 @@ public RubyNode wrap(RubyNode node) {

@Override
public void shutdown() {
try {
innerShutdown(true);
} catch (RaiseException e) {
final RubyException rubyException = e.getRubyException();

for (String line : Backtrace.DISPLAY_FORMATTER.format(e.getRubyException().getContext(), rubyException, rubyException.getBacktrace())) {
System.err.println(line);
}
}
innerShutdown(true);
}

public PrintStream getDebugStandardOut() {
Original file line number Diff line number Diff line change
@@ -16,7 +16,9 @@
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.source.Source;
import com.oracle.truffle.api.source.SourceSection;

import jnr.constants.platform.Errno;

import org.jcodings.Encoding;
import org.jcodings.EncodingDB;
import org.jcodings.transcode.EConvFlags;
@@ -303,6 +305,7 @@ public CoreLibrary(RubyContext context) {

arrayClass = defineClass("Array", new ArrayNodes.ArrayAllocator());
bindingClass = defineClass("Binding", new RubyBinding.BindingAllocator());
defineClass("ConditionVariable", new ConditionVariableNodes.ConditionVariableAllocator());
dirClass = defineClass("Dir");
encodingClass = defineClass("Encoding", NO_ALLOCATOR);
falseClass = defineClass("FalseClass", NO_ALLOCATOR);
@@ -421,6 +424,7 @@ private void addCoreMethods() {
coreMethodNodeManager.addCoreMethodNodes(BindingNodesFactory.getFactories());
coreMethodNodeManager.addCoreMethodNodes(BignumNodesFactory.getFactories());
coreMethodNodeManager.addCoreMethodNodes(ClassNodesFactory.getFactories());
coreMethodNodeManager.addCoreMethodNodes(ConditionVariableNodesFactory.getFactories());
coreMethodNodeManager.addCoreMethodNodes(ExceptionNodesFactory.getFactories());
coreMethodNodeManager.addCoreMethodNodes(FalseClassNodesFactory.getFactories());
coreMethodNodeManager.addCoreMethodNodes(FiberNodesFactory.getFactories());
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
import com.oracle.truffle.api.Truffle;
import com.oracle.truffle.api.nodes.InvalidAssumptionException;
import com.oracle.truffle.api.nodes.Node;

import org.jruby.RubyThread.Status;
import org.jruby.truffle.nodes.RubyNode;
import org.jruby.truffle.runtime.RubyContext;
@@ -131,13 +132,8 @@ public void pauseAllThreadsAndExecute(Node currentNode, boolean deferred, Safepo
RubyThread thread = context.getThreadManager().getCurrentThread();

// Need to lock interruptibly since we are in the registered threads.
while (true) {
try {
lock.lockInterruptibly();
break;
} catch (InterruptedException e) {
poll(currentNode);
}
while (!lock.tryLock()) {
poll(currentNode);
}

try {
Original file line number Diff line number Diff line change
@@ -11,8 +11,12 @@

import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
import com.oracle.truffle.api.nodes.Node;

import org.jruby.RubyThread.Status;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.backtrace.Backtrace;
import org.jruby.truffle.runtime.control.RaiseException;
import org.jruby.truffle.runtime.core.RubyException;
import org.jruby.truffle.runtime.core.RubyThread;

import java.util.Collections;
@@ -147,19 +151,35 @@ public synchronized void unregisterThread(RubyThread thread) {
}

public void shutdown() {
// kill all threads except main
context.getSafepointManager().pauseAllThreadsAndExecute(null, false, new SafepointAction() {
@Override
public synchronized void run(RubyThread thread, Node currentNode) {
if (thread != rootThread && Thread.currentThread() == thread.getRootFiberJavaThread()) {
thread.shutdown();
try {
killOtherThreads();
} finally {
rootThread.getFiberManager().shutdown();
rootThread.getRootFiber().cleanup();
rootThread.cleanup();
}
}

private void killOtherThreads() {
while (true) {
try {
context.getSafepointManager().pauseAllThreadsAndExecute(null, false, new SafepointAction() {
@Override
public synchronized void run(RubyThread thread, Node currentNode) {
if (thread != rootThread && Thread.currentThread() == thread.getRootFiberJavaThread()) {
thread.shutdown();
}
}
});
break; // Successfully executed the safepoint and sent the exceptions.
} catch (RaiseException e) {
final RubyException rubyException = e.getRubyException();

for (String line : Backtrace.DISPLAY_FORMATTER.format(e.getRubyException().getContext(), rubyException, rubyException.getBacktrace())) {
System.err.println(line);
}
}
});

rootThread.getFiberManager().shutdown();
rootThread.getRootFiber().cleanup();
rootThread.cleanup();
}
}

}