Skip to content

Commit

Permalink
Showing 5 changed files with 154 additions and 330 deletions.
191 changes: 41 additions & 150 deletions core/thread.rb
Original file line number Diff line number Diff line change
@@ -6,30 +6,54 @@
class Thread
attr_reader :recursive_objects
attr_reader :pid
attr_reader :exception

def self.start(*args)
raise ArgumentError.new("no block passed to Thread.start") unless block_given?
def self.new(*args, &block)
thr = Rubinius.invoke_primitive :thread_s_new, args, block, self

thr = Rubinius.invoke_primitive :thread_allocate, self

Rubinius.asm(args, thr) do |args, obj|
run obj
dup
Rubinius::VariableScope.of_sender.locked!

run args
push_block
send_with_splat :__thread_initialize__, 0, true
# no pop here, as .asm blocks imply a pop as they're not
# allowed to leak a stack value
unless thr.send :initialized?
raise ThreadError, "Thread#initialize not called"
end

return thr
end

def self.start(*args, &block)
raise ArgumentError.new("no block passed to Thread.start") unless block

Rubinius.invoke_primitive :thread_s_start, args, block, self
end

class << self
alias_method :fork, :start
end

def initialize(*args, &block)
unless block
Kernel.raise ThreadError, "no block passed to Thread#initialize"
end

if @initialized
Kernel.raise ThreadError, "already initialized thread"
end

@args = args
@block = block
@initialized = true

Thread.current.group.add self
end

alias_method :__thread_initialize__, :initialize

def initialized?
@initialized
end

private :initialized?

def self.current
Rubinius.primitive :thread_current
Kernel.raise PrimitiveFailure, "Thread.current primitive failed"
@@ -54,21 +78,6 @@ def self.stop
nil
end

def fork
Rubinius.primitive :thread_fork
Kernel.raise ThreadError, "Thread#fork failed, thread already started or dead"
end

def raise_prim(exc)
Rubinius.primitive :thread_raise
Kernel.raise PrimitiveFailure, "Thread#raise primitive failed"
end

def kill_prim
Rubinius.primitive :thread_kill
Kernel.raise PrimitiveFailure, "Thread#kill primitive failed"
end

def wakeup
Rubinius.primitive :thread_wakeup
Kernel.raise ThreadError, "Thread#wakeup primitive failed, thread may be dead"
@@ -95,21 +104,6 @@ def mri_backtrace
Kernel.raise PrimitiveFailure, "Thread#mri_backtrace primitive failed"
end

def unlock_locks
Rubinius.primitive :thread_unlock_locks
Kernel.raise PrimitiveFailure, "Thread#unlock_locks primitive failed"
end

def current_exception
Rubinius.primitive :thread_current_exception
Kernel.raise PrimitiveFailure, "Thread#current_exception primitive failed"
end

def set_exception(exception)
Rubinius.primitive :thread_set_exception
Kernel.raise PrimitiveFailure, "Thread#set_exception primitive failed"
end

@abort_on_exception = false

def self.abort_on_exception
@@ -138,50 +132,6 @@ def inspect

alias_method :to_s, :inspect

def self.new(*args)
thr = Rubinius.invoke_primitive :thread_allocate, self

Rubinius::VariableScope.of_sender.locked!

Rubinius.asm(args, thr) do |args, obj|
run obj
dup

run args
push_block
send_with_splat :initialize, 0, true
# no pop here, as .asm blocks imply a pop as they're not
# allowed to leak a stack value
end

unless thr.thread_is_setup?
raise ThreadError, "Thread#initialize not called"
end

return thr
end

def initialize(*args, &block)
unless block
Kernel.raise ThreadError, "no block passed to Thread#initialize"
end

@args = args
@block = block

th_group = Thread.current.group

th_group.add self

fork
end

alias_method :__thread_initialize__, :initialize

def thread_is_setup?
@block != nil
end

def alive?
Rubinius.synchronize(self) do
@alive
@@ -233,14 +183,9 @@ def group
end

def raise(exc=undefined, msg=nil, trace=nil)
Rubinius.lock(self)

unless @alive
Rubinius.unlock(self)
return self
end
Rubinius.synchronize(self) do
return self unless @alive

begin
if undefined.equal? exc
no_argument = true
exc = active_exception
@@ -265,13 +210,10 @@ def raise(exc=undefined, msg=nil, trace=nil)
if self == Thread.current
Kernel.raise exc
else
raise_prim exc
Rubinius.invoke_primitive :thread_raise, self, exc
end
ensure
Rubinius.unlock(self)
end
end
private :raise_prim

def [](key)
locals_aref(Rubinius::Type.coerce_to_symbol(key))
@@ -355,61 +297,10 @@ def self.kill(thread)

alias_method :run, :wakeup

# Called by Thread#fork in the new thread
#
def __run__
begin
begin
Rubinius.unlock(self)
@result = @block.call(*@args)
ensure
begin
# We must lock self in a careful way.
#
# At this point, it's possible that an other thread does Thread#raise
# and then our execution is interrupted AT ANY GIVEN TIME. We
# absolutely must make sure to lock self as soon as possible to lock
# out interrupts from other threads.
#
# Rubinius.uninterrupted_lock(self) just does that.
#
# Notice that this can't be moved to other methods and there should be
# no preceding code before it in the enclosing ensure clause.
# These are to prevent any interrupted lock failures.
Rubinius.uninterrupted_lock(self)

# Now, we locked self. No other thread can interrupt this thread
# anymore.
# If there is any not-triggered interrupt, check and process it. In
# either case, we jump to the following ensure clause.
Rubinius.check_interrupts
ensure
unlock_locks
end
end
rescue Exception => e
set_exception e

STDERR.puts "Exception in thread: #{e.message} (#{e.class})" if $DEBUG

if abort_on_exception or Thread.abort_on_exception
Thread.main.raise e
end
ensure
Rubinius::Mirror.reflect(@group).remove self

if Rubinius.thread_state[0] == :thread_kill
@killed = true
end

Rubinius.unlock(self)
end
end

def kill
@sleep = false
Rubinius.synchronize(self) do
kill_prim
Rubinius.invoke_primitive :thread_kill, self
end
self
end
@@ -419,7 +310,7 @@ def kill

def value
join
@killed ? nil : @result
@value
end

def active_exception
14 changes: 14 additions & 0 deletions core/thread_mirror.rb
Original file line number Diff line number Diff line change
@@ -4,6 +4,20 @@ class Thread < Mirror
def group=(group)
Rubinius.invoke_primitive :object_set_ivar, @object, :@group, group
end

def finish
Rubinius::Mirror.reflect(@object.group).remove @object

if exception = @object.exception
if $DEBUG
STDERR.puts "Exception in thread: #{exception.message} (#{exception.class})"
end

if @object.abort_on_exception or Thread.abort_on_exception
Thread.main.raise exception
end
end
end
end
end
end
147 changes: 65 additions & 82 deletions machine/builtin/thread.cpp
Original file line number Diff line number Diff line change
@@ -110,42 +110,73 @@ namespace rubinius {
}
}

Object* send_run(STATE) {
return state->vm()->thread.get()->send(state, state->symbol("__run__"));
Object* run_instance(STATE) {
// These are all referenced, so OnStack is not necessary.
Thread* thread = state->vm()->thread.get();
Array* args = thread->args();
Object* block = thread->block();

if(thread->initialized()->false_p() || args->nil_p() || block->nil_p()) {
return cNil;
}

Object* value = block->send(state, G(sym_call), args, block);

thread->exception(state, state->vm()->thread_state()->current_exception());

if(state->vm()->thread_state()->raise_reason() == cThreadKill) {
thread->value(state, cNil);
} else if(value) {
thread->value(state, value);
}

Object* mirror = G(mirror)->send(state, state->symbol("reflect"),
Array::from_tuple(state, Tuple::from(state, 1, thread)));
mirror->send(state, state->symbol("finish"));

return value;
}

Thread* Thread::allocate(STATE, Object* self) {
Thread* thread = Thread::create(state, self, send_run);
Thread* Thread::s_new(STATE, Object* self, Array* args, Object* block) {
Thread* thread = Thread::create(state, self, run_instance);

CallFrame* call_frame = state->vm()->get_ruby_frame(1);

utilities::logger::write("create thread: %s, %s:%d",
utilities::logger::write("new thread: %s, %s:%d",
thread->vm()->name().c_str(),
call_frame->file(state)->cpp_str(state).c_str(),
call_frame->line(state));

if(!thread->send(state, state->symbol("initialize"), args, block, true)) {
return NULL;
}

thread->fork(state);

return thread;
}

Thread* Thread::current(STATE) {
return state->vm()->thread.get();
}
Thread* Thread::s_start(STATE, Object* self, Array* args, Object* block) {
Thread* thread = Thread::create(state, self, run_instance);

Object* Thread::unlock_locks(STATE) {
Thread* self = this;
OnStack<1> os(state, self);
CallFrame* call_frame = state->vm()->get_ruby_frame(1);

memory::LockedObjects& los = self->vm_->locked_objects();
for(memory::LockedObjects::iterator i = los.begin();
i != los.end();
++i) {
ObjectHeader* locked = *i;
if(locked != self) {
locked->unlock_for_terminate(state);
}
utilities::logger::write("start thread: %s, %s:%d",
thread->vm()->name().c_str(),
call_frame->file(state)->cpp_str(state).c_str(),
call_frame->line(state));

if(!thread->send(state, state->symbol("__thread_initialize__"), args, block, true)) {
return NULL;
}
los.clear();
return cNil;

thread->fork(state);

return thread;
}

Thread* Thread::current(STATE) {
return state->vm()->thread.get();
}

void Thread::unlock_after_fork(STATE) {
@@ -255,40 +286,20 @@ namespace rubinius {
Thread* self = this;
OnStack<1> os(state, self);

self->init_lock_.lock();

pthread_attr_t attrs;
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, THREAD_STACK_SIZE);
pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);

int error;

error = pthread_create(&self->vm_->os_thread(), &attrs,
int status = pthread_create(&self->vm_->os_thread(), &attrs,
function, (void*)self->vm_);

pthread_attr_destroy(&attrs);

if(error) {
return error;
} else {
// We can't return from here until the new thread completes a minimal
// initialization. After the initialization, it unlocks init_lock_.
// So, wait here until we can lock init_lock_ after that.
self->init_lock_.lock();

// We locked init_lock_. And we are sure that the new thread completed
// the initialization.
// Locking init_lock_ isn't needed anymore, so unlock it.
self->init_lock_.unlock();

return 0;
}
return status;
}

Object* Thread::main_thread(STATE) {
state->vm()->thread->hard_unlock(state);

std::string& runtime = state->shared().env()->runtime_path();

G(rubinius)->set_const(state, "Signature",
@@ -301,7 +312,6 @@ namespace rubinius {

state->shared().env()->load_core(state, runtime);

state->vm()->thread->alive(state, cTrue);
state->vm()->thread_state()->clear();

state->shared().start_console(state);
@@ -325,11 +335,11 @@ namespace rubinius {
// Enable the JIT after the core library has loaded
G(jit)->enable(state);

Object* exit = instance->send(state, state->symbol("main"));
Object* value = instance->send(state, state->symbol("main"));

state->shared().signals()->system_exit(state->vm()->thread_state()->raise_value());

return exit;
return value;
}

void* Thread::run(void* ptr) {
@@ -353,30 +363,23 @@ namespace rubinius {

NativeMethod::init_thread(state);

// Lock the thread object and unlock it at __run__ in the ruby land.
vm->thread->alive(state, cTrue);
vm->thread->init_lock_.unlock();

// TODO: remove use of init_lock
// Become GC-dependent after unlocking init_lock_ to avoid deadlocks.
// gc_dependent may lock when it detects GC is happening. Also the parent
// thread is locked until init_lock_ is unlocked by this child thread.
state->vm()->become_managed();
vm->thread->hard_lock(state, 0);

vm->shared.tool_broker()->thread_start(state);
Object* ret = vm->thread->function_(state);
Object* value = vm->thread->function_(state);
vm->shared.tool_broker()->thread_stop(state);

vm->thread->join_lock_.lock();
vm->thread->stopped();

memory::LockedObjects& los = state->vm()->locked_objects();
for(memory::LockedObjects::iterator i = los.begin();
i != los.end();
++i) {
memory::LockedObjects& locked_objects = state->vm()->locked_objects();
for(memory::LockedObjects::iterator i = locked_objects.begin();
i != locked_objects.end();
++i)
{
(*i)->unlock_for_terminate(state);
}
locked_objects.clear();

vm->thread->join_cond_.broadcast();
vm->thread->join_lock_.unlock();
@@ -387,7 +390,7 @@ namespace rubinius {

vm->become_unmanaged();

if(vm->main_thread_p() || (!ret && vm->thread_state()->raise_reason() == cExit)) {
if(vm->main_thread_p() || (!value && vm->thread_state()->raise_reason() == cExit)) {
state->shared().signals()->system_exit(vm->thread_state()->raise_value());
}

@@ -399,19 +402,12 @@ namespace rubinius {
return 0;
}

Object* Thread::fork(STATE) {
// If the thread is already alive or already ran, we can't use it anymore.
if(CBOOL(alive()) || !vm_ || vm_->zombie_p()) {
return Primitives::failure();
}

void Thread::fork(STATE) {
if(int error = start_thread(state, Thread::run)) {
char buf[RBX_STRERROR_BUFSIZE];
char* err = RBX_STRERROR(error, buf, RBX_STRERROR_BUFSIZE);
Exception::raise_thread_error(state, err);
}

return cNil;
}

Object* Thread::pass(STATE) {
@@ -443,19 +439,6 @@ namespace rubinius {
return exc;
}

Object* Thread::set_exception(STATE, Exception* exc) {
exception(state, exc);
return exc;
}

Object* Thread::current_exception(STATE) {
utilities::thread::SpinLock::LockGuard guard(init_lock_);

if(!vm_) return cNil;

return vm_->thread_state()->current_exception();
}

Object* Thread::kill(STATE) {
utilities::thread::SpinLock::LockGuard guard(init_lock_);

81 changes: 20 additions & 61 deletions machine/builtin/thread.hpp
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@ namespace rubinius {
* Thread execution.
*/
class Thread : public Object {
Array* args_; // slot
Object* block_; // slot
Object* alive_; // slot
Object* sleep_; // slot
Channel* control_channel_; // slot
@@ -30,12 +32,12 @@ namespace rubinius {
Randomizer* randomizer_; // slot
LookupTable* locals_; // slot
Object* group_; // slot
Object* result_; // slot
Object* value_; // slot
Exception* exception_; // slot
Object* critical_; // slot
Object* killed_; // slot
Fixnum* priority_; // slot
Fixnum* pid_; // slot
Object* initialized_; // slot

utilities::thread::SpinLock init_lock_;
utilities::thread::Mutex join_lock_;
@@ -53,28 +55,32 @@ namespace rubinius {

static void bootstrap(STATE);
static void initialize(STATE, Thread* obj) {
obj->alive_ = nil<Object>();
obj->args_ = nil<Array>();
obj->block_ = cNil;
obj->alive_ = cTrue;
obj->sleep_ = cFalse;
obj->control_channel_ = nil<Channel>();
obj->recursive_objects(state, LookupTable::create(state));
obj->debugger_thread_ = nil<Thread>();
obj->thread_id_ = nil<Fixnum>();
obj->randomizer_ = nil<Randomizer>();
obj->locals(state, LookupTable::create(state));
obj->group_ = nil<Object>();
obj->result_ = cFalse;
obj->group_ = cNil;
obj->value_ = cNil;
obj->exception_ = nil<Exception>();
obj->critical_ = cFalse;
obj->killed_ = cFalse;
obj->priority_ = Fixnum::from(0);
obj->pid_ = Fixnum::from(0);
obj->initialized_ = cFalse;
obj->init_lock_.init();
obj->join_lock_.init();
obj->join_cond_.init();
obj->vm_ = 0;
}

public:
attr_accessor(args, Array);
attr_accessor(block, Object);
attr_accessor(alive, Object);
attr_accessor(sleep, Object);
attr_accessor(control_channel, Channel);
@@ -84,39 +90,24 @@ namespace rubinius {
attr_accessor(randomizer, Randomizer);
attr_accessor(locals, LookupTable);
attr_accessor(group, Object);
attr_accessor(result, Object);
attr_accessor(value, Object);
attr_accessor(exception, Exception);
attr_accessor(critical, Object);
attr_accessor(killed, Object);
attr_accessor(priority, Fixnum);
attr_accessor(pid, Fixnum);
attr_accessor(initialized, Object);

VM* vm() const {
return vm_;
}

public:

/**
* Allocate a Thread object.
*
* Object is in a valid but not running state.
* It still assumes that #initialize will be
* called to fully set it up. The object is
* not yet associated with an actual native
* thread.
*
* This method also creates a new VM object
* to represent its state.
*
* @see Thread::fork()
* @see Thread::create()
*
* @see machine/vm.hpp
* @see kernel/thread.rb
*/
// Rubinius.primitive :thread_allocate
static Thread* allocate(STATE, Object* self);
// Rubinius.primitive :thread_s_new
static Thread* s_new(STATE, Object* self, Array* args, Object* block);

// Rubinius.primitive :thread_s_start
static Thread* s_start(STATE, Object* self, Array* args, Object* block);

/**
* Returns the Thread object for the state.
@@ -140,27 +131,7 @@ namespace rubinius {

public: /* Instance primitives */

/**
* Execute the Thread.
*
* Actually creates the native thread and starts it.
* The native thread will start executing this Thread's
* #__run__ method.
*
* @see Thread::allocate()
*
* @see kernel/thread.rb
*/
// Rubinius.primitive :thread_fork
Object* fork(STATE);

/**
* Execute the Thread.
*
* This leaves the thread in an attached state, so that
* a pthread_join() later on will work.
*/
int fork_attached(STATE);
void fork(STATE);

/**
* Retrieve the priority set for this Thread.
@@ -178,15 +149,6 @@ namespace rubinius {
// Rubinius.primitive :thread_raise
Object* raise(STATE, Exception* exc);

// Rubinius.primitive :thread_set_exception
Object* set_exception(STATE, Exception* exc);

/**
* Returns current exception
*/
// Rubinius.primitive :thread_current_exception
Object* current_exception(STATE);

/**
* Kill this Thread.
*/
@@ -222,9 +184,6 @@ namespace rubinius {
// Rubinius.primitive :thread_join
Thread* join(STATE, Object* timeout);

// Rubinius.primitive :thread_unlock_locks
Object* unlock_locks(STATE);

// This method must only be called after fork() with only one active
// thread.
void unlock_after_fork(STATE);
51 changes: 14 additions & 37 deletions machine/capi/thread.cpp
Original file line number Diff line number Diff line change
@@ -239,13 +239,14 @@ extern "C" {
Object* run_function(STATE) {
NativeMethodEnvironment* env = NativeMethodEnvironment::get();

Thread* self = state->vm()->thread.get();
Thread* thread = state->vm()->thread.get();

NativeMethod* nm = capi::c_as<NativeMethod>(self->locals_aref(state, state->symbol("function")));
Pointer* ptr = capi::c_as<Pointer>(self->locals_aref(state, state->symbol("argument")));
NativeMethod* nm = capi::c_as<NativeMethod>(
thread->locals_aref(state, state->symbol("function")));
Pointer* ptr = capi::c_as<Pointer>(thread->locals_aref(state, state->symbol("argument")));

self->locals_remove(state, state->symbol("function"));
self->locals_remove(state, state->symbol("argument"));
thread->locals_remove(state, state->symbol("function"));
thread->locals_remove(state, state->symbol("argument"));

NativeMethodFrame nmf(env, 0, nm);
CallFrame* previous_frame = 0;
@@ -267,19 +268,14 @@ extern "C" {
state->vm()->push_call_frame(&cf, previous_frame);

nmf.setup(
env->get_handle(self),
env->get_handle(thread),
env->get_handle(cNil),
env->get_handle(nm),
env->get_handle(nm->module()));

{
OnStack<3> os(state, self, nm, ptr);
self->hard_unlock(state);
}

ENTER_CAPI(state);

Object* ret = NULL;
Object* value = NULL;

ExceptionPoint ep(env);

@@ -288,24 +284,11 @@ extern "C" {
if(unlikely(ep.jumped_to())) {
LEAVE_CAPI(state);

state->vm()->pop_call_frame(previous_frame);

// Setup exception in thread so it's raised when joining
// Reload self because it might have been moved
self = state->vm()->thread.get();

{
OnStack<1> os(state, self);
self->hard_lock(state, false);
Exception* exc = capi::c_as<Exception>(self->current_exception(state));
self->exception(state, exc);
self->alive(state, cFalse);
self->hard_unlock(state);
}

return NULL;
// Set exception in thread so it's raised when joining.
state->vm()->thread.get()->exception(state,
capi::c_as<Exception>(state->vm()->thread_state()->current_exception()));
} else {
ret = env->get_object(nm->func()(ptr->pointer));
value = env->get_object(nm->func()(ptr->pointer));
}

LEAVE_CAPI(state);
@@ -316,15 +299,9 @@ extern "C" {
env->set_current_native_frame(nmf.previous());
ep.pop(env);

self = state->vm()->thread.get();

OnStack<1> os(state, self);
state->vm()->thread.get()->alive(state, cFalse);

self->hard_lock(state, false);
self->alive(state, cFalse);
self->hard_unlock(state);

return ret;
return value;
}

VALUE capi_thread_create(VALUE (*func)(ANYARGS), void* arg, const char* name, const char* file) {

0 comments on commit 9524ae1

Please sign in to comment.