Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jruby/jruby
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: cfd7f7446554
Choose a base ref
...
head repository: jruby/jruby
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 71f6aa91f30b
Choose a head ref
  • 2 commits
  • 11 files changed
  • 1 contributor

Commits on Aug 18, 2015

  1. Copy the full SHA
    390a646 View commit details
  2. Copy the full SHA
    71f6aa9 View commit details
86 changes: 27 additions & 59 deletions truffle/src/main/java/org/jruby/truffle/nodes/core/FiberNodes.java
Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@
import org.jruby.truffle.runtime.subsystems.FiberManager;
import org.jruby.truffle.runtime.subsystems.ThreadManager;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@CoreClass(name = "Fiber")
@@ -43,14 +42,15 @@ public static DynamicObject newRootFiber(DynamicObject thread, FiberManager fibe
public static void initialize(final DynamicObject fiber, final DynamicObject block) {
assert RubyGuards.isRubyFiber(fiber);
assert RubyGuards.isRubyProc(block);
Layouts.FIBER.getFields(fiber).name = "Ruby Fiber@" + Layouts.PROC.getSharedMethodInfo(block).getSourceSection().getShortDescription();
final String name = "Ruby Fiber@" + Layouts.PROC.getSharedMethodInfo(block).getSourceSection().getShortDescription();
Layouts.FIBER.setName(fiber, name);
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
handleFiberExceptions(fiber, block);
}
});
thread.setName(Layouts.FIBER.getFields(fiber).name);
thread.setName(name);
thread.start();
}

@@ -68,16 +68,16 @@ public void run() {
result = ProcNodes.rootCall(block, args);
} finally {
// Make sure that other fibers notice we are dead before they gain control back
Layouts.FIBER.getFields(fiber).alive = false;
Layouts.FIBER.setAlive(fiber, false);
}
resume(fiber, Layouts.FIBER.getFields(fiber).lastResumedByFiber, true, result);
resume(fiber, Layouts.FIBER.getLastResumedByFiber(fiber), true, result);
} catch (FiberExitException e) {
assert !Layouts.FIBER.getFields(fiber).isRootFiber;
assert !Layouts.FIBER.getRootFiber(fiber);
// Naturally exit the Java thread on catching this
} catch (ReturnException e) {
sendMessageTo(Layouts.FIBER.getFields(fiber).lastResumedByFiber, new FiberExceptionMessage(Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getCoreLibrary().unexpectedReturn(null)));
Layouts.FIBER.getMessageQueue(Layouts.FIBER.getLastResumedByFiber(fiber)).add(new FiberExceptionMessage(Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getCoreLibrary().unexpectedReturn(null)));
} catch (RaiseException e) {
sendMessageTo(Layouts.FIBER.getFields(fiber).lastResumedByFiber, new FiberExceptionMessage((DynamicObject) e.getRubyException()));
Layouts.FIBER.getMessageQueue(Layouts.FIBER.getLastResumedByFiber(fiber)).add(new FiberExceptionMessage((DynamicObject) e.getRubyException()));
}
}
});
@@ -97,24 +97,19 @@ public static void run(DynamicObject fiber, final Runnable task) {
// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public static void start(DynamicObject fiber) {
assert RubyGuards.isRubyFiber(fiber);
Layouts.FIBER.getFields(fiber).thread = Thread.currentThread();
Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getThreadManager().initializeCurrentThread(Layouts.FIBER.getFields(fiber).rubyThread);
Layouts.THREAD.getFiberManager(Layouts.FIBER.getFields(fiber).rubyThread).registerFiber(fiber);
Layouts.FIBER.setThread(fiber, Thread.currentThread());
Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getThreadManager().initializeCurrentThread(Layouts.FIBER.getRubyThread(fiber));
Layouts.THREAD.getFiberManager(Layouts.FIBER.getRubyThread(fiber)).registerFiber(fiber);
Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getSafepointManager().enterThread();
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public static void cleanup(DynamicObject fiber) {
assert RubyGuards.isRubyFiber(fiber);
Layouts.FIBER.getFields(fiber).alive = false;
Layouts.FIBER.setAlive(fiber, false);
Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getSafepointManager().leaveThread();
Layouts.THREAD.getFiberManager(Layouts.FIBER.getFields(fiber).rubyThread).unregisterFiber(fiber);
Layouts.FIBER.getFields(fiber).thread = null;
}

private static void sendMessageTo(DynamicObject fiber, FiberMessage message) {
assert RubyGuards.isRubyFiber(fiber);
Layouts.FIBER.getFields(fiber).messageQueue.add(message);
Layouts.THREAD.getFiberManager(Layouts.FIBER.getRubyThread(fiber)).unregisterFiber(fiber);
Layouts.FIBER.setThread(fiber, null);
}

/**
@@ -128,21 +123,21 @@ private static Object[] waitForResume(final DynamicObject fiber) {
final FiberMessage message = Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getThreadManager().runUntilResult(new ThreadManager.BlockingAction<FiberMessage>() {
@Override
public FiberMessage block() throws InterruptedException {
return Layouts.FIBER.getFields(fiber).messageQueue.take();
return Layouts.FIBER.getMessageQueue(fiber).take();
}
});

Layouts.THREAD.getFiberManager(Layouts.FIBER.getFields(fiber).rubyThread).setCurrentFiber(fiber);
Layouts.THREAD.getFiberManager(Layouts.FIBER.getRubyThread(fiber)).setCurrentFiber(fiber);

if (message instanceof FiberExitMessage) {
throw new FiberExitException();
} else if (message instanceof FiberExceptionMessage) {
throw new RaiseException(((FiberExceptionMessage) message).getException());
} else if (message instanceof FiberResumeMessage) {
final FiberResumeMessage resumeMessage = (FiberResumeMessage) message;
assert Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getThreadManager().getCurrentThread() == Layouts.FIBER.getFields(resumeMessage.getSendingFiber()).rubyThread;
assert Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getThreadManager().getCurrentThread() == Layouts.FIBER.getRubyThread(resumeMessage.getSendingFiber());
if (!(resumeMessage.isYield())) {
Layouts.FIBER.getFields(fiber).lastResumedByFiber = resumeMessage.getSendingFiber();
Layouts.FIBER.setLastResumedByFiber(fiber, resumeMessage.getSendingFiber());
}
return resumeMessage.getArgs();
} else {
@@ -159,7 +154,7 @@ private static void resume(DynamicObject fromFiber, DynamicObject fiber, boolean
assert RubyGuards.isRubyFiber(fromFiber);
assert RubyGuards.isRubyFiber(fiber);

sendMessageTo(fiber, new FiberResumeMessage(yield, fromFiber, args));
Layouts.FIBER.getMessageQueue(fiber).add(new FiberResumeMessage(yield, fromFiber, args));
}

public static Object[] transferControlTo(DynamicObject fromFiber, DynamicObject fiber, boolean yield, Object[] args) {
@@ -173,25 +168,16 @@ public static Object[] transferControlTo(DynamicObject fromFiber, DynamicObject

public static void shutdown(DynamicObject fiber) {
assert RubyGuards.isRubyFiber(fiber);
assert !Layouts.FIBER.getFields(fiber).isRootFiber;
sendMessageTo(fiber, new FiberExitMessage());
}

public static boolean isAlive(DynamicObject fiber) {
assert RubyGuards.isRubyFiber(fiber);
return Layouts.FIBER.getFields(fiber).alive;
assert !Layouts.FIBER.getRootFiber(fiber);
Layouts.FIBER.getMessageQueue(fiber).add(new FiberExitMessage());
}

public static DynamicObject createRubyFiber(DynamicObject parent, DynamicObject rubyClass, String name) {
final FiberFields fields = new FiberNodes.FiberFields(parent, false);
fields.name = name;
return Layouts.FIBER.createFiber(Layouts.CLASS.getInstanceFactory(rubyClass), fields);
return Layouts.FIBER.createFiber(Layouts.CLASS.getInstanceFactory(rubyClass), false, new LinkedBlockingQueue<FiberMessage>(2), parent, name, null, true, null);
}

public static DynamicObject createRubyFiber(DynamicObject parent, FiberManager fiberManager, ThreadManager threadManager, DynamicObject rubyClass, String name, boolean isRootFiber) {
final FiberFields fields = new FiberNodes.FiberFields(parent, isRootFiber);
fields.name = name;
return Layouts.FIBER.createFiber(Layouts.CLASS.getInstanceFactory(rubyClass), fields);
return Layouts.FIBER.createFiber(Layouts.CLASS.getInstanceFactory(rubyClass), false, new LinkedBlockingQueue<FiberMessage>(2), parent, name, null, true, null);
}

public interface FiberMessage {
@@ -219,12 +205,12 @@ protected Object singleValue(VirtualFrame frame, Object[] args) {
protected Object transfer(VirtualFrame frame, DynamicObject fiber, boolean isYield, Object[] args) {
CompilerDirectives.transferToInterpreter();

if (!isAlive(fiber)) {
if (!Layouts.FIBER.getAlive(fiber)) {
throw new RaiseException(getContext().getCoreLibrary().deadFiberCalledError(this));
}

DynamicObject currentThread = getContext().getThreadManager().getCurrentThread();
if (Layouts.FIBER.getFields(fiber).rubyThread != currentThread) {
if (Layouts.FIBER.getRubyThread(fiber) != currentThread) {
CompilerDirectives.transferToInterpreter();
throw new RaiseException(getContext().getCoreLibrary().fiberError("fiber called across threads", this));
}
@@ -284,9 +270,9 @@ public YieldNode(RubyContext context, SourceSection sourceSection) {
public Object yield(VirtualFrame frame, Object[] args) {
final DynamicObject currentThread = getContext().getThreadManager().getCurrentThread();
final DynamicObject yieldingFiber = Layouts.THREAD.getFiberManager(currentThread).getCurrentFiber();
final DynamicObject fiberYieldedTo = Layouts.FIBER.getFields(yieldingFiber).lastResumedByFiber;
final DynamicObject fiberYieldedTo = Layouts.FIBER.getLastResumedByFiber(yieldingFiber);

if (Layouts.FIBER.getFields(yieldingFiber).isRootFiber || fiberYieldedTo == null) {
if (Layouts.FIBER.getRootFiber(yieldingFiber) || fiberYieldedTo == null) {
throw new RaiseException(getContext().getCoreLibrary().yieldFromRootFiberError(this));
}

@@ -358,22 +344,4 @@ public DynamicObject allocate(DynamicObject rubyClass) {

}

public static class FiberFields {
public final DynamicObject rubyThread;
@CompilerDirectives.CompilationFinal
public String name;
public final boolean isRootFiber;
// we need 2 slots when the FiberManager sends the kill message and there is another message unprocessed
public final BlockingQueue<FiberNodes.FiberMessage> messageQueue = new LinkedBlockingQueue<>(2);
public volatile DynamicObject lastResumedByFiber = null;
public volatile boolean alive = true;
public volatile Thread thread;

public FiberFields(DynamicObject rubyThread, boolean isRootFiber) {
assert RubyGuards.isRubyThread(rubyThread);
this.rubyThread = rubyThread;
this.isRootFiber = isRootFiber;
}
}

}
Loading