Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jruby/jruby
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: a54f0997c0b0
Choose a base ref
...
head repository: jruby/jruby
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 5478943da163
Choose a head ref
  • 3 commits
  • 9 files changed
  • 1 contributor

Commits on Apr 22, 2015

  1. Copy the full SHA
    2b230b1 View commit details
  2. Copy the full SHA
    1409c2e View commit details
  3. [Truffle] Introdude SafepointManager.pauseThreadAndExecuteLater().

    * Make the deferred action just a boolean parameter.
    * Affect the right thread by chosing which Fiber to affect.
    eregon committed Apr 22, 2015
    Copy the full SHA
    5478943 View commit details
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
import com.oracle.truffle.api.frame.VirtualFrame;
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.source.SourceSection;

import org.jruby.RubyThread.Status;
import org.jruby.truffle.nodes.dispatch.CallDispatchHeadNode;
import org.jruby.truffle.nodes.dispatch.DispatchHeadNode;
@@ -24,6 +25,7 @@
import org.jruby.truffle.runtime.control.ThreadExitException;
import org.jruby.truffle.runtime.core.*;
import org.jruby.truffle.runtime.subsystems.SafepointAction;
import org.jruby.truffle.runtime.subsystems.SafepointManager;
import org.jruby.truffle.runtime.util.Consumer;

@CoreClass(name = "Thread")
@@ -99,15 +101,12 @@ public KillNode(KillNode prev) {

@Specialization
public RubyThread kill(final RubyThread thread) {
getContext().getSafepointManager().pauseAllThreadsAndExecute(this, new SafepointAction() {
getContext().getSafepointManager().pauseThreadAndExecuteLater(thread.getRootFiberJavaThread(), this, new SafepointAction() {
@Override
public void run(RubyThread currentThread, Node currentNode) {
if (currentThread == thread && thread.isCurrentJavaThreadRootFiber()) {
thread.shutdown();
}
currentThread.shutdown();
}
});

return thread;
}

@@ -260,15 +259,11 @@ public RubyNilClass raise(VirtualFrame frame, final RubyThread thread, RubyClass

final RaiseException exceptionWrapper = new RaiseException((RubyException) exception);

getContext().getSafepointManager().pauseAllThreadsAndExecute(this, new SafepointAction() {

getContext().getSafepointManager().pauseThreadAndExecuteLater(thread.getCurrentFiberJavaThread(), this, new SafepointAction() {
@Override
public void run(RubyThread currentThread, Node currentNode) {
if (currentThread == thread && thread.isCurrentJavaThreadCurrentFiber()) {
throw exceptionWrapper;
}
throw exceptionWrapper;
}

});

return nil();
Original file line number Diff line number Diff line change
@@ -96,7 +96,7 @@ public static class FiberExitException extends ControlFlowException {
private RubyFiber lastResumedByFiber = null;
private boolean alive = true;

protected volatile Thread thread;
private volatile Thread thread;

public RubyFiber(RubyThread parent, RubyClass rubyClass, String name) {
this(parent, parent.getFiberManager(), parent.getThreadManager(), rubyClass, name, false);
@@ -178,6 +178,10 @@ public void cleanup() {
thread = null;
}

public Thread getJavaThread() {
return thread;
}

public RubyThread getRubyThread() {
return rubyThread;
}
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
import org.jruby.truffle.runtime.control.ReturnException;
import org.jruby.truffle.runtime.control.ThreadExitException;
import org.jruby.truffle.runtime.subsystems.FiberManager;
import org.jruby.truffle.runtime.subsystems.SafepointAction;
import org.jruby.truffle.runtime.subsystems.ThreadManager;
import org.jruby.truffle.runtime.subsystems.ObjectSpaceManager.ObjectGraphVisitor;
import org.jruby.truffle.runtime.subsystems.ThreadManager.BlockingActionWithoutGlobalLock;
@@ -30,7 +31,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

@@ -61,8 +64,6 @@ public class RubyThread extends RubyBasicObject {

private final List<Lock> ownedLocks = new ArrayList<>(); // Always accessed by the same underlying Java thread.

private final List<Runnable> deferredSafepointActions = new ArrayList<>();

public RubyThread(RubyClass rubyClass, ThreadManager manager) {
super(rubyClass);
this.manager = manager;
@@ -131,12 +132,12 @@ public void shutdown() {
exit();
}

public boolean isCurrentJavaThreadRootFiber() {
return Thread.currentThread() == thread;
public Thread getRootFiberJavaThread() {
return thread;
}

public boolean isCurrentJavaThreadCurrentFiber() {
return Thread.currentThread() == fiberManager.getCurrentFiber().thread;
public Thread getCurrentFiberJavaThread() {
return fiberManager.getCurrentFiber().getJavaThread();
}

public void join() {
@@ -236,10 +237,6 @@ public RubyFiber getRootFiber() {
return fiberManager.getRootFiber();
}

public List<Runnable> getDeferredSafepointActions() {
return deferredSafepointActions;
}

public static class ThreadAllocator implements Allocator {

@Override
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
package org.jruby.truffle.runtime.signal;

import com.oracle.truffle.api.nodes.Node;

import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.core.RubyProc;
import org.jruby.truffle.runtime.core.RubyThread;
@@ -32,22 +33,12 @@ public ProcSignalHandler(RubyContext context, RubyProc proc) {

@Override
public void handle(Signal signal) {
// TODO: just make this a normal Ruby thread once we don't have the global lock anymore
context.getSafepointManager().pauseAllThreadsAndExecuteFromNonRubyThread(null, new SafepointAction() {

Thread mainThread = context.getThreadManager().getRootThread().getCurrentFiberJavaThread();
context.getSafepointManager().pauseMainThreadAndExecuteLaterFromNonRubyThread(mainThread, new SafepointAction() {
@Override
public void run(RubyThread thread, Node currentNode) {
if (thread == context.getThreadManager().getRootThread() && thread.isCurrentJavaThreadCurrentFiber()) {
context.getThreadManager().enterGlobalLock(thread);
try {
// assumes this proc does not re-enter the SafepointManager.
proc.rootCall();
} finally {
context.getThreadManager().leaveGlobalLock();
}
}
proc.rootCall();
}

});
}

Original file line number Diff line number Diff line change
@@ -21,13 +21,11 @@
*/
public class FiberManager {

private final RubyThread rubyThread;
private final RubyFiber rootFiber;
private RubyFiber currentFiber;
private final Set<RubyFiber> runningFibers = Collections.newSetFromMap(new ConcurrentHashMap<RubyFiber, Boolean>());

public FiberManager(RubyThread rubyThread, ThreadManager threadManager) {
this.rubyThread = rubyThread;
this.rootFiber = RubyFiber.newRootFiber(rubyThread, this, threadManager);
this.currentFiber = rootFiber;
}
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ public void handle(HttpExchange httpExchange) {
try {
final StringBuilder builder = new StringBuilder();

context.getSafepointManager().pauseAllThreadsAndExecuteFromNonRubyThread(null, new SafepointAction() {
context.getSafepointManager().pauseAllThreadsAndExecuteFromNonRubyThread(false, new SafepointAction() {

@Override
public void run(RubyThread thread, Node currentNode) {
@@ -104,23 +104,13 @@ public void run(RubyThread thread, Node currentNode) {
@Override
public void handle(HttpExchange httpExchange) {
try {
context.getSafepointManager().pauseAllThreadsAndExecuteFromNonRubyThread(null, new SafepointAction() {

Thread mainThread = context.getThreadManager().getRootThread().getCurrentFiberJavaThread();
context.getSafepointManager().pauseMainThreadAndExecuteLaterFromNonRubyThread(mainThread, new SafepointAction() {
@Override
public void run(RubyThread thread, final Node currentNode) {
if (thread == context.getThreadManager().getRootThread() && thread.isCurrentJavaThreadCurrentFiber()) {
thread.getDeferredSafepointActions().add(new Runnable() {

@Override
public void run() {
new SimpleShell(context).run(Truffle.getRuntime().getCurrentFrame()
.getFrame(FrameInstance.FrameAccess.MATERIALIZE, true).materialize(), currentNode);
}

});
}
new SimpleShell(context).run(Truffle.getRuntime().getCurrentFrame()
.getFrame(FrameInstance.FrameAccess.MATERIALIZE, true).materialize(), currentNode);
}

});

httpExchange.getResponseHeaders().set("Content-Type", "text/plain");
Original file line number Diff line number Diff line change
@@ -160,7 +160,7 @@ public boolean visit(RubyBasicObject object) {

};

context.getSafepointManager().pauseAllThreadsAndExecute(null, new SafepointAction() {
context.getSafepointManager().pauseAllThreadsAndExecute(null, false, new SafepointAction() {

@Override
public void run(RubyThread currentThread, Node currentNode) {
Original file line number Diff line number Diff line change
@@ -14,15 +14,13 @@
import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.Truffle;
import com.oracle.truffle.api.nodes.InvalidAssumptionException;

import com.oracle.truffle.api.nodes.Node;

import org.jruby.RubyThread.Status;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.core.RubyThread;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
@@ -38,6 +36,7 @@ public class SafepointManager {
private final ReentrantLock lock = new ReentrantLock();
private final Phaser phaser = new Phaser();
private volatile SafepointAction action;
private volatile boolean deferred;

public SafepointManager(RubyContext context) {
this.context = context;
@@ -93,13 +92,8 @@ private void assumptionInvalidated(Node currentNode, boolean holdsGlobalLock, bo

// We're now running again normally, with the global lock, and can run deferred actions

if (thread != null) {
final List<Runnable> deferredActions = new ArrayList<>(thread.getDeferredSafepointActions());
thread.getDeferredSafepointActions().clear();

for (Runnable action : deferredActions) {
action.run();
}
if (deferred && holdsGlobalLock && thread != null) {
action.run(thread, currentNode);
}
}

@@ -115,7 +109,7 @@ private void step(Node currentNode, RubyThread thread, boolean isDrivingThread)
phaser.arriveAndAwaitAdvance();

try {
if (thread != null && thread.getStatus() != Status.ABORTING) {
if (!deferred && thread != null && thread.getStatus() != Status.ABORTING) {
action.run(thread, currentNode);
}
} finally {
@@ -124,7 +118,7 @@ private void step(Node currentNode, RubyThread thread, boolean isDrivingThread)
}
}

public void pauseAllThreadsAndExecute(Node currentNode, SafepointAction action) {
public void pauseAllThreadsAndExecute(Node currentNode, boolean deferred, SafepointAction action) {
if (lock.isHeldByCurrentThread()) {
throw new IllegalStateException("Re-entered SafepointManager");
}
@@ -140,13 +134,13 @@ public void pauseAllThreadsAndExecute(Node currentNode, SafepointAction action)
}

try {
pauseAllThreadsAndExecute(currentNode, true, action);
pauseAllThreadsAndExecute(currentNode, true, action, deferred);
} finally {
lock.unlock();
}
}

public void pauseAllThreadsAndExecuteFromNonRubyThread(Node currentNode, SafepointAction action) {
public void pauseAllThreadsAndExecuteFromNonRubyThread(boolean deferred, SafepointAction action) {
if (lock.isHeldByCurrentThread()) {
throw new IllegalStateException("Re-entered SafepointManager");
}
@@ -157,7 +151,7 @@ public void pauseAllThreadsAndExecuteFromNonRubyThread(Node currentNode, Safepoi
try {
enterThread();
try {
pauseAllThreadsAndExecute(currentNode, false, action);
pauseAllThreadsAndExecute(null, false, action, deferred);
} finally {
leaveThread();
}
@@ -166,8 +160,9 @@ public void pauseAllThreadsAndExecuteFromNonRubyThread(Node currentNode, Safepoi
}
}

private void pauseAllThreadsAndExecute(Node currentNode, boolean holdsGlobalLock, SafepointAction action) {
private void pauseAllThreadsAndExecute(Node currentNode, boolean holdsGlobalLock, SafepointAction action, boolean deferred) {
this.action = action;
this.deferred = deferred;

/* this is a potential cause for race conditions,
* but we need to invalidate first so the interrupted threads
@@ -179,6 +174,28 @@ private void pauseAllThreadsAndExecute(Node currentNode, boolean holdsGlobalLock
assumptionInvalidated(currentNode, holdsGlobalLock, true);
}

public void pauseThreadAndExecuteLater(final Thread thread, Node currentNode, final SafepointAction action) {
pauseAllThreadsAndExecute(currentNode, true, new SafepointAction() {
@Override
public void run(RubyThread rubyThread, Node currentNode) {
if (Thread.currentThread() == thread) {
action.run(rubyThread, currentNode);
}
}
});
}

public void pauseMainThreadAndExecuteLaterFromNonRubyThread(final Thread thread, final SafepointAction action) {
pauseAllThreadsAndExecuteFromNonRubyThread(true, new SafepointAction() {
@Override
public void run(RubyThread rubyThread, Node currentNode) {
if (Thread.currentThread() == thread) {
action.run(rubyThread, currentNode);
}
}
});
}

private void interruptOtherThreads() {
Thread current = Thread.currentThread();
for (Thread thread : runningThreads) {
Original file line number Diff line number Diff line change
@@ -150,10 +150,10 @@ public synchronized void unregisterThread(RubyThread thread) {

public void shutdown() {
// kill all threads except main
context.getSafepointManager().pauseAllThreadsAndExecute(null, new SafepointAction() {
context.getSafepointManager().pauseAllThreadsAndExecute(null, false, new SafepointAction() {
@Override
public synchronized void run(RubyThread thread, Node currentNode) {
if (thread != rootThread && thread.isCurrentJavaThreadRootFiber()) {
if (thread != rootThread && Thread.currentThread() == thread.getRootFiberJavaThread()) {
thread.shutdown();
}
}