Skip to content

Commit

Permalink
Performance improvements for IO.select.
Browse files Browse the repository at this point in the history
* Avoid HashMap for small sets of items.
* Avoid HashSet for small sets of items.

HashMap and HashSet are very memory-inefficient for small numbers
of items. HashMap::Node was highest in early alloc profile.

* Avoid Java 5 iteration when numeric iteration will do.

Java 5 iteration creates at least an Iterator and often other
support structures to support it.

* Use builtin channels when possible (pipe fixed in this commit).

ENXIO selectors cannot be combined with builtin selectors,
requiring us to select them in a separate thread. We're using our
executor, but there's other overhead for native select like an
additional pipe per selector, native memory for signaling the pipe
on the other end, etc. The threading may be the biggest hit, since
it requires more communication.

* Eliminate more unnecessary collections from MRI logic.

We do not need to turn our lists of keys into primitive arrays.
  • Loading branch information
headius committed Nov 6, 2014
1 parent 8dd4662 commit abf9b02
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 96 deletions.
25 changes: 11 additions & 14 deletions core/src/main/java/org/jruby/util/io/PosixShim.java
Expand Up @@ -246,22 +246,19 @@ public int close(Closeable closeable) {

public Channel[] pipe() {
clear();
if (posix.isNative()) {
int[] fds = new int[2];
int ret = posix.pipe(fds);
if (ret == -1) {
errno = Errno.valueOf(posix.errno());
return null;
}
setCloexec(fds[0], true);
setCloexec(fds[1], true);
return new Channel[]{new NativeDeviceChannel(fds[0]), new NativeDeviceChannel(fds[1])};
}

// otherwise, Java pipe. Note Java pipe is not FD_CLOEXEC, but we can't use posix_spawn anyway
try {
Pipe pipe = Pipe.open();
return new Channel[]{pipe.source(), pipe.sink()};
Channel source = pipe.source(), sink = pipe.sink();

if (posix.isNative()) {
// set cloexec if possible
int read = FilenoUtil.filenoFrom(source);
int write = FilenoUtil.filenoFrom(sink);
setCloexec(read, true);
setCloexec(write, true);
}

return new Channel[]{source, sink};
} catch (IOException ioe) {
errno = Helpers.errnoFromException(ioe);
return null;
Expand Down
180 changes: 98 additions & 82 deletions core/src/main/java/org/jruby/util/io/SelectExecutor.java
Expand Up @@ -16,13 +16,8 @@
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -58,20 +53,24 @@ IRubyObject selectEnd(ThreadContext context) throws IOException {
fdTerm(writeKeyList);
fdTerm(errorKeyList);

for (Selector selector : selectors.values()) {
// if it is a JDK selector, cache it
if (selector.provider() == SelectorProvider.provider()) {
// clear cancelled keys (with selectNow) and return to pool
selector.selectNow();
context.runtime.getSelectorPool().put(selector);
} else {
selector.close();
if (selectors != null) {
for (int i = 0; i < selectors.size(); i++) {
Selector selector = selectors.get(i);
// if it is a JDK selector, cache it
if (selector.provider() == SelectorProvider.provider()) {
// clear cancelled keys (with selectNow) and return to pool
selector.selectNow();
context.runtime.getSelectorPool().put(selector);
} else {
selector.close();
}
}
}

for (ENXIOSelector enxioSelector : enxioSelectors) {
enxioSelector.pipe.sink().close();
enxioSelector.pipe.source().close();
// TODO: pool ENXIOSelector impls
for (ENXIOSelector enxioSelector : enxioSelectors) {
enxioSelector.pipe.sink().close();
enxioSelector.pipe.source().close();
}
}

// TODO: reset blocking status
Expand Down Expand Up @@ -112,7 +111,7 @@ IRubyObject selectInternal(ThreadContext context) throws IOException {
readAry = read.convertToArray();
for (i = 0; i < readAry.size(); i++) {
fptr = TypeConverter.ioGetIO(runtime, readAry.eltOk(i)).getOpenFileChecked();
fdSetRead(context, fptr.fd());
fdSetRead(context, fptr.fd(), readAry.size());
if (fptr.READ_DATA_PENDING() || fptr.READ_CHAR_PENDING()) { /* check for buffered data */
if (pendingReadFDs == null) pendingReadFDs = new ArrayList(1);
pendingReadFDs.add(fptr.fd());
Expand All @@ -121,9 +120,6 @@ IRubyObject selectInternal(ThreadContext context) throws IOException {
if (pendingReadFDs != null) { /* no blocking if there's buffered data */
timeout = (long) 0;
}
readKeys = keyListToArray(readKeyList);
} else {
readKeys = null;
}

RubyArray writeAry = null;
Expand All @@ -132,16 +128,14 @@ IRubyObject selectInternal(ThreadContext context) throws IOException {
for (i = 0; i < writeAry.size(); i++) {
RubyIO write_io = TypeConverter.ioGetIO(runtime, writeAry.eltOk(i)).GetWriteIO();
fptr = write_io.getOpenFileChecked();
fdSetWrite(context, fptr.fd());
fdSetWrite(context, fptr.fd(), writeAry.size());
}
writeKeys = keyListToArray(writeKeyList);
} else {
writeKeys = null;
}

RubyArray exceptAry = null;
if (!except.isNil()) {
// most of this is commented out since we can't select for error with JDK
// This does not actually register anything because we do not have a way to select for error on JDK.
// We make the calls for their side effects.
exceptAry = except.convertToArray();
for (i = 0; i < exceptAry.size(); i++) {
RubyIO io = TypeConverter.ioGetIO(runtime, exceptAry.eltOk(i));
Expand All @@ -151,21 +145,19 @@ IRubyObject selectInternal(ThreadContext context) throws IOException {
fptr = write_io.getOpenFileChecked();
}
}
errorKeys = keyListToArray(errorKeyList);
} else {
errorKeys = null;
}

int n = threadFdSelect(context);

if (pendingReadFDs == null && n == 0 && unselectableReadFDs == null && unselectableWriteFDs == null) return context.nil; /* returns nil on timeout */
if (n == 0 && pendingReadFDs == null && n == 0 && unselectableReadFDs == null && unselectableWriteFDs == null) return context.nil; /* returns nil on timeout */

res = RubyArray.newArray(runtime, 3);
res.push(runtime.newArray());
res.push(runtime.newArray());
res.push(runtime.newArray());
res.push(runtime.newArray(Math.min(n, maxReadReadySize())));
res.push(runtime.newArray(Math.min(n, maxWriteReadySize())));
// we never add anything for error since JDK does not provide a way to select for error
res.push(runtime.newArray(0));

if (readKeys != null) {
if (readKeyList != null) {
list = (RubyArray) res.eltOk(0);
for (i = 0; i < readAry.size(); i++) {
IRubyObject obj = readAry.eltOk(i);
Expand All @@ -188,7 +180,7 @@ IRubyObject selectInternal(ThreadContext context) throws IOException {
}
}

if (writeKeys != null) {
if (writeKeyList != null) {
list = (RubyArray) res.eltOk(1);
for (i = 0; i < writeAry.size(); i++) {
IRubyObject obj = writeAry.eltOk(i);
Expand All @@ -212,7 +204,7 @@ IRubyObject selectInternal(ThreadContext context) throws IOException {
}
}

if (errorKeys != null) {
if (errorKeyList != null) {
list = (RubyArray) res.eltOk(2);
for (i = 0; i < exceptAry.size(); i++) {
IRubyObject obj = exceptAry.eltOk(i);
Expand All @@ -233,7 +225,21 @@ IRubyObject selectInternal(ThreadContext context) throws IOException {
return res; /* returns an empty array on interrupt */
}

private void fdSetRead(ThreadContext context, ChannelFD fd) throws IOException {
private int maxReadReadySize() {
int size = 0;
if (readKeyList != null) size += readKeyList.size();
if (unselectableReadFDs != null) size += unselectableReadFDs.size();
return size;
}

private int maxWriteReadySize() {
int size = 0;
if (writeKeyList != null) size += readKeyList.size();
if (unselectableWriteFDs != null) size += unselectableWriteFDs.size();
return size;
}

private void fdSetRead(ThreadContext context, ChannelFD fd, int maxSize) throws IOException {
if (fd.chFile != null || fd.isNativeFile) {
// files are not selectable, so we treat them as ready
if (unselectableReadFDs == null) unselectableReadFDs = new ArrayList(1);
Expand All @@ -246,7 +252,7 @@ private void fdSetRead(ThreadContext context, ChannelFD fd) throws IOException {
readKeyList.add(key);
}

private void fdSetWrite(ThreadContext context, ChannelFD fd) throws IOException {
private void fdSetWrite(ThreadContext context, ChannelFD fd, int maxSize) throws IOException {
if (fd.chFile != null || fd.isNativeFile) {
// files are not selectable, so we treat them as ready
if (unselectableWriteFDs == null) unselectableWriteFDs = new ArrayList(1);
Expand All @@ -263,7 +269,7 @@ private boolean fdIsSet(List<SelectionKey> fds, ChannelFD fd, int operations) {
if (fds == null) return false;

for (SelectionKey key : fds) {
if (key.isValid() && (key.readyOps() & operations) != 0 && ((Set<ChannelFD>)key.attachment()).contains(fd)) return true;
if (key.isValid() && (key.readyOps() & operations) != 0 && ((List<ChannelFD>)key.attachment()).contains(fd)) return true;
}
return false;
}
Expand Down Expand Up @@ -297,13 +303,23 @@ private SelectionKey trySelectWrite(ThreadContext context, ChannelFD fd) throws
}

private Selector getSelector(ThreadContext context, SelectableChannel channel) throws IOException {
Selector selector = selectors.get(channel.provider());
Selector selector = null;
// using a linear search because there should never be more than a couple selector providers in flight
if (selectors == null) {
selectors = new ArrayList<>(1);
} else {
for (int i = 0; i < selectors.size(); i++) {
Selector sel = selectors.get(i);
if (sel.provider() == channel.provider()) {
selector = sel;
break;
}
}
}

if (selector == null) {
selector = context.runtime.getSelectorPool().get(channel.provider());
if (selectors.isEmpty()) {
selectors = new HashMap<SelectorProvider, Selector>();
}
selectors.put(channel.provider(), selector);
selectors.add(selector);

if (!selector.provider().equals(SelectorProvider.provider())) {
// need to create pipe between alt impl selector and native NIO selector
Expand All @@ -327,34 +343,22 @@ private static SelectionKey registerSelect(Selector selector, ChannelFD attachme
int real_ops = channel.validOps() & ops;

SelectionKey key = channel.keyFor(selector);
List<ChannelFD> attachmentSet;
if (key != null) {
key.interestOps(key.interestOps() | real_ops);
((Set<ChannelFD>)key.attachment()).add(attachment);
attachmentSet = (List<ChannelFD>)key.attachment();
if (!attachmentSet.contains(attachment)) attachmentSet.add(attachment);
return key;
} else {
Set<ChannelFD> attachmentSet = new HashSet<>();
attachmentSet = new ArrayList(1);
attachmentSet.add(attachment);
return channel.register(selector, real_ops, attachmentSet);
}

// } else {
// key.interestOps(key.interestOps() | real_ops);
// Map<Character,Integer> att = (Map<Character,Integer>)key.attachment();
// att.putAll(attachment);
// key.attach(att);
// }
//
// return true;
}

private static SelectionKey[] keyListToArray(List<SelectionKey> fds) {
if (fds == null) return null;
return fds.toArray(new SelectionKey[fds.size()]);
}

// MRI: rb_thread_fd_select
private int threadFdSelect(ThreadContext context) throws IOException {
if (readKeys == null && writeKeys == null && errorKeys == null) {
if (readKeyList == null && writeKeyList == null && errorKeyList == null) {
if (timeout == null) { // sleep forever
try {
context.getThread().sleep(0);
Expand All @@ -371,13 +375,13 @@ private int threadFdSelect(ThreadContext context) throws IOException {
return 0;
}

if (readKeys != null) {
if (readKeyList != null) {
// rb_fd_resize(max - 1, read);
}
if (writeKeys != null) {
if (writeKeyList != null) {
// rb_fd_resize(max - 1, write);
}
if (errorKeys != null) {
if (errorKeyList != null) {
// rb_fd_resize(max - 1, except);
}
return doSelect(context);
Expand Down Expand Up @@ -440,12 +444,12 @@ private int doSelect(ThreadContext context) {
// }

// we don't kill the keys here because that breaks them
// if (readKeys != null)
// fdTerm(readKeys);
// if (writeKeys != null)
// fdTerm(writeKeys);
// if (errorKeys != null);
// fdTerm(errorKeys);
// if (readKeyList != null)
// fdTerm(readKeyList);
// if (writeKeyList != null)
// fdTerm(writeKeyList);
// if (errorKeyList != null);
// fdTerm(errorKeyList);

return result;
}
Expand All @@ -458,21 +462,29 @@ public Integer run(ThreadContext context, SelectExecutor s) throws InterruptedEx
if (s.mainSelector != null) {
if (s.pendingReadFDs == null) {
if (s.timeout != null && s.timeout == 0) {
for (Selector selector : s.selectors.values()) ready += selector.selectNow();
for (int i = 0; i < s.selectors.size(); i++) {
Selector selector = s.selectors.get(i);
ready += selector.selectNow();
}
} else {
List<Future> futures = new ArrayList<Future>(s.enxioSelectors.size());
for (ENXIOSelector enxioSelector : s.enxioSelectors) {
for (int i = 0; i < s.enxioSelectors.size(); i++) {
ENXIOSelector enxioSelector = s.enxioSelectors.get(i);
futures.add(context.runtime.getExecutor().submit(enxioSelector));
}

ready += s.mainSelector.select(s.timeout == null ? 0 : s.timeout);

// enxio selectors use a thread pool, since we can't select on multiple types
// of selector at once.
for (ENXIOSelector enxioSelector : s.enxioSelectors) enxioSelector.selector.wakeup();
for (int i = 0; i < s.enxioSelectors.size(); i++) {
ENXIOSelector enxioSelector = s.enxioSelectors.get(i);
enxioSelector.selector.wakeup();
}

// ensure all the enxio threads have finished
for (Future f : futures) try {
for (int i = 0; i < futures.size(); i++) try {
Future f = futures.get(i);
f.get();
} catch (InterruptedException iex) {
} catch (ExecutionException eex) {
Expand All @@ -482,12 +494,16 @@ public Integer run(ThreadContext context, SelectExecutor s) throws InterruptedEx
}
}
} else {
for (Selector selector : s.selectors.values()) ready += selector.selectNow();
for (int i = 0; i < s.selectors.size(); i++) {
Selector selector = s.selectors.get(i);
ready += selector.selectNow();
}
}
}

// If any enxio selectors woke up, remove them from the selected key set of the main selector
for (ENXIOSelector enxioSelector : s.enxioSelectors) {
for (int i = 0; i < s.enxioSelectors.size(); i++) {
ENXIOSelector enxioSelector = s.enxioSelectors.get(i);
Pipe.SourceChannel source = enxioSelector.pipe.source();
SelectionKey key = source.keyFor(s.mainSelector);
if (key != null && s.mainSelector.selectedKeys().contains(key)) {
Expand All @@ -509,7 +525,10 @@ public void wakeup(RubyThread thread, SelectExecutor selectExecutor) {
thread.getNativeThread().interrupt();

// wake up all selectors too
for (Selector selector : selectExecutor.selectors.values()) selector.wakeup();
for (int i = 0; i < selectExecutor.selectors.size(); i++) {
Selector selector = selectExecutor.selectors.get(i);
selector.wakeup();
}
}
};

Expand Down Expand Up @@ -544,12 +563,9 @@ public Object call() throws Exception {
List<ChannelFD> unselectableWriteFDs;
List<ChannelFD> pendingReadFDs;
Selector mainSelector = null;
Map<SelectorProvider, Selector> selectors = Collections.emptyMap();
Collection<ENXIOSelector> enxioSelectors = Collections.emptyList();
List<Selector> selectors = null;
List<ENXIOSelector> enxioSelectors = Collections.emptyList();

SelectionKey[] readKeys;
SelectionKey[] writeKeys;
SelectionKey[] errorKeys;
Long timeout;
final Ruby runtime;

Expand Down

0 comments on commit abf9b02

Please sign in to comment.