Skip to content

Commit

Permalink
[Truffle] Use a Phaser instead of a CyclicBarrier. [Try #2]
Browse files Browse the repository at this point in the history
* No need to reconstruct the barrier, it does not break on interrupt.
* No need for tricky locking scheme.
* Entirely unsynchronized leaveThread.
* Fix: main thread needs to call enterThread().
  • Loading branch information
eregon committed Feb 11, 2015
1 parent 983d3f7 commit f3756b7
Showing 1 changed file with 32 additions and 73 deletions.
Expand Up @@ -20,53 +20,39 @@
import org.jruby.truffle.runtime.core.RubyThread;
import org.jruby.truffle.runtime.util.Consumer;

import java.util.HashSet;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;

public class SafepointManager {

private final RubyContext context;

private final Set<Thread> runningThreads = new HashSet<Thread>();
private final Set<Thread> runningThreads = Collections.newSetFromMap(new ConcurrentHashMap<Thread, Boolean>());

@CompilerDirectives.CompilationFinal private Assumption assumption = Truffle.getRuntime().createAssumption();
private final ReentrantLock lock = new ReentrantLock();
private CyclicBarrier barrier;
private final Phaser phaser = new Phaser();
private Consumer<RubyThread> action;

public SafepointManager(RubyContext context) {
this.context = context;
runningThreads.add(Thread.currentThread());

enterThread();
}

public void enterThread() {
public synchronized void enterThread() {
CompilerAsserts.neverPartOfCompilation();

// Waits for lock to become available
lock.lock();
try {
runningThreads.add(Thread.currentThread());
} finally {
lock.unlock();
}
phaser.register();
runningThreads.add(Thread.currentThread());
}

public void leaveThread() {
CompilerAsserts.neverPartOfCompilation();

// Leave only when there is no more running safepoint action.
while (!lock.tryLock()) {
poll(false);
}
// SafepointManager lock acquired
try {
runningThreads.remove(Thread.currentThread());
} finally {
lock.unlock();
}
phaser.arriveAndDeregister();
runningThreads.remove(Thread.currentThread());
}

public void poll() {
Expand All @@ -77,47 +63,47 @@ private void poll(boolean holdsGlobalLock) {
try {
assumption.check();
} catch (InvalidAssumptionException e) {
assumptionInvalidated(holdsGlobalLock);
assumptionInvalidated(holdsGlobalLock, false);
}
}

private void assumptionInvalidated(boolean holdsGlobalLock) {
private void assumptionInvalidated(boolean holdsGlobalLock, boolean isDrivingThread) {
RubyThread thread = null;

if (holdsGlobalLock) {
thread = context.getThreadManager().leaveGlobalLock();
}

try {
step(thread);
step(thread, isDrivingThread);
} finally {
if (holdsGlobalLock) {
context.getThreadManager().enterGlobalLock(thread);
}
}
}

private void step(RubyThread thread) {
// clear the interrupted status which may have been set by interruptAllThreads().
Thread.interrupted();

private void step(RubyThread thread, boolean isDrivingThread) {
// wait other threads to reach their safepoint
waitOnBarrier();
phaser.arriveAndAwaitAdvance();

if (lock.isHeldByCurrentThread()) {
if (isDrivingThread) {
assumption = Truffle.getRuntime().createAssumption();
}

// wait the assumption to be renewed
waitOnBarrier();
phaser.arriveAndAwaitAdvance();

try {
if (thread != null && thread.getStatus() != Status.ABORTING) {
action.accept(thread);
}
} finally {
// wait other threads to finish their action
waitOnBarrier();
phaser.arriveAndAwaitAdvance();

// clear the interrupted status which may have been set by interruptAllThreads().
Thread.interrupted();
}
}

Expand All @@ -134,46 +120,19 @@ public void pauseAllThreadsAndExecuteFromNonRubyThread(Consumer<RubyThread> acti
}
}

private void pauseAllThreadsAndExecute(boolean holdsGlobalLock, Consumer<RubyThread> action) {
private synchronized void pauseAllThreadsAndExecute(boolean holdsGlobalLock, Consumer<RubyThread> action) {
CompilerDirectives.transferToInterpreter();

assert !lock.isHeldByCurrentThread() : "reentering pauseAllThreadsAndExecute";
lock.lock();
try {
this.action = action;

barrier = new CyclicBarrier(runningThreads.size());

/* this is a potential cause for race conditions,
* but we need to invalidate first so the interrupted threads
* see the invalidation in poll() in their catch(InterruptedException) clause
* and wait on the barrier instead of retrying their blocking action. */
assumption.invalidate();
interruptAllThreads();
this.action = action;

assumptionInvalidated(holdsGlobalLock);
} finally {
lock.unlock();
}
}
/* this is a potential cause for race conditions,
* but we need to invalidate first so the interrupted threads
* see the invalidation in poll() in their catch(InterruptedException) clause
* and wait on the barrier instead of retrying their blocking action. */
assumption.invalidate();
interruptAllThreads();

private void waitOnBarrier() {
while (true) {
try {
barrier.await();
break;
} catch (BrokenBarrierException | InterruptedException e) {
// System.err.println("Safepoint barrier interrupted for thread " + Thread.currentThread());
if (lock.isHeldByCurrentThread()) {
barrier.reset();
} else {
// wait for the lock holder to repair the barrier
while (barrier.isBroken()) {
Thread.yield();
}
}
}
}
assumptionInvalidated(holdsGlobalLock, true);
}

private void interruptAllThreads() {
Expand Down

0 comments on commit f3756b7

Please sign in to comment.