Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Reworked signal handling. Closes #3418.
Browse files Browse the repository at this point in the history
brixen committed Jun 11, 2015
1 parent 0bc749b commit 5a97aec
Showing 9 changed files with 206 additions and 196 deletions.
55 changes: 29 additions & 26 deletions kernel/loader.rb
Original file line number Diff line number Diff line change
@@ -790,6 +790,34 @@ def done
Process.exit! @exit_code
end

def handle_exception(e)
case e
when SystemExit
@exit_code = e.status
when SyntaxError
@exit_code = 1

show_syntax_error(e)

STDERR.puts "\nBacktrace:"
STDERR.puts
STDERR.puts e.awesome_backtrace.show
when Interrupt
@exit_code = 1

Rubinius::Logger.log_exception "An exception occurred #{@stage}:", e
when SignalException
Signal.trap(e.signo, "SIG_DFL")
Process.kill e.signo, Process.pid
end
rescue Object => e
@exit_code = 1

Rubinius::Logger.log_exception "An exception occurred #{@stage}:", e
ensure
epilogue
end

# Orchestrate everything.
def main
preamble
@@ -810,33 +838,8 @@ def main
script
repl

rescue SystemExit => e
@exit_code = e.status

epilogue
rescue SyntaxError => e
@exit_code = 1

show_syntax_error(e)

STDERR.puts "\nBacktrace:"
STDERR.puts
STDERR.puts e.awesome_backtrace.show
epilogue
rescue Interrupt => e
@exit_code = 1

Rubinius::Logger.log_exception "An exception occurred #{@stage}:", e
epilogue
rescue SignalException => e
Signal.trap(e.signo, "SIG_DFL")
Process.kill e.signo, Process.pid
epilogue
rescue Object => e
@exit_code = 1

Rubinius::Logger.log_exception "An exception occurred #{@stage}:", e
epilogue
handle_exception e
else
# We do this, run epilogue both in the rescue blocks and also here,
# so that at_exit{} hooks can read $!.
8 changes: 5 additions & 3 deletions vm/builtin/system.cpp
Original file line number Diff line number Diff line change
@@ -1019,13 +1019,15 @@ namespace rubinius {
}

Object* System::vm_watch_signal(STATE, Fixnum* sig, Object* ignored) {
SignalThread* st = state->shared().signal_handler();
SignalThread* st = state->shared().signals();

if(st) {
native_int i = sig->to_native();
if(i < 0) {
st->add_signal(state, -i, SignalThread::eDefault);
st->add_signal_handler(state, -i, SignalThread::eDefault);
} else if(i > 0) {
st->add_signal(state, i, CBOOL(ignored) ? SignalThread::eIgnore : SignalThread::eCustom);
st->add_signal_handler(state, i,
CBOOL(ignored) ? SignalThread::eIgnore : SignalThread::eCustom);
}

return cTrue;
35 changes: 15 additions & 20 deletions vm/environment.cpp
Original file line number Diff line number Diff line change
@@ -79,6 +79,7 @@ namespace rubinius {
, signature_(0)
, signal_thread_(NULL)
, finalizer_thread_(NULL)
, loader_(NULL)
{
#ifdef ENABLE_LLVM
#if RBX_LLVM_API_VER < 305
@@ -107,6 +108,8 @@ namespace rubinius {
root_vm->metrics().init(metrics::eRubyMetrics);
state = new State(root_vm);

loader_ = new TypedRoot<Object*>(state);

NativeMethod::init_thread(state);

start_logging(state);
@@ -198,12 +201,6 @@ namespace rubinius {
#endif
}

void Environment::start_signals(STATE) {
state->vm()->set_run_signals(true);
signal_thread_ = new SignalThread(state, config);
signal_thread_->start(state);
}

void Environment::stop_signals(STATE) {
signal_thread_->stop(state);
}
@@ -597,7 +594,7 @@ namespace rubinius {
}

stop_jit(state);
stop_signals(state);
state->shared().signals()->stop(state);

root_vm->set_call_frame(0);

@@ -846,7 +843,7 @@ namespace rubinius {

load_argv(argc_, argv_);

start_signals(state);
state->shared().start_signals(state);
state->vm()->initialize_config();

load_tool();
@@ -870,23 +867,21 @@ namespace rubinius {
state->shared().start_console(state);
state->shared().start_metrics(state);

Object* loader = G(rubinius)->get_const(state, state->symbol("Loader"));
if(loader->nil_p()) {
rubinius::bug("Unable to find loader");
Object* klass = G(rubinius)->get_const(state, state->symbol("Loader"));
if(klass->nil_p()) {
rubinius::bug("unable to find class Rubinius::Loader");
}

OnStack<1> os(state, loader);
Object* instance = klass->send(state, 0, state->symbol("new"));
if(instance) {
loader_->set(instance);
} else {
rubinius::bug("unable to instantiate Rubinius::Loader");
}

// Enable the JIT after the core library has loaded
G(jit)->enable(state);

Object* inst = loader->send(state, 0, state->symbol("new"));
if(inst) {
OnStack<1> os2(state, inst);

inst->send(state, 0, state->symbol("main"));
} else {
rubinius::bug("Unable to instantiate loader");
}
loader_->get()->send(state, 0, state->symbol("main"));
}
}
8 changes: 8 additions & 0 deletions vm/environment.hpp
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@
#include "config_parser.hpp"
#include "configuration.hpp"

#include "gc/root.hpp"

namespace rubinius {

class ConfigParser;
@@ -61,6 +63,8 @@ namespace rubinius {

utilities::thread::Mutex halt_lock_;

TypedRoot<Object*>* loader_;

public:
SharedState* shared;
VM* root_vm;
@@ -81,6 +85,10 @@ namespace rubinius {
return argv_;
}

Object* loader() {
return loader_->get();
}

void set_root_vm(VM* vm) {
root_vm = vm;
state->set_vm(vm);
12 changes: 11 additions & 1 deletion vm/shared_state.cpp
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@

#include "console.hpp"
#include "metrics.hpp"
#include "signal.hpp"
#include "world_state.hpp"
#include "builtin/randomizer.hpp"
#include "builtin/array.hpp"
@@ -31,7 +32,7 @@ namespace rubinius {

SharedState::SharedState(Environment* env, Configuration& config, ConfigParser& cp)
: internal_threads_(0)
, signal_thread_(0)
, signals_(0)
, finalizer_thread_(0)
, console_(0)
, metrics_(0)
@@ -142,6 +143,15 @@ namespace rubinius {
return threads;
}

SignalThread* SharedState::start_signals(STATE) {
SYNC(state);

signals_ = new SignalThread(state);
signals_->start(state);

return signals_;
}

console::Console* SharedState::start_console(STATE) {
SYNC(state);

16 changes: 7 additions & 9 deletions vm/shared_state.hpp
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ namespace rubinius {
class SharedState : public Lockable {
private:
InternalThreads* internal_threads_;
SignalThread* signal_thread_;
SignalThread* signals_;
FinalizerThread* finalizer_thread_;
console::Console* console_;
metrics::Metrics* metrics_;
@@ -160,14 +160,6 @@ namespace rubinius {
return internal_threads_;
}

SignalThread* signal_handler() const {
return signal_thread_;
}

void set_signal_handler(SignalThread* thr) {
signal_thread_ = thr;
}

FinalizerThread* finalizer_handler() const {
return finalizer_thread_;
}
@@ -215,6 +207,12 @@ namespace rubinius {
return primitive_hits_[primitive];
}

SignalThread* signals() const {
return signals_;
}

SignalThread* start_signals(STATE);

console::Console* console() const {
return console_;
}
227 changes: 114 additions & 113 deletions vm/signal.cpp
Original file line number Diff line number Diff line change
@@ -3,35 +3,31 @@
#include "vm.hpp"
#include "call_frame.hpp"
#include "environment.hpp"
#include "on_stack.hpp"
#include "signal.hpp"
#include "configuration.hpp"

#include "builtin/module.hpp"
#include "builtin/array.hpp"

#include "builtin/array.hpp"
#include "builtin/class.hpp"
#include "builtin/constant_scope.hpp"
#include "builtin/jit.hpp"
#include "builtin/module.hpp"
#include "builtin/module.hpp"
#include "builtin/native_method.hpp"
#include "builtin/string.hpp"
#include "builtin/thread.hpp"

#include <string>
#include <iostream>
#include <fcntl.h>

#include "util/logger.hpp"

#include "dtrace/dtrace.h"

#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>

#include <iostream>
#include <string>

#ifdef USE_EXECINFO
#include <execinfo.h>
#endif
@@ -53,33 +49,62 @@ namespace rubinius {
// crashing inside the crash handler.
static struct utsname machine_info;

SignalThread::SignalThread(STATE, Configuration& config)
: InternalThread(state, "rbx.signal", InternalThread::eSmall)
SignalThread::SignalThread(STATE)
: InternalThread(state, "rbx.signals", InternalThread::eSmall)
, shared_(state->shared())
, target_(state->vm())
, queued_signals_(0)
, queue_index_(0)
, process_index_(0)
{
signal_thread_ = this;
shared_.set_signal_handler(this);
install_default_handlers();
}

setup_default_handlers();
void SignalThread::signal_handler(int signal) {
signal_thread_->queue_signal(signal);
}

void SignalThread::queue_signal(int signal) {
if(thread_exit_) return;

metrics().system_metrics.os_signals_received++;

{
thread::Mutex::LockGuard guard(lock_);

pending_signals_[queue_index_] = signal;
/* GCC 4.8.2 can't tell that this code is equivalent.
* queue_index_ = ++queue_index_ % pending_signal_size_;
*/
++queue_index_;
queue_index_ %= pending_signal_size_;

condition_.signal();
}
}

void SignalThread::initialize(STATE) {
InternalThread::initialize(state);

for(int i = 0; i < NSIG; i++) {
Thread::create(state, vm());

for(int i = 0; i < pending_signal_size_; i++) {
pending_signals_[i] = 0;
}

worker_lock_.init();
worker_cond_.init();
queue_index_ = process_index_ = 0;

watch_lock_.init();
lock_.init();
condition_.init();
}

void SignalThread::wakeup(STATE) {
InternalThread::wakeup(state);

worker_cond_.signal();
{
thread::Mutex::LockGuard guard(lock_);
condition_.signal();
}
}

void SignalThread::stop(STATE) {
@@ -92,11 +117,73 @@ namespace rubinius {
InternalThread::stop(state);
}

void SignalThread::run(STATE) {
GCTokenImpl gct;
state->gc_independent(gct, 0);

#ifndef RBX_WINDOWS
sigset_t set;
sigfillset(&set);
pthread_sigmask(SIG_BLOCK, &set, NULL);
#endif

metrics().init(metrics::eRubyMetrics);

while(!thread_exit_) {
int signal = pending_signals_[process_index_];
pending_signals_[process_index_] = 0;

/* GCC 4.8.2 can't tell that this code is equivalent.
* process_index_ = ++process_index_ % pending_signal_size_;
*/
++process_index_;
process_index_ %= pending_signal_size_;

if(signal > 0) {
GCDependent guard(state, 0);

metrics().system_metrics.os_signals_processed++;

Array* args = 0;
OnStack<1> os(state, args);

args = Array::create(state, 1);
args->set(state, 0, Fixnum::from(signal));

if(!G(rubinius)->send(state, 0, state->symbol("received_signal"), args, cNil)) {
if(state->thread_state()->raise_reason() == cException ||
state->thread_state()->raise_reason() == cExit) {
Array* args = 0;
Exception* exc = 0;
OnStack<2> os(state, args, exc);

exc = state->thread_state()->current_exception();
state->thread_state()->clear_raise();

args = Array::create(state, 1);
args->set(state, 0, exc);

state->shared().env()->loader()->send(
state, 0, state->symbol("handle_exception"), args, cNil);

state->shared().env()->halt_and_exit(state);

break;
}
}
} else {
thread::Mutex::LockGuard guard(lock_);

if(queue_index_ != process_index_) continue;
condition_.wait(lock_);
}
}
}

void SignalThread::print_machine_info(PrintFunction function) {
function("node info: %s %s", machine_info.nodename, machine_info.version);
}


#define RBX_PROCESS_INFO_LEN 256

void SignalThread::print_process_info(PrintFunction function) {
@@ -126,116 +213,30 @@ namespace rubinius {
function("process info: %s", process_info);
}

void SignalThread::run(STATE) {
#ifndef RBX_WINDOWS
sigset_t set;
sigfillset(&set);
pthread_sigmask(SIG_BLOCK, &set, NULL);
#endif

GCTokenImpl gct;

metrics().init(metrics::eRubyMetrics);

while(!thread_exit_) {
{
utilities::thread::Mutex::LockGuard lg(worker_lock_);
if(thread_exit_) break;
state->gc_independent(gct, 0);
worker_cond_.wait(worker_lock_);
// If we should exit now, don't try to become
// dependent first but break and exit the thread
if(thread_exit_) break;
}
state->gc_dependent(gct, 0);
{
utilities::thread::Mutex::LockGuard lg(worker_lock_);
if(thread_exit_) break;
}

target_->wakeup(state, gct, 0);
}
}

void SignalThread::signal_handler(int sig) {
signal_thread_->handle_signal(sig);
}

void SignalThread::handle_signal(int sig) {
if(thread_exit_) return;

target_->metrics().system_metrics.os_signals_received++;

queued_signals_ = 1;
pending_signals_[sig] = 1;

target_->set_check_local_interrupts();

if(target_->should_interrupt_with_signal()) {
if(!pthread_equal(pthread_self(), target_->os_thread())) {
#ifdef RBX_WINDOWS
// TODO: Windows
#else
pthread_kill(target_->os_thread(), SIGVTALRM);
#endif
}
}

worker_cond_.signal();
}

void SignalThread::add_signal(STATE, int sig, HandlerType type) {
SYNC(state);
utilities::thread::Mutex::LockGuard lg(worker_lock_);
void SignalThread::add_signal_handler(STATE, int signal, HandlerType type) {
thread::SpinLock::LockGuard guard(watch_lock_);

#ifndef RBX_WINDOWS
struct sigaction action;

if(type == eDefault) {
action.sa_handler = SIG_DFL;
watched_signals_.remove(sig);
watched_signals_.remove(signal);
} else if(type == eIgnore) {
action.sa_handler = SIG_IGN;
watched_signals_.push_back(sig);
watched_signals_.push_back(signal);
} else {
action.sa_handler = signal_handler;
watched_signals_.push_back(sig);
watched_signals_.push_back(signal);
}

action.sa_flags = 0;
sigfillset(&action.sa_mask);

sigaction(sig, &action, NULL);
sigaction(signal, &action, NULL);
#endif
}

bool SignalThread::deliver_signals(STATE, CallFrame* call_frame) {
queued_signals_ = 0;

for(int i = 0; i < NSIG; i++) {
if(pending_signals_[i] > 0) {
pending_signals_[i] = 0;

target_->metrics().system_metrics.os_signals_processed++;

Array* args = Array::create(state, 1);
args->set(state, 0, Fixnum::from(i));

// Check whether the send raised an exception and
// stop running the handlers if that happens
if(!G(rubinius)->send(state, call_frame,
state->symbol("received_signal"), args, cNil)) {
if(state->thread_state()->raise_reason() == cException ||
state->thread_state()->raise_reason() == cExit) {
return false;
}
}
}
}

return true;
}

void SignalThread::print_backtraces() {
STATE = shared_.env()->state;
ThreadList* threads = shared_.threads();
@@ -398,7 +399,7 @@ namespace rubinius {
}
#endif

void SignalThread::setup_default_handlers() {
void SignalThread::install_default_handlers() {
#ifndef RBX_WINDOWS
// Get the machine info.
uname(&machine_info);
35 changes: 17 additions & 18 deletions vm/signal.hpp
Original file line number Diff line number Diff line change
@@ -16,16 +16,18 @@ namespace rubinius {
struct CallFrame;

class SignalThread : public InternalThread, public Lockable {
const static int pending_signal_size_ = 256;
SharedState& shared_;
VM* target_;

int pending_signals_[NSIG];
int queued_signals_;
int pending_signals_[pending_signal_size_];
int queue_index_;
int process_index_;

std::list<int> watched_signals_;
utilities::thread::SpinLock watch_lock_;

utilities::thread::Condition worker_cond_;
utilities::thread::Mutex worker_lock_;
utilities::thread::Condition condition_;
utilities::thread::Mutex lock_;

public:
enum HandlerType {
@@ -34,28 +36,25 @@ namespace rubinius {
eCustom
};

SignalThread(STATE, Configuration& config);
SignalThread(STATE);

SharedState& shared() {
return shared_;
}

void initialize(STATE);
void setup_default_handlers();

void add_signal(State*, int sig, HandlerType type = eCustom);
void handle_signal(int sig);
static void signal_handler(int sig);

bool deliver_signals(STATE, CallFrame* call_frame);
static void signal_handler(int signal);

void print_backtraces();

void open_pipes();
void install_default_handlers();

void initialize(STATE);
void run(STATE);
void stop(STATE);
void wakeup(STATE);
void stop(STATE);

void queue_signal(int signal);
void add_signal_handler(State*, int signal, HandlerType type = eCustom);

void print_backtraces();

typedef void (*PrintFunction)(const char* message, ...);

6 changes: 0 additions & 6 deletions vm/state.cpp
Original file line number Diff line number Diff line change
@@ -22,12 +22,6 @@ namespace rubinius {
set_call_frame(call_frame);
vm_->clear_check_local_interrupts();

if(vm_->run_signals_) {
if(!vm_->shared.signal_handler()->deliver_signals(this, call_frame)) {
return false;
}
}

Exception* exc = vm_->interrupted_exception_.get();
if(!exc->nil_p()) {
vm_->interrupted_exception_.set(nil<Exception>());

0 comments on commit 5a97aec

Please sign in to comment.