Skip to content

Commit

Permalink
Showing 3 changed files with 80 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -45,7 +45,9 @@
import com.oracle.truffle.api.source.SourceSection;
import jnr.constants.platform.Errno;
import jnr.constants.platform.Fcntl;
import jnr.ffi.Pointer;
import jnr.ffi.*;
import jnr.posix.DefaultNativeTimeval;
import jnr.posix.Timeval;
import org.jruby.truffle.nodes.RubyGuards;
import org.jruby.truffle.nodes.dispatch.CallDispatchHeadNode;
import org.jruby.truffle.nodes.dispatch.DispatchHeadNodeFactory;
@@ -100,7 +102,7 @@ public IOConnectPipeNode(RubyContext context, SourceSection sourceSection) {
}

@Specialization
public boolean connectPipe(VirtualFrame frame, DynamicObject lhs, DynamicObject rhs) {
public boolean connectPipe(DynamicObject lhs, DynamicObject rhs) {
final int[] fds = new int[2];

if (posix().pipe(fds) == -1) {
@@ -269,13 +271,11 @@ public Object readIfAvailable(VirtualFrame frame, DynamicObject file, int number
final FDSet fdSet = fdSetFactory.create();
fdSet.set(fd);

// TODO CS 2-Sep-15 why are longs 8 bytes? Is that always the case?
final Pointer timeout = getContext().getMemoryManager().allocateDirect(8 * 2); // Needs to be two longs.
timeout.putLong(0, 0);
timeout.putLong(8, 0);
final Timeval timeoutObject = new DefaultNativeTimeval(jnr.ffi.Runtime.getSystemRuntime());
timeoutObject.setTime(new long[]{0, 0});

final int res = nativeSockets().select(fd + 1, fdSet.getPointer(),
PointerNodes.NULL_POINTER, PointerNodes.NULL_POINTER, timeout);
PointerNodes.NULL_POINTER, PointerNodes.NULL_POINTER, timeoutObject);

if (res == 0) {
CompilerDirectives.transferToInterpreter();
@@ -594,19 +594,25 @@ public Object select(DynamicObject readables, DynamicObject writables, DynamicOb
readableSet.set(fd);
}

final int result = getContext().getThreadManager().runUntilResult(this, new ThreadManager.BlockingAction<Integer>() {
final int result = getContext().getThreadManager().runUntilTimeout(this, timeout, new ThreadManager.BlockingTimeoutAction<Integer>() {
@Override
public Integer block() throws InterruptedException {
return nativeSockets().select(
public Integer block(Timeval timeoutToUse) throws InterruptedException {
final int result = nativeSockets().select(
max(readableFds) + 1,
readableSet.getPointer(),
PointerNodes.NULL_POINTER,
PointerNodes.NULL_POINTER,
PointerNodes.NULL_POINTER);
timeoutToUse);

if (result == 0) {
return null;
}

return result;
}
});

if (result == -1) {
if (result == -1 || result == 0) {
return nil();
}

@@ -637,7 +643,7 @@ public Integer block() throws InterruptedException {
PointerNodes.NULL_POINTER,
writableSet.getPointer(),
PointerNodes.NULL_POINTER,
PointerNodes.NULL_POINTER);
null);
}
});

Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
package org.jruby.truffle.runtime.sockets;

import jnr.ffi.Pointer;
import jnr.posix.Timeval;

public interface NativeSockets {

@@ -86,7 +87,7 @@ public interface NativeSockets {
* fd_set *restrict errorfds, struct timeval *restrict timeout);
*/

int select(int nfds, Pointer readfds, Pointer writefds, Pointer errorfds, Pointer timeout);
int select(int nfds, Pointer readfds, Pointer writefds, Pointer errorfds, Timeval timeout);

/*
* int
Original file line number Diff line number Diff line change
@@ -12,6 +12,8 @@
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
import com.oracle.truffle.api.nodes.Node;
import com.oracle.truffle.api.object.DynamicObject;
import jnr.posix.DefaultNativeTimeval;
import jnr.posix.Timeval;
import org.jruby.RubyThread.Status;
import org.jruby.truffle.nodes.RubyGuards;
import org.jruby.truffle.nodes.core.FiberNodes;
@@ -52,13 +54,16 @@ public DynamicObject getRootThread() {
return rootThread;
}


public static interface BlockingAction<T> {
public static boolean SUCCESS = true;
public interface BlockingAction<T> {
boolean SUCCESS = true;

T block() throws InterruptedException;
}

public interface BlockingTimeoutAction<T> {
T block(Timeval timeoutToUse) throws InterruptedException;
}

/**
* Runs {@code action} until it returns a non-null value.
* The action might be {@link Thread#interrupted()}, for instance by
@@ -90,6 +95,57 @@ public <T> T runUntilResult(Node currentNode, BlockingAction<T> action) {
return result;
}

public <T> T runUntilTimeout(Node currentNode, int timeout, final BlockingTimeoutAction<T> action) {
final Timeval timeoutToUse = new DefaultNativeTimeval(jnr.ffi.Runtime.getSystemRuntime());

if (timeout == 0) {
timeoutToUse.setTime(new long[]{0, 0});

return runUntilResult(currentNode, new BlockingAction<T>() {

@Override
public T block() throws InterruptedException {
return action.block(timeoutToUse);
}

});
} else {
final int pollTime = 500_000_000;
final long requestedTimeoutAt = System.nanoTime() + timeout * 1_000;

return runUntilResult(currentNode, new BlockingAction<T>() {

@Override
public T block() throws InterruptedException {
final long timeUntilRequestedTimeout = requestedTimeoutAt - System.nanoTime();

final boolean timeoutForPoll;
final long effectiveTimeout;

if (timeUntilRequestedTimeout < pollTime) {
timeoutForPoll = false;
effectiveTimeout = timeUntilRequestedTimeout;
} else {
timeoutForPoll = true;
effectiveTimeout = pollTime;
}

final long effectiveTimeoutMicros = effectiveTimeout / 1_000;
timeoutToUse.setTime(new long[]{effectiveTimeoutMicros / 1_000_000, effectiveTimeoutMicros % 1_000_000});

final T result = action.block(timeoutToUse);

if (result == null && timeoutForPoll && requestedTimeoutAt - System.nanoTime() > 0) {
throw new InterruptedException();
}

return result;
}

});
}
}

public void initializeCurrentThread(DynamicObject thread) {
assert RubyGuards.isRubyThread(thread);
currentThread.set(thread);

4 comments on commit 14a7a31

@chrisseaton
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eregon please review

@chrisseaton
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should say this solution has some limitations. All threads which would normally be blocked on a select now wake up every 500ms (too frequently), and we've also increased the latency for safe points where threads are blocked on a select to 500s (not frequent enough).

Sorry, something went wrong.

@eregon
Copy link
Member

@eregon eregon commented on 14a7a31 Oct 15, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean 500 ms for the second duration, right?

This looks fine but fairly complex.
I think we have similar logic in Kernel#sleep. We should settle on some well known time unit, probably milliseconds or nanos. It seems IO#kernel takes seconds but this code:
final long requestedTimeoutAt = System.nanoTime() + timeout * 1_000;
then seems strange.
(Using monotonic time is good, I guess it was a currentTimeMillis() there before).

I'll give it a try to refactor later and maybe merge some logic with Kernel#sleep.

Sorry, something went wrong.

@chrisseaton
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah 500ms.

The time unit is microseconds, as that's what the kernel uses for timeouts.

Please sign in to comment.