Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jruby/jruby
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 62c5247fe25a
Choose a base ref
...
head repository: jruby/jruby
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 973a03f35802
Choose a head ref
  • 2 commits
  • 4 files changed
  • 1 contributor

Commits on Jan 28, 2015

  1. Copy the full SHA
    1a7f688 View commit details
  2. [Truffle] Refactor and fix SafepointManager.

    * New design so assumptionInvalidated() drives all barrier synchronizations.
    * The global lock is now better separated from barrier synchronization.
    * Support interrupts of the barrier by re-creating it (should be relatively rare).
    eregon committed Jan 28, 2015
    Copy the full SHA
    973a03f View commit details
14 changes: 11 additions & 3 deletions core/src/main/java/org/jruby/Ruby.java
Original file line number Diff line number Diff line change
@@ -904,14 +904,22 @@ public synchronized TruffleBridge getTruffleBridge() {
* explanation when the classes are not found.
*/

final Class<?> clazz;

try {
clazz = getJRubyClassLoader().loadClass("org.jruby.truffle.TruffleBridgeImpl");
} catch (Exception e) {
throw new UnsupportedOperationException("Support for Truffle has been removed from this distribution", e);
}

try {
Class<?> clazz = getJRubyClassLoader().loadClass("org.jruby.truffle.TruffleBridgeImpl");
Constructor<?> con = clazz.getConstructor(Ruby.class);
truffleBridge = (TruffleBridge) con.newInstance(this);
truffleBridge.init();
} catch (Exception e) {
throw new UnsupportedOperationException("Support for Truffle has been removed from this distribution", e);
throw new UnsupportedOperationException("Error while calling the constructor of Truffle Bridge", e);
}

truffleBridge.init();
}

return truffleBridge;
Original file line number Diff line number Diff line change
@@ -96,7 +96,7 @@ public void run() {
} finally {
status = Status.ABORTING;
context.getThreadManager().leaveGlobalLock();
context.getSafepointManager().leaveThread(finalThread);
context.getSafepointManager().leaveThread();
finalThread.manager.unregisterThread(finalThread);

finalThread.finished.countDown();
Original file line number Diff line number Diff line change
@@ -22,15 +22,14 @@

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SafepointManager {

private final RubyContext context;

@CompilerDirectives.CompilationFinal private Assumption assumption = Truffle.getRuntime().createAssumption();
private final Lock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantLock();
private CyclicBarrier barrier;
private int liveThreads = 1;
private Consumer<RubyThread> action;
@@ -51,12 +50,12 @@ public void enterThread() {
}
}

public void leaveThread(RubyThread thread) {
public void leaveThread() {
CompilerAsserts.neverPartOfCompilation();

// Leave only when there is no more running safepoint action.
while (!lock.tryLock()) {
pollWithoutGlobalLock(thread);
poll(false);
}
// SafepointManager lock acquired
try {
@@ -66,123 +65,108 @@ public void leaveThread(RubyThread thread) {
}
}

private void pollWithoutGlobalLock(RubyThread thread) {
try {
assumption.check();
} catch (InvalidAssumptionException e) {
assumptionInvalidated(thread);
}
public void poll() {
poll(true);
}

public void poll() {
private void poll(boolean holdsGlobalLock) {
try {
assumption.check();
} catch (InvalidAssumptionException e) {
assumptionInvalidated(context.getThreadManager().getCurrentThread());
assumptionInvalidated(holdsGlobalLock);
}
}

private void assumptionInvalidated(RubyThread thread) {
// wait other threads to reach their safepoint
waitOnBarrier();

// wait the assumption to be renewed
waitOnBarrier();

try {
if (thread.getStatus() != Status.ABORTING) {
action.accept(thread);
}
} finally {
// wait other threads to finish their action
waitOnBarrier();
private void assumptionInvalidated(boolean holdsGlobalLock) {
RubyThread thread = null;
if (holdsGlobalLock) {
thread = context.getThreadManager().leaveGlobalLock();
}
}

public void pauseAllThreadsAndExecute(final Consumer<RubyThread> action) {
CompilerDirectives.transferToInterpreter();

lock.lock();
try {
SafepointManager.this.action = action;

barrier = new CyclicBarrier(liveThreads);
// clear the interrupted status which may have been set by interruptAllThreads().
Thread.interrupted();

assumption.invalidate();

context.getThreadManager().interruptAllThreads();

// wait for all threads to reach their safepoint
// wait other threads to reach their safepoint
waitOnBarrier();

assumption = Truffle.getRuntime().createAssumption();
if (lock.isHeldByCurrentThread()) {
assumption = Truffle.getRuntime().createAssumption();
}

// wait for all threads to see the new assumption
// wait the assumption to be renewed
waitOnBarrier();

try {
action.accept(context.getThreadManager().getCurrentThread());
if (holdsGlobalLock && thread.getStatus() != Status.ABORTING) {
runAction(thread);
}
} finally {
// wait for all threads to execute the action
// wait other threads to finish their action
waitOnBarrier();
}
} finally {
lock.unlock();
if (holdsGlobalLock) {
context.getThreadManager().enterGlobalLock(thread);
}
}
}

private void runAction(RubyThread thread) {
context.getThreadManager().enterGlobalLock(thread);
try {
action.accept(thread);
} finally {
context.getThreadManager().leaveGlobalLock();
}
}

public void pauseAllThreadsAndExecuteFromNonRubyThread(final Consumer<RubyThread> action) {
public void pauseAllThreadsAndExecute(Consumer<RubyThread> action) {
pauseAllThreadsAndExecute(true, true, action);
}

public void pauseAllThreadsAndExecuteFromNonRubyThread(Consumer<RubyThread> action) {
pauseAllThreadsAndExecute(false, false, action);
}

private void pauseAllThreadsAndExecute(boolean isRubyThread, boolean holdsGlobalLock, Consumer<RubyThread> action) {
CompilerDirectives.transferToInterpreter();

assert !lock.isHeldByCurrentThread() : "reentering pauseAllThreadsAndExecute";
lock.lock();
try {
// The current (Java) thread is not a Ruby thread, so we do not touch the global lock.

SafepointManager.this.action = action;
this.action = action;

barrier = new CyclicBarrier(liveThreads + 1);
barrier = new CyclicBarrier(isRubyThread ? liveThreads : liveThreads + 1);

/* this is a potential cause for race conditions,
* but we need to invalidate first so the interrupted threads
* see the invalidation in poll() in their catch(InterruptedException) clause
* and wait on the barrier instead of retrying their blocking action. */
assumption.invalidate();

context.getThreadManager().interruptAllThreads();

// wait for all threads to reach their safepoint
waitOnBarrierNoGlobalLock();

assumption = Truffle.getRuntime().createAssumption();

// wait for all threads to see the new assumption
waitOnBarrierNoGlobalLock();

// wait for all Ruby threads to execute the action
waitOnBarrierNoGlobalLock();
assumptionInvalidated(isRubyThread && holdsGlobalLock);
} finally {
lock.unlock();
}
}

private void waitOnBarrier() {
final RubyThread runningThread = context.getThreadManager().leaveGlobalLock();

// clear the interrupted status which may have been set by interruptAllThreads().
Thread.interrupted();

try {
waitOnBarrierNoGlobalLock();
} finally {
context.getThreadManager().enterGlobalLock(runningThread);
}
}

private void waitOnBarrierNoGlobalLock() {
while (true) {
try {
barrier.await();
break;
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException("Should not be interrupted while waiting on the safepoint barrier", e);
} catch (BrokenBarrierException | InterruptedException e) {
// System.err.println("Safepoint barrier interrupted for thread " + Thread.currentThread());
if (lock.isHeldByCurrentThread()) {
barrier.reset();
} else {
// wait for the lock holder to repair the barrier
while (barrier.isBroken()) {
Thread.yield();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -124,6 +124,7 @@ public <T> T runOnce(BlockingActionWithoutGlobalLock<T> action) {
}

public RubyThread getCurrentThread() {
assert globalLock.isHeldByCurrentThread() : "getCurrentThread() is only correct if holding the global lock";
return currentThread;
}