Skip to content

Commit

Permalink
Showing 5 changed files with 56 additions and 54 deletions.
67 changes: 33 additions & 34 deletions truffle/src/main/java/org/jruby/truffle/nodes/core/FiberNodes.java
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
import com.oracle.truffle.api.nodes.ControlFlowException;
import com.oracle.truffle.api.object.DynamicObject;
import com.oracle.truffle.api.source.SourceSection;

import org.jruby.truffle.nodes.RubyGuards;
import org.jruby.truffle.nodes.RubyNode;
import org.jruby.truffle.nodes.cast.SingleValueCastNode;
@@ -26,44 +27,50 @@
import org.jruby.truffle.runtime.control.RaiseException;
import org.jruby.truffle.runtime.control.ReturnException;
import org.jruby.truffle.runtime.layouts.Layouts;
import org.jruby.truffle.runtime.subsystems.FiberManager;
import org.jruby.truffle.runtime.subsystems.ThreadManager;

import java.util.concurrent.LinkedBlockingQueue;

@CoreClass(name = "Fiber")
public abstract class FiberNodes {

public static DynamicObject newRootFiber(DynamicObject thread, FiberManager fiberManager, ThreadManager threadManager) {
public static DynamicObject createFiber(DynamicObject thread, DynamicObject rubyClass, String name) {
return createFiber(thread, rubyClass, name, false);
}

public static DynamicObject createRootFiber(RubyContext context, DynamicObject thread) {
return createFiber(thread, context.getCoreLibrary().getFiberClass(), "root Fiber for Thread", true);
}

private static DynamicObject createFiber(DynamicObject thread, DynamicObject rubyClass, String name, boolean isRootFiber) {
assert RubyGuards.isRubyThread(thread);
RubyContext context = Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(thread)).getContext();
return createRubyFiber(thread, fiberManager, threadManager, context.getCoreLibrary().getFiberClass(), "root Fiber for Thread", true);
return Layouts.FIBER.createFiber(Layouts.CLASS.getInstanceFactory(rubyClass), isRootFiber, new LinkedBlockingQueue<FiberMessage>(2), thread, name, null, true, null);
}

public static void initialize(final DynamicObject fiber, final DynamicObject block) {
public static void initialize(final RubyContext context, final DynamicObject fiber, final DynamicObject block) {
assert RubyGuards.isRubyFiber(fiber);
assert RubyGuards.isRubyProc(block);
final String name = "Ruby Fiber@" + Layouts.PROC.getSharedMethodInfo(block).getSourceSection().getShortDescription();
Layouts.FIBER.setName(fiber, name);
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
handleFiberExceptions(fiber, block);
handleFiberExceptions(context, fiber, block);
}
});
thread.setName(name);
thread.start();
}

private static void handleFiberExceptions(final DynamicObject fiber, final DynamicObject block) {
private static void handleFiberExceptions(final RubyContext context, final DynamicObject fiber, final DynamicObject block) {
assert RubyGuards.isRubyFiber(fiber);
assert RubyGuards.isRubyProc(block);

run(fiber, new Runnable() {
run(context, fiber, new Runnable() {
@Override
public void run() {
try {
final Object[] args = waitForResume(fiber);
final Object[] args = waitForResume(context, fiber);
final Object result;
try {
result = ProcNodes.rootCall(block, args);
@@ -76,54 +83,54 @@ public void run() {
assert !Layouts.FIBER.getRootFiber(fiber);
// Naturally exit the Java thread on catching this
} catch (BreakException e) {
Layouts.FIBER.getMessageQueue(Layouts.FIBER.getLastResumedByFiber(fiber)).add(new FiberExceptionMessage(Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getCoreLibrary().breakFromProcClosure(null)));
Layouts.FIBER.getMessageQueue(Layouts.FIBER.getLastResumedByFiber(fiber)).add(new FiberExceptionMessage(context.getCoreLibrary().breakFromProcClosure(null)));
} catch (ReturnException e) {
Layouts.FIBER.getMessageQueue(Layouts.FIBER.getLastResumedByFiber(fiber)).add(new FiberExceptionMessage(Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getCoreLibrary().unexpectedReturn(null)));
Layouts.FIBER.getMessageQueue(Layouts.FIBER.getLastResumedByFiber(fiber)).add(new FiberExceptionMessage(context.getCoreLibrary().unexpectedReturn(null)));
} catch (RaiseException e) {
Layouts.FIBER.getMessageQueue(Layouts.FIBER.getLastResumedByFiber(fiber)).add(new FiberExceptionMessage((DynamicObject) e.getRubyException()));
}
}
});
}

public static void run(DynamicObject fiber, final Runnable task) {
public static void run(RubyContext context, DynamicObject fiber, final Runnable task) {
assert RubyGuards.isRubyFiber(fiber);

start(fiber);
start(context, fiber);
try {
task.run();
} finally {
cleanup(fiber);
cleanup(context, fiber);
}
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public static void start(DynamicObject fiber) {
public static void start(RubyContext context, DynamicObject fiber) {
assert RubyGuards.isRubyFiber(fiber);
Layouts.FIBER.setThread(fiber, Thread.currentThread());
Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getThreadManager().initializeCurrentThread(Layouts.FIBER.getRubyThread(fiber));
context.getThreadManager().initializeCurrentThread(Layouts.FIBER.getRubyThread(fiber));
Layouts.THREAD.getFiberManager(Layouts.FIBER.getRubyThread(fiber)).registerFiber(fiber);
Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getSafepointManager().enterThread();
context.getSafepointManager().enterThread();
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public static void cleanup(DynamicObject fiber) {
public static void cleanup(RubyContext context, DynamicObject fiber) {
assert RubyGuards.isRubyFiber(fiber);
Layouts.FIBER.setAlive(fiber, false);
Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getSafepointManager().leaveThread();
context.getSafepointManager().leaveThread();
Layouts.THREAD.getFiberManager(Layouts.FIBER.getRubyThread(fiber)).unregisterFiber(fiber);
Layouts.FIBER.setThread(fiber, null);
}

/**
* Send the Java thread that represents this fiber to sleep until it receives a resume or exit
* message.
* @param context TODO
* @param fiber
*/
private static Object[] waitForResume(final DynamicObject fiber) {
private static Object[] waitForResume(RubyContext context, final DynamicObject fiber) {
assert RubyGuards.isRubyFiber(fiber);

final RubyContext context = Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext();
final FiberMessage message = context.getThreadManager().runUntilResult(null, new ThreadManager.BlockingAction<FiberMessage>() {
@Override
public FiberMessage block() throws InterruptedException {
@@ -161,13 +168,13 @@ private static void resume(DynamicObject fromFiber, DynamicObject fiber, boolean
Layouts.FIBER.getMessageQueue(fiber).add(new FiberResumeMessage(yield, fromFiber, args));
}

public static Object[] transferControlTo(DynamicObject fromFiber, DynamicObject fiber, boolean yield, Object[] args) {
public static Object[] transferControlTo(RubyContext context, DynamicObject fromFiber, DynamicObject fiber, boolean yield, Object[] args) {
assert RubyGuards.isRubyFiber(fromFiber);
assert RubyGuards.isRubyFiber(fiber);

resume(fromFiber, fiber, yield, args);

return waitForResume(fromFiber);
return waitForResume(context, fromFiber);
}

public static void shutdown(DynamicObject fiber) {
@@ -176,14 +183,6 @@ public static void shutdown(DynamicObject fiber) {
Layouts.FIBER.getMessageQueue(fiber).add(new FiberExitMessage());
}

public static DynamicObject createRubyFiber(DynamicObject parent, DynamicObject rubyClass, String name) {
return Layouts.FIBER.createFiber(Layouts.CLASS.getInstanceFactory(rubyClass), false, new LinkedBlockingQueue<FiberMessage>(2), parent, name, null, true, null);
}

public static DynamicObject createRubyFiber(DynamicObject parent, FiberManager fiberManager, ThreadManager threadManager, DynamicObject rubyClass, String name, boolean isRootFiber) {
return Layouts.FIBER.createFiber(Layouts.CLASS.getInstanceFactory(rubyClass), false, new LinkedBlockingQueue<FiberMessage>(2), parent, name, null, true, null);
}

public interface FiberMessage {
}

@@ -221,7 +220,7 @@ protected Object transfer(VirtualFrame frame, DynamicObject fiber, boolean isYie

final DynamicObject sendingFiber = Layouts.THREAD.getFiberManager(currentThread).getCurrentFiber();

return singleValue(frame, transferControlTo(sendingFiber, fiber, isYield, args));
return singleValue(frame, transferControlTo(getContext(), sendingFiber, fiber, isYield, args));
}

}
@@ -237,7 +236,7 @@ public InitializeNode(RubyContext context, SourceSection sourceSection) {
public DynamicObject initialize(DynamicObject fiber, DynamicObject block) {
CompilerDirectives.transferToInterpreter();

FiberNodes.initialize(fiber, block);
FiberNodes.initialize(getContext(), fiber, block);
return nil();
}

@@ -343,7 +342,7 @@ public AllocateNode(RubyContext context, SourceSection sourceSection) {
@Specialization
public DynamicObject allocate(DynamicObject rubyClass) {
DynamicObject parent = getContext().getThreadManager().getCurrentThread();
return createRubyFiber(parent, rubyClass, null);
return createFiber(parent, rubyClass, null);
}

}
25 changes: 13 additions & 12 deletions truffle/src/main/java/org/jruby/truffle/nodes/core/ThreadNodes.java
Original file line number Diff line number Diff line change
@@ -40,10 +40,11 @@
@CoreClass(name = "Thread")
public abstract class ThreadNodes {

public static DynamicObject createRubyThread(DynamicObject rubyClass) {
final DynamicObject objectClass = Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(rubyClass)).getContext().getCoreLibrary().getObjectClass();
final DynamicObject object = Layouts.THREAD.createThread(Layouts.CLASS.getInstanceFactory(rubyClass), null, null, new CountDownLatch(1), Layouts.BASIC_OBJECT.createBasicObject(Layouts.CLASS.getInstanceFactory(objectClass)), new ArrayList<Lock>(), false,InterruptMode.IMMEDIATE, null, Status.RUN, null, null, new AtomicBoolean(false), 0);
Layouts.THREAD.setFiberManagerUnsafe(object, new FiberManager(object));
public static DynamicObject createRubyThread(RubyContext context, DynamicObject rubyClass) {
final DynamicObject objectClass = context.getCoreLibrary().getObjectClass();
final DynamicObject threadLocals = Layouts.BASIC_OBJECT.createBasicObject(Layouts.CLASS.getInstanceFactory(objectClass));
final DynamicObject object = Layouts.THREAD.createThread(Layouts.CLASS.getInstanceFactory(rubyClass), null, null, new CountDownLatch(1), threadLocals, new ArrayList<Lock>(), false,InterruptMode.IMMEDIATE, null, Status.RUN, null, null, new AtomicBoolean(false), 0);
Layouts.THREAD.setFiberManagerUnsafe(object, new FiberManager(context, object));
return object;
}

@@ -76,10 +77,10 @@ public static void run(DynamicObject thread, final RubyContext context, Node cur
Layouts.THREAD.setName(thread, name);
Thread.currentThread().setName(name);

start(thread);
start(context, thread);
try {
DynamicObject fiber = Layouts.THREAD.getFiberManager(thread).getRootFiber();
FiberNodes.run(fiber, task);
FiberNodes.run(context, fiber, task);
} catch (ThreadExitException e) {
Layouts.THREAD.setValue(thread, context.getCoreLibrary().getNilObject());
return;
@@ -88,23 +89,23 @@ public static void run(DynamicObject thread, final RubyContext context, Node cur
} catch (ReturnException e) {
Layouts.THREAD.setException(thread, context.getCoreLibrary().unexpectedReturn(currentNode));
} finally {
cleanup(thread);
cleanup(context, thread);
}
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public static void start(DynamicObject thread) {
public static void start(RubyContext context, DynamicObject thread) {
assert RubyGuards.isRubyThread(thread);
Layouts.THREAD.setThread(thread, Thread.currentThread());
Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getMetaClass(thread)).getContext().getThreadManager().registerThread(thread);
context.getThreadManager().registerThread(thread);
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public static void cleanup(DynamicObject thread) {
public static void cleanup(RubyContext context, DynamicObject thread) {
assert RubyGuards.isRubyThread(thread);

Layouts.THREAD.setStatus(thread, Status.ABORTING);
Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getMetaClass(thread)).getContext().getThreadManager().unregisterThread(thread);
context.getThreadManager().unregisterThread(thread);

Layouts.THREAD.setStatus(thread, Status.DEAD);
Layouts.THREAD.setThread(thread, null);
@@ -451,7 +452,7 @@ public AllocateNode(RubyContext context, SourceSection sourceSection) {
@TruffleBoundary
@Specialization
public DynamicObject allocate(DynamicObject rubyClass) {
return createRubyThread(rubyClass);
return createRubyThread(getContext(), rubyClass);
}

}
Original file line number Diff line number Diff line change
@@ -10,8 +10,10 @@
package org.jruby.truffle.runtime.subsystems;

import com.oracle.truffle.api.object.DynamicObject;

import org.jruby.truffle.nodes.RubyGuards;
import org.jruby.truffle.nodes.core.FiberNodes;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.layouts.Layouts;

import java.util.Collections;
@@ -27,8 +29,8 @@ public class FiberManager {
private DynamicObject currentFiber;
private final Set<DynamicObject> runningFibers = Collections.newSetFromMap(new ConcurrentHashMap<DynamicObject, Boolean>());

public FiberManager(DynamicObject rubyThread) {
this.rootFiber = FiberNodes.newRootFiber(rubyThread, this, Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getMetaClass(rubyThread)).getContext().getThreadManager());
public FiberManager(RubyContext context, DynamicObject rubyThread) {
this.rootFiber = FiberNodes.createRootFiber(context, rubyThread);
this.currentFiber = rootFiber;
}

Original file line number Diff line number Diff line change
@@ -85,7 +85,7 @@ public synchronized void defineFinalizer(DynamicObject object, Object callable)
if (finalizerThread == null) {
// TODO(CS): should we be running this in a real Ruby thread?

finalizerThread = ThreadNodes.createRubyThread(context.getCoreLibrary().getThreadClass());
finalizerThread = ThreadNodes.createRubyThread(context, context.getCoreLibrary().getThreadClass());
ThreadNodes.initialize(finalizerThread, context, null, "finalizer", new Runnable() {
@Override
public void run() {
Original file line number Diff line number Diff line change
@@ -39,13 +39,13 @@ public class ThreadManager {

public ThreadManager(RubyContext context) {
this.context = context;
this.rootThread = ThreadNodes.createRubyThread(context.getCoreLibrary().getThreadClass());
this.rootThread = ThreadNodes.createRubyThread(context, context.getCoreLibrary().getThreadClass());
Layouts.THREAD.setName(rootThread, "main");
}

public void initialize() {
ThreadNodes.start(rootThread);
FiberNodes.start(Layouts.THREAD.getFiberManager(rootThread).getRootFiber());
ThreadNodes.start(context, rootThread);
FiberNodes.start(context, Layouts.THREAD.getFiberManager(rootThread).getRootFiber());
}

public DynamicObject getRootThread() {
@@ -119,8 +119,8 @@ public void shutdown() {
}
} finally {
Layouts.THREAD.getFiberManager(rootThread).shutdown();
FiberNodes.cleanup(Layouts.THREAD.getFiberManager(rootThread).getRootFiber());
ThreadNodes.cleanup(rootThread);
FiberNodes.cleanup(context, Layouts.THREAD.getFiberManager(rootThread).getRootFiber());
ThreadNodes.cleanup(context, rootThread);
}
}

0 comments on commit 1710f02

Please sign in to comment.