Skip to content

Commit

Permalink
Restructure Windows monkey patch for Open3. #4943
Browse files Browse the repository at this point in the history
headius committed Jan 11, 2018
1 parent e55ba79 commit f58ddad
Showing 4 changed files with 240 additions and 901 deletions.
181 changes: 11 additions & 170 deletions core/src/main/java/org/jruby/RubyIO.java
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@
import jnr.posix.POSIX;
import org.jcodings.transcode.EConvFlags;
import org.jruby.api.API;
import org.jruby.ext.jruby.JRubyWindowsLibrary;
import org.jruby.runtime.Helpers;
import org.jruby.runtime.JavaSites.IOSites;
import org.jruby.runtime.callsite.RespondToCallSite;
@@ -4737,6 +4738,14 @@ private static int rb_io_fptr_finalize(Ruby runtime, OpenFile fptr) {

private static final byte[] NEWLINE_BYTES = { (byte) '\n' };

public boolean isPopenSpecial() {
return popenSpecial;
}

public void setPopenSpecial(boolean popenSpecial) {
this.popenSpecial = popenSpecial;
}

private static IOSites sites(ThreadContext context) {
return context.sites.IO;
}
@@ -4835,180 +4844,12 @@ public static IRubyObject writeStatic(ThreadContext context, IRubyObject recv, I
@Deprecated
@JRubyMethod(name = "popen3", rest = true, meta = true)
public static IRubyObject popen3(ThreadContext context, IRubyObject recv, IRubyObject[] args, Block block) {
final Ruby runtime = context.runtime;

// TODO: handle opts
if (args.length > 0 && args[args.length - 1] instanceof RubyHash) {
args = Arrays.copyOf(args, args.length - 1);
}

final POpenTuple tuple = popenSpecial(context, args);
final long pid = ShellLauncher.getPidFromProcess(tuple.process);

// array trick to be able to reference enclosing RubyThread
final RubyThread[] waitThread = new RubyThread[1];
waitThread[0] = new RubyThread(
runtime,
(RubyClass) runtime.getClassFromPath("Process::WaitThread"),
new ThreadedRunnable() {

volatile Thread javaThread;

@Override
public Thread getJavaThread() {
return javaThread;
}

@Override
public void run() {
javaThread = Thread.currentThread();
RubyThread rubyThread;
// spin a bit until this happens; should almost never spin
while ((rubyThread = waitThread[0]) == null) {
Thread.yield();
}

runtime.getThreadService().registerNewThread(rubyThread);

rubyThread.op_aset(runtime.newSymbol("pid"), runtime.newFixnum(pid));

try {
int exitValue = tuple.process.waitFor();

// RubyStatus uses real native status now, so we unshift Java's shifted exit status
RubyProcess.RubyStatus status = RubyProcess.RubyStatus.newProcessStatus(
runtime,
exitValue << 8,
pid);

rubyThread.cleanTerminate(status);
} catch (Throwable t) {
rubyThread.exceptionRaised(t);
} finally {
rubyThread.dispose();
}
}

});

RubyArray yieldArgs = RubyArray.newArrayLight(runtime,
tuple.output,
tuple.input,
tuple.error,
waitThread[0]);

if (block.isGiven()) {
try {
return block.yield(context, yieldArgs);
} finally {
cleanupPOpen(tuple);

IRubyObject status = waitThread[0].join(IRubyObject.NULL_ARRAY);
context.setLastExitStatus(status);
}
}

return yieldArgs;
return JRubyWindowsLibrary.popen3(context, recv, args, block);
}

@Deprecated
public static IRubyObject popen4(ThreadContext context, IRubyObject recv, IRubyObject[] args, Block block) {
Ruby runtime = context.runtime;

try {
POpenTuple tuple = popenSpecial(context, args);

RubyArray yieldArgs = RubyArray.newArrayLight(runtime,
runtime.newFixnum(ShellLauncher.getPidFromProcess(tuple.process)),
tuple.output,
tuple.input,
tuple.error);

if (block.isGiven()) {
try {
return block.yield(context, yieldArgs);
} finally {
cleanupPOpen(tuple);
// RubyStatus uses real native status now, so we unshift Java's shifted exit status
context.setLastExitStatus(RubyProcess.RubyStatus.newProcessStatus(runtime, tuple.process.waitFor() << 8, ShellLauncher.getPidFromProcess(tuple.process)));
}
}
return yieldArgs;
} catch (InterruptedException e) {
throw runtime.newThreadError("unexpected interrupt");
}
}

@Deprecated
private static void cleanupPOpen(POpenTuple tuple) {
if (tuple.input.openFile.isOpen()) {
try {
tuple.input.close();
} catch (RaiseException re) {}
}
if (tuple.output.openFile.isOpen()) {
try {
tuple.output.close();
} catch (RaiseException re) {}
}
if (tuple.error.openFile.isOpen()) {
try {
tuple.error.close();
} catch (RaiseException re) {}
}
}

@Deprecated
private static class POpenTuple {
public POpenTuple(RubyIO i, RubyIO o, RubyIO e, Process p) {
input = i; output = o; error = e; process = p;
}
public final RubyIO input;
public final RubyIO output;
public final RubyIO error;
public final Process process;
}

@Deprecated
public static POpenTuple popenSpecial(ThreadContext context, IRubyObject[] args) {
Ruby runtime = context.runtime;

try {
ShellLauncher.POpenProcess process = ShellLauncher.popen3(runtime, args, false);
RubyIO input = process.getInput() != null ?
new RubyIO(runtime, process.getInput()) :
new RubyIO(runtime, process.getInputStream());
RubyIO output = process.getOutput() != null ?
new RubyIO(runtime, process.getOutput()) :
new RubyIO(runtime, process.getOutputStream());
RubyIO error = process.getError() != null ?
new RubyIO(runtime, process.getError()) :
new RubyIO(runtime, process.getErrorStream());

// ensure the OpenFile knows it's a process; see OpenFile#finalize
input.getOpenFile().setProcess(process);
output.getOpenFile().setProcess(process);
error.getOpenFile().setProcess(process);

// set all streams as popenSpecial streams, so we don't shut down process prematurely
input.popenSpecial = true;
output.popenSpecial = true;
error.popenSpecial = true;

// process streams are not seekable
// input.getOpenFile().getMainStreamSafe().getDescriptor().
// setCanBeSeekable(false);
// output.getOpenFile().getMainStreamSafe().getDescriptor().
// setCanBeSeekable(false);
// error.getOpenFile().getMainStreamSafe().getDescriptor().
// setCanBeSeekable(false);

return new POpenTuple(input, output, error, process);
// } catch (BadDescriptorException e) {
// throw runtime.newErrnoEBADFError();
} catch (IOException e) {
throw runtime.newIOErrorFromException(e);
}
return JRubyWindowsLibrary.popen4(context, recv, args, block);
}

@Deprecated
220 changes: 220 additions & 0 deletions core/src/main/java/org/jruby/ext/jruby/JRubyWindowsLibrary.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package org.jruby.ext.jruby;

import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyIO;
import org.jruby.RubyModule;
import org.jruby.RubyProcess;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyMethod;
import org.jruby.exceptions.RaiseException;
import org.jruby.internal.runtime.ThreadedRunnable;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.load.Library;
import org.jruby.util.ShellLauncher;

import java.io.IOException;
import java.util.Arrays;

/**
* Special cases and hacks to better support Windows
*/
public class JRubyWindowsLibrary implements Library {

@Deprecated
public static POpenTuple popenSpecial(ThreadContext context, IRubyObject[] args) {
Ruby runtime = context.runtime;

try {
ShellLauncher.POpenProcess process = ShellLauncher.popen3(runtime, args, false);
RubyIO input = process.getInput() != null ?
new RubyIO(runtime, process.getInput()) :
new RubyIO(runtime, process.getInputStream());
RubyIO output = process.getOutput() != null ?
new RubyIO(runtime, process.getOutput()) :
new RubyIO(runtime, process.getOutputStream());
RubyIO error = process.getError() != null ?
new RubyIO(runtime, process.getError()) :
new RubyIO(runtime, process.getErrorStream());

// ensure the OpenFile knows it's a process; see OpenFile#finalize
input.getOpenFile().setProcess(process);
output.getOpenFile().setProcess(process);
error.getOpenFile().setProcess(process);

// set all streams as popenSpecial streams, so we don't shut down process prematurely
input.setPopenSpecial(true);
output.setPopenSpecial(true);
error.setPopenSpecial(true);

// process streams are not seekable
// input.getOpenFile().getMainStreamSafe().getDescriptor().
// setCanBeSeekable(false);
// output.getOpenFile().getMainStreamSafe().getDescriptor().
// setCanBeSeekable(false);
// error.getOpenFile().getMainStreamSafe().getDescriptor().
// setCanBeSeekable(false);

return new POpenTuple(input, output, error, process);
// } catch (BadDescriptorException e) {
// throw runtime.newErrnoEBADFError();
} catch (IOException e) {
throw runtime.newIOErrorFromException(e);
}
}

public void load(Ruby runtime, boolean wrap) {
RubyModule windows = runtime.defineModuleUnder("Windows", runtime.getModule("JRuby"));
windows.defineAnnotatedMethods(JRubyWindowsLibrary.class);
}

@JRubyMethod(name = "popen3", rest = true, meta = true)
public static IRubyObject popen3(ThreadContext context, IRubyObject recv, IRubyObject[] args, Block block) {
final Ruby runtime = context.runtime;

// TODO: handle opts
if (args.length > 0 && args[args.length - 1] instanceof RubyHash) {
args = Arrays.copyOf(args, args.length - 1);
}

final POpenTuple tuple = popenSpecial(context, args);
final long pid = ShellLauncher.getPidFromProcess(tuple.process);

// array trick to be able to reference enclosing RubyThread
final RubyThread[] waitThread = new RubyThread[1];
waitThread[0] = new RubyThread(
runtime,
(RubyClass) runtime.getClassFromPath("Process::WaitThread"),
new ProcessWaitThread(waitThread, runtime, pid, tuple));

RubyArray yieldArgs = RubyArray.newArrayLight(runtime,
tuple.output,
tuple.input,
tuple.error,
waitThread[0]);

if (block.isGiven()) {
try {
return block.yield(context, yieldArgs);
} finally {
cleanupPOpen(tuple);

IRubyObject status = waitThread[0].join(IRubyObject.NULL_ARRAY);
context.setLastExitStatus(status);
}
}

return yieldArgs;
}

public static IRubyObject popen4(ThreadContext context, IRubyObject recv, IRubyObject[] args, Block block) {
Ruby runtime = context.runtime;

try {
JRubyWindowsLibrary.POpenTuple tuple = JRubyWindowsLibrary.popenSpecial(context, args);

RubyArray yieldArgs = RubyArray.newArrayLight(runtime,
runtime.newFixnum(ShellLauncher.getPidFromProcess(tuple.process)),
tuple.output,
tuple.input,
tuple.error);

if (block.isGiven()) {
try {
return block.yield(context, yieldArgs);
} finally {
cleanupPOpen(tuple);
// RubyStatus uses real native status now, so we unshift Java's shifted exit status
context.setLastExitStatus(RubyProcess.RubyStatus.newProcessStatus(runtime, tuple.process.waitFor() << 8, ShellLauncher.getPidFromProcess(tuple.process)));
}
}
return yieldArgs;
} catch (InterruptedException e) {
throw runtime.newThreadError("unexpected interrupt");
}
}

private static void cleanupPOpen(JRubyWindowsLibrary.POpenTuple tuple) {
if (tuple.input.getOpenFile().isOpen()) {
try {
tuple.input.close();
} catch (RaiseException re) {}
}
if (tuple.output.getOpenFile().isOpen()) {
try {
tuple.output.close();
} catch (RaiseException re) {}
}
if (tuple.error.getOpenFile().isOpen()) {
try {
tuple.error.close();
} catch (RaiseException re) {}
}
}

private static class POpenTuple {
public POpenTuple(RubyIO i, RubyIO o, RubyIO e, Process p) {
input = i; output = o; error = e; process = p;
}
public final RubyIO input;
public final RubyIO output;
public final RubyIO error;
public final Process process;
}

private static class ProcessWaitThread implements ThreadedRunnable {

private final RubyThread[] waitThread;
private final Ruby runtime;
private final long pid;
private final POpenTuple tuple;
volatile Thread javaThread;

public ProcessWaitThread(RubyThread[] waitThread, Ruby runtime, long pid, POpenTuple tuple) {
this.waitThread = waitThread;
this.runtime = runtime;
this.pid = pid;
this.tuple = tuple;
}

@Override
public Thread getJavaThread() {
return javaThread;
}

@Override
public void run() {
javaThread = Thread.currentThread();
RubyThread rubyThread;
// spin a bit until this happens; should almost never spin
while ((rubyThread = waitThread[0]) == null) {
Thread.yield();
}

runtime.getThreadService().registerNewThread(rubyThread);

rubyThread.op_aset(runtime.newSymbol("pid"), runtime.newFixnum(pid));

try {
int exitValue = tuple.process.waitFor();

// RubyStatus uses real native status now, so we unshift Java's shifted exit status
RubyProcess.RubyStatus status = RubyProcess.RubyStatus.newProcessStatus(
runtime,
exitValue << 8,
pid);

rubyThread.cleanTerminate(status);
} catch (Throwable t) {
rubyThread.exceptionRaised(t);
} finally {
rubyThread.dispose();
}
}

}
}
726 changes: 3 additions & 723 deletions lib/ruby/stdlib/jruby/open3_windows.rb
Original file line number Diff line number Diff line change
@@ -1,731 +1,11 @@
#
# = open3.rb: Popen, but with stderr, too
#
# Author:: Yukihiro Matsumoto
# Documentation:: Konrad Meyer
#
# Open3 gives you access to stdin, stdout, and stderr when running other
# programs.
#

#
# Open3 grants you access to stdin, stdout, stderr and a thread to wait the
# child process when running another program.
# You can specify various attributes, redirections, current directory, etc., of
# the program as Process.spawn.
#
# - Open3.popen3 : pipes for stdin, stdout, stderr
# - Open3.popen2 : pipes for stdin, stdout
# - Open3.popen2e : pipes for stdin, merged stdout and stderr
# - Open3.capture3 : give a string for stdin. get strings for stdout, stderr
# - Open3.capture2 : give a string for stdin. get a string for stdout
# - Open3.capture2e : give a string for stdin. get a string for merged stdout and stderr
# - Open3.pipeline_rw : pipes for first stdin and last stdout of a pipeline
# - Open3.pipeline_r : pipe for last stdout of a pipeline
# - Open3.pipeline_w : pipe for first stdin of a pipeline
# - Open3.pipeline_start : a pipeline
# - Open3.pipeline : run a pipline and wait
#
# This is a modified Open3 that overwrites methods not supported on Windows
# with versions that work.

module Open3

# Open stdin, stdout, and stderr streams and start external executable.
# In addition, a thread for waiting the started process is noticed.
# The thread has a pid method and thread variable :pid which is the pid of
# the started process.
#
# Block form:
#
# Open3.popen3([env,] cmd... [, opts]) {|stdin, stdout, stderr, wait_thr|
# pid = wait_thr.pid # pid of the started process.
# ...
# exit_status = wait_thr.value # Process::Status object returned.
# }
#
# Non-block form:
#
# stdin, stdout, stderr, wait_thr = Open3.popen3([env,] cmd... [, opts])
# pid = wait_thr[:pid] # pid of the started process.
# ...
# stdin.close # stdin, stdout and stderr should be closed explicitly in this form.
# stdout.close
# stderr.close
# exit_status = wait_thr.value # Process::Status object returned.
#
# The parameters +cmd...+ is passed to Process.spawn.
# So a commandline string and list of argument strings can be accepted as follows.
#
# Open3.popen3("echo a") {|i, o, e, t| ... }
# Open3.popen3("echo", "a") {|i, o, e, t| ... }
# Open3.popen3(["echo", "argv0"], "a") {|i, o, e, t| ... }
#
# If the last parameter, opts, is a Hash, it is recognized as an option for Process.spawn.
#
# Open3.popen3("pwd", :chdir=>"/") {|i,o,e,t|
# p o.read.chomp #=> "/"
# }
#
# wait_thr.value waits the termination of the process.
# The block form also waits the process when it returns.
#
# Closing stdin, stdout and stderr does not wait the process.
#
def popen3(*cmd, &block)
return IO::popen3(*cmd, &block) if RUBY_ENGINE == 'jruby'

if Hash === cmd.last
opts = cmd.pop.dup
else
opts = {}
end

in_r, in_w = IO.pipe
opts[:in] = in_r
in_w.sync = true

out_r, out_w = IO.pipe
opts[:out] = out_w

err_r, err_w = IO.pipe
opts[:err] = err_w

popen_run(cmd, opts, [in_r, out_w, err_w], [in_w, out_r, err_r], &block)
IO::popen3(*cmd, &block)
end
module_function :popen3

# Open3.popen2 is similer to Open3.popen3 except it doesn't make a pipe for
# the standard error stream.
#
# Block form:
#
# Open3.popen2([env,] cmd... [, opts]) {|stdin, stdout, wait_thr|
# pid = wait_thr.pid # pid of the started process.
# ...
# exit_status = wait_thr.value # Process::Status object returned.
# }
#
# Non-block form:
#
# stdin, stdout, wait_thr = Open3.popen2([env,] cmd... [, opts])
# ...
# stdin.close # stdin and stdout should be closed explicitly in this form.
# stdout.close
#
# See Process.spawn for the optional hash arguments _env_ and _opts_.
#
# Example:
#
# Open3.popen2("wc -c") {|i,o,t|
# i.print "answer to life the universe and everything"
# i.close
# p o.gets #=> "42\n"
# }
#
# Open3.popen2("bc -q") {|i,o,t|
# i.puts "obase=13"
# i.puts "6 * 9"
# p o.gets #=> "42\n"
# }
#
# Open3.popen2("dc") {|i,o,t|
# i.print "42P"
# i.close
# p o.read #=> "*"
# }
#
def popen2(*cmd, &block)
if Hash === cmd.last
opts = cmd.pop.dup
else
opts = {}
end

in_r, in_w = IO.pipe
opts[:in] = in_r
in_w.sync = true

out_r, out_w = IO.pipe
opts[:out] = out_w

popen_run(cmd, opts, [in_r, out_w], [in_w, out_r], &block)
end
module_function :popen2

# Open3.popen2e is similer to Open3.popen3 except it merges
# the standard output stream and the standard error stream.
#
# Block form:
#
# Open3.popen2e([env,] cmd... [, opts]) {|stdin, stdout_and_stderr, wait_thr|
# pid = wait_thr.pid # pid of the started process.
# ...
# exit_status = wait_thr.value # Process::Status object returned.
# }
#
# Non-block form:
#
# stdin, stdout_and_stderr, wait_thr = Open3.popen2e([env,] cmd... [, opts])
# ...
# stdin.close # stdin and stdout_and_stderr should be closed explicitly in this form.
# stdout_and_stderr.close
#
# See Process.spawn for the optional hash arguments _env_ and _opts_.
#
# Example:
# # check gcc warnings
# source = "foo.c"
# Open3.popen2e("gcc", "-Wall", source) {|i,oe,t|
# oe.each {|line|
# if /warning/ =~ line
# ...
# end
# }
# }
#
def popen2e(*cmd, &block)
if Hash === cmd.last
opts = cmd.pop.dup
else
opts = {}
end

in_r, in_w = IO.pipe
opts[:in] = in_r
in_w.sync = true

out_r, out_w = IO.pipe
opts[[:out, :err]] = out_w

popen_run(cmd, opts, [in_r, out_w], [in_w, out_r], &block)
end
module_function :popen2e

def popen_run(cmd, opts, child_io, parent_io) # :nodoc:
pid = spawn(*cmd, opts)
wait_thr = Process.detach(pid)
child_io.each {|io| io.close }
result = [*parent_io, wait_thr]
if defined? yield
begin
return yield(*result)
ensure
parent_io.each{|io| io.close unless io.closed?}
wait_thr.join
end
end
result
end
module_function :popen_run
class << self
private :popen_run
end

# Open3.capture3 captures the standard output and the standard error of a command.
#
# stdout_str, stderr_str, status = Open3.capture3([env,] cmd... [, opts])
#
# The arguments env, cmd and opts are passed to Open3.popen3 except
# opts[:stdin_data] and opts[:binmode]. See Process.spawn.
#
# If opts[:stdin_data] is specified, it is sent to the command's standard input.
#
# If opts[:binmode] is true, internal pipes are set to binary mode.
#
# Example:
#
# # dot is a command of graphviz.
# graph = <<'End'
# digraph g {
# a -> b
# }
# End
# layouted_graph, dot_log = Open3.capture3("dot -v", :stdin_data=>graph)
#
# o, e, s = Open3.capture3("echo a; sort >&2", :stdin_data=>"foo\nbar\nbaz\n")
# p o #=> "a\n"
# p e #=> "bar\nbaz\nfoo\n"
# p s #=> #<Process::Status: pid 32682 exit 0>
#
# # generate a thumnail image using the convert command of ImageMagick.
# # However, if the image stored really in a file,
# # system("convert", "-thumbnail", "80", "png:#{filename}", "png:-") is better
# # because memory consumption.
# # But if the image is stored in a DB or generated by gnuplot Open3.capture2 example,
# # Open3.capture3 is considerable.
# #
# image = File.read("/usr/share/openclipart/png/animals/mammals/sheep-md-v0.1.png", :binmode=>true)
# thumnail, err, s = Open3.capture3("convert -thumbnail 80 png:- png:-", :stdin_data=>image, :binmode=>true)
# if s.success?
# STDOUT.binmode; print thumnail
# end
#
def capture3(*cmd)
if Hash === cmd.last
opts = cmd.pop.dup
else
opts = {}
end

stdin_data = opts.delete(:stdin_data) || ''
binmode = opts.delete(:binmode)

popen3(*cmd, opts) {|i, o, e, t|
if binmode
i.binmode
o.binmode
e.binmode
end
out_reader = Thread.new { o.read }
err_reader = Thread.new { e.read }
i.write stdin_data
i.close
[out_reader.value, err_reader.value, t.value]
}
end
module_function :capture3

# Open3.capture2 captures the standard output of a command.
#
# stdout_str, status = Open3.capture2([env,] cmd... [, opts])
#
# The arguments env, cmd and opts are passed to Open3.popen3 except
# opts[:stdin_data] and opts[:binmode]. See Process.spawn.
#
# If opts[:stdin_data] is specified, it is sent to the command's standard input.
#
# If opts[:binmode] is true, internal pipes are set to binary mode.
#
# Example:
#
# # factor is a command for integer factorization.
# o, s = Open3.capture2("factor", :stdin_data=>"42")
# p o #=> "42: 2 3 7\n"
#
# # generate x**2 graph in png using gnuplot.
# gnuplot_commands = <<"End"
# set terminal png
# plot x**2, "-" with lines
# 1 14
# 2 1
# 3 8
# 4 5
# e
# End
# image, s = Open3.capture2("gnuplot", :stdin_data=>gnuplot_commands, :binmode=>true)
#
def capture2(*cmd)
if Hash === cmd.last
opts = cmd.pop.dup
else
opts = {}
end

stdin_data = opts.delete(:stdin_data) || ''
binmode = opts.delete(:binmode)

popen2(*cmd, opts) {|i, o, t|
if binmode
i.binmode
o.binmode
end
out_reader = Thread.new { o.read }
i.write stdin_data
i.close
[out_reader.value, t.value]
}
end
module_function :capture2

# Open3.capture2e captures the standard output and the standard error of a command.
#
# stdout_and_stderr_str, status = Open3.capture2e([env,] cmd... [, opts])
#
# The arguments env, cmd and opts are passed to Open3.popen3 except
# opts[:stdin_data] and opts[:binmode]. See Process.spawn.
#
# If opts[:stdin_data] is specified, it is sent to the command's standard input.
#
# If opts[:binmode] is true, internal pipes are set to binary mode.
#
# Example:
#
# # capture make log
# make_log, s = Open3.capture2e("make")
#
def capture2e(*cmd)
if Hash === cmd.last
opts = cmd.pop.dup
else
opts = {}
end

stdin_data = opts.delete(:stdin_data) || ''
binmode = opts.delete(:binmode)

popen2e(*cmd, opts) {|i, oe, t|
if binmode
i.binmode
oe.binmode
end
outerr_reader = Thread.new { oe.read }
i.write stdin_data
i.close
[outerr_reader.value, t.value]
}
end
module_function :capture2e

# Open3.pipeline_rw starts a list of commands as a pipeline with pipes
# which connects stdin of the first command and stdout of the last command.
#
# Open3.pipeline_rw(cmd1, cmd2, ... [, opts]) {|first_stdin, last_stdout, wait_threads|
# ...
# }
#
# first_stdin, last_stdout, wait_threads = Open3.pipeline_rw(cmd1, cmd2, ... [, opts])
# ...
# first_stdin.close
# last_stdout.close
#
# Each cmd is a string or an array.
# If it is an array, the elements are passed to Process.spawn.
#
# cmd:
# commandline command line string which is passed to a shell
# [env, commandline, opts] command line string which is passed to a shell
# [env, cmdname, arg1, ..., opts] command name and one or more arguments (no shell)
# [env, [cmdname, argv0], arg1, ..., opts] command name and arguments including argv[0] (no shell)
#
# Note that env and opts are optional, as Process.spawn.
#
# The option to pass Process.spawn is constructed by merging
# +opts+, the last hash element of the array and
# specification for the pipe between each commands.
#
# Example:
#
# Open3.pipeline_rw("tr -dc A-Za-z", "wc -c") {|i,o,ts|
# i.puts "All persons more than a mile high to leave the court."
# i.close
# p o.gets #=> "42\n"
# }
#
# Open3.pipeline_rw("sort", "cat -n") {|stdin, stdout, wait_thrs|
# stdin.puts "foo"
# stdin.puts "bar"
# stdin.puts "baz"
# stdin.close # send EOF to sort.
# p stdout.read #=> " 1\tbar\n 2\tbaz\n 3\tfoo\n"
# }
def pipeline_rw(*cmds, &block)
if Hash === cmds.last
opts = cmds.pop.dup
else
opts = {}
end

in_r, in_w = IO.pipe
opts[:in] = in_r
in_w.sync = true

out_r, out_w = IO.pipe
opts[:out] = out_w

pipeline_run(cmds, opts, [in_r, out_w], [in_w, out_r], &block)
end
module_function :pipeline_rw

# Open3.pipeline_r starts a list of commands as a pipeline with a pipe
# which connects stdout of the last command.
#
# Open3.pipeline_r(cmd1, cmd2, ... [, opts]) {|last_stdout, wait_threads|
# ...
# }
#
# last_stdout, wait_threads = Open3.pipeline_r(cmd1, cmd2, ... [, opts])
# ...
# last_stdout.close
#
# Each cmd is a string or an array.
# If it is an array, the elements are passed to Process.spawn.
#
# cmd:
# commandline command line string which is passed to a shell
# [env, commandline, opts] command line string which is passed to a shell
# [env, cmdname, arg1, ..., opts] command name and one or more arguments (no shell)
# [env, [cmdname, argv0], arg1, ..., opts] command name and arguments including argv[0] (no shell)
#
# Note that env and opts are optional, as Process.spawn.
#
# Example:
#
# Open3.pipeline_r("zcat /var/log/apache2/access.log.*.gz",
# [{"LANG"=>"C"}, "grep", "GET /favicon.ico"],
# "logresolve") {|o, ts|
# o.each_line {|line|
# ...
# }
# }
#
# Open3.pipeline_r("yes", "head -10") {|o, ts|
# p o.read #=> "y\ny\ny\ny\ny\ny\ny\ny\ny\ny\n"
# p ts[0].value #=> #<Process::Status: pid 24910 SIGPIPE (signal 13)>
# p ts[1].value #=> #<Process::Status: pid 24913 exit 0>
# }
#
def pipeline_r(*cmds, &block)
if Hash === cmds.last
opts = cmds.pop.dup
else
opts = {}
end

out_r, out_w = IO.pipe
opts[:out] = out_w

pipeline_run(cmds, opts, [out_w], [out_r], &block)
end
module_function :pipeline_r

# Open3.pipeline_w starts a list of commands as a pipeline with a pipe
# which connects stdin of the first command.
#
# Open3.pipeline_w(cmd1, cmd2, ... [, opts]) {|first_stdin, wait_threads|
# ...
# }
#
# first_stdin, wait_threads = Open3.pipeline_w(cmd1, cmd2, ... [, opts])
# ...
# first_stdin.close
#
# Each cmd is a string or an array.
# If it is an array, the elements are passed to Process.spawn.
#
# cmd:
# commandline command line string which is passed to a shell
# [env, commandline, opts] command line string which is passed to a shell
# [env, cmdname, arg1, ..., opts] command name and one or more arguments (no shell)
# [env, [cmdname, argv0], arg1, ..., opts] command name and arguments including argv[0] (no shell)
#
# Note that env and opts are optional, as Process.spawn.
#
# Example:
#
# Open3.pipeline_w("bzip2 -c", :out=>"/tmp/hello.bz2") {|i, ts|
# i.puts "hello"
# }
#
def pipeline_w(*cmds, &block)
if Hash === cmds.last
opts = cmds.pop.dup
else
opts = {}
end

in_r, in_w = IO.pipe
opts[:in] = in_r
in_w.sync = true

pipeline_run(cmds, opts, [in_r], [in_w], &block)
end
module_function :pipeline_w

# Open3.pipeline_start starts a list of commands as a pipeline.
# No pipe made for stdin of the first command and
# stdout of the last command.
#
# Open3.pipeline_start(cmd1, cmd2, ... [, opts]) {|wait_threads|
# ...
# }
#
# wait_threads = Open3.pipeline_start(cmd1, cmd2, ... [, opts])
# ...
#
# Each cmd is a string or an array.
# If it is an array, the elements are passed to Process.spawn.
#
# cmd:
# commandline command line string which is passed to a shell
# [env, commandline, opts] command line string which is passed to a shell
# [env, cmdname, arg1, ..., opts] command name and one or more arguments (no shell)
# [env, [cmdname, argv0], arg1, ..., opts] command name and arguments including argv[0] (no shell)
#
# Note that env and opts are optional, as Process.spawn.
#
# Example:
#
# # run xeyes in 10 seconds.
# Open3.pipeline_start("xeyes") {|ts|
# sleep 10
# t = ts[0]
# Process.kill("TERM", t.pid)
# p t.value #=> #<Process::Status: pid 911 SIGTERM (signal 15)>
# }
#
# # convert pdf to ps and send it to a printer.
# # collect error message of pdftops and lpr.
# pdf_file = "paper.pdf"
# printer = "printer-name"
# err_r, err_w = IO.pipe
# Open3.pipeline_start(["pdftops", pdf_file, "-"],
# ["lpr", "-P#{printer}"],
# :err=>err_w) {|ts|
# err_w.close
# p err_r.read # error messages of pdftops and lpr.
# }
#
def pipeline_start(*cmds, &block)
if Hash === cmds.last
opts = cmds.pop.dup
else
opts = {}
end

if block
pipeline_run(cmds, opts, [], [], &block)
else
ts, = pipeline_run(cmds, opts, [], [])
ts
end
end
module_function :pipeline_start

# Open3.pipeline starts a list of commands as a pipeline.
# It waits the finish of the commands.
# No pipe made for stdin of the first command and
# stdout of the last command.
#
# status_list = Open3.pipeline(cmd1, cmd2, ... [, opts])
#
# Each cmd is a string or an array.
# If it is an array, the elements are passed to Process.spawn.
#
# cmd:
# commandline command line string which is passed to a shell
# [env, commandline, opts] command line string which is passed to a shell
# [env, cmdname, arg1, ..., opts] command name and one or more arguments (no shell)
# [env, [cmdname, argv0], arg1, ..., opts] command name and arguments including argv[0] (no shell)
#
# Note that env and opts are optional, as Process.spawn.
#
# Example:
#
# fname = "/usr/share/man/man1/ruby.1.gz"
# p Open3.pipeline(["zcat", fname], "nroff -man", "less")
# #=> [#<Process::Status: pid 11817 exit 0>,
# # #<Process::Status: pid 11820 exit 0>,
# # #<Process::Status: pid 11828 exit 0>]
#
# fname = "/usr/share/man/man1/ls.1.gz"
# Open3.pipeline(["zcat", fname], "nroff -man", "colcrt")
#
# # convert PDF to PS and send to a printer by lpr
# pdf_file = "paper.pdf"
# printer = "printer-name"
# Open3.pipeline(["pdftops", pdf_file, "-"],
# ["lpr", "-P#{printer}"])
#
# # count lines
# Open3.pipeline("sort", "uniq -c", :in=>"names.txt", :out=>"count")
#
# # cyclic pipeline
# r,w = IO.pipe
# w.print "ibase=14\n10\n"
# Open3.pipeline("bc", "tee /dev/tty", :in=>r, :out=>w)
# #=> 14
# # 18
# # 22
# # 30
# # 42
# # 58
# # 78
# # 106
# # 202
#
def pipeline(*cmds)
if Hash === cmds.last
opts = cmds.pop.dup
else
opts = {}
end

pipeline_run(cmds, opts, [], []) {|ts|
ts.map {|t| t.value }
}
end
module_function :pipeline

def pipeline_run(cmds, pipeline_opts, child_io, parent_io) # :nodoc:
if cmds.empty?
raise ArgumentError, "no commands"
end

opts_base = pipeline_opts.dup
opts_base.delete :in
opts_base.delete :out

wait_thrs = []
r = nil
cmds.each_with_index {|cmd, i|
cmd_opts = opts_base.dup
if String === cmd
cmd = [cmd]
else
cmd_opts.update cmd.pop if Hash === cmd.last
end
if i == 0
if !cmd_opts.include?(:in)
if pipeline_opts.include?(:in)
cmd_opts[:in] = pipeline_opts[:in]
end
end
else
cmd_opts[:in] = r
end
if i != cmds.length - 1
r2, w2 = IO.pipe
cmd_opts[:out] = w2
else
if !cmd_opts.include?(:out)
if pipeline_opts.include?(:out)
cmd_opts[:out] = pipeline_opts[:out]
end
end
end
pid = spawn(*cmd, cmd_opts)
wait_thrs << Process.detach(pid)
r.close if r
w2.close if w2
r = r2
}
result = parent_io + [wait_thrs]
child_io.each {|io| io.close }
if defined? yield
begin
return yield(*result)
ensure
parent_io.each{|io| io.close unless io.closed?}
wait_thrs.each {|t| t.join }
end
end
result
end
module_function :pipeline_run
class << self
private :pipeline_run
end

end

if $0 == __FILE__
a = Open3.popen3("nroff -man")
Thread.start do
while line = gets
a[0].print line
end
a[0].close
end
while line = a[1].gets
print ":", line
end
end
14 changes: 6 additions & 8 deletions lib/ruby/stdlib/open3.rb
Original file line number Diff line number Diff line change
@@ -29,14 +29,7 @@
# - Open3.pipeline : run a pipeline and wait for its completion
#

# Because spawn does not yet work on Windows, we fall back on the older open3 there.
real_open3 = true
if respond_to?(:org) && org.jruby.platform.Platform::IS_WINDOWS
require 'jruby/open3_windows'
real_open3 = false
end

real_open3 && module Open3
module Open3

# Open stdin, stdout, and stderr streams and start external executable.
# In addition, a thread to wait for the started process is created.
@@ -674,3 +667,8 @@ class << self
end

end

# Because spawn does not yet work on Windows, we fall back on the older open3 there.
if RUBY_ENGINE == 'jruby' && org.jruby.platform.Platform::IS_WINDOWS
require 'jruby/open3_windows'
end

0 comments on commit f58ddad

Please sign in to comment.