Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: rubinius/rubinius
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: fbb3f1e408ed
Choose a base ref
...
head repository: rubinius/rubinius
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 0e2ace24f2f9
Choose a head ref
  • 4 commits
  • 12 files changed
  • 1 contributor

Commits on May 7, 2015

  1. Copy the full SHA
    5576d2c View commit details
  2. Reworked Console connection mechanism.

    One major problem with the previous mechanism was that there's no way
    for a process to clean up from a SIGKILL, so there's no way to not litter
    some directory with the console files that the process would pre-emptively
    create.
    
    The new process timeline looks like this:
    
    1. When a Rubinius process starts, it opens (creating if necessary, and
    truncating) the system.console.path file. It watches that file for
    modifications. If the process is killed, that file will persist, but there
    is only one such file for N many processes. Opening the file advertises that
    the process is available for Console connections. The file descriptor has
    the close-on-exec flag set.
    
    2. When a change is detected on the system.console.path file, the process
    stats two files named ${system.console.path}-$pid-{request, response}. If
    these files exist and are regular files, a connection is initiated.
    
    3. Threads are created for the request, response, and worker functions.
    While the connection is maintained by the client, new connection attempts
    are ignored.
    
    4. When the connection is terminated by the client, a new connection may
    be initiated as described in 2.
    
    When the process with a connected client forks, the child process closes
    the Console request and response files.
    
    When a process without a connected client forks, the listener is recreated
    in the child.
    brixen committed May 7, 2015
    Copy the full SHA
    4d299a3 View commit details
  3. Fill in standard IO file descriptors on startup.

    If the process that exec's us had closed file descriptors 0, 1, or 2, we re-open
    them to tmp files so the descriptors are allocated.
    brixen committed May 7, 2015
    Copy the full SHA
    874d5be View commit details
  4. Copy the full SHA
    0e2ace2 View commit details
Showing with 253 additions and 169 deletions.
  1. +0 −4 kernel/delta/console.rb
  2. +3 −6 library/rubinius/configuration.rb
  3. +3 −0 vm/builtin/io.cpp
  4. +147 −60 vm/console.cpp
  5. +54 −20 vm/console.hpp
  6. +37 −66 vm/environment.cpp
  7. +2 −5 vm/environment.hpp
  8. +0 −2 vm/internal_threads.cpp
  9. +6 −2 vm/llvm/state.cpp
  10. +1 −1 vm/shared_state.cpp
  11. +0 −1 vm/shared_state.hpp
  12. +0 −2 vm/signal.cpp
4 changes: 0 additions & 4 deletions kernel/delta/console.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
module Rubinius
module Console
class Server
# The following constants are set internally:
# RequestPath - the path to the console request file for this process
# ResponsePath - the path to the console response file for this process

attr_reader :inbox
attr_reader :outbox
attr_reader :thread
9 changes: 3 additions & 6 deletions library/rubinius/configuration.rb
Original file line number Diff line number Diff line change
@@ -176,14 +176,11 @@
s.vm_variable "tmp", "$TMPDIR",
"Default temp/fallback directory for the process"

s.vm_variable "fsapi.path", "$TMPDIR/$PROGRAM_NAME-$USER-$PID",
"Base directory of the Rubinius File System API files"

s.vm_variable "fsapi.access", 0750,
"Permissions on the Rubinius File System API directory"
s.vm_variable "console.path", "$TMPDIR/$PROGRAM_NAME-$USER-console",
"Path for Rubinius Console connection file"

s.vm_variable "console.access", 0660,
"Permissions on the Rubinius Console files"
"Permissions on the Rubinius Console connection file"

s.vm_variable "log", "$TMPDIR/$PROGRAM_NAME-$USER.log",
"Logging facility to use: 'syslog', 'console', or path"
3 changes: 3 additions & 0 deletions vm/builtin/io.cpp
Original file line number Diff line number Diff line change
@@ -486,6 +486,9 @@ namespace rubinius {
// Invalid descriptor no matter what.
descriptor(state, Fixnum::from(-1));

// Don't close stdin, stdout, stderr descriptors.
if(desc < 3) return cNil;

// If there is a handle for this IO, and it's been promoted into
// a lowlevel RIO struct using fdopen, then we MUST use fclose
// to close it.
207 changes: 147 additions & 60 deletions vm/console.cpp
Original file line number Diff line number Diff line change
@@ -26,12 +26,15 @@
#include <sys/types.h>

#include <stdio.h>
#include <libgen.h>

// read
#include <unistd.h>
#include <sys/uio.h>
#include <sys/types.h>

#include <ostream>

namespace rubinius {
using namespace utilities;

@@ -53,37 +56,31 @@ namespace rubinius {
return fd;
}

Request::Request(STATE, Console* console)
Request::Request(STATE, Console* console, Response* response)
: InternalThread(state, "rbx.console.request")
, console_(console)
, response_(console->response())
, response_(response)
, enabled_(false)
, fd_(-1)
, fsevent_(state)
{
}

void Request::initialize(STATE) {
path_ = state->shared().fsapi_path + "/console-request";
console_->server_class(state)->set_const(state, state->symbol("RequestPath"),
String::create(state, path_.c_str()));
}

void Request::setup_request(STATE) {
if((fd_ = open_file(state, path_)) < 0) {
if((fd_ = open_file(state, console_->request_path())) < 0) {
logger::error("%s: console request: unable to open file", strerror(errno));
return;
}

FSEvent* fsevent = FSEvent::create(state);
fsevent->watch_file(state, fd_, path_.c_str());
fsevent->watch_file(state, fd_, console_->request_path().c_str());
fsevent_.set(fsevent);

enabled_ = true;
}

void Request::start_thread(STATE) {
setup_request(state);
if(!enabled_) return;

InternalThread::start_thread(state);
}
@@ -96,7 +93,7 @@ namespace rubinius {
}
}

void Request::close_request(STATE) {
void Request::close_request() {
if(fd_ > 0) {
close(fd_);
fd_ = -1;
@@ -106,22 +103,11 @@ namespace rubinius {
void Request::stop_thread(STATE) {
InternalThread::stop_thread(state);

close_request(state);
unlink(path_.c_str());
}

void Request::before_exec(STATE) {
stop_thread(state);
}

void Request::after_exec(STATE) {
start(state);
close_request();
}

void Request::after_fork_child(STATE) {
close_request(state);

InternalThread::after_fork_child(state);
close_request();
}

char* Request::read_request(STATE) {
@@ -182,9 +168,15 @@ namespace rubinius {
Response::Response(STATE, Console* console)
: InternalThread(state, "rbx.console.response")
, console_(console)
, inbox_(state)
, outbox_(state)
, fd_(-1)
, request_list_(NULL)
{
inbox_.set(as<Channel>(
console_->ruby_console()->get_ivar(state, state->symbol("@inbox"))));
outbox_.set(as<Channel>(
console_->ruby_console()->get_ivar(state, state->symbol("@outbox"))));
}

Response::~Response() {
@@ -193,15 +185,11 @@ namespace rubinius {
}

void Response::initialize(STATE) {
path_ = state->shared().fsapi_path + "/console-response";
console_->server_class(state)->set_const(state, state->symbol("ResponsePath"),
String::create(state, path_.c_str()));

Thread::create(state, vm());
}

void Response::start_thread(STATE) {
if((fd_ = open_file(state, path_)) < 0) {
if((fd_ = open_file(state, console_->response_path())) < 0) {
logger::error("%s: console response: unable to open file", strerror(errno));
return;
}
@@ -218,17 +206,20 @@ namespace rubinius {
void Response::wakeup(STATE) {
InternalThread::wakeup(state);

GCTokenImpl gct;
inbox_.get()->send(state, gct, String::create(state, ""), 0);

response_cond_.signal();
}

void Response::close_response(STATE) {
void Response::close_response() {
if(fd_ > 0) {
close(fd_);
fd_ = -1;
}
}

void Response::clear_requests(STATE) {
void Response::clear_requests() {
if(request_list_) {
for(RequestList::const_iterator i = request_list_->begin();
i != request_list_->end();
@@ -244,25 +235,11 @@ namespace rubinius {
void Response::stop_thread(STATE) {
InternalThread::stop_thread(state);

clear_requests(state);

close_response(state);
unlink(path_.c_str());
}

void Response::before_exec(STATE) {
stop_thread(state);
}

void Response::after_exec(STATE) {
start(state);
close_response();
}

void Response::after_fork_child(STATE) {
close_response(state);
clear_requests(state);

InternalThread::after_fork_child(state);
close_response();
}

void Response::send_request(STATE, const char* request) {
@@ -301,10 +278,8 @@ namespace rubinius {
size_t pending_requests = 0;
char* request = NULL;

Channel* inbox = as<Channel>(
console_->ruby_console()->get_ivar(state, state->symbol("@inbox")));
Channel* outbox = as<Channel>(
console_->ruby_console()->get_ivar(state, state->symbol("@outbox")));
Channel* inbox = inbox_.get();
Channel* outbox = outbox_.get();

String* response = 0;
OnStack<3> os(state, inbox, outbox, response);
@@ -352,23 +327,135 @@ namespace rubinius {
}
}

Listener::Listener(STATE, Console* console)
: InternalThread(state, "rbx.console.listener")
, console_(console)
, fsevent_(state)
, fd_(-1)
{
}

Listener::~Listener() {
close(fd_);
}

void Listener::initialize(STATE) {
fd_ = ::open(console_->console_path().c_str(),
O_CREAT | O_TRUNC | O_RDWR | O_CLOEXEC,
state->shared().config.system_console_access.value);

if(fd_ < 0) {
utilities::logger::error("%s: unable to open Console connection file",
strerror(errno));
}

// The umask setting will override our permissions for open().
if(chmod(console_->console_path().c_str(),
state->shared().config.system_console_access.value) < 0) {
utilities::logger::error("%s: unable to set mode for Console connection file",
strerror(errno));
}

FSEvent* fsevent = FSEvent::create(state);
fsevent->watch_file(state, fd_, console_->console_path().c_str());
fsevent_.set(fsevent);
}

void Listener::start_thread(STATE) {
InternalThread::start_thread(state);
}

void Listener::wakeup(STATE) {
InternalThread::wakeup(state);

if(write(fd_, "\0", 1) < 0) {
logger::error("%s: console: unable to wake listener thread", strerror(errno));
}
}

bool Listener::connection_initiated() {
struct stat st;

bool req = stat(console_->request_path().c_str(), &st) == 0 && S_ISREG(st.st_mode);
bool res = stat(console_->response_path().c_str(), &st) == 0 && S_ISREG(st.st_mode);

return req && res;
}

void Listener::run(STATE) {
while(!thread_exit_) {
Object* status = fsevent_.get()->wait_for_event(state);

if(thread_exit_) break;

if(status->nil_p()) {
utilities::logger::error("%s: console: listener: wait for event failed",
strerror(errno));
continue;
}

if(console_->connected_p()) continue;

if(connection_initiated()) {
console_->accept(state);
}
}
}

Console::Console(STATE)
: path_(std::string(state->shared().fsapi_path + "/console"))
, response_(new Response(state, this))
, request_(new Request(state, this))
: listener_(0)
, response_(0)
, request_(0)
, ruby_console_(state)
{
ruby_console_.set(server_class(state)->send(state, 0, state->symbol("new")));
console_path_ = state->shared().config.system_console_path.value;

std::ostringstream basename;
basename << state->shared().config.system_console_path.value << "-"
<< state->shared().pid;

request_path_ = basename.str() + "-request";
response_path_ = basename.str() + "-response";

listener_ = new Listener(state, this);
}

Console::~Console() {
delete request_;
delete response_;
if(listener_) delete listener_;
reset();
}

bool Console::connected_p() {
return request_ && request_->enabled_p();
}

void Console::start(STATE) {
request_->start(state);
response_->start(state);
listener_->start(state);
}

void Console::accept(STATE) {
ruby_console_.set(server_class(state)->send(state, 0, state->symbol("new")));

response_ = new Response(state, this);
request_ = new Request(state, this, response_);
}

void Console::reset() {
if(request_) {
delete request_;
request_ = 0;
}

if(response_) {
delete response_;
response_ = 0;
}

ruby_console_.set(cNil);
}

void Console::after_fork_child(STATE) {
reset();
}

Class* Console::server_class(STATE) {
Loading