Skip to content

Commit

Permalink
[Truffle] Fix location to run deferred safepoint actions.
Browse files Browse the repository at this point in the history
* Was a deadlock as threads could fight over the global lock and the safepoint lock.
* Fix Kernel#sleep to actually try sleeping the given duration.
* Implement Thread.{wakeup,run}.
* Optimize the case of interrupting the current thread.
* Fix Exception#to_s.
* Move some Thread code to Ruby.
  • Loading branch information
eregon committed Apr 23, 2015
1 parent e43d8fa commit 89c00ff
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 102 deletions.
1 change: 0 additions & 1 deletion spec/truffle/tags/core/exception/to_s_tags.txt
@@ -1,2 +1 @@
fails:Exception#to_s returns self's message if set
fails:Exception#to_s calls #to_s on the message
5 changes: 0 additions & 5 deletions spec/truffle/tags/core/thread/kill_tags.txt

This file was deleted.

3 changes: 0 additions & 3 deletions spec/truffle/tags/core/thread/run_tags.txt

This file was deleted.

11 changes: 0 additions & 11 deletions spec/truffle/tags/core/thread/terminate_tags.txt

This file was deleted.

3 changes: 0 additions & 3 deletions spec/truffle/tags/core/thread/wakeup_tags.txt

This file was deleted.

Expand Up @@ -39,7 +39,7 @@ public InitializeNode(InitializeNode prev) {
public RubyNilClass initialize(RubyException exception, UndefinedPlaceholder message) {
notDesignedForCompilation();

exception.initialize(getContext().makeString(" "));
exception.initialize(getContext().makeString(""));
return nil();
}

Expand Down Expand Up @@ -132,7 +132,11 @@ public ToSNode(ToSNode prev) {

@Specialization
public RubyString toS(RubyException exception) {
return getContext().makeString(exception.getLogicalClass().getName());
if (exception.getMessage().length() == 0) {
return getContext().makeString(exception.getLogicalClass().getName());
} else {
return getContext().makeString(exception.getMessage().getBytes());
}
}

}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.oracle.truffle.api.utilities.ConditionProfile;

import org.jcodings.Encoding;
import org.jruby.RubyThread.Status;
import org.jruby.common.IRubyWarnings;
import org.jruby.runtime.Visibility;
import org.jruby.truffle.nodes.RubyNode;
Expand Down Expand Up @@ -1790,18 +1791,33 @@ private long doSleepMillis(final long durationInMillis) {
}

final long start = System.currentTimeMillis();
final RubyThread thread = getContext().getThreadManager().getCurrentThread();

long slept = getContext().getThreadManager().runUntilResult(new BlockingActionWithoutGlobalLock<Long>() {
boolean shouldWakeUp = false;

getContext().getThreadManager().runOnce(new BlockingActionWithoutGlobalLock<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
Thread.sleep(durationInMillis);
return SUCCESS;
public Long block() throws InterruptedException {
long now = System.currentTimeMillis();
long slept = now - start;

if (shouldWakeUp || slept >= durationInMillis) {
return slept;
}

try {
Thread.sleep(durationInMillis - slept);
return System.currentTimeMillis() - start;
} catch (InterruptedException e) {
if (thread.getStatus() == Status.RUN) { // Thread#{wakeup,run}
shouldWakeUp = true;
}
throw e;
}
}
});

final long end = System.currentTimeMillis();

return (end - start) / 1000;
return slept / 1000;
}

}
Expand Down
59 changes: 13 additions & 46 deletions truffle/src/main/java/org/jruby/truffle/nodes/core/ThreadNodes.java
Expand Up @@ -69,25 +69,6 @@ public RubyThread current() {

}

@CoreMethod(names = "exit", onSingleton = true)
public abstract static class ExitModuleNode extends CoreMethodNode {

public ExitModuleNode(RubyContext context, SourceSection sourceSection) {
super(context, sourceSection);
}

public ExitModuleNode(ExitModuleNode prev) {
super(prev);
}

@Specialization
public RubyNilClass exit() {
getContext().getThreadManager().getCurrentThread().shutdown();
return nil();
}

}

@CoreMethod(names = { "kill", "exit", "terminate" })
public abstract static class KillNode extends CoreMethodNode {

Expand All @@ -100,14 +81,17 @@ public KillNode(KillNode prev) {
}

@Specialization
public RubyThread kill(final RubyThread thread) {
getContext().getSafepointManager().pauseThreadAndExecuteLater(thread.getRootFiberJavaThread(), this, new SafepointAction() {
public RubyThread kill(final RubyThread rubyThread) {
final Thread toKill = rubyThread.getRootFiberJavaThread();

getContext().getSafepointManager().pauseThreadAndExecuteLater(toKill, this, new SafepointAction() {
@Override
public void run(RubyThread currentThread, Node currentNode) {
currentThread.shutdown();
}
});
return thread;

return rubyThread;
}

}
Expand Down Expand Up @@ -271,28 +255,6 @@ public void run(RubyThread currentThread, Node currentNode) {

}

@CoreMethod(names = "run")
public abstract static class RunNode extends CoreMethodNode {

public RunNode(RubyContext context, SourceSection sourceSection) {
super(context, sourceSection);
}

public RunNode(RunNode prev) {
super(prev);
}

@Specialization
public RubyThread run(final RubyThread thread) {
notDesignedForCompilation();

thread.interrupt();

return thread;
}

}

@CoreMethod(names = "status")
public abstract static class StatusNode extends CoreMethodNode {

Expand Down Expand Up @@ -364,7 +326,7 @@ public Object value(RubyThread self) {

}

@CoreMethod(names = "wakeup")
@CoreMethod(names = { "wakeup", "run" })
public abstract static class WakeupNode extends CoreMethodNode {

public WakeupNode(RubyContext context, SourceSection sourceSection) {
Expand All @@ -379,8 +341,13 @@ public WakeupNode(WakeupNode prev) {
public RubyThread wakeup(final RubyThread thread) {
notDesignedForCompilation();

if (thread.getStatus() == Status.DEAD) {
CompilerDirectives.transferToInterpreter();
throw new RaiseException(getContext().getCoreLibrary().threadError("killed thread", this));
}

// TODO: should only interrupt sleep
thread.interrupt();
thread.wakeup();

return thread;
}
Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;

/**
Expand Down Expand Up @@ -129,7 +130,7 @@ public void cleanup() {

public void shutdown() {
fiberManager.shutdown();
exit();
throw new ThreadExitException();
}

public Thread getRootFiberJavaThread() {
Expand Down Expand Up @@ -169,7 +170,8 @@ public Boolean block() throws InterruptedException {
return joined;
}

public void interrupt() {
public void wakeup() {
status = Status.RUN;
Thread t = thread;
if (t != null) {
t.interrupt();
Expand Down Expand Up @@ -213,10 +215,6 @@ public RubyException getException() {
return exception;
}

private void exit() {
throw new ThreadExitException();
}

public String getName() {
return name;
}
Expand Down
Expand Up @@ -17,6 +17,7 @@
import com.oracle.truffle.api.nodes.Node;

import org.jruby.RubyThread.Status;
import org.jruby.truffle.nodes.RubyNode;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.core.RubyThread;

Expand Down Expand Up @@ -69,13 +70,20 @@ private void poll(Node currentNode, boolean holdsGlobalLock) {
try {
assumption.check();
} catch (InvalidAssumptionException e) {
assumptionInvalidated(currentNode, holdsGlobalLock, false);
SafepointAction deferredAction = assumptionInvalidated(currentNode, holdsGlobalLock, false);

// We're now running again normally, with the global lock, and can run deferred actions
if (deferredAction != null && holdsGlobalLock) {
deferredAction.run(context.getThreadManager().getCurrentThread(), currentNode);
}
}
}

private void assumptionInvalidated(Node currentNode, boolean holdsGlobalLock, boolean isDrivingThread) {
RubyThread thread = null;
private SafepointAction assumptionInvalidated(Node currentNode, boolean holdsGlobalLock, boolean isDrivingThread) {
// Read these while in the safepoint.
SafepointAction deferredAction = deferred ? action : null;

RubyThread thread = null;
if (holdsGlobalLock) {
thread = context.getThreadManager().leaveGlobalLock();
}
Expand All @@ -85,16 +93,13 @@ private void assumptionInvalidated(Node currentNode, boolean holdsGlobalLock, bo
try {
step(currentNode, thread, isDrivingThread);
} finally {
if (holdsGlobalLock) {
// The driving thread must acquire the global lock AFTER releasing the SafepointManager lock.
if (!isDrivingThread && holdsGlobalLock) {
context.getThreadManager().enterGlobalLock(thread);
}
}

// We're now running again normally, with the global lock, and can run deferred actions

if (deferred && holdsGlobalLock && thread != null) {
action.run(thread, currentNode);
}
return deferredAction;
}

private void step(Node currentNode, RubyThread thread, boolean isDrivingThread) {
Expand Down Expand Up @@ -123,6 +128,8 @@ public void pauseAllThreadsAndExecute(Node currentNode, boolean deferred, Safepo
throw new IllegalStateException("Re-entered SafepointManager");
}

RubyThread thread = context.getThreadManager().getCurrentThread();

// Need to lock interruptibly since we are in the registered threads.
while (true) {
try {
Expand All @@ -137,6 +144,12 @@ public void pauseAllThreadsAndExecute(Node currentNode, boolean deferred, Safepo
pauseAllThreadsAndExecute(currentNode, true, action, deferred);
} finally {
lock.unlock();
context.getThreadManager().enterGlobalLock(thread);
}

// Run deferred actions after leaving the SafepointManager lock and with the global lock.
if (deferred) {
action.run(thread, currentNode);
}
}

Expand Down Expand Up @@ -174,15 +187,21 @@ private void pauseAllThreadsAndExecute(Node currentNode, boolean holdsGlobalLock
assumptionInvalidated(currentNode, holdsGlobalLock, true);
}

public void pauseThreadAndExecuteLater(final Thread thread, Node currentNode, final SafepointAction action) {
pauseAllThreadsAndExecute(currentNode, true, new SafepointAction() {
@Override
public void run(RubyThread rubyThread, Node currentNode) {
if (Thread.currentThread() == thread) {
action.run(rubyThread, currentNode);
public void pauseThreadAndExecuteLater(final Thread thread, RubyNode currentNode, final SafepointAction action) {
if (Thread.currentThread() == thread) {
// fast path if we are already the right thread
RubyThread rubyThread = context.getThreadManager().getCurrentThread();
action.run(rubyThread, currentNode);
} else {
pauseAllThreadsAndExecute(currentNode, true, new SafepointAction() {
@Override
public void run(RubyThread rubyThread, Node currentNode) {
if (Thread.currentThread() == thread) {
action.run(rubyThread, currentNode);
}
}
}
});
});
}
}

public void pauseMainThreadAndExecuteLaterFromNonRubyThread(final Thread thread, final SafepointAction action) {
Expand Down
2 changes: 1 addition & 1 deletion truffle/src/main/ruby/core.rb
Expand Up @@ -73,7 +73,7 @@
require_relative 'core/rubinius/bootstrap/stat'
require_relative 'core/rubinius/bootstrap/string'
require_relative 'core/rubinius/bootstrap/symbol'
#require_relative 'core/rubinius/bootstrap/thread'
require_relative 'core/rubinius/bootstrap/thread'
#require_relative 'core/rubinius/bootstrap/thunk'
require_relative 'core/rubinius/bootstrap/time'
require_relative 'core/rubinius/bootstrap/true'
Expand Down
44 changes: 44 additions & 0 deletions truffle/src/main/ruby/core/rubinius/bootstrap/thread.rb
@@ -0,0 +1,44 @@
# Copyright (c) 2007-2014, Evan Phoenix and contributors
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Rubinius nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

# Only part of Rubinius' thread.rb

class Thread

def self.stop
sleep
nil
end

def self.exit
Thread.current.kill
end

def self.kill(thread)
thread.kill
end

end

0 comments on commit 89c00ff

Please sign in to comment.