Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into codedb-ffi-io
Browse files Browse the repository at this point in the history
brixen committed Jun 27, 2016
2 parents 3cbca7e + 44d5f80 commit 9a0726c
Showing 26 changed files with 573 additions and 182 deletions.
10 changes: 10 additions & 0 deletions core/fiber.rb
Original file line number Diff line number Diff line change
@@ -34,6 +34,11 @@ def self.main
raise PrimitiveFailure, "Fiber.main primitive failed"
end

def self.count
Rubinius.primitive :fiber_s_count
raise PrimitiveFailure, "Fiber.count primitive failed"
end

def status
Rubinius.primitive :fiber_status
raise PrimitiveFailure, "Fiber#status primitive failed"
@@ -49,6 +54,11 @@ def transfer(*args)
raise PrimitiveFailure, "Fiber#transfer primitive failed"
end

def dispose
Rubinius.primitive :fiber_dispose
raise PrimitiveFailure, "Fiber#dispose primitive failed"
end

def alive?
status != "dead"
end
5 changes: 5 additions & 0 deletions core/thread.rb
Original file line number Diff line number Diff line change
@@ -79,6 +79,11 @@ def self.list
Kernel.raise PrimitiveFailure, "Thread.list primitive failed"
end

def self.count
Rubinius.primitive :thread_count
Kernel.raise PrimitiveFailure, "Thread.count primitive failed"
end

def self.stop
sleep
nil
88 changes: 74 additions & 14 deletions machine/builtin/fiber.cpp
Original file line number Diff line number Diff line change
@@ -87,7 +87,8 @@ namespace rubinius {

if(!vm()->suspended_p()) {
std::ostringstream msg;
msg << "attempt to restart non-suspended (" << vm()->transition_flag() << ") fiber";
msg << "attempt to restart non-suspended ("
<< vm()->fiber_transition_flag() << ") fiber";
Exception::raise_fiber_error(state, msg.str().c_str());
}

@@ -97,8 +98,8 @@ namespace rubinius {
wakeup();

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->wait_mutex());
vm()->wait_condition().notify_one();
std::lock_guard<std::mutex> guard(vm()->fiber_wait_mutex());
vm()->fiber_wait_condition().notify_one();
}
}

@@ -107,15 +108,44 @@ namespace rubinius {
}
}

void Fiber::cancel(STATE) {
{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

vm()->thread_state()->raise_fiber_cancel();

state->vm()->set_suspending();

restart_context(state->vm());
wakeup();

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->fiber_wait_mutex());
vm()->fiber_wait_condition().notify_one();
}
}

vm()->limited_wait_for([this]{ return vm()->running_p(); });

// Release the canceled Fiber.
state->vm()->set_suspended();

vm()->limited_wait_for([this]{ return vm()->zombie_p(); });

vm()->set_canceled();

state->vm()->set_running();
}

void Fiber::suspend_and_continue(STATE) {
UnmanagedPhase unmanaged(state);

{
std::unique_lock<std::mutex> lk(vm()->wait_mutex());
std::unique_lock<std::mutex> lk(vm()->fiber_wait_mutex());

vm()->set_suspended();
while(!wakeup_p()) {
vm()->wait_condition().wait(lk);
vm()->fiber_wait_condition().wait(lk);
}
clear_wakeup();
vm()->set_resuming();
@@ -137,6 +167,8 @@ namespace rubinius {
Object* Fiber::return_value(STATE) {
if(vm()->thread_state()->raise_reason() == cNone) {
return state->vm()->thread()->fiber_value();
} else if(vm()->thread_state()->raise_reason() == cFiberCancel) {
return NULL;
} else {
invoke_context()->thread_state()->set_state(vm()->thread_state());
return NULL;
@@ -177,16 +209,18 @@ namespace rubinius {
vm->thread()->fiber_value(state, cNil);
}

if(vm->fiber()->status() == eTransfer) {
// restart the root Fiber
vm->thread()->fiber()->invoke_context(vm);
vm->thread()->fiber()->restart(state);
} else {
vm->fiber()->invoke_context()->fiber()->restart(state);
if(vm->thread_state()->raise_reason() != cFiberCancel) {
if(vm->fiber()->status() == eTransfer) {
// restart the root Fiber
vm->thread()->fiber()->invoke_context(vm);
vm->thread()->fiber()->restart(state);
} else {
vm->fiber()->invoke_context()->fiber()->restart(state);
}
}

{
std::lock_guard<std::mutex> guard(vm->wait_mutex());
std::lock_guard<std::mutex> guard(vm->fiber_wait_mutex());

vm->fiber()->status(eDead);
vm->set_suspended();
@@ -377,6 +411,12 @@ namespace rubinius {
return return_value(state);
}

Object* Fiber::dispose(STATE) {
cancel(state);

return this;
}

Object* Fiber::s_yield(STATE, Arguments& args) {
Fiber* fiber = state->vm()->fiber();
OnStack<1> os(state, fiber);
@@ -412,10 +452,30 @@ namespace rubinius {
return state->vm()->thread()->fiber();
}

void Fiber::finalize(STATE, Fiber* fib) {
Fixnum* Fiber::s_count(STATE) {
return state->shared().vm_fibers_count(state);
}

void Fiber::finalize(STATE, Fiber* fiber) {
if(state->shared().config.machine_fiber_log_finalizer.value) {
logger::write("fiber: finalizer: %s, %d",
fib->thread_name()->c_str(state), fib->fiber_id()->to_native());
fiber->thread_name()->c_str(state), fiber->fiber_id()->to_native());
}

if(fiber->vm()) {
if(!state->shared().halting_p()) {
if(!fiber->vm()->zombie_p()) {
fiber->cancel(state);
}
}

if(fiber->vm()->zombie_p()) {
VM::discard(state, fiber->vm());
fiber->vm(NULL);
} else {
logger::write("fiber: finalizer: fiber not completed: %s, %d",
fiber->thread_name()->c_str(state), fiber->fiber_id()->to_native());
}
}
}
}
7 changes: 7 additions & 0 deletions machine/builtin/fiber.hpp
Original file line number Diff line number Diff line change
@@ -93,6 +93,9 @@ namespace rubinius {
// Rubinius.primitive :fiber_s_main
static Fiber* s_main(STATE);

// Rubinius.primitive :fiber_s_count
static Fixnum* s_count(STATE);

bool root_p();

Status status() {
@@ -120,6 +123,7 @@ namespace rubinius {

void start(STATE, Arguments& args);
void restart(STATE);
void cancel(STATE);
void suspend_and_continue(STATE);

// Rubinius.primitive :fiber_status
@@ -131,6 +135,9 @@ namespace rubinius {
// Rubinius.primitive :fiber_transfer
Object* transfer(STATE, Arguments& args);

// Rubinius.primitive :fiber_dispose
Object* dispose(STATE);

public: /* TypeInfo */

class Info : public TypeInfo {
42 changes: 28 additions & 14 deletions machine/builtin/system.cpp
Original file line number Diff line number Diff line change
@@ -206,7 +206,14 @@ namespace rubinius {
args[idx] = 0;

// If we added anything, then exec, otherwise fall through and fail.
if(idx > 0) execvp(args[0], args);
if(idx > 0) {
for(int i = 0; i < 5; i++) {
if(::execvp(args[0], args) < 0) {
if(errno != EAGAIN) break;
}
}
}

// If we failed, clean up the args.
delete[] args;
}
@@ -385,21 +392,19 @@ namespace rubinius {

static int fork_exec(STATE, int errors_fd) {
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->fork_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

state->shared().machine_threads()->before_fork_exec(state);
state->memory()->set_interrupt();

ThreadNexus::LockStatus status = state->vm()->thread_nexus()->lock(state->vm());
ThreadNexus::LockStatus status = state->vm()->thread_nexus()->fork_lock(state->vm());

// If execvp() succeeds, we'll read EOF and know.
fcntl(errors_fd, F_SETFD, FD_CLOEXEC);

int pid = ::fork();

if(status == ThreadNexus::eLocked) {
state->vm()->thread_nexus()->unlock();
}
state->vm()->thread_nexus()->fork_unlock(status);

if(pid == 0) {
// We're in the child...
@@ -465,7 +470,11 @@ namespace rubinius {
}

if(exe.argc()) {
(void)::execvp(exe.command(), exe.argv());
for(int i = 0; i < 5; i++) {
if(::execvp(exe.command(), exe.argv()) < 0) {
if(errno != EAGAIN) break;
}
}
} else {
exec_sh_fallback(state, exe.command(), exe.command_size());
}
@@ -710,7 +719,7 @@ namespace rubinius {

Object* System::vm_exec(STATE, String* path, Array* args) {
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->exec_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

/* Setting up the command and arguments may raise an exception so do it
* before everything else.
@@ -755,7 +764,11 @@ namespace rubinius {
}

if(exe.argc()) {
(void)::execvp(exe.command(), exe.argv());
for(int i = 0; i < 5; i++) {
if(::execvp(exe.command(), exe.argv()) < 0) {
if(errno != EAGAIN) break;
}
}
} else {
exec_sh_fallback(state, exe.command(), exe.command_size());
}
@@ -864,18 +877,16 @@ namespace rubinius {
return force_as<Fixnum>(Primitives::failure());
#else
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->fork_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

state->shared().machine_threads()->before_fork(state);
state->memory()->set_interrupt();

ThreadNexus::LockStatus status = state->vm()->thread_nexus()->lock(state->vm());
ThreadNexus::LockStatus status = state->vm()->thread_nexus()->fork_lock(state->vm());

int pid = ::fork();

if(status == ThreadNexus::eLocked) {
state->vm()->thread_nexus()->unlock();
}
state->vm()->thread_nexus()->fork_unlock(status);

if(pid > 0) {
// We're in the parent...
@@ -1731,6 +1742,9 @@ namespace rubinius {
case cThreadKill:
reason = state->symbol("thread_kill");
break;
case cFiberCancel:
reason = state->symbol("fiber_cancel");
break;
}

tuple->put(state, 0, reason);
16 changes: 8 additions & 8 deletions machine/builtin/thread.cpp
Original file line number Diff line number Diff line change
@@ -112,14 +112,10 @@ namespace rubinius {
logger::write("thread: finalizer: %s", thread->vm()->name().c_str());
}

thread->finalize_instance(state);
}

void Thread::finalize_instance(STATE) {
if(vm() && vm()->zombie_p()) {
fiber_mutex_.std::mutex::~mutex();
VM::discard(state, vm());
vm(NULL);
if(thread->vm() && thread->vm()->zombie_p()) {
thread->fiber_mutex().std::mutex::~mutex();
VM::discard(state, thread->vm());
thread->vm(NULL);
}
}

@@ -432,6 +428,10 @@ namespace rubinius {
return state->shared().vm_threads(state);
}

Fixnum* Thread::count(STATE) {
return state->shared().vm_threads_count(state);
}

Object* Thread::set_priority(STATE, Fixnum* new_priority) {
priority(state, new_priority);
return new_priority;
4 changes: 3 additions & 1 deletion machine/builtin/thread.hpp
Original file line number Diff line number Diff line change
@@ -131,6 +131,9 @@ namespace rubinius {
// Rubinius.primitive :thread_list
static Array* list(STATE);

// Rubinius.primitive :thread_count
static Fixnum* count(STATE);

public: /* Instance primitives */

void fork(STATE);
@@ -237,7 +240,6 @@ namespace rubinius {
static Thread* create(STATE, Class* klass, VM* vm);

static void finalize(STATE, Thread* thread);
void finalize_instance(STATE);

int start_thread(STATE, void* (*function)(void*));
static void* run(void*);
6 changes: 6 additions & 0 deletions machine/environment.cpp
Original file line number Diff line number Diff line change
@@ -546,6 +546,8 @@ namespace rubinius {
void Environment::halt(STATE, int exit_code) {
utilities::thread::Mutex::LockGuard guard(halt_lock_);

state->shared().set_halting();

if(state->shared().config.system_log_lifetime.value) {
logger::write("process: exit: %s %d %fs",
shared->pid.c_str(), exit_code, shared->run_time());
@@ -562,6 +564,8 @@ namespace rubinius {
shared->machine_threads()->shutdown(state);
}

shared->finalizer()->dispose(state);

shared->thread_nexus()->lock(state->vm());

shared->finalizer()->finish(state);
@@ -743,5 +747,7 @@ namespace rubinius {

State main_state(vm);
state->shared().start_signals(&main_state);

state->shared().set_running();
}
}
2 changes: 2 additions & 0 deletions machine/instructions.cpp
Original file line number Diff line number Diff line change
@@ -195,6 +195,8 @@ Object* MachineCode::interpreter(STATE, MachineCode* const mcode) {
case cExit:
call_frame->scope->flush_to_heap(state);
return NULL;
case cFiberCancel:
return NULL;
default:
break;
} // switch
39 changes: 31 additions & 8 deletions machine/logger.cpp
Original file line number Diff line number Diff line change
@@ -47,6 +47,26 @@ namespace rubinius {
loglevel_ = level;
}

void lock() {
if(logger_) {
logger_->lock();
}
}

bool try_lock() {
if(logger_) {
return logger_->try_lock();
} else {
return false;
}
}

void unlock() {
if(logger_) {
logger_->unlock();
}
}

void close() {
delete logger_;
}
@@ -124,7 +144,7 @@ namespace rubinius {

void write(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

char buf[LOGGER_MSG_SIZE];

@@ -136,7 +156,7 @@ namespace rubinius {

void fatal(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eFatal) return;

@@ -150,7 +170,7 @@ namespace rubinius {

void error(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eError) return;

@@ -164,7 +184,7 @@ namespace rubinius {

void warn(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eWarn) return;

@@ -178,7 +198,7 @@ namespace rubinius {

void info(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eInfo) return;

@@ -192,7 +212,7 @@ namespace rubinius {

void debug(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eDebug) return;

@@ -215,10 +235,13 @@ namespace rubinius {

char* Logger::timestamp() {
time_t clock;
struct tm lt;

time(&clock);
strftime(formatted_time_, LOGGER_TIME_SIZE, "%b %e %H:%M:%S",
localtime(&clock));
localtime_r(&clock, &lt);

strftime(formatted_time_, LOGGER_TIME_SIZE, "%b %e %H:%M:%S", &lt);

return formatted_time_;
}

17 changes: 16 additions & 1 deletion machine/logger.hpp
Original file line number Diff line number Diff line change
@@ -26,6 +26,9 @@ namespace rubinius {


void open(logger_type type, const char* identifier, logger_level level=eWarn, ...);
void lock();
bool try_lock();
void unlock();
void close();

void write(const char* message, ...);
@@ -70,9 +73,21 @@ namespace rubinius {

char* timestamp();

rubinius::locks::spinlock_mutex& lock() {
rubinius::locks::spinlock_mutex& spinlock() {
return lock_;
}

void lock() {
lock_.lock();
}

bool try_lock() {
return lock_.try_lock();
}

void unlock() {
lock_.unlock();
}
};

class Syslog : public Logger {
49 changes: 45 additions & 4 deletions machine/memory/finalizer.cpp
Original file line number Diff line number Diff line change
@@ -25,6 +25,13 @@

namespace rubinius {
namespace memory {
void NativeFinalizer::dispose(STATE) {
// TODO: consider building this on the TypeInfo structure.
if(Fiber* fiber = try_as<Fiber>(object())) {
if(!fiber->vm()->zombie_p()) fiber->cancel(state);
}
}

void NativeFinalizer::finalize(STATE) {
(*finalizer_)(state, object());
}
@@ -35,6 +42,9 @@ namespace rubinius {
}
}

void ExtensionFinalizer::dispose(STATE) {
}

void ExtensionFinalizer::finalize(STATE) {
ManagedPhase managed(state);

@@ -84,6 +94,9 @@ namespace rubinius {
}
}

void ManagedFinalizer::dispose(STATE) {
}

void ManagedFinalizer::finalize(STATE) {
ManagedPhase managed(state);

@@ -168,14 +181,17 @@ namespace rubinius {
}

void FinalizerThread::initialize(STATE) {
Thread::create(state, vm());
synchronization_ = new Synchronization();
}

void FinalizerThread::wakeup(STATE) {
MachineThread::wakeup(state);

while(thread_running_) {
LockUnmanaged<std::mutex> guard(state, list_mutex());
UnmanagedPhase unmanaged(state);
std::lock_guard<std::mutex> guard(list_mutex());

list_condition().notify_one();
}
}
@@ -210,12 +226,35 @@ namespace rubinius {
vm()->metrics().gc.objects_finalized++;
}
}

state->vm()->thread()->vm()->set_zombie(state);
}

void FinalizerThread::dispose(STATE) {
finishing_ = true;

std::lock_guard<std::mutex> guard(list_mutex());

for(FinalizerObjects::iterator i = process_list_.begin();
i != process_list_.end();
++i)
{
FinalizerObject* fo = *i;
fo->dispose(state);
}

for(FinalizerObjects::iterator i = live_list_.begin();
i != live_list_.end();
++i)
{
FinalizerObject* fo = *i;
fo->dispose(state);
}
}

void FinalizerThread::finish(STATE) {
finishing_ = true;

// TODO: cleanup
while(!process_list_.empty()) {
FinalizerObject* fo = process_list_.back();
process_list_.pop_back();
@@ -240,15 +279,17 @@ namespace rubinius {
void FinalizerThread::native_finalizer(STATE, Object* obj, FinalizerFunction func) {
if(finishing_) return;

LockUnmanaged<std::mutex> guard(state, list_mutex());
UnmanagedPhase unmanaged(state);
std::lock_guard<std::mutex> guard(list_mutex());

add_finalizer(state, new NativeFinalizer(state, obj, func));
}

void FinalizerThread::extension_finalizer(STATE, Object* obj, FinalizerFunction func) {
if(finishing_) return;

LockUnmanaged<std::mutex> guard(state, list_mutex());
UnmanagedPhase unmanaged(state);
std::lock_guard<std::mutex> guard(list_mutex());

add_finalizer(state, new ExtensionFinalizer(state, obj, func));
}
5 changes: 5 additions & 0 deletions machine/memory/finalizer.hpp
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ namespace rubinius {
object_ = obj;
}

virtual void dispose(STATE) = 0;
virtual void finalize(STATE) = 0;
virtual void mark(ImmixGC* gc) = 0;
virtual bool match_p(STATE, Object* object, Object* finalizer) = 0;
@@ -53,6 +54,7 @@ namespace rubinius {
, finalizer_(finalizer)
{ }

void dispose(STATE);
void finalize(STATE);
void mark(ImmixGC* gc);
bool match_p(STATE, Object* object, Object* finalizer) { return false; }
@@ -67,6 +69,7 @@ namespace rubinius {
, finalizer_(finalizer)
{ }

void dispose(STATE);
void finalize(STATE);
void mark(ImmixGC* gc);
bool match_p(STATE, Object* object, Object* finalizer) { return false; }
@@ -81,6 +84,7 @@ namespace rubinius {
, finalizer_(finalizer)
{ }

void dispose(STATE);
void finalize(STATE);
void mark(ImmixGC* gc);
bool match_p(STATE, Object* object, Object* finalizer);
@@ -128,6 +132,7 @@ namespace rubinius {
}

void finish(STATE);
void dispose(STATE);

void native_finalizer(STATE, Object* obj, FinalizerFunction func);
void extension_finalizer(STATE, Object* obj, FinalizerFunction func);
2 changes: 2 additions & 0 deletions machine/memory/immix_marker.cpp
Original file line number Diff line number Diff line change
@@ -108,6 +108,8 @@ namespace memory {
}

state->memory()->clear_mature_mark_in_progress();

state->vm()->thread()->vm()->set_zombie(state);
}
}
}
1 change: 1 addition & 0 deletions machine/raise_reason.hpp
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ namespace rubinius {
cExit,
cCatchThrow,
cThreadKill,
cFiberCancel,
};
}

44 changes: 44 additions & 0 deletions machine/shared_state.cpp
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
#include "signal.hpp"
#include "builtin/randomizer.hpp"
#include "builtin/array.hpp"
#include "builtin/fixnum.hpp"
#include "builtin/thread.hpp"
#include "builtin/native_method.hpp"
#include "builtin/system.hpp"
@@ -57,6 +58,7 @@ namespace rubinius {
, type_info_lock_()
, code_resource_lock_()
, use_capi_lock_(false)
, phase_(eBooting)
, om(NULL)
, global_cache(new GlobalCache)
, config(config)
@@ -128,6 +130,27 @@ namespace rubinius {
return threads;
}

Fixnum* SharedState::vm_threads_count(STATE) {
std::lock_guard<std::mutex> guard(thread_nexus_->threads_mutex());

native_int count = 0;

for(ThreadList::iterator i = thread_nexus_->threads()->begin();
i != thread_nexus_->threads()->end();
++i)
{
if(VM* vm = (*i)->as_vm()) {
Thread *thread = vm->thread();
if(vm->kind() == memory::ManagedThread::eThread
&&!thread->nil_p() && CBOOL(thread->alive())) {
count++;
}
}
}

return Fixnum::from(count);
}

Array* SharedState::vm_fibers(STATE) {
std::lock_guard<std::mutex> guard(thread_nexus_->threads_mutex());

@@ -149,6 +172,27 @@ namespace rubinius {
return fibers;
}

Fixnum* SharedState::vm_fibers_count(STATE) {
std::lock_guard<std::mutex> guard(thread_nexus_->threads_mutex());

native_int count = 0;

for(ThreadList::iterator i = thread_nexus_->threads()->begin();
i != thread_nexus_->threads()->end();
++i)
{
if(VM* vm = (*i)->as_vm()) {
if(vm->kind() == memory::ManagedThread::eFiber
&& !vm->fiber()->nil_p()
&& vm->fiber()->status() != Fiber::eDead) {
count++;
}
}
}

return Fixnum::from(count);
}

Array* SharedState::vm_thread_fibers(STATE, Thread* thread) {
std::lock_guard<std::mutex> guard(thread_nexus_->threads_mutex());

37 changes: 37 additions & 0 deletions machine/shared_state.hpp
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
#include "capi/capi_constants.h"

#include <unistd.h>
#include <atomic>
#include <string>
#include <vector>

@@ -51,6 +52,7 @@ namespace rubinius {
class ManagedThread;
}

class Fixnum;
class SignalThread;
class Memory;
class GlobalCache;
@@ -81,6 +83,13 @@ namespace rubinius {
*/

class SharedState {
public:
enum Phase {
eBooting,
eRunning,
eHalting,
};

private:
ThreadNexus* thread_nexus_;
MachineThreads* machine_threads_;
@@ -125,6 +134,8 @@ namespace rubinius {
bool use_capi_lock_;
int primitive_hits_[Primitives::cTotalPrimitives];

std::atomic<Phase> phase_;

public:
Globals globals;
Memory* om;
@@ -140,6 +151,30 @@ namespace rubinius {
SharedState(Environment* env, Configuration& config, ConfigParser& cp);
~SharedState();

bool booting_p() {
return phase_ == eBooting;
}

void set_booting() {
phase_ = eBooting;
}

bool running_p() {
return phase_ == eRunning;
}

void set_running() {
phase_ = eRunning;
}

bool halting_p() {
return phase_ == eHalting;
}

void set_halting() {
phase_ = eHalting;
}

int size();

void set_initialized() {
@@ -166,7 +201,9 @@ namespace rubinius {
}

Array* vm_threads(STATE);
Fixnum* vm_threads_count(STATE);
Array* vm_fibers(STATE);
Fixnum* vm_fibers_count(STATE);
Array* vm_thread_fibers(STATE, Thread* thread);

int global_serial() const {
189 changes: 151 additions & 38 deletions machine/thread_nexus.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "logger.hpp"
#include "shared_state.hpp"
#include "thread_nexus.hpp"
#include "vm.hpp"
@@ -14,13 +15,11 @@

namespace rubinius {
void ThreadNexus::blocking_phase(VM* vm) {
bool held = waiting_lock(vm);
vm->set_thread_phase(eBlocking);
if(!held) unlock();
spinning_lock(vm, [vm]{ vm->set_thread_phase(eBlocking); });
}

void ThreadNexus::managed_phase(VM* vm) {
if(!waiting_lock(vm)) unlock();
spinning_lock(vm, [vm]{ vm->set_thread_phase(eManaged); });
}

void ThreadNexus::unmanaged_phase(VM* vm) {
@@ -31,23 +30,6 @@ namespace rubinius {
vm->set_thread_phase(eWaiting);
}

void ThreadNexus::restore_phase(VM* vm, Phase phase) {
switch(phase) {
case eManaged:
managed_phase(vm);
break;
case eBlocking:
blocking_phase(vm);
break;
case eUnmanaged:
unmanaged_phase(vm);
break;
case eWaiting:
waiting_phase(vm);
break;
}
}

bool ThreadNexus::blocking_p(VM* vm) {
atomic::memory_barrier();
return (vm->thread_phase() & eBlocking) == eBlocking;
@@ -87,28 +69,47 @@ namespace rubinius {
void ThreadNexus::after_fork_child(STATE) {
VM* current = state->vm();

for(ThreadList::iterator i = threads_.begin();
i != threads_.end();
++i) {
if(VM* vm = (*i)->as_vm()) {
if(vm == current) continue;
while(!threads_.empty()) {
VM* vm = threads_.back()->as_vm();
threads_.pop_back();

if(vm->kind() == memory::ManagedThread::eThread) {
if(!vm) continue;

switch(vm->kind()) {
case memory::ManagedThread::eThread: {
if(Thread* thread = vm->thread()) {
if(!thread->nil_p()) {
if(vm == current) {
thread->current_fiber(state, thread->fiber());
continue;
}

thread->unlock_after_fork(state);
thread->stopped();
}
}

vm->reset_parked();
vm->set_zombie();

break;
}
case memory::ManagedThread::eFiber: {
if(Fiber* fiber = vm->fiber()) {
fiber->status(Fiber::eDead);
vm->set_canceled();
vm->set_zombie();
}

vm->reset_parked();
break;
}
case memory::ManagedThread::eSystem:
VM::discard(state, vm);
break;
}
}

threads_.clear();
threads_.push_back(current);

state->shared().set_root_vm(current);
}

@@ -150,6 +151,16 @@ namespace rubinius {
}
}

void ThreadNexus::detect_deadlock(uint64_t nanoseconds, uint64_t limit) {
if(nanoseconds > limit) {
logger::fatal("thread nexus: unable to lock, possible deadlock");

list_threads();

rubinius::abort();
}
}

uint64_t ThreadNexus::delay() {
static int i = 0;
static int delay[] = {
@@ -167,6 +178,77 @@ namespace rubinius {
return ns;
}

ThreadNexus::LockStatus ThreadNexus::fork_lock(VM* vm) {
waiting_phase(vm);

/* Preserve the state of the phase_flag_ in situations where we have the
* entire system serialized.
*/
uint32_t id = 0;
bool held = false;
uint64_t ns = 0;

while(!phase_flag_.compare_exchange_strong(id, vm->thread_id())) {
if(id == vm->thread_id()) {
/* The exchange failed, but it was because the value was already set
* to our id, so we hold the "lock".
*/
vm->set_thread_phase(eManaged);
held = true;
break;
}

ns += delay();

detect_deadlock(ns, cLockLimit);

id = 0;
}

// Checkpoint all the other threads.
set_stop();

ns = 0;
while(!try_checkpoint(vm)) {
ns += delay();

detect_deadlock(ns, cLockLimit);
}

/* Lock and hold the waiting_mutex to prevent any other thread from
* holding it across a fork() call.
*/
ns = 0;
while(!waiting_mutex_.try_lock()) {
ns += delay();

detect_deadlock(ns, cLockLimit);
}

/* Hold the logger lock so that the multi-process semaphore that the
* logger depends on is not held across fork() calls.
*/
ns = 0;
while(!logger::try_lock()) {
ns += delay();

detect_deadlock(ns, cLockLimit);
}

return to_lock_status(held);
}

void ThreadNexus::fork_unlock(LockStatus status) {
logger::unlock();

waiting_condition_.notify_all();
waiting_mutex_.unlock();

if(status == eLocked) {
phase_flag_ = 0;
}
}

bool ThreadNexus::try_checkpoint(VM* vm) {
timer::StopWatch<timer::nanoseconds> timer(
vm->metrics().lock.stop_the_world_ns);
@@ -191,7 +273,7 @@ namespace rubinius {

ns += delay();

detect_deadlock(ns, lock_limit, other_vm);
detect_deadlock(ns, cLockLimit, other_vm);
}
}
}
@@ -221,7 +303,7 @@ namespace rubinius {

ns += delay();

detect_deadlock(ns, lock_limit, other_vm);
detect_deadlock(ns, cLockLimit, other_vm);
}
}
}
@@ -232,9 +314,7 @@ namespace rubinius {

vm->set_thread_phase(eWaiting);

while(!phase_flag_.compare_exchange_strong(id, vm->thread_id(),
std::memory_order_acq_rel))
{
while(!phase_flag_.compare_exchange_strong(id, vm->thread_id())) {
if(id == vm->thread_id()) {
/* The exchange failed, but it was because the value was already set
* to our id, so we hold the "lock".
@@ -244,9 +324,9 @@ namespace rubinius {
}

{
std::unique_lock<std::mutex> lk(wait_mutex_);
wait_condition_.wait(lk,
[this]{ return phase_flag_.load(std::memory_order_acquire) == 0; });
std::unique_lock<std::mutex> lk(waiting_mutex_);
waiting_condition_.wait(lk,
[this]{ return phase_flag_ == 0; });
}

id = 0;
@@ -255,4 +335,37 @@ namespace rubinius {
vm->set_thread_phase(eManaged);
return false;
}

void ThreadNexus::spinning_lock(VM* vm, std::function<void ()> f) {
uint32_t id = 0;
int spin = 0;
bool held = false;
uint64_t ns = 0;

vm->set_thread_phase(eWaiting);

while(!phase_flag_.compare_exchange_strong(id, vm->thread_id())) {
if(id == vm->thread_id()) {
/* The exchange failed, but it was because the value was already set
* to our id, so we hold the "lock".
*/
held = true;
break;
}

if(++spin > cSpinLimit) {
ns += delay();

detect_deadlock(ns, cLockLimit);
}

id = 0;
}

// Call the provided function while holding the lock.
f();

// Release the lock unless we already held it.
if(!held) phase_flag_ = 0;
}
}
62 changes: 23 additions & 39 deletions machine/thread_nexus.hpp
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@

#include <atomic>
#include <condition_variable>
#include <functional>
#include <list>
#include <mutex>

@@ -27,18 +28,19 @@ namespace rubinius {

class ThreadNexus {
std::atomic<bool> stop_;
std::mutex fork_mutex_;
std::mutex exec_mutex_;
std::mutex process_mutex_;
std::mutex threads_mutex_;
std::mutex wait_mutex_;
std::condition_variable wait_condition_;
std::mutex waiting_mutex_;
std::condition_variable waiting_condition_;

std::atomic<uint32_t> phase_flag_;

ThreadList threads_;
uint32_t thread_ids_;

const static uint64_t lock_limit = 5000000000;
const static uint64_t cLockLimit = 5000000000;
const static int cSpinLimit = 10000;


public:
enum Phase {
@@ -58,11 +60,10 @@ namespace rubinius {

ThreadNexus()
: stop_(false)
, fork_mutex_()
, exec_mutex_()
, process_mutex_()
, threads_mutex_()
, wait_mutex_()
, wait_condition_()
, waiting_mutex_()
, waiting_condition_()
, phase_flag_(0)
, threads_()
, thread_ids_(0)
@@ -78,12 +79,8 @@ namespace rubinius {
}

public:
std::mutex& fork_mutex() {
return fork_mutex_;
}

std::mutex& exec_mutex() {
return exec_mutex_;
std::mutex& process_mutex() {
return process_mutex_;
}

ThreadList* threads() {
@@ -119,8 +116,8 @@ namespace rubinius {
waiting_phase(vm);

{
std::unique_lock<std::mutex> lk(wait_mutex_);
wait_condition_.wait(lk,
std::unique_lock<std::mutex> lk(waiting_mutex_);
waiting_condition_.wait(lk,
[this]{ return !stop_.load(std::memory_order_acquire); });
}

@@ -129,26 +126,15 @@ namespace rubinius {
}

bool waiting_lock(VM* vm);
void spinning_lock(VM* vm, std::function<void ()> f);

LockStatus try_lock(VM* vm) {
while(stop_p()) {
bool held = waiting_lock(vm);

// Assumption about stop_ may change while we progress.
if(stop_p()) {
if(try_checkpoint(vm)) {
if(stop_p()) {
unset_stop();
return to_lock_status(held);
}
}
}
LockStatus fork_lock(VM* vm);
void fork_unlock(LockStatus status);

// Either we're not stop_'ing or something blocked us from serializing.
if(!held) unlock();
void check_stop(VM* vm, std::function<void ()> f) {
while(stop_p()) {
spinning_lock(vm, [&, this]{ f(); unset_stop(); });
}

return eNotLocked;
}

LockStatus lock(VM* vm) {
@@ -160,25 +146,23 @@ namespace rubinius {
}

void unlock() {
std::lock_guard<std::mutex> guard(wait_mutex_);
phase_flag_.store(0, std::memory_order_release);
wait_condition_.notify_all();
waiting_condition_.notify_all();
phase_flag_ = 0;
}

bool try_checkpoint(VM* vm);
void checkpoint(VM* vm);

uint64_t delay();
void detect_deadlock(uint64_t nanoseconds, uint64_t limit, VM* vm);
void detect_deadlock(uint64_t nanoseconds, uint64_t limit);

void list_threads();

VM* new_vm(SharedState* shared, const char* name = NULL);
void delete_vm(VM* vm);

void after_fork_child(STATE);

void restore_phase(VM* vm, Phase phase);
};
}

26 changes: 1 addition & 25 deletions machine/thread_phase.hpp
Original file line number Diff line number Diff line change
@@ -76,29 +76,6 @@ namespace rubinius {
}
};

template <typename T>
class LockUnmanaged {
T& lock_;
State* state_;
ThreadNexus::Phase phase_;

public:
LockUnmanaged(STATE, T& in_lock)
: lock_(in_lock)
, state_(state)
, phase_(state->vm()->thread_phase())
{
state_->vm()->thread_nexus()->waiting_phase(state_->vm());

lock_.lock();
}

~LockUnmanaged() {
lock_.unlock();
state_->vm()->thread_nexus()->restore_phase(state_->vm(), phase_);
}
};

template <typename T>
class LockWaiting {
T& lock_;
@@ -107,12 +84,11 @@ namespace rubinius {
LockWaiting(STATE, T& in_lock)
: lock_(in_lock)
{
ThreadNexus::Phase phase = state->vm()->thread_phase();
state->vm()->thread_nexus()->waiting_phase(state->vm());

lock_.lock();

state->vm()->thread_nexus()->restore_phase(state->vm(), phase);
state->vm()->thread_nexus()->managed_phase(state->vm());
}

~LockWaiting() {
10 changes: 7 additions & 3 deletions machine/vm.cpp
Original file line number Diff line number Diff line change
@@ -70,9 +70,9 @@ namespace rubinius {
, interrupt_by_kill_(false)
, check_local_interrupts_(false)
, thread_step_(false)
, wait_mutex_()
, wait_condition_()
, transition_flag_(eSuspending)
, fiber_wait_mutex_()
, fiber_wait_condition_()
, fiber_transition_flag_(eSuspending)
, interrupt_lock_()
, method_missing_reason_(eNone)
, constant_missing_reason_(vFound)
@@ -387,6 +387,10 @@ namespace rubinius {

void VM::set_zombie(STATE) {
state->shared().thread_nexus()->delete_vm(this);
set_zombie();
}

void VM::set_zombie() {
set_thread(nil<Thread>());
set_fiber(nil<Fiber>());
zombie_ = true;
75 changes: 48 additions & 27 deletions machine/vm.hpp
Original file line number Diff line number Diff line change
@@ -23,10 +23,12 @@
#include "sodium/randombytes.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <regex>
#include <string>
#include <thread>
#include <vector>
#include <setjmp.h>
#include <stdint.h>
@@ -89,6 +91,8 @@ namespace rubinius {
friend class State;

private:
static const int cWaitLimit = 100;

UnwindInfoSet unwinds_;

CallFrame* call_frame_;
@@ -104,18 +108,19 @@ namespace rubinius {
bool check_local_interrupts_;
bool thread_step_;

std::mutex wait_mutex_;
std::condition_variable wait_condition_;
std::mutex fiber_wait_mutex_;
std::condition_variable fiber_wait_condition_;

enum FiberTransition {
eSuspending,
eSuspended,
eResuming,
eRunning,
eCanceled,
eFinished
};

std::atomic<FiberTransition> transition_flag_;
std::atomic<FiberTransition> fiber_transition_flag_;

utilities::thread::SpinLock interrupt_lock_;

@@ -183,56 +188,64 @@ namespace rubinius {
return interrupt_lock_;
}

std::mutex& wait_mutex() {
return wait_mutex_;
std::mutex& fiber_wait_mutex() {
return fiber_wait_mutex_;
}

std::condition_variable& wait_condition() {
return wait_condition_;
std::condition_variable& fiber_wait_condition() {
return fiber_wait_condition_;
}

FiberTransition transition_flag() {
return transition_flag_;
FiberTransition fiber_transition_flag() {
return fiber_transition_flag_;
}

bool suspending_p() const {
return transition_flag_ == eSuspending;
return fiber_transition_flag_ == eSuspending;
}

bool suspended_p() const {
return transition_flag_ == eSuspended;
return fiber_transition_flag_ == eSuspended;
}

bool resuming_p() const {
return transition_flag_ == eResuming;
return fiber_transition_flag_ == eResuming;
}

bool running_p() const {
return transition_flag_ == eRunning;
return fiber_transition_flag_ == eRunning;
}

bool canceled_p() const {
return fiber_transition_flag_ == eCanceled;
}

bool finished_p() const {
return transition_flag_ == eFinished;
return fiber_transition_flag_ == eFinished;
}

void set_suspending() {
transition_flag_ = eSuspending;
fiber_transition_flag_ = eSuspending;
}

void set_suspended() {
transition_flag_ = eSuspended;
fiber_transition_flag_ = eSuspended;
}

void set_resuming() {
transition_flag_ = eResuming;
fiber_transition_flag_ = eResuming;
}

void set_running() {
transition_flag_ = eRunning;
fiber_transition_flag_ = eRunning;
}

void set_canceled() {
fiber_transition_flag_ = eCanceled;
}

void set_finished() {
transition_flag_ = eFinished;
fiber_transition_flag_ = eFinished;
}

void set_thread(Thread* thread);
@@ -247,6 +260,7 @@ namespace rubinius {
}

void set_zombie(STATE);
void set_zombie();

bool zombie_p() {
return zombie_;
@@ -268,6 +282,17 @@ namespace rubinius {
return shared.memory();
}

bool limited_wait_for(std::function<bool ()> f) {
bool status = false;

// TODO: randomize wait interval
for(int i = 0; i < cWaitLimit && !(status = f()); i++) {
std::this_thread::sleep_for(std::chrono::microseconds(10));
}

return status;
}

void set_start_time();
double run_time();

@@ -467,14 +492,10 @@ namespace rubinius {
void checkpoint(STATE) {
metrics().machine.checkpoints++;

ThreadNexus::LockStatus status = thread_nexus_->try_lock(this);
if(status != ThreadNexus::eNotLocked) {
metrics().machine.stops++;

collect_maybe(state);

if(status == ThreadNexus::eLocked) thread_nexus_->unlock();
}
thread_nexus_->check_stop(this, [this, state]{
metrics().machine.stops++;
collect_maybe(state);
});

if(profile_counter_++ >= profile_interval_) {
update_profile(state);
4 changes: 4 additions & 0 deletions machine/vm_thread_state.cpp
Original file line number Diff line number Diff line change
@@ -103,4 +103,8 @@ namespace rubinius {
void VMThreadState::raise_thread_kill() {
raise_reason_ = cThreadKill;
}

void VMThreadState::raise_fiber_cancel() {
raise_reason_ = cFiberCancel;
}
}
1 change: 1 addition & 0 deletions machine/vm_thread_state.hpp
Original file line number Diff line number Diff line change
@@ -61,6 +61,7 @@ namespace rubinius {
void raise_exit(Object* code);
void raise_throw(Object* dest, Object* value);
void raise_thread_kill();
void raise_fiber_cancel();
};
};

7 changes: 7 additions & 0 deletions spec/ruby/core/fiber/count_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
require File.expand_path('../../../spec_helper', __FILE__)

describe "Fiber.count" do
it "returns the number of Fibers that would be returned by .list" do
Fiber.count.should == Fiber.list.count
end
end
7 changes: 7 additions & 0 deletions spec/ruby/core/thread/count_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
require File.expand_path('../../../spec_helper', __FILE__)

describe "Thread.count" do
it "returns the number of Threads that would be returned by .list" do
Thread.count.should == Thread.list.count
end
end

0 comments on commit 9a0726c

Please sign in to comment.