Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jruby/jruby
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 7e896105bdd7
Choose a base ref
...
head repository: jruby/jruby
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: b994c1c0fd19
Choose a head ref
  • 4 commits
  • 6 files changed
  • 1 contributor

Commits on Jul 14, 2015

  1. [Truffle] Use a separate flag for waking up a thread.

    * Thread#{wakeup,run} only interrupts #sleep and Thread.stop.
    * Simplifies a lot interactions between the status and blocking actions.
    * Fixes #3131.
    eregon committed Jul 14, 2015
    Copy the full SHA
    dfc5f8a View commit details
  2. Copy the full SHA
    2eca2cb View commit details
  3. Copy the full SHA
    27d7722 View commit details
  4. Copy the full SHA
    b994c1c View commit details
2 changes: 2 additions & 0 deletions spec/truffle/tags/core/kernel/require_tags.txt
Original file line number Diff line number Diff line change
@@ -50,6 +50,8 @@ fails:Kernel#require stores the missing path in a LoadError object
fails:Kernel.require stores the missing path in a LoadError object
fails:Kernel#require (concurrently) blocks a second thread from returning while the 1st is still requiring
fails:Kernel.require (concurrently) blocks a second thread from returning while the 1st is still requiring
fails:Kernel#require (concurrently) blocks based on the path
fails:Kernel.require (concurrently) blocks based on the path
fails:Kernel#require (concurrently) allows a 2nd require if the 1st raised an exception
fails:Kernel.require (concurrently) allows a 2nd require if the 1st raised an exception
fails:Kernel#require (concurrently) blocks a 3rd require if the 1st raises an exception and the 2nd is still running
30 changes: 17 additions & 13 deletions truffle/src/main/java/org/jruby/truffle/nodes/core/KernelNodes.java
Original file line number Diff line number Diff line change
@@ -1807,30 +1807,34 @@ private long doSleepMillis(final long durationInMillis) {
throw new RaiseException(getContext().getCoreLibrary().argumentError("time interval must be positive", this));
}

final long start = System.currentTimeMillis();
final RubyThread thread = getContext().getThreadManager().getCurrentThread();

long slept = getContext().getThreadManager().runUntilResult(new BlockingActionWithoutGlobalLock<Long>() {
boolean shouldWakeUp = false;
// Clear the wakeUp flag, following Ruby semantics:
// it should only be considered if we are inside the sleep when Thread#{run,wakeup} is called.
thread.shouldWakeUp();

return sleepFor(getContext(), durationInMillis);
}

public static long sleepFor(RubyContext context, final long durationInMillis) {
assert durationInMillis >= 0;

final RubyThread thread = context.getThreadManager().getCurrentThread();

final long start = System.currentTimeMillis();

long slept = context.getThreadManager().runUntilResult(new BlockingActionWithoutGlobalLock<Long>() {
@Override
public Long block() throws InterruptedException {
long now = System.currentTimeMillis();
long slept = now - start;

if (shouldWakeUp || slept >= durationInMillis) {
if (slept >= durationInMillis || thread.shouldWakeUp()) {
return slept;
}
Thread.sleep(durationInMillis - slept);

try {
Thread.sleep(durationInMillis - slept);
return System.currentTimeMillis() - start;
} catch (InterruptedException e) {
if (thread.getStatus() == Status.RUN) { // Thread#{wakeup,run}
shouldWakeUp = true;
}
throw e;
}
return System.currentTimeMillis() - start;
}
});

86 changes: 76 additions & 10 deletions truffle/src/main/java/org/jruby/truffle/nodes/core/MutexNodes.java
Original file line number Diff line number Diff line change
@@ -14,7 +14,10 @@
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.object.*;
import com.oracle.truffle.api.source.SourceSection;

import org.jruby.truffle.nodes.RubyNode;
import org.jruby.truffle.nodes.objects.Allocator;
import org.jruby.truffle.runtime.NotProvided;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.control.RaiseException;
import org.jruby.truffle.runtime.core.RubyBasicObject;
@@ -68,24 +71,29 @@ public LockNode(RubyContext context, SourceSection sourceSection) {
@Specialization
public RubyBasicObject lock(RubyBasicObject mutex) {
final ReentrantLock lock = getLock(mutex);
final RubyThread thread = getContext().getThreadManager().getCurrentThread();

lock(lock, thread, this);

return mutex;
}

protected static void lock(final ReentrantLock lock, final RubyThread thread, RubyNode currentNode) {
final RubyContext context = currentNode.getContext();

if (lock.isHeldByCurrentThread()) {
CompilerDirectives.transferToInterpreter();
throw new RaiseException(getContext().getCoreLibrary().threadError("deadlock; recursive locking", this));
throw new RaiseException(context.getCoreLibrary().threadError("deadlock; recursive locking", currentNode));
}

final RubyThread thread = getContext().getThreadManager().getCurrentThread();

getContext().getThreadManager().runUntilResult(new BlockingActionWithoutGlobalLock<Boolean>() {
context.getThreadManager().runUntilResult(new BlockingActionWithoutGlobalLock<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
lock.lockInterruptibly();
thread.acquiredLock(lock);
return SUCCESS;
}
});

return mutex;
}

}
@@ -154,22 +162,80 @@ public UnlockNode(RubyContext context, SourceSection sourceSection) {
@Specialization
public RubyBasicObject unlock(RubyBasicObject mutex) {
final ReentrantLock lock = getLock(mutex);

final RubyThread thread = getContext().getThreadManager().getCurrentThread();

unlock(lock, thread, this);

return mutex;
}

protected static void unlock(ReentrantLock lock, RubyThread thread, RubyNode currentNode) {
final RubyContext context = currentNode.getContext();

try {
lock.unlock();
} catch (IllegalMonitorStateException e) {
if (!lock.isLocked()) {
throw new RaiseException(getContext().getCoreLibrary().threadError("Attempt to unlock a mutex which is not locked", this));
throw new RaiseException(context.getCoreLibrary().threadError("Attempt to unlock a mutex which is not locked", currentNode));
} else {
throw new RaiseException(getContext().getCoreLibrary().threadError("Attempt to unlock a mutex which is locked by another thread", this));
throw new RaiseException(context.getCoreLibrary().threadError("Attempt to unlock a mutex which is locked by another thread", currentNode));
}
}

thread.releasedLock(lock);
}

return mutex;
}

@CoreMethod(names = "sleep", optional = 1)
public abstract static class SleepNode extends CoreMethodArrayArgumentsNode {

public SleepNode(RubyContext context, SourceSection sourceSection) {
super(context, sourceSection);
}

@Specialization
public long sleep(RubyBasicObject mutex, NotProvided duration) {
// TODO: this should actually be "forever".
return doSleepMillis(mutex, Integer.MAX_VALUE);
}

@Specialization(guards = "isNil(duration)")
public long sleep(RubyBasicObject mutex, RubyBasicObject duration) {
return sleep(mutex, NotProvided.INSTANCE);
}

@Specialization
public long sleep(RubyBasicObject mutex, long duration) {
return doSleepMillis(mutex, duration * 1000);
}

@Specialization
public long sleep(RubyBasicObject mutex, double duration) {
return doSleepMillis(mutex, (long) (duration * 1000.0));
}

public long doSleepMillis(RubyBasicObject mutex, long durationInMillis) {
if (durationInMillis < 0) {
throw new RaiseException(getContext().getCoreLibrary().argumentError("time interval must be positive", this));
}

final ReentrantLock lock = getLock(mutex);
final RubyThread thread = getContext().getThreadManager().getCurrentThread();

// Clear the wakeUp flag, following Ruby semantics:
// it should only be considered if we are inside the sleep when Thread#{run,wakeup} is called.
// Here we do it before unlocking for providing nice semantics for
// thread1: mutex.sleep
// thread2: mutex.synchronize { <ensured that thread1 is sleeping and thread1.wakeup will wake it up> }
thread.shouldWakeUp();

UnlockNode.unlock(lock, thread, this);
try {
return KernelNodes.SleepNode.sleepFor(getContext(), durationInMillis);
} finally {
LockNode.lock(lock, thread, this);
}
}

}
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;

/**
@@ -48,6 +49,7 @@ public class RubyThread extends RubyBasicObject {

private volatile Thread thread;
private volatile Status status = Status.RUN;
private volatile AtomicBoolean wakeUp = new AtomicBoolean(false);

private volatile RubyException exception;
private volatile Object value;
@@ -178,7 +180,7 @@ public Boolean block() throws InterruptedException {
}

public void wakeup() {
status = Status.RUN;
wakeUp.set(true);
Thread t = thread;
if (t != null) {
t.interrupt();
@@ -256,6 +258,11 @@ public void setInterruptMode(InterruptMode interruptMode) {
this.interruptMode = interruptMode;
}

/** Return whether Thread#{run,wakeup} was called and clears the wakeup flag. */
public boolean shouldWakeUp() {
return wakeUp.getAndSet(false);
}

public static class ThreadAllocator implements Allocator {

@Override
1 change: 0 additions & 1 deletion truffle/src/main/ruby/core.rb
Original file line number Diff line number Diff line change
@@ -207,7 +207,6 @@ def self.omit(reason)
#require_relative 'core/rubinius/common/sprinter'
require_relative 'core/rubinius/common/symbol'
require_relative 'core/rubinius/common/mutex'
require_relative 'core/rubinius/api/shims/mutex'
require_relative 'core/rubinius/common/thread'
#require_relative 'core/rubinius/common/thread_group'
require_relative 'core/rubinius/common/throw_catch'
42 changes: 0 additions & 42 deletions truffle/src/main/ruby/core/rubinius/api/shims/mutex.rb

This file was deleted.