Skip to content

Commit

Permalink
Showing 10 changed files with 155 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.jruby.truffle.runtime.core.RubyFiber;
import org.jruby.truffle.runtime.core.RubyNilClass;
import org.jruby.truffle.runtime.core.RubyProc;
import org.jruby.truffle.runtime.core.RubyThread;

@CoreClass(name = "Fiber")
public abstract class FiberNodes {
@@ -57,12 +58,13 @@ protected Object transfer(RubyFiber fiber, boolean isYield, Object[] args) {
throw new RaiseException(getContext().getCoreLibrary().deadFiberCalledError(this));
}

if (fiber.getRubyThread() != getContext().getThreadManager().getCurrentThread()) {
RubyThread currentThread = getContext().getThreadManager().getCurrentThread();
if (fiber.getRubyThread() != currentThread) {
CompilerDirectives.transferToInterpreter();
throw new RaiseException(getContext().getCoreLibrary().fiberError("fiber called across threads", this));
}

final RubyFiber sendingFiber = getContext().getFiberManager().getCurrentFiber();
final RubyFiber sendingFiber = currentThread.getFiberManager().getCurrentFiber();

return singleValue(sendingFiber.transferControlTo(fiber, isYield, args));
}
@@ -129,10 +131,11 @@ public YieldNode(YieldNode prev) {

@Specialization
public Object yield(Object[] args) {
final RubyFiber yieldingFiber = getContext().getFiberManager().getCurrentFiber();
RubyThread currentThread = getContext().getThreadManager().getCurrentThread();
final RubyFiber yieldingFiber = currentThread.getFiberManager().getCurrentFiber();
final RubyFiber fiberYieldedTo = yieldingFiber.getLastResumedByFiber();

if (yieldingFiber.isTopLevel() || fiberYieldedTo == null) {
if (yieldingFiber.isRootFiber() || fiberYieldedTo == null) {
throw new RaiseException(getContext().getCoreLibrary().yieldFromRootFiberError(this));
}

Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ public ExitModuleNode(ExitModuleNode prev) {

@Specialization
public RubyNilClass exit() {
getContext().getThreadManager().getCurrentThread().exit();
getContext().getThreadManager().getCurrentThread().shutdown();
return nil();
}

@@ -100,14 +100,12 @@ public KillNode(KillNode prev) {
@Specialization
public RubyThread kill(final RubyThread thread) {
getContext().getSafepointManager().pauseAllThreadsAndExecute(this, new SafepointAction() {

@Override
public void run(RubyThread currentThread, Node currentNode) {
if (currentThread == thread) {
currentThread.exit();
if (currentThread == thread && thread.isCurrentJavaThreadRootFiber()) {
thread.shutdown();
}
}

});

return thread;
@@ -266,7 +264,7 @@ public RubyNilClass raise(VirtualFrame frame, final RubyThread thread, RubyClass

@Override
public void run(RubyThread currentThread, Node currentNode) {
if (currentThread == thread) {
if (currentThread == thread && thread.isCurrentJavaThreadCurrentFiber()) {
throw exceptionWrapper;
}
}
12 changes: 2 additions & 10 deletions truffle/src/main/java/org/jruby/truffle/runtime/RubyContext.java
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@

import jnr.posix.POSIX;
import jnr.posix.POSIXFactory;

import org.jcodings.Encoding;
import org.jcodings.specific.ASCIIEncoding;
import org.jcodings.specific.UTF8Encoding;
@@ -64,7 +65,6 @@ public class RubyContext extends ExecutionContext {
private final TraceManager traceManager;
private final ObjectSpaceManager objectSpaceManager;
private final ThreadManager threadManager;
private final FiberManager fiberManager;
private final AtExitManager atExitManager;
private final RubySymbol.SymbolTable symbolTable = new RubySymbol.SymbolTable(this);
private final Shape emptyShape;
@@ -118,10 +118,8 @@ public RubyContext(Ruby runtime) {
traceManager = new TraceManager();
atExitManager = new AtExitManager();

// Must initialize threads before fibers

threadManager = new ThreadManager(this);
fiberManager = new FiberManager(this);
threadManager.initialize();

rubiniusPrimitiveManager = RubiniusPrimitiveManager.create();

@@ -280,8 +278,6 @@ public void shutdown() {
instrumentationServerManager.shutdown();
}

fiberManager.shutdown();

threadManager.shutdown();
}

@@ -469,10 +465,6 @@ public ObjectSpaceManager getObjectSpaceManager() {
return objectSpaceManager;
}

public FiberManager getFiberManager() {
return fiberManager;
}

public ThreadManager getThreadManager() {
return threadManager;
}
105 changes: 71 additions & 34 deletions truffle/src/main/java/org/jruby/truffle/runtime/core/RubyFiber.java
Original file line number Diff line number Diff line change
@@ -25,6 +25,8 @@

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Represents the Ruby {@code Fiber} class. The current implementation uses Java threads and message
@@ -80,67 +82,100 @@ public RubyException getException() {
}

public static class FiberExitException extends ControlFlowException {

private static final long serialVersionUID = 1522270454305076317L;

}

private final FiberManager fiberManager;
private final ThreadManager threadManager;
private final RubyThread rubyThread;

private String name;
private final boolean topLevel;
private final BlockingQueue<FiberMessage> messageQueue = new ArrayBlockingQueue<>(1);
private final boolean isRootFiber;
// we need 2 slots when the safepoint manager sends the kill message and there is another message unprocessed
private final BlockingQueue<FiberMessage> messageQueue = new LinkedBlockingQueue<>(2);
private RubyFiber lastResumedByFiber = null;
private boolean alive = true;

public RubyFiber(RubyClass rubyClass, FiberManager fiberManager, ThreadManager threadManager, String name, boolean topLevel) {
protected volatile Thread thread;

public RubyFiber(RubyThread parent, RubyClass rubyClass, String name) {
this(parent, parent.getFiberManager(), parent.getThreadManager(), rubyClass, name, false);
}

public static RubyFiber newRootFiber(RubyThread thread, FiberManager fiberManager, ThreadManager threadManager) {
RubyContext context = thread.getContext();
return new RubyFiber(thread, fiberManager, threadManager, context.getCoreLibrary().getFiberClass(), "root Fiber for Thread", true);
}

private RubyFiber(RubyThread parent, FiberManager fiberManager, ThreadManager threadManager, RubyClass rubyClass, String name, boolean isRootFiber) {
super(rubyClass);
this.rubyThread = parent;
this.fiberManager = fiberManager;
this.threadManager = threadManager;
this.name = name;
this.topLevel = topLevel;
this.rubyThread = threadManager.getCurrentThread();
this.isRootFiber = isRootFiber;
}

public void initialize(RubyProc block) {
public void initialize(final RubyProc block) {
RubyNode.notDesignedForCompilation();

name = "Ruby Fiber@" + block.getSharedMethodInfo().getSourceSection().getShortDescription();

final RubyFiber finalFiber = this;
final RubyProc finalBlock = block;

final Thread thread = new Thread(new Runnable() {

@Override
public void run() {
fiberManager.registerFiber(finalFiber);
finalFiber.getContext().getSafepointManager().enterThread();
threadManager.enterGlobalLock(rubyThread);
handleFiberExceptions(block);
}
});
thread.setName(name);
thread.start();
}

private void handleFiberExceptions(final RubyProc block) {
run(new Runnable() {
@Override
public void run() {
try {
final Object[] args = finalFiber.waitForResume();
final Object result = finalBlock.rootCall(args);
finalFiber.resume(finalFiber.lastResumedByFiber, true, result);
} catch (FiberExitException | ThreadExitException e) { // TODO (eregon, 21 Apr. 2015): The thread should cleanly kill its fibers when dying.
// Naturally exit the thread on catching this
final Object[] args = waitForResume();
final Object result = block.rootCall(args);
resume(lastResumedByFiber, true, result);
} catch (FiberExitException e) {
assert !isRootFiber;
// Naturally exit the Java thread on catching this
} catch (ReturnException e) {
sendMessageTo(finalFiber.lastResumedByFiber, new FiberExceptionMessage(finalFiber.getContext().getCoreLibrary().unexpectedReturn(null)));
sendMessageTo(lastResumedByFiber, new FiberExceptionMessage(getContext().getCoreLibrary().unexpectedReturn(null)));
} catch (RaiseException e) {
sendMessageTo(finalFiber.lastResumedByFiber, new FiberExceptionMessage(e.getRubyException()));
} finally {
alive = false;
threadManager.leaveGlobalLock();
finalFiber.getContext().getSafepointManager().leaveThread();
fiberManager.unregisterFiber(finalFiber);
sendMessageTo(lastResumedByFiber, new FiberExceptionMessage(e.getRubyException()));
}
}

});
thread.setName(name);
thread.start();
}

protected void run(final Runnable task) {
RubyNode.notDesignedForCompilation();

start();
try {
task.run();
} finally {
cleanup();
}
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public void start() {
thread = Thread.currentThread();
fiberManager.registerFiber(this);
getContext().getSafepointManager().enterThread();
threadManager.enterGlobalLock(rubyThread);
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public void cleanup() {
alive = false;
threadManager.leaveGlobalLock();
getContext().getSafepointManager().leaveThread();
fiberManager.unregisterFiber(this);
thread = null;
}

public RubyThread getRubyThread() {
@@ -201,6 +236,7 @@ public Object[] transferControlTo(RubyFiber fiber, boolean yield, Object[] args)
}

public void shutdown() {
assert !isRootFiber;
RubyNode.notDesignedForCompilation();

sendMessageTo(this, new FiberExitMessage());
@@ -216,8 +252,8 @@ public RubyFiber getLastResumedByFiber() {
return lastResumedByFiber;
}

public boolean isTopLevel() {
return topLevel;
public boolean isRootFiber() {
return isRootFiber;
}

public String getName() {
@@ -228,7 +264,8 @@ public static class FiberAllocator implements Allocator {

@Override
public RubyBasicObject allocate(RubyContext context, RubyClass rubyClass, Node currentNode) {
return new RubyFiber(rubyClass, context.getFiberManager(), context.getThreadManager(), null, false);
RubyThread parent = context.getThreadManager().getCurrentThread();
return new RubyFiber(parent, rubyClass, null);
}

}
Original file line number Diff line number Diff line change
@@ -10,13 +10,15 @@
package org.jruby.truffle.runtime.core;

import com.oracle.truffle.api.nodes.Node;

import org.jruby.RubyThread.Status;
import org.jruby.truffle.nodes.RubyNode;
import org.jruby.truffle.nodes.objects.Allocator;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.control.RaiseException;
import org.jruby.truffle.runtime.control.ReturnException;
import org.jruby.truffle.runtime.control.ThreadExitException;
import org.jruby.truffle.runtime.subsystems.FiberManager;
import org.jruby.truffle.runtime.subsystems.ThreadManager;
import org.jruby.truffle.runtime.subsystems.ObjectSpaceManager.ObjectGraphVisitor;
import org.jruby.truffle.runtime.subsystems.ThreadManager.BlockingActionWithoutGlobalLock;
@@ -41,6 +43,8 @@ public class RubyThread extends RubyBasicObject {

private final ThreadManager manager;

private final FiberManager fiberManager;

private String name;

/** We use this instead of {@link Thread#join()} since we don't always have a reference
@@ -63,6 +67,7 @@ public RubyThread(RubyClass rubyClass, ThreadManager manager) {
super(rubyClass);
this.manager = manager;
threadLocals = new RubyBasicObject(rubyClass.getContext().getCoreLibrary().getObjectClass());
fiberManager = new FiberManager(this, manager);
}

public void initialize(RubyContext context, Node currentNode, final RubyProc block) {
@@ -86,15 +91,12 @@ public void run() {

public void run(final RubyContext context, Node currentNode, String info, Runnable task) {
name = "Ruby Thread@" + info;
thread = Thread.currentThread();
thread.setName(name);

manager.registerThread(this);
context.getSafepointManager().enterThread();
manager.enterGlobalLock(this);
Thread.currentThread().setName(name);

start();
try {
task.run();
RubyFiber fiber = getRootFiber();
fiber.run(task);
} catch (ThreadExitException e) {
value = context.getCoreLibrary().getNilObject();
return;
@@ -103,15 +105,19 @@ public void run(final RubyContext context, Node currentNode, String info, Runnab
} catch (ReturnException e) {
exception = context.getCoreLibrary().unexpectedReturn(currentNode);
} finally {
cleanup(context);
cleanup();
}
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public void cleanup(RubyContext context) {
public void start() {
thread = Thread.currentThread();
manager.registerThread(this);
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public void cleanup() {
status = Status.ABORTING;
manager.leaveGlobalLock();
context.getSafepointManager().leaveThread();
manager.unregisterThread(this);

status = Status.DEAD;
@@ -120,8 +126,17 @@ public void cleanup(RubyContext context) {
finished.countDown();
}

public void setRootThread(Thread thread) {
this.thread = thread;
public void shutdown() {
fiberManager.shutdown();
exit();
}

public boolean isCurrentJavaThreadRootFiber() {
return Thread.currentThread() == thread;
}

public boolean isCurrentJavaThreadCurrentFiber() {
return Thread.currentThread() == fiberManager.getCurrentFiber().thread;
}

public void join() {
@@ -197,7 +212,7 @@ public RubyException getException() {
return exception;
}

public void exit() {
private void exit() {
throw new ThreadExitException();
}

@@ -209,6 +224,18 @@ public void setName(String name) {
this.name = name;
}

public ThreadManager getThreadManager() {
return manager;
}

public FiberManager getFiberManager() {
return fiberManager;
}

public RubyFiber getRootFiber() {
return fiberManager.getRootFiber();
}

public List<Runnable> getDeferredSafepointActions() {
return deferredSafepointActions;
}
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ public void handle(Signal signal) {

@Override
public void run(RubyThread thread, Node currentNode) {
if (thread == context.getThreadManager().getRootThread()) {
if (thread == context.getThreadManager().getRootThread() && thread.isCurrentJavaThreadCurrentFiber()) {
context.getThreadManager().enterGlobalLock(thread);
try {
// assumes this proc does not re-enter the SafepointManager.
Original file line number Diff line number Diff line change
@@ -9,27 +9,31 @@
*/
package org.jruby.truffle.runtime.subsystems;

import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.core.RubyFiber;
import org.jruby.truffle.runtime.core.RubyThread;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Manages Ruby {@code Fiber} objects.
* Manages Ruby {@code Fiber} objects for a given Ruby thread.
*/
public class FiberManager {

private final RubyThread rubyThread;
private final RubyFiber rootFiber;
// FIXME (eregon): per ruby thread
private RubyFiber currentFiber;

private final Set<RubyFiber> runningFibers = Collections.newSetFromMap(new ConcurrentHashMap<RubyFiber, Boolean>());

public FiberManager(RubyContext context) {
rootFiber = new RubyFiber(context.getCoreLibrary().getFiberClass(), this, context.getThreadManager(), "root", true);
currentFiber = rootFiber;
public FiberManager(RubyThread rubyThread, ThreadManager threadManager) {
this.rubyThread = rubyThread;
this.rootFiber = RubyFiber.newRootFiber(rubyThread, this, threadManager);
this.currentFiber = rootFiber;
}

public RubyFiber getRootFiber() {
return rootFiber;
}

public RubyFiber getCurrentFiber() {
@@ -50,7 +54,9 @@ public void unregisterFiber(RubyFiber fiber) {

public void shutdown() {
for (RubyFiber fiber : runningFibers) {
fiber.shutdown();
if (!fiber.isRootFiber()) {
fiber.shutdown();
}
}
}

Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ public void handle(HttpExchange httpExchange) {

@Override
public void run(RubyThread thread, final Node currentNode) {
if (thread.getName().equals("main")) {
if (thread == context.getThreadManager().getRootThread() && thread.isCurrentJavaThreadCurrentFiber()) {
thread.getDeferredSafepointActions().add(new Runnable() {

@Override
Original file line number Diff line number Diff line change
@@ -41,8 +41,6 @@ public class SafepointManager {

public SafepointManager(RubyContext context) {
this.context = context;

enterThread();
}

public void enterThread() {
Original file line number Diff line number Diff line change
@@ -10,8 +10,8 @@
package org.jruby.truffle.runtime.subsystems;

import com.oracle.truffle.api.CompilerDirectives;

import com.oracle.truffle.api.nodes.Node;

import org.jruby.RubyThread.Status;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.core.RubyThread;
@@ -38,11 +38,14 @@ public class ThreadManager {

public ThreadManager(RubyContext context) {
this.context = context;
rootThread = new RubyThread(context.getCoreLibrary().getThreadClass(), this);
this.rootThread = new RubyThread(context.getCoreLibrary().getThreadClass(), this);
rootThread.setName("main");
rootThread.setRootThread(Thread.currentThread());
runningRubyThreads.add(rootThread);
enterGlobalLock(rootThread);
}

public void initialize() {
registerThread(rootThread);
rootThread.start();
rootThread.getRootFiber().start();
}

public RubyThread getRootThread() {
@@ -149,14 +152,16 @@ public void shutdown() {
// kill all threads except main
context.getSafepointManager().pauseAllThreadsAndExecute(null, new SafepointAction() {
@Override
public void run(RubyThread thread, Node currentThread) {
if (thread != rootThread) {
thread.exit();
public synchronized void run(RubyThread thread, Node currentNode) {
if (thread != rootThread && thread.isCurrentJavaThreadRootFiber()) {
thread.shutdown();
}
}
});

rootThread.cleanup(context);
rootThread.getFiberManager().shutdown();
rootThread.getRootFiber().cleanup();
rootThread.cleanup();
}

}

0 comments on commit 5ccc678

Please sign in to comment.