Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jruby/jruby
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 983d3f74b755
Choose a base ref
...
head repository: jruby/jruby
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: f3756b7f6950
Choose a head ref
  • 1 commit
  • 1 file changed
  • 1 contributor

Commits on Feb 11, 2015

  1. [Truffle] Use a Phaser instead of a CyclicBarrier. [Try #2]

    * No need to reconstruct the barrier, it does not break on interrupt.
    * No need for tricky locking scheme.
    * Entirely unsynchronized leaveThread.
    * Fix: main thread needs to call enterThread().
    eregon committed Feb 11, 2015
    Copy the full SHA
    f3756b7 View commit details
Showing with 32 additions and 73 deletions.
  1. +32 −73 truffle/src/main/java/org/jruby/truffle/runtime/subsystems/SafepointManager.java
Original file line number Diff line number Diff line change
@@ -20,53 +20,39 @@
import org.jruby.truffle.runtime.core.RubyThread;
import org.jruby.truffle.runtime.util.Consumer;

import java.util.HashSet;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;

public class SafepointManager {

private final RubyContext context;

private final Set<Thread> runningThreads = new HashSet<Thread>();
private final Set<Thread> runningThreads = Collections.newSetFromMap(new ConcurrentHashMap<Thread, Boolean>());

@CompilerDirectives.CompilationFinal private Assumption assumption = Truffle.getRuntime().createAssumption();
private final ReentrantLock lock = new ReentrantLock();
private CyclicBarrier barrier;
private final Phaser phaser = new Phaser();
private Consumer<RubyThread> action;

public SafepointManager(RubyContext context) {
this.context = context;
runningThreads.add(Thread.currentThread());

enterThread();
}

public void enterThread() {
public synchronized void enterThread() {
CompilerAsserts.neverPartOfCompilation();

// Waits for lock to become available
lock.lock();
try {
runningThreads.add(Thread.currentThread());
} finally {
lock.unlock();
}
phaser.register();
runningThreads.add(Thread.currentThread());
}

public void leaveThread() {
CompilerAsserts.neverPartOfCompilation();

// Leave only when there is no more running safepoint action.
while (!lock.tryLock()) {
poll(false);
}
// SafepointManager lock acquired
try {
runningThreads.remove(Thread.currentThread());
} finally {
lock.unlock();
}
phaser.arriveAndDeregister();
runningThreads.remove(Thread.currentThread());
}

public void poll() {
@@ -77,47 +63,47 @@ private void poll(boolean holdsGlobalLock) {
try {
assumption.check();
} catch (InvalidAssumptionException e) {
assumptionInvalidated(holdsGlobalLock);
assumptionInvalidated(holdsGlobalLock, false);
}
}

private void assumptionInvalidated(boolean holdsGlobalLock) {
private void assumptionInvalidated(boolean holdsGlobalLock, boolean isDrivingThread) {
RubyThread thread = null;

if (holdsGlobalLock) {
thread = context.getThreadManager().leaveGlobalLock();
}

try {
step(thread);
step(thread, isDrivingThread);
} finally {
if (holdsGlobalLock) {
context.getThreadManager().enterGlobalLock(thread);
}
}
}

private void step(RubyThread thread) {
// clear the interrupted status which may have been set by interruptAllThreads().
Thread.interrupted();

private void step(RubyThread thread, boolean isDrivingThread) {
// wait other threads to reach their safepoint
waitOnBarrier();
phaser.arriveAndAwaitAdvance();

if (lock.isHeldByCurrentThread()) {
if (isDrivingThread) {
assumption = Truffle.getRuntime().createAssumption();
}

// wait the assumption to be renewed
waitOnBarrier();
phaser.arriveAndAwaitAdvance();

try {
if (thread != null && thread.getStatus() != Status.ABORTING) {
action.accept(thread);
}
} finally {
// wait other threads to finish their action
waitOnBarrier();
phaser.arriveAndAwaitAdvance();

// clear the interrupted status which may have been set by interruptAllThreads().
Thread.interrupted();
}
}

@@ -134,46 +120,19 @@ public void pauseAllThreadsAndExecuteFromNonRubyThread(Consumer<RubyThread> acti
}
}

private void pauseAllThreadsAndExecute(boolean holdsGlobalLock, Consumer<RubyThread> action) {
private synchronized void pauseAllThreadsAndExecute(boolean holdsGlobalLock, Consumer<RubyThread> action) {
CompilerDirectives.transferToInterpreter();

assert !lock.isHeldByCurrentThread() : "reentering pauseAllThreadsAndExecute";
lock.lock();
try {
this.action = action;

barrier = new CyclicBarrier(runningThreads.size());

/* this is a potential cause for race conditions,
* but we need to invalidate first so the interrupted threads
* see the invalidation in poll() in their catch(InterruptedException) clause
* and wait on the barrier instead of retrying their blocking action. */
assumption.invalidate();
interruptAllThreads();
this.action = action;

assumptionInvalidated(holdsGlobalLock);
} finally {
lock.unlock();
}
}
/* this is a potential cause for race conditions,
* but we need to invalidate first so the interrupted threads
* see the invalidation in poll() in their catch(InterruptedException) clause
* and wait on the barrier instead of retrying their blocking action. */
assumption.invalidate();
interruptAllThreads();

private void waitOnBarrier() {
while (true) {
try {
barrier.await();
break;
} catch (BrokenBarrierException | InterruptedException e) {
// System.err.println("Safepoint barrier interrupted for thread " + Thread.currentThread());
if (lock.isHeldByCurrentThread()) {
barrier.reset();
} else {
// wait for the lock holder to repair the barrier
while (barrier.isBroken()) {
Thread.yield();
}
}
}
}
assumptionInvalidated(holdsGlobalLock, true);
}

private void interruptAllThreads() {