Skip to content

Commit

Permalink
Showing 8 changed files with 184 additions and 73 deletions.
13 changes: 7 additions & 6 deletions machine/builtin/fiber.cpp
Original file line number Diff line number Diff line change
@@ -87,7 +87,8 @@ namespace rubinius {

if(!vm()->suspended_p()) {
std::ostringstream msg;
msg << "attempt to restart non-suspended (" << vm()->transition_flag() << ") fiber";
msg << "attempt to restart non-suspended ("
<< vm()->fiber_transition_flag() << ") fiber";
Exception::raise_fiber_error(state, msg.str().c_str());
}

@@ -97,8 +98,8 @@ namespace rubinius {
wakeup();

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->wait_mutex());
vm()->wait_condition().notify_one();
std::lock_guard<std::mutex> guard(vm()->fiber_wait_mutex());
vm()->fiber_wait_condition().notify_one();
}
}

@@ -111,11 +112,11 @@ namespace rubinius {
UnmanagedPhase unmanaged(state);

{
std::unique_lock<std::mutex> lk(vm()->wait_mutex());
std::unique_lock<std::mutex> lk(vm()->fiber_wait_mutex());

vm()->set_suspended();
while(!wakeup_p()) {
vm()->wait_condition().wait(lk);
vm()->fiber_wait_condition().wait(lk);
}
clear_wakeup();
vm()->set_resuming();
@@ -186,7 +187,7 @@ namespace rubinius {
}

{
std::lock_guard<std::mutex> guard(vm->wait_mutex());
std::lock_guard<std::mutex> guard(vm->fiber_wait_mutex());

vm->fiber()->status(eDead);
vm->set_suspended();
18 changes: 7 additions & 11 deletions machine/builtin/system.cpp
Original file line number Diff line number Diff line change
@@ -382,21 +382,19 @@ namespace rubinius {

static int fork_exec(STATE, int errors_fd) {
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->fork_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

state->shared().machine_threads()->before_fork_exec(state);
state->memory()->set_interrupt();

ThreadNexus::LockStatus status = state->vm()->thread_nexus()->lock(state->vm());
ThreadNexus::LockStatus status = state->vm()->thread_nexus()->fork_lock(state->vm());

// If execvp() succeeds, we'll read EOF and know.
fcntl(errors_fd, F_SETFD, FD_CLOEXEC);

int pid = ::fork();

if(status == ThreadNexus::eLocked) {
state->vm()->thread_nexus()->unlock();
}
state->vm()->thread_nexus()->fork_unlock(status);

if(pid == 0) {
// We're in the child...
@@ -707,7 +705,7 @@ namespace rubinius {

Object* System::vm_exec(STATE, String* path, Array* args) {
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->exec_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

/* Setting up the command and arguments may raise an exception so do it
* before everything else.
@@ -861,18 +859,16 @@ namespace rubinius {
return force_as<Fixnum>(Primitives::failure());
#else
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->fork_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

state->shared().machine_threads()->before_fork(state);
state->memory()->set_interrupt();

ThreadNexus::LockStatus status = state->vm()->thread_nexus()->lock(state->vm());
ThreadNexus::LockStatus status = state->vm()->thread_nexus()->fork_lock(state->vm());

int pid = ::fork();

if(status == ThreadNexus::eLocked) {
state->vm()->thread_nexus()->unlock();
}
state->vm()->thread_nexus()->fork_unlock(status);

if(pid > 0) {
// We're in the parent...
39 changes: 31 additions & 8 deletions machine/logger.cpp
Original file line number Diff line number Diff line change
@@ -47,6 +47,26 @@ namespace rubinius {
loglevel_ = level;
}

void lock() {
if(logger_) {
logger_->lock();
}
}

bool try_lock() {
if(logger_) {
return logger_->try_lock();
} else {
return false;
}
}

void unlock() {
if(logger_) {
logger_->unlock();
}
}

void close() {
delete logger_;
}
@@ -124,7 +144,7 @@ namespace rubinius {

void write(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

char buf[LOGGER_MSG_SIZE];

@@ -136,7 +156,7 @@ namespace rubinius {

void fatal(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eFatal) return;

@@ -150,7 +170,7 @@ namespace rubinius {

void error(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eError) return;

@@ -164,7 +184,7 @@ namespace rubinius {

void warn(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eWarn) return;

@@ -178,7 +198,7 @@ namespace rubinius {

void info(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eInfo) return;

@@ -192,7 +212,7 @@ namespace rubinius {

void debug(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eDebug) return;

@@ -215,10 +235,13 @@ namespace rubinius {

char* Logger::timestamp() {
time_t clock;
struct tm lt;

time(&clock);
strftime(formatted_time_, LOGGER_TIME_SIZE, "%b %e %H:%M:%S",
localtime(&clock));
localtime_r(&clock, &lt);

strftime(formatted_time_, LOGGER_TIME_SIZE, "%b %e %H:%M:%S", &lt);

return formatted_time_;
}

17 changes: 16 additions & 1 deletion machine/logger.hpp
Original file line number Diff line number Diff line change
@@ -26,6 +26,9 @@ namespace rubinius {


void open(logger_type type, const char* identifier, logger_level level=eWarn, ...);
void lock();
bool try_lock();
void unlock();
void close();

void write(const char* message, ...);
@@ -70,9 +73,21 @@ namespace rubinius {

char* timestamp();

rubinius::locks::spinlock_mutex& lock() {
rubinius::locks::spinlock_mutex& spinlock() {
return lock_;
}

void lock() {
lock_.lock();
}

bool try_lock() {
return lock_.try_lock();
}

void unlock() {
lock_.unlock();
}
};

class Syslog : public Logger {
91 changes: 85 additions & 6 deletions machine/thread_nexus.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "logger.hpp"
#include "shared_state.hpp"
#include "thread_nexus.hpp"
#include "vm.hpp"
@@ -150,6 +151,16 @@ namespace rubinius {
}
}

void ThreadNexus::detect_deadlock(uint64_t nanoseconds, uint64_t limit) {
if(nanoseconds > limit) {
logger::fatal("thread nexus: unable to lock, possible deadlock");

list_threads();

rubinius::abort();
}
}

uint64_t ThreadNexus::delay() {
static int i = 0;
static int delay[] = {
@@ -167,6 +178,76 @@ namespace rubinius {
return ns;
}

ThreadNexus::LockStatus ThreadNexus::fork_lock(VM* vm) {
waiting_phase(vm);

/* Preserve the state of the phase_flag_ in situations where we have the
* entire system serialized.
*/
uint32_t id = 0;
bool held = false;
uint64_t ns = 0;

while(!phase_flag_.compare_exchange_strong(id, vm->thread_id())) {
if(id == vm->thread_id()) {
/* The exchange failed, but it was because the value was already set
* to our id, so we hold the "lock".
*/
vm->set_thread_phase(eManaged);
held = true;
break;
}

ns += delay();

detect_deadlock(ns, lock_limit);

id = 0;
}

/* Lock and hold the waiting_mutex to prevent any other thread from
* holding it across a fork() call.
*/
ns = 0;

while(!waiting_mutex_.try_lock()) {
ns += delay();

detect_deadlock(ns, lock_limit);
}

// Checkpoint all the other threads.
ns = 0;

while(!try_checkpoint(vm)) {
ns += delay();

detect_deadlock(ns, lock_limit);
}

/* Hold the logger lock so that the multi-process semaphore that the
* logger depends on is not held across fork() calls.
*/
ns = 0;

while(!logger::try_lock()) {
ns += delay();

detect_deadlock(ns, lock_limit);
}

return to_lock_status(held);
}

void ThreadNexus::fork_unlock(LockStatus status) {
if(status == eLocked) {
phase_flag_ = 0;
}

logger::unlock();
waiting_mutex_.unlock();
}

bool ThreadNexus::try_checkpoint(VM* vm) {
timer::StopWatch<timer::nanoseconds> timer(
vm->metrics().lock.stop_the_world_ns);
@@ -232,9 +313,7 @@ namespace rubinius {

vm->set_thread_phase(eWaiting);

while(!phase_flag_.compare_exchange_strong(id, vm->thread_id(),
std::memory_order_acq_rel))
{
while(!phase_flag_.compare_exchange_strong(id, vm->thread_id())) {
if(id == vm->thread_id()) {
/* The exchange failed, but it was because the value was already set
* to our id, so we hold the "lock".
@@ -244,9 +323,9 @@ namespace rubinius {
}

{
std::unique_lock<std::mutex> lk(wait_mutex_);
wait_condition_.wait(lk,
[this]{ return phase_flag_.load(std::memory_order_acquire) == 0; });
std::unique_lock<std::mutex> lk(waiting_mutex_);
waiting_condition_.wait(lk,
[this]{ return phase_flag_ == 0; });
}

id = 0;
35 changes: 16 additions & 19 deletions machine/thread_nexus.hpp
Original file line number Diff line number Diff line change
@@ -27,11 +27,10 @@ namespace rubinius {

class ThreadNexus {
std::atomic<bool> stop_;
std::mutex fork_mutex_;
std::mutex exec_mutex_;
std::mutex process_mutex_;
std::mutex threads_mutex_;
std::mutex wait_mutex_;
std::condition_variable wait_condition_;
std::mutex waiting_mutex_;
std::condition_variable waiting_condition_;

std::atomic<uint32_t> phase_flag_;

@@ -58,11 +57,10 @@ namespace rubinius {

ThreadNexus()
: stop_(false)
, fork_mutex_()
, exec_mutex_()
, process_mutex_()
, threads_mutex_()
, wait_mutex_()
, wait_condition_()
, waiting_mutex_()
, waiting_condition_()
, phase_flag_(0)
, threads_()
, thread_ids_(0)
@@ -78,12 +76,8 @@ namespace rubinius {
}

public:
std::mutex& fork_mutex() {
return fork_mutex_;
}

std::mutex& exec_mutex() {
return exec_mutex_;
std::mutex& process_mutex() {
return process_mutex_;
}

ThreadList* threads() {
@@ -119,8 +113,8 @@ namespace rubinius {
waiting_phase(vm);

{
std::unique_lock<std::mutex> lk(wait_mutex_);
wait_condition_.wait(lk,
std::unique_lock<std::mutex> lk(waiting_mutex_);
waiting_condition_.wait(lk,
[this]{ return !stop_.load(std::memory_order_acquire); });
}

@@ -129,6 +123,8 @@ namespace rubinius {
}

bool waiting_lock(VM* vm);
LockStatus fork_lock(VM* vm);
void fork_unlock(LockStatus status);

LockStatus try_lock(VM* vm) {
while(stop_p()) {
@@ -160,16 +156,17 @@ namespace rubinius {
}

void unlock() {
std::lock_guard<std::mutex> guard(wait_mutex_);
phase_flag_.store(0, std::memory_order_release);
wait_condition_.notify_all();
std::lock_guard<std::mutex> guard(waiting_mutex_);
phase_flag_ = 0;
waiting_condition_.notify_all();
}

bool try_checkpoint(VM* vm);
void checkpoint(VM* vm);

uint64_t delay();
void detect_deadlock(uint64_t nanoseconds, uint64_t limit, VM* vm);
void detect_deadlock(uint64_t nanoseconds, uint64_t limit);

void list_threads();

6 changes: 3 additions & 3 deletions machine/vm.cpp
Original file line number Diff line number Diff line change
@@ -70,9 +70,9 @@ namespace rubinius {
, interrupt_by_kill_(false)
, check_local_interrupts_(false)
, thread_step_(false)
, wait_mutex_()
, wait_condition_()
, transition_flag_(eSuspending)
, fiber_wait_mutex_()
, fiber_wait_condition_()
, fiber_transition_flag_(eSuspending)
, interrupt_lock_()
, method_missing_reason_(eNone)
, constant_missing_reason_(vFound)
38 changes: 19 additions & 19 deletions machine/vm.hpp
Original file line number Diff line number Diff line change
@@ -104,8 +104,8 @@ namespace rubinius {
bool check_local_interrupts_;
bool thread_step_;

std::mutex wait_mutex_;
std::condition_variable wait_condition_;
std::mutex fiber_wait_mutex_;
std::condition_variable fiber_wait_condition_;

enum FiberTransition {
eSuspending,
@@ -115,7 +115,7 @@ namespace rubinius {
eFinished
};

std::atomic<FiberTransition> transition_flag_;
std::atomic<FiberTransition> fiber_transition_flag_;

utilities::thread::SpinLock interrupt_lock_;

@@ -183,56 +183,56 @@ namespace rubinius {
return interrupt_lock_;
}

std::mutex& wait_mutex() {
return wait_mutex_;
std::mutex& fiber_wait_mutex() {
return fiber_wait_mutex_;
}

std::condition_variable& wait_condition() {
return wait_condition_;
std::condition_variable& fiber_wait_condition() {
return fiber_wait_condition_;
}

FiberTransition transition_flag() {
return transition_flag_;
FiberTransition fiber_transition_flag() {
return fiber_transition_flag_;
}

bool suspending_p() const {
return transition_flag_ == eSuspending;
return fiber_transition_flag_ == eSuspending;
}

bool suspended_p() const {
return transition_flag_ == eSuspended;
return fiber_transition_flag_ == eSuspended;
}

bool resuming_p() const {
return transition_flag_ == eResuming;
return fiber_transition_flag_ == eResuming;
}

bool running_p() const {
return transition_flag_ == eRunning;
return fiber_transition_flag_ == eRunning;
}

bool finished_p() const {
return transition_flag_ == eFinished;
return fiber_transition_flag_ == eFinished;
}

void set_suspending() {
transition_flag_ = eSuspending;
fiber_transition_flag_ = eSuspending;
}

void set_suspended() {
transition_flag_ = eSuspended;
fiber_transition_flag_ = eSuspended;
}

void set_resuming() {
transition_flag_ = eResuming;
fiber_transition_flag_ = eResuming;
}

void set_running() {
transition_flag_ = eRunning;
fiber_transition_flag_ = eRunning;
}

void set_finished() {
transition_flag_ = eFinished;
fiber_transition_flag_ = eFinished;
}

void set_thread(Thread* thread);

0 comments on commit 2d7d69c

Please sign in to comment.