Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jruby/jruby
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 573752a57af4
Choose a base ref
...
head repository: jruby/jruby
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 0691ff2ec876
Choose a head ref
  • 4 commits
  • 15 files changed
  • 1 contributor

Commits on Aug 31, 2015

  1. Copy the full SHA
    bf175cf View commit details
  2. Copy the full SHA
    769e023 View commit details
  3. Copy the full SHA
    fdafb5a View commit details
  4. Copy the full SHA
    0691ff2 View commit details
24 changes: 7 additions & 17 deletions spec/ruby/core/thread/raise_spec.rb
Original file line number Diff line number Diff line change
@@ -9,23 +9,6 @@
lambda {t.raise("Kill the thread")}.should_not raise_error
lambda {t.value}.should_not raise_error
end

it "has backtrace" do
backtrace = nil

thread = Thread.new do
begin
sleep
rescue => e
backtrace = e.backtrace
end
end
sleep 1
thread.raise
thread.join
backtrace.should_not be_nil
end

end

describe "Thread#raise on a sleeping thread" do
@@ -59,6 +42,13 @@
ScratchPad.recorded.message.should == "get to work"
end

it "raises the given exception and the backtrace is the one of the interrupted thread" do
@thr.raise Exception
Thread.pass while @thr.status
ScratchPad.recorded.should be_kind_of(Exception)
ScratchPad.recorded.backtrace[0].should include("sleep")
end

it "is captured and raised by Thread#value" do
t = Thread.new do
sleep
Original file line number Diff line number Diff line change
@@ -123,7 +123,8 @@ public static void cleanup(DynamicObject fiber) {
private static Object[] waitForResume(final DynamicObject fiber) {
assert RubyGuards.isRubyFiber(fiber);

final FiberMessage message = Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getThreadManager().runUntilResult(new ThreadManager.BlockingAction<FiberMessage>() {
final RubyContext context = Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext();
final FiberMessage message = context.getThreadManager().runUntilResult(null, new ThreadManager.BlockingAction<FiberMessage>() {
@Override
public FiberMessage block() throws InterruptedException {
return Layouts.FIBER.getMessageQueue(fiber).take();
@@ -138,7 +139,7 @@ public FiberMessage block() throws InterruptedException {
throw new RaiseException(((FiberExceptionMessage) message).getException());
} else if (message instanceof FiberResumeMessage) {
final FiberResumeMessage resumeMessage = (FiberResumeMessage) message;
assert Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getLogicalClass(fiber)).getContext().getThreadManager().getCurrentThread() == Layouts.FIBER.getRubyThread(resumeMessage.getSendingFiber());
assert context.getThreadManager().getCurrentThread() == Layouts.FIBER.getRubyThread(resumeMessage.getSendingFiber());
if (!(resumeMessage.isYield())) {
Layouts.FIBER.setLastResumedByFiber(fiber, resumeMessage.getSendingFiber());
}
Original file line number Diff line number Diff line change
@@ -19,12 +19,14 @@
import com.oracle.truffle.api.frame.FrameInstance.FrameAccess;
import com.oracle.truffle.api.nodes.DirectCallNode;
import com.oracle.truffle.api.nodes.IndirectCallNode;
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.nodes.UnexpectedResultException;
import com.oracle.truffle.api.object.DynamicObject;
import com.oracle.truffle.api.object.Property;
import com.oracle.truffle.api.source.Source;
import com.oracle.truffle.api.source.SourceSection;
import com.oracle.truffle.api.utilities.ConditionProfile;

import org.jcodings.Encoding;
import org.jcodings.specific.USASCIIEncoding;
import org.jcodings.specific.UTF8Encoding;
@@ -680,7 +682,7 @@ public Object exec(VirtualFrame frame, Object command, Object[] args) {
}

@TruffleBoundary
private static void exec(RubyContext context, DynamicObject envAsHash, String[] commandLine) {
private void exec(RubyContext context, DynamicObject envAsHash, String[] commandLine) {
final ProcessBuilder builder = new ProcessBuilder(commandLine);
builder.inheritIO();

@@ -697,7 +699,7 @@ private static void exec(RubyContext context, DynamicObject envAsHash, String[]
throw new RuntimeException(e);
}

int exitCode = context.getThreadManager().runUntilResult(new BlockingAction<Integer>() {
int exitCode = context.getThreadManager().runUntilResult(this, new BlockingAction<Integer>() {
@Override
public Integer block() throws InterruptedException {
return process.waitFor();
@@ -838,7 +840,7 @@ public DynamicObject gets(VirtualFrame frame) {

final BufferedReader reader = new BufferedReader(new InputStreamReader(in, encoding.getCharset()));

final String line = getContext().getThreadManager().runUntilResult(new BlockingAction<String>() {
final String line = getContext().getThreadManager().runUntilResult(this, new BlockingAction<String>() {
@Override
public String block() throws InterruptedException {
return gets(reader);
@@ -1839,17 +1841,17 @@ private long doSleepMillis(final long durationInMillis) {
// it should only be considered if we are inside the sleep when Thread#{run,wakeup} is called.
Layouts.THREAD.getWakeUp(thread).getAndSet(false);

return sleepFor(getContext(), durationInMillis);
return sleepFor(this, getContext(), durationInMillis);
}

public static long sleepFor(RubyContext context, final long durationInMillis) {
public static long sleepFor(Node currentNode, RubyContext context, final long durationInMillis) {
assert durationInMillis >= 0;

final DynamicObject thread = context.getThreadManager().getCurrentThread();

final long start = System.currentTimeMillis();

long slept = context.getThreadManager().runUntilResult(new BlockingAction<Long>() {
long slept = context.getThreadManager().runUntilResult(currentNode, new BlockingAction<Long>() {
@Override
public Long block() throws InterruptedException {
long now = System.currentTimeMillis();
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ protected static void lock(final ReentrantLock lock, final DynamicObject thread,
throw new RaiseException(context.getCoreLibrary().threadError("deadlock; recursive locking", currentNode));
}

context.getThreadManager().runUntilResult(new BlockingAction<Boolean>() {
context.getThreadManager().runUntilResult(currentNode, new BlockingAction<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
lock.lockInterruptibly();
@@ -214,7 +214,7 @@ public long doSleepMillis(DynamicObject mutex, long durationInMillis) {

UnlockNode.unlock(lock, thread, this);
try {
return KernelNodes.SleepNode.sleepFor(getContext(), durationInMillis);
return KernelNodes.SleepNode.sleepFor(this, getContext(), durationInMillis);
} finally {
LockNode.lock(lock, thread, this);
}
Original file line number Diff line number Diff line change
@@ -88,7 +88,7 @@ public RubyNode coerceToBoolean(RubyNode nonBlocking) {
public Object popBlocking(DynamicObject self, boolean nonBlocking) {
final BlockingQueue<Object> queue = Layouts.QUEUE.getQueue(self);

return getContext().getThreadManager().runUntilResult(new BlockingAction<Object>() {
return getContext().getThreadManager().runUntilResult(this, new BlockingAction<Object>() {
@Override
public Object block() throws InterruptedException {
return queue.take();
@@ -136,7 +136,7 @@ public Object receiveTimeout(DynamicObject self, double duration) {
final long durationInMillis = (long) (duration * 1000.0);
final long start = System.currentTimeMillis();

return getContext().getThreadManager().runUntilResult(new BlockingAction<Object>() {
return getContext().getThreadManager().runUntilResult(this, new BlockingAction<Object>() {
@Override
public Object block() throws InterruptedException {
long now = System.currentTimeMillis();
@@ -246,7 +246,7 @@ public int num_waiting(DynamicObject self) {
final ReentrantLock lock = (ReentrantLock) UnsafeHolder.U.getObject(linkedBlockingQueue, LOCK_FIELD_OFFSET);
final Condition notEmptyCondition = (Condition) UnsafeHolder.U.getObject(linkedBlockingQueue, NOT_EMPTY_CONDITION_FIELD_OFFSET);

getContext().getThreadManager().runUntilResult(new BlockingAction<Boolean>() {
getContext().getThreadManager().runUntilResult(this, new BlockingAction<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
lock.lockInterruptibly();
Original file line number Diff line number Diff line change
@@ -142,7 +142,7 @@ public RubyNode coerceToBoolean(RubyNode nonBlocking) {
public DynamicObject pushBlocking(DynamicObject self, final Object value, boolean nonBlocking) {
final BlockingQueue<Object> queue = Layouts.SIZED_QUEUE.getQueue(self);

getContext().getThreadManager().runUntilResult(new BlockingAction<Boolean>() {
getContext().getThreadManager().runUntilResult(this, new BlockingAction<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
queue.put(value);
@@ -189,7 +189,7 @@ public RubyNode coerceToBoolean(RubyNode nonBlocking) {
public Object popBlocking(DynamicObject self, boolean nonBlocking) {
final BlockingQueue<Object> queue = Layouts.SIZED_QUEUE.getQueue(self);

return getContext().getThreadManager().runUntilResult(new BlockingAction<Object>() {
return getContext().getThreadManager().runUntilResult(this, new BlockingAction<Object>() {
@Override
public Object block() throws InterruptedException {
return queue.take();
@@ -282,7 +282,7 @@ public int num_waiting(DynamicObject self) {
final Condition notEmptyCondition = (Condition) UnsafeHolder.U.getObject(arrayBlockingQueue, NOT_EMPTY_CONDITION_FIELD_OFFSET);
final Condition notFullCondition = (Condition) UnsafeHolder.U.getObject(arrayBlockingQueue, NOT_FULL_CONDITION_FIELD_OFFSET);

getContext().getThreadManager().runUntilResult(new BlockingAction<Boolean>() {
getContext().getThreadManager().runUntilResult(this, new BlockingAction<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
lock.lockInterruptibly();
86 changes: 43 additions & 43 deletions truffle/src/main/java/org/jruby/truffle/nodes/core/ThreadNodes.java
Original file line number Diff line number Diff line change
@@ -16,9 +16,11 @@
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.object.DynamicObject;
import com.oracle.truffle.api.source.SourceSection;

import org.jruby.RubyThread.Status;
import org.jruby.runtime.Visibility;
import org.jruby.truffle.nodes.RubyGuards;
import org.jruby.truffle.nodes.RubyNode;
import org.jruby.truffle.runtime.NotProvided;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.control.RaiseException;
@@ -120,44 +122,6 @@ public static void shutdown(DynamicObject thread) {
throw new ThreadExitException();
}

public static void join(final DynamicObject thread) {
assert RubyGuards.isRubyThread(thread);
Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getMetaClass(thread)).getContext().getThreadManager().runUntilResult(new ThreadManager.BlockingAction<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
Layouts.THREAD.getFinishedLatch(thread).await();
return SUCCESS;
}
});

if (Layouts.THREAD.getException(thread) != null) {
throw new RaiseException(Layouts.THREAD.getException(thread));
}
}

public static boolean join(final DynamicObject thread, final int timeoutInMillis) {
assert RubyGuards.isRubyThread(thread);
final long start = System.currentTimeMillis();
final boolean joined = Layouts.MODULE.getFields(Layouts.BASIC_OBJECT.getMetaClass(thread)).getContext().getThreadManager().runUntilResult(new ThreadManager.BlockingAction<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
long now = System.currentTimeMillis();
long waited = now - start;
if (waited >= timeoutInMillis) {
// We need to know whether countDown() was called and we do not want to block.
return Layouts.THREAD.getFinishedLatch(thread).getCount() == 0;
}
return Layouts.THREAD.getFinishedLatch(thread).await(timeoutInMillis - waited, TimeUnit.MILLISECONDS);
}
});

if (joined && Layouts.THREAD.getException(thread) != null) {
throw new RaiseException(Layouts.THREAD.getException(thread));
}

return joined;
}

@CoreMethod(names = "alive?")
public abstract static class AliveNode extends CoreMethodArrayArgumentsNode {

@@ -275,7 +239,7 @@ public JoinNode(RubyContext context, SourceSection sourceSection) {

@Specialization
public DynamicObject join(DynamicObject thread, NotProvided timeout) {
ThreadNodes.join(thread);
doJoin(this, thread);
return thread;
}

@@ -295,15 +259,51 @@ public Object join(DynamicObject thread, double timeout) {
}

private Object joinMillis(DynamicObject self, int timeoutInMillis) {
assert RubyGuards.isRubyThread(self);

if (ThreadNodes.join(self, timeoutInMillis)) {
if (doJoinMillis(self, timeoutInMillis)) {
return self;
} else {
return nil();
}
}

@TruffleBoundary
public static void doJoin(RubyNode currentNode, final DynamicObject thread) {
currentNode.getContext().getThreadManager().runUntilResult(currentNode, new ThreadManager.BlockingAction<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
Layouts.THREAD.getFinishedLatch(thread).await();
return SUCCESS;
}
});

if (Layouts.THREAD.getException(thread) != null) {
throw new RaiseException(Layouts.THREAD.getException(thread));
}
}

@TruffleBoundary
private boolean doJoinMillis(final DynamicObject thread, final int timeoutInMillis) {
final long start = System.currentTimeMillis();
final boolean joined = getContext().getThreadManager().runUntilResult(this, new ThreadManager.BlockingAction<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
long now = System.currentTimeMillis();
long waited = now - start;
if (waited >= timeoutInMillis) {
// We need to know whether countDown() was called and we do not want to block.
return Layouts.THREAD.getFinishedLatch(thread).getCount() == 0;
}
return Layouts.THREAD.getFinishedLatch(thread).await(timeoutInMillis - waited, TimeUnit.MILLISECONDS);
}
});

if (joined && Layouts.THREAD.getException(thread) != null) {
throw new RaiseException(Layouts.THREAD.getException(thread));
}

return joined;
}

}

@CoreMethod(names = "main", onSingleton = true)
@@ -381,7 +381,7 @@ public ValueNode(RubyContext context, SourceSection sourceSection) {

@Specialization
public Object value(DynamicObject self) {
join(self);
JoinNode.doJoin(this, self);
return Layouts.THREAD.getValue(self);
}

Original file line number Diff line number Diff line change
@@ -582,7 +582,7 @@ public Object select(DynamicObject readables, DynamicObject writables, DynamicOb
readableSet.set(fd);
}

final int result = getContext().getThreadManager().runUntilResult(new ThreadManager.BlockingAction<Integer>() {
final int result = getContext().getThreadManager().runUntilResult(this, new ThreadManager.BlockingAction<Integer>() {
@Override
public Integer block() throws InterruptedException {
return nativeSockets().select(
@@ -617,7 +617,7 @@ public Object selectNilReadables(DynamicObject readables, DynamicObject writable
writableSet.set(fd);
}

final int result = getContext().getThreadManager().runUntilResult(new ThreadManager.BlockingAction<Integer>() {
final int result = getContext().getThreadManager().runUntilResult(this, new ThreadManager.BlockingAction<Integer>() {
@Override
public Integer block() throws InterruptedException {
return nativeSockets().select(
Original file line number Diff line number Diff line change
@@ -13,7 +13,10 @@
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.object.DynamicObject;
import com.oracle.truffle.api.source.SourceSection;

import org.jruby.truffle.runtime.RubyCallStack;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.backtrace.Backtrace;
import org.jruby.truffle.runtime.control.RaiseException;
import org.jruby.truffle.runtime.layouts.Layouts;
import org.jruby.truffle.runtime.subsystems.SafepointAction;
@@ -34,16 +37,18 @@ public ThreadRaisePrimitiveNode(RubyContext context, SourceSection sourceSection

@Specialization(guards = { "isRubyThread(thread)", "isRubyException(exception)" })
public DynamicObject raise(DynamicObject thread, final DynamicObject exception) {
getContext().getSafepointManager().pauseThreadAndExecuteLater(
Layouts.FIBER.getThread((Layouts.THREAD.getFiberManager(thread).getCurrentFiber())),
this,
new SafepointAction() {
@Override
public void run(DynamicObject currentThread, Node currentNode) {
throw new RaiseException(exception);
}
});
final Thread javaThread = Layouts.FIBER.getThread((Layouts.THREAD.getFiberManager(thread).getCurrentFiber()));

getContext().getSafepointManager().pauseThreadAndExecuteLater(javaThread, this, new SafepointAction() {
@Override
public void run(DynamicObject currentThread, Node currentNode) {
if (Layouts.EXCEPTION.getBacktrace(exception) == null) {
Backtrace backtrace = RubyCallStack.getBacktrace(currentNode);
Layouts.EXCEPTION.setBacktrace(exception, backtrace);
}
throw new RaiseException(exception);
}
});
return nil();
}

Original file line number Diff line number Diff line change
@@ -492,7 +492,7 @@ public Object waitPID(final int input_pid, boolean no_hang) {
final int finalOptions = options;

// retry:
pid = getContext().getThreadManager().runUntilResult(new ThreadManager.BlockingAction<Integer>() {
pid = getContext().getThreadManager().runUntilResult(this, new ThreadManager.BlockingAction<Integer>() {
@Override
public Integer block() throws InterruptedException {
return posix().waitpid(input_pid, statusReference, finalOptions);
Original file line number Diff line number Diff line change
@@ -101,7 +101,7 @@ private void runFinalizers() {

while (true) {
// Wait on the finalizer queue
FinalizerReference finalizerReference = context.getThreadManager().runUntilResult(new BlockingAction<FinalizerReference>() {
FinalizerReference finalizerReference = context.getThreadManager().runUntilResult(null, new BlockingAction<FinalizerReference>() {
@Override
public FinalizerReference block() throws InterruptedException {
return (FinalizerReference) finalizerQueue.remove();
Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@ public static interface BlockingAction<T> {
* @return the first non-null return value from {@code action}
*/
@TruffleBoundary
public <T> T runUntilResult(BlockingAction<T> action) {
public <T> T runUntilResult(Node currentNode, BlockingAction<T> action) {
T result = null;

do {
@@ -83,7 +83,7 @@ public <T> T runUntilResult(BlockingAction<T> action) {
}
} catch (InterruptedException e) {
// We were interrupted, possibly by the SafepointManager.
context.getSafepointManager().pollFromBlockingCall(null);
context.getSafepointManager().pollFromBlockingCall(currentNode);
}
} while (result == null);

Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ public class FileUtils {
public static byte[] readAllBytesInterruptedly(RubyContext context, String file) {
final Path path = Paths.get(file);

return context.getThreadManager().runUntilResult(new BlockingAction<byte[]>() {
return context.getThreadManager().runUntilResult(null, new BlockingAction<byte[]>() {
@Override
public byte[] block() throws InterruptedException {
try {
2 changes: 1 addition & 1 deletion truffle/src/main/ruby/core.rb
Original file line number Diff line number Diff line change
@@ -88,6 +88,7 @@ def self.omit(reason)
require_relative 'core/rubinius/bootstrap/string'
require_relative 'core/rubinius/bootstrap/symbol'
require_relative 'core/rubinius/bootstrap/thread'
require_relative 'core/rubinius/api/shims/thread_bootstrap'
#require_relative 'core/rubinius/bootstrap/thunk'
require_relative 'core/rubinius/bootstrap/time'
require_relative 'core/rubinius/bootstrap/true'
@@ -209,7 +210,6 @@ def self.omit(reason)
require_relative 'core/rubinius/common/symbol'
require_relative 'core/rubinius/common/mutex'
require_relative 'core/rubinius/common/thread'
require_relative 'core/rubinius/api/shims/thread_common'
#require_relative 'core/rubinius/common/thread_group'
require_relative 'core/rubinius/common/throw_catch'
require_relative 'core/rubinius/common/time'
Original file line number Diff line number Diff line change
@@ -62,7 +62,6 @@ def raise(exc=undefined, msg=nil, trace=nil)
if self == Thread.current
Kernel.raise exc
else
exc.capture_backtrace! 2 unless exc.backtrace?
raise_prim exc
end
end