Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: jruby/jruby
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: d4cbfd2d8cdb
Choose a base ref
...
head repository: jruby/jruby
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: c0591385a025
Choose a head ref
  • 3 commits
  • 6 files changed
  • 1 contributor

Commits on Jul 12, 2016

  1. Reopen stdio streams when run from Drip, to pick up its FIFOs.

    This fixes #2690, mostly.
    
    Previous situation
    ==================
    
    On startup, we assume stdio will be at the usual 0,1,2 values descriptors, and we use those directly to open native stdio streams for Ruby. This conflicts with Drip background processes, since after the "drip main" has run, it switches System.in/out/err to point at FIFO file streams that are used to proxy the client process. If we assume stdio is always at 0,1,2, we wire up the wrong descriptors.
    
    First fix
    =========
    
    My first fix was to just skip the native stream logic and use our tried-and-true unwrapping logic to take System.in/out/err and unwrap them down to their innermost File*Stream. When running under Drip, one of its Switchable streams will be in the way, and we will fall back on using wrapped-stream logic for stdio. That fixes this issue at least well enough for stdio to function.
    
    Unfortunately this fix has a problem: by not unwrapping all the way to the FileChannel, we lose some interactivity at a console. Granted, we already lose some interactivity due to Drip's FIFO being in the way, but on top of that we also have Java's stream buffering interfering with interactive use.
    
    Additional fix
    ==============
    
    I continued with the above fix and added smarts to further unwrap any Switchable streams. After much fussing around, I have the logic properly getting Drip's FIFO file descriptors for stdio, and fully unwrapping them.
    
    Unfortunately, by default when Drip boots JRuby, it attempts to use our org.jruby.main.DripMain to start up a background JRuby instance. This instance sees the stdio streams *before* they have been switched to the FIFOs, and so that prebooted instance has stdio pointing at the wrong descriptors.
    
    We have addressed this for other Drip laziness by adding Ruby.reinitialize, which re-inits anything we need to defer until the process "really" starts. For example, ARGV is re-defined to point at the actual arguments passed in by the Drip client.
    
    I also did this work so that the streams would be reinitialized, by replacing the existing descriptors in the stdio streams with the new ones from Drip. However, any code that grabs the Ruby stdio file descriptors before the "true" process start will continue to point at the old, incorrect descriptors.
    
    Possible solutions
    ==============
    
    * We could ignore the problem. Most of the time, people don't grab and hold onto the file descriptors for `$stdin` and friends...they just use top-level IO functions that write to those streams. But there's always troublemakers...
    * Perhaps we could persuade @ninjudd that the Drip daemon should actually dup the appropriate side of the FIFOs directly into stdio. This would mean every access from DripMain forward would see plain old stdio file descriptors without requiring any switching.
    
    A side effect from my changes here is that stdio.fileno will now always reflect the actual fileno. Previously I forced stdio streams to always report their fileno in the 0,1,2 group, which doesn't make sense if they're not actually using those fileno.
    headius committed Jul 12, 2016
    Copy the full SHA
    7e024a7 View commit details

Commits on Aug 11, 2016

  1. Copy the full SHA
    bf2b852 View commit details
  2. Merge pull request #4010 from headius/reopen_stdio_for_drip

    Reopen stdio streams when run from Drip, to pick up its FIFOs.
    headius authored Aug 11, 2016
    Copy the full SHA
    c059138 View commit details
1 change: 1 addition & 0 deletions core/src/main/java/org/jruby/Ruby.java
Original file line number Diff line number Diff line change
@@ -315,6 +315,7 @@ void reinitialize(boolean reinitCore) {

if (reinitCore) {
RubyGlobal.initARGV(this);
RubyGlobal.initSTDIO(this, globalVariables);
}
}

113 changes: 87 additions & 26 deletions core/src/main/java/org/jruby/RubyGlobal.java
Original file line number Diff line number Diff line change
@@ -53,19 +53,24 @@
import org.jruby.runtime.ReadonlyGlobalVariable;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.ByteList;
import org.jruby.util.KCode;
import org.jruby.util.OSEnvironment;
import org.jruby.util.RegexpOptions;
import org.jruby.util.cli.OutputStrings;
import org.jruby.util.io.ChannelHelper;
import org.jruby.util.io.EncodingUtils;
import org.jruby.util.io.FilenoUtil;
import org.jruby.util.io.OpenFile;
import org.jruby.util.io.STDIO;

import static org.jruby.internal.runtime.GlobalVariable.Scope.*;

import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.channels.Channel;
import java.nio.channels.Channels;
import java.util.Map;
@@ -191,21 +196,7 @@ public static void createGlobals(ThreadContext context, Ruby runtime) {

runtime.defineVariable(new BacktraceGlobalVariable(runtime, "$@"), THREAD);

IRubyObject stdin = RubyIO.prepStdio(
runtime, runtime.getIn(), prepareStdioChannel(runtime, STDIO.IN, runtime.getIn()), OpenFile.READABLE, runtime.getIO(), "<STDIN>");
IRubyObject stdout = RubyIO.prepStdio(
runtime, runtime.getOut(), prepareStdioChannel(runtime, STDIO.OUT, runtime.getOut()), OpenFile.WRITABLE, runtime.getIO(), "<STDOUT>");
IRubyObject stderr = RubyIO.prepStdio(
runtime, runtime.getErr(), prepareStdioChannel(runtime, STDIO.ERR, runtime.getErr()), OpenFile.WRITABLE | OpenFile.SYNC, runtime.getIO(), "<STDERR>");

runtime.defineVariable(new InputGlobalVariable(runtime, "$stdin", stdin), GLOBAL);
runtime.defineVariable(new OutputGlobalVariable(runtime, "$stdout", stdout), GLOBAL);
globals.alias("$>", "$stdout");
runtime.defineVariable(new OutputGlobalVariable(runtime, "$stderr", stderr), GLOBAL);

runtime.defineGlobalConstant("STDIN", stdin);
runtime.defineGlobalConstant("STDOUT", stdout);
runtime.defineGlobalConstant("STDERR", stderr);
initSTDIO(runtime, globals);

runtime.defineVariable(new LoadedFeatures(runtime, "$\""), GLOBAL);
runtime.defineVariable(new LoadedFeatures(runtime, "$LOADED_FEATURES"), GLOBAL);
@@ -276,20 +267,90 @@ public static void createGlobals(ThreadContext context, Ruby runtime) {
globals.alias("$LAST_PAREN_MATCH", "$+");
}

public static void initSTDIO(Ruby runtime, GlobalVariables globals) {
RubyIO stdin = RubyIO.prepStdio(
runtime, runtime.getIn(), prepareStdioChannel(runtime, STDIO.IN, runtime.getIn()), OpenFile.READABLE, runtime.getIO(), "<STDIN>");
RubyIO stdout = RubyIO.prepStdio(
runtime, runtime.getOut(), prepareStdioChannel(runtime, STDIO.OUT, runtime.getOut()), OpenFile.WRITABLE, runtime.getIO(), "<STDOUT>");
RubyIO stderr = RubyIO.prepStdio(
runtime, runtime.getErr(), prepareStdioChannel(runtime, STDIO.ERR, runtime.getErr()), OpenFile.WRITABLE | OpenFile.SYNC, runtime.getIO(), "<STDERR>");

if (runtime.getObject().getConstantFromNoConstMissing("STDIN") == null) {
runtime.defineVariable(new InputGlobalVariable(runtime, "$stdin", stdin), GLOBAL);
runtime.defineVariable(new OutputGlobalVariable(runtime, "$stdout", stdout), GLOBAL);
globals.alias("$>", "$stdout");
runtime.defineVariable(new OutputGlobalVariable(runtime, "$stderr", stderr), GLOBAL);

runtime.defineGlobalConstant("STDIN", stdin);
runtime.defineGlobalConstant("STDOUT", stdout);
runtime.defineGlobalConstant("STDERR", stderr);
} else {
((RubyIO) runtime.getObject().getConstant("STDIN")).getOpenFile().setFD(stdin.getOpenFile().fd());
((RubyIO) runtime.getObject().getConstant("STDOUT")).getOpenFile().setFD(stdout.getOpenFile().fd());
((RubyIO) runtime.getObject().getConstant("STDERR")).getOpenFile().setFD(stderr.getOpenFile().fd());
}
}

private static Channel prepareStdioChannel(Ruby runtime, STDIO stdio, Object stream) {
if (runtime.getPosix().isNative() && stdio.isJVMDefault(stream) && !Platform.IS_WINDOWS) {
// use real native channel for stdio
return new NativeDeviceChannel(stdio.fileno());
} else {
switch (stdio) {
case IN:
return Channels.newChannel((InputStream)stream);
case OUT:
case ERR:
return Channels.newChannel((OutputStream)stream);
default: throw new RuntimeException("invalid stdio: " + stdio);
// use real native fileno for stdio, if possible

// try typical stdio stream and channel types
int fileno = -1;
Channel channel = null;

if (stream instanceof Channel) {
channel = (Channel) stream;
fileno = FilenoUtil.filenoFrom(channel);
} else if (stream instanceof InputStream) {
InputStream unwrappedStream = ChannelHelper.unwrapFilterInputStream((InputStream) stream);
if (unwrappedStream instanceof FileInputStream) {
fileno = FilenoUtil.filenoFrom(((FileInputStream) unwrappedStream).getChannel());
}
} else if (stream instanceof OutputStream) {
OutputStream unwrappedStream = ChannelHelper.unwrapFilterOutputStream((OutputStream) stream);
if (unwrappedStream instanceof FileOutputStream) {
fileno = FilenoUtil.filenoFrom(((FileOutputStream) unwrappedStream).getChannel());
}
}

if (fileno >= 0) {
// We got a real fileno, use it
return new NativeDeviceChannel(fileno);
}
}

// fall back on non-direct stdio
// NOTE (CON): This affects interactivity in any case where we cannot determine the real fileno and use native.
// We do force flushing of stdout and stdout, but we can't provide all the interactive niceities
// without a proper native channel. See https://github.com/jruby/jruby/issues/2690
switch (stdio) {
case IN:
return Channels.newChannel((InputStream)stream);
case OUT:
case ERR:
return Channels.newChannel((OutputStream)stream);
default: throw new RuntimeException("invalid stdio: " + stdio);
}
}

// TODO: gross...see https://github.com/ninjudd/drip/issues/96
private static int unwrapDripStream(Object stream) {
if (stream.getClass().getName().startsWith("org.flatland.drip.Switchable")) {

try {
FileDescriptor fd = (FileDescriptor) stream.getClass().getMethod("getFD").invoke(stream);
return FilenoUtil.filenoFrom(fd);
} catch (NoSuchMethodException nsme) {
nsme.printStackTrace(System.err);
} catch (IllegalAccessException iae) {
iae.printStackTrace(System.err);
} catch (InvocationTargetException ite) {
ite.printStackTrace(System.err);
}
}

return -1;
}


12 changes: 0 additions & 12 deletions core/src/main/java/org/jruby/RubyIO.java
Original file line number Diff line number Diff line change
@@ -187,11 +187,6 @@ public static RubyIO prepStdio(Ruby runtime, InputStream f, Channel c, int fmode

fptr = io.getOpenFileChecked();

// Use standard stdio filenos if we're using System.in et al.
if (f == System.in) {
fptr.fd().realFileno = 0;
}

prepStdioEcflags(fptr, fmode);
fptr.stdio_file = f;

@@ -206,13 +201,6 @@ public static RubyIO prepStdio(Ruby runtime, OutputStream f, Channel c, int fmod

fptr = io.getOpenFileChecked();

// Use standard stdio filenos if we're using System.in et al.
if (f == System.out) {
fptr.fd().realFileno = 1;
} else if (f == System.err) {
fptr.fd().realFileno = 2;
}

prepStdioEcflags(fptr, fmode);
fptr.stdio_file = f;

128 changes: 28 additions & 100 deletions core/src/main/java/org/jruby/util/ShellLauncher.java
Original file line number Diff line number Diff line change
@@ -33,8 +33,6 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -63,13 +61,12 @@
import org.jruby.RubyInstanceConfig;
import org.jruby.RubyModule;
import org.jruby.RubyString;
import jnr.posix.util.FieldAccess;
import jnr.posix.util.Platform;
import org.jruby.runtime.Helpers;
import org.jruby.ext.rbconfig.RbConfigLibrary;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.cli.Options;
import org.jruby.util.io.ChannelHelper;
import org.jruby.util.io.IOOptions;
import org.jruby.util.io.ModeFlags;

@@ -840,95 +837,6 @@ private static Process popenShared(Ruby runtime, IRubyObject[] strings, Map env,
return childProcess;
}

/**
* Unwrap all filtering streams between the given stream and its actual
* unfiltered stream. This is primarily to unwrap streams that have
* buffers that would interfere with interactivity.
*
* @param filteredStream The stream to unwrap
* @return An unwrapped stream, presumably unbuffered
*/
public static OutputStream unwrapBufferedStream(OutputStream filteredStream) {
if (RubyInstanceConfig.NO_UNWRAP_PROCESS_STREAMS) return filteredStream;

return unwrapFilterOutputStream(filteredStream);
}

/**
* Unwrap all filtering streams between the given stream and its actual
* unfiltered stream. This is primarily to unwrap streams that have
* buffers that would interfere with interactivity.
*
* @param filteredStream The stream to unwrap
* @return An unwrapped stream, presumably unbuffered
*/
public static InputStream unwrapBufferedStream(InputStream filteredStream) {
if (RubyInstanceConfig.NO_UNWRAP_PROCESS_STREAMS) return filteredStream;

// Java 7+ uses a stream that drains the child on exit, which when
// unwrapped breaks because the channel gets drained prematurely.
// System.out.println("class is :" + filteredStream.getClass().getName());
if (filteredStream.getClass().getName().indexOf("ProcessPipeInputStream") != 1) {
return filteredStream;
}

return unwrapFilterInputStream((FilterInputStream)filteredStream);
}

/**
* Unwrap the given stream to its first non-FilterOutputStream. If the stream is not
* a FilterOutputStream it is returned immediately.
*
* Note that this version is used when you are absolutely sure you want to unwrap;
* the unwrapBufferedStream version will perform checks for certain types of
* process-related streams that should not be unwrapped (Java 7+ Process, e.g.).
*
* @param filteredStream a stream to be unwrapped, if it is a FilterOutputStream
* @return the deeped non-FilterOutputStream stream, or filterOutputStream if it is
* not a FilterOutputStream to begin with.
*/
public static OutputStream unwrapFilterOutputStream(OutputStream filteredStream) {
while (filteredStream instanceof FilterOutputStream) {
try {
OutputStream tmpStream = (OutputStream)
FieldAccess.getProtectedFieldValue(FilterOutputStream.class,
"out", filteredStream);
if (tmpStream == null) break;
filteredStream = tmpStream;
} catch (Exception e) {
break; // break out if we've dug as deep as we can
}
}
return filteredStream;
}

/**
* Unwrap the given stream to its first non-FilterInputStream. If the stream is not
* a FilterInputStream it is returned immediately.
*
* Note that this version is used when you are absolutely sure you want to unwrap;
* the unwrapBufferedStream version will perform checks for certain types of
* process-related streams that should not be unwrapped (Java 7+ Process, e.g.).
*
* @param filteredStream a stream to be unwrapped, if it is a FilterInputStream
* @return the deeped non-FilterInputStream stream, or filterInputStream if it is
* not a FilterInputStream to begin with.
*/
public static InputStream unwrapFilterInputStream(InputStream filteredStream) {
while (filteredStream instanceof FilterInputStream) {
try {
InputStream tmpStream = (InputStream)
FieldAccess.getProtectedFieldValue(FilterInputStream.class,
"in", filteredStream);
if (tmpStream == null) break;
filteredStream = tmpStream;
} catch (Exception e) {
break; // break out if we've dug as deep as we can
}
}
return filteredStream;
}

public static class POpenProcess extends Process {
private final Process child;
private final boolean waitForChild;
@@ -1080,7 +988,7 @@ private void prepareInerr(Process child) {
private void prepareOutput(Process child) {
// popen caller wants to be able to write, provide subprocess out directly
realOutput = child.getOutputStream();
output = unwrapBufferedStream(realOutput);
output = ChannelHelper.unwrapBufferedStream(realOutput);
if (output instanceof FileOutputStream) {
outputChannel = ((FileOutputStream) output).getChannel();
} else {
@@ -1090,12 +998,12 @@ private void prepareOutput(Process child) {

private void pumpInput(Process child, Ruby runtime) {
// no read requested, hook up read to parents output
InputStream childIn = unwrapBufferedStream(child.getInputStream());
InputStream childIn = ChannelHelper.unwrapBufferedStream(child.getInputStream());
FileChannel childInChannel = null;
if (childIn instanceof FileInputStream) {
childInChannel = ((FileInputStream) childIn).getChannel();
}
OutputStream parentOut = unwrapBufferedStream(runtime.getOut());
OutputStream parentOut = ChannelHelper.unwrapBufferedStream(runtime.getOut());
FileChannel parentOutChannel = null;
if (parentOut instanceof FileOutputStream) {
parentOutChannel = ((FileOutputStream) parentOut).getChannel();
@@ -1112,12 +1020,12 @@ private void pumpInput(Process child, Ruby runtime) {

private void pumpInerr(Process child, Ruby runtime) {
// no read requested, hook up read to parents output
InputStream childIn = unwrapBufferedStream(child.getErrorStream());
InputStream childIn = ChannelHelper.unwrapBufferedStream(child.getErrorStream());
FileChannel childInChannel = null;
if (childIn instanceof FileInputStream) {
childInChannel = ((FileInputStream) childIn).getChannel();
}
OutputStream parentOut = unwrapBufferedStream(runtime.getOut());
OutputStream parentOut = ChannelHelper.unwrapBufferedStream(runtime.getOut());
FileChannel parentOutChannel = null;
if (parentOut instanceof FileOutputStream) {
parentOutChannel = ((FileOutputStream) parentOut).getChannel();
@@ -1487,8 +1395,8 @@ private static class StreamPumper extends Thread implements Pumper {
private final Ruby runtime;

StreamPumper(Ruby runtime, InputStream in, OutputStream out, boolean avail, Slave slave, Object sync) {
this.in = unwrapBufferedStream(in);
this.out = unwrapBufferedStream(out);
this.in = ChannelHelper.unwrapBufferedStream(in);
this.out = ChannelHelper.unwrapBufferedStream(out);
this.onlyIfAvailable = avail;
this.slave = slave;
this.sync = sync;
@@ -1733,4 +1641,24 @@ static void log(Ruby runtime, String msg) {
runtime.getErr().println("ShellLauncher: " + msg);
}
}

@Deprecated
public static OutputStream unwrapBufferedStream(OutputStream filteredStream) {
return ChannelHelper.unwrapBufferedStream(filteredStream);
}

@Deprecated
public static InputStream unwrapBufferedStream(InputStream filteredStream) {
return ChannelHelper.unwrapBufferedStream(filteredStream);
}

@Deprecated
public static OutputStream unwrapFilterOutputStream(OutputStream filteredStream) {
return ChannelHelper.unwrapFilterOutputStream(filteredStream);
}

@Deprecated
public static InputStream unwrapFilterInputStream(InputStream filteredStream) {
return ChannelHelper.unwrapFilterInputStream(filteredStream);
}
}
Loading