Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added Fiber#dispose and auto-cleanup of Fiber.
Browse files Browse the repository at this point in the history
A Fiber is composed of both managed (ie object memory) resources and unmanaged
(ie pthread) resources. When a Fiber completes normally, the unmanaged
resources can be reclaimed simply.

When a Fiber that has not completed normally goes out of 'scope' (ie is no
longer reachable from any reachable object), the garbage collector cannot
simply reclaim the managed resources allocated to the Fiber because the Fiber
composes native resources (ie the pthread instance). In this case, the Fiber's
pthread invoke function needs to be forced to exit so that the pthread
instance can be reclaimed.
brixen committed Jun 26, 2016
1 parent faf044a commit 93465e4
Showing 15 changed files with 181 additions and 7 deletions.
5 changes: 5 additions & 0 deletions core/fiber.rb
Original file line number Diff line number Diff line change
@@ -54,6 +54,11 @@ def transfer(*args)
raise PrimitiveFailure, "Fiber#transfer primitive failed"
end

def dispose
Rubinius.primitive :fiber_dispose
raise PrimitiveFailure, "Fiber#dispose primitive failed"
end

def alive?
status != "dead"
end
57 changes: 51 additions & 6 deletions machine/builtin/fiber.cpp
Original file line number Diff line number Diff line change
@@ -108,6 +108,35 @@ namespace rubinius {
}
}

void Fiber::cancel(STATE) {
{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

vm()->thread_state()->raise_fiber_cancel();

state->vm()->set_suspending();

restart_context(state->vm());
wakeup();

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->fiber_wait_mutex());
vm()->fiber_wait_condition().notify_one();
}
}

vm()->limited_wait_for([this]{ return vm()->running_p(); });

// Release the canceled Fiber.
state->vm()->set_suspended();

vm()->limited_wait_for([this]{ return vm()->zombie_p(); });

vm()->set_canceled();

state->vm()->set_running();
}

void Fiber::suspend_and_continue(STATE) {
UnmanagedPhase unmanaged(state);

@@ -138,6 +167,8 @@ namespace rubinius {
Object* Fiber::return_value(STATE) {
if(vm()->thread_state()->raise_reason() == cNone) {
return state->vm()->thread()->fiber_value();
} else if(vm()->thread_state()->raise_reason() == cFiberCancel) {
return NULL;
} else {
invoke_context()->thread_state()->set_state(vm()->thread_state());
return NULL;
@@ -178,12 +209,14 @@ namespace rubinius {
vm->thread()->fiber_value(state, cNil);
}

if(vm->fiber()->status() == eTransfer) {
// restart the root Fiber
vm->thread()->fiber()->invoke_context(vm);
vm->thread()->fiber()->restart(state);
} else {
vm->fiber()->invoke_context()->fiber()->restart(state);
if(vm->thread_state()->raise_reason() != cFiberCancel) {
if(vm->fiber()->status() == eTransfer) {
// restart the root Fiber
vm->thread()->fiber()->invoke_context(vm);
vm->thread()->fiber()->restart(state);
} else {
vm->fiber()->invoke_context()->fiber()->restart(state);
}
}

{
@@ -378,6 +411,12 @@ namespace rubinius {
return return_value(state);
}

Object* Fiber::dispose(STATE) {
cancel(state);

return this;
}

Object* Fiber::s_yield(STATE, Arguments& args) {
Fiber* fiber = state->vm()->fiber();
OnStack<1> os(state, fiber);
@@ -424,6 +463,12 @@ namespace rubinius {
}

if(fiber->vm()) {
if(!state->shared().halting_p()) {
if(!fiber->vm()->zombie_p()) {
fiber->cancel(state);
}
}

if(fiber->vm()->zombie_p()) {
VM::discard(state, fiber->vm());
fiber->vm(NULL);
4 changes: 4 additions & 0 deletions machine/builtin/fiber.hpp
Original file line number Diff line number Diff line change
@@ -123,6 +123,7 @@ namespace rubinius {

void start(STATE, Arguments& args);
void restart(STATE);
void cancel(STATE);
void suspend_and_continue(STATE);

// Rubinius.primitive :fiber_status
@@ -134,6 +135,9 @@ namespace rubinius {
// Rubinius.primitive :fiber_transfer
Object* transfer(STATE, Arguments& args);

// Rubinius.primitive :fiber_dispose
Object* dispose(STATE);

public: /* TypeInfo */

class Info : public TypeInfo {
3 changes: 3 additions & 0 deletions machine/builtin/system.cpp
Original file line number Diff line number Diff line change
@@ -1739,6 +1739,9 @@ namespace rubinius {
case cThreadKill:
reason = state->symbol("thread_kill");
break;
case cFiberCancel:
reason = state->symbol("fiber_cancel");
break;
}

tuple->put(state, 0, reason);
6 changes: 6 additions & 0 deletions machine/environment.cpp
Original file line number Diff line number Diff line change
@@ -546,6 +546,8 @@ namespace rubinius {
void Environment::halt(STATE, int exit_code) {
utilities::thread::Mutex::LockGuard guard(halt_lock_);

state->shared().set_halting();

if(state->shared().config.system_log_lifetime.value) {
logger::write("process: exit: %s %d %fs",
shared->pid.c_str(), exit_code, shared->run_time());
@@ -562,6 +564,8 @@ namespace rubinius {
shared->machine_threads()->shutdown(state);
}

shared->finalizer()->dispose(state);

shared->thread_nexus()->lock(state->vm());

shared->finalizer()->finish(state);
@@ -743,5 +747,7 @@ namespace rubinius {

State main_state(vm);
state->shared().start_signals(&main_state);

state->shared().set_running();
}
}
2 changes: 2 additions & 0 deletions machine/instructions.cpp
Original file line number Diff line number Diff line change
@@ -195,6 +195,8 @@ Object* MachineCode::interpreter(STATE, MachineCode* const mcode) {
case cExit:
call_frame->scope->flush_to_heap(state);
return NULL;
case cFiberCancel:
return NULL;
default:
break;
} // switch
39 changes: 38 additions & 1 deletion machine/memory/finalizer.cpp
Original file line number Diff line number Diff line change
@@ -25,6 +25,13 @@

namespace rubinius {
namespace memory {
void NativeFinalizer::dispose(STATE) {
// TODO: consider building this on the TypeInfo structure.
if(Fiber* fiber = try_as<Fiber>(object())) {
if(!fiber->vm()->zombie_p()) fiber->cancel(state);
}
}

void NativeFinalizer::finalize(STATE) {
(*finalizer_)(state, object());
}
@@ -35,6 +42,9 @@ namespace rubinius {
}
}

void ExtensionFinalizer::dispose(STATE) {
}

void ExtensionFinalizer::finalize(STATE) {
ManagedPhase managed(state);

@@ -84,6 +94,9 @@ namespace rubinius {
}
}

void ManagedFinalizer::dispose(STATE) {
}

void ManagedFinalizer::finalize(STATE) {
ManagedPhase managed(state);

@@ -168,6 +181,7 @@ namespace rubinius {
}

void FinalizerThread::initialize(STATE) {
Thread::create(state, vm());
synchronization_ = new Synchronization();
}

@@ -212,12 +226,35 @@ namespace rubinius {
vm()->metrics().gc.objects_finalized++;
}
}

state->vm()->thread()->vm()->set_zombie(state);
}

void FinalizerThread::dispose(STATE) {
finishing_ = true;

std::lock_guard<std::mutex> guard(list_mutex());

for(FinalizerObjects::iterator i = process_list_.begin();
i != process_list_.end();
++i)
{
FinalizerObject* fo = *i;
fo->dispose(state);
}

for(FinalizerObjects::iterator i = live_list_.begin();
i != live_list_.end();
++i)
{
FinalizerObject* fo = *i;
fo->dispose(state);
}
}

void FinalizerThread::finish(STATE) {
finishing_ = true;

// TODO: cleanup
while(!process_list_.empty()) {
FinalizerObject* fo = process_list_.back();
process_list_.pop_back();
5 changes: 5 additions & 0 deletions machine/memory/finalizer.hpp
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ namespace rubinius {
object_ = obj;
}

virtual void dispose(STATE) = 0;
virtual void finalize(STATE) = 0;
virtual void mark(ImmixGC* gc) = 0;
virtual bool match_p(STATE, Object* object, Object* finalizer) = 0;
@@ -53,6 +54,7 @@ namespace rubinius {
, finalizer_(finalizer)
{ }

void dispose(STATE);
void finalize(STATE);
void mark(ImmixGC* gc);
bool match_p(STATE, Object* object, Object* finalizer) { return false; }
@@ -67,6 +69,7 @@ namespace rubinius {
, finalizer_(finalizer)
{ }

void dispose(STATE);
void finalize(STATE);
void mark(ImmixGC* gc);
bool match_p(STATE, Object* object, Object* finalizer) { return false; }
@@ -81,6 +84,7 @@ namespace rubinius {
, finalizer_(finalizer)
{ }

void dispose(STATE);
void finalize(STATE);
void mark(ImmixGC* gc);
bool match_p(STATE, Object* object, Object* finalizer);
@@ -128,6 +132,7 @@ namespace rubinius {
}

void finish(STATE);
void dispose(STATE);

void native_finalizer(STATE, Object* obj, FinalizerFunction func);
void extension_finalizer(STATE, Object* obj, FinalizerFunction func);
2 changes: 2 additions & 0 deletions machine/memory/immix_marker.cpp
Original file line number Diff line number Diff line change
@@ -108,6 +108,8 @@ namespace memory {
}

state->memory()->clear_mature_mark_in_progress();

state->vm()->thread()->vm()->set_zombie(state);
}
}
}
1 change: 1 addition & 0 deletions machine/raise_reason.hpp
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ namespace rubinius {
cExit,
cCatchThrow,
cThreadKill,
cFiberCancel,
};
}

1 change: 1 addition & 0 deletions machine/shared_state.cpp
Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ namespace rubinius {
, type_info_lock_()
, code_resource_lock_()
, use_capi_lock_(false)
, phase_(eBooting)
, om(NULL)
, global_cache(new GlobalCache)
, config(config)
34 changes: 34 additions & 0 deletions machine/shared_state.hpp
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
#include "capi/capi_constants.h"

#include <unistd.h>
#include <atomic>
#include <string>
#include <vector>

@@ -82,6 +83,13 @@ namespace rubinius {
*/

class SharedState {
public:
enum Phase {
eBooting,
eRunning,
eHalting,
};

private:
ThreadNexus* thread_nexus_;
MachineThreads* machine_threads_;
@@ -126,6 +134,8 @@ namespace rubinius {
bool use_capi_lock_;
int primitive_hits_[Primitives::cTotalPrimitives];

std::atomic<Phase> phase_;

public:
Globals globals;
Memory* om;
@@ -141,6 +151,30 @@ namespace rubinius {
SharedState(Environment* env, Configuration& config, ConfigParser& cp);
~SharedState();

bool booting_p() {
return phase_ == eBooting;
}

void set_booting() {
phase_ = eBooting;
}

bool running_p() {
return phase_ == eRunning;
}

void set_running() {
phase_ = eRunning;
}

bool halting_p() {
return phase_ == eHalting;
}

void set_halting() {
phase_ = eHalting;
}

int size();

void set_initialized() {
24 changes: 24 additions & 0 deletions machine/vm.hpp
Original file line number Diff line number Diff line change
@@ -23,10 +23,12 @@
#include "sodium/randombytes.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <regex>
#include <string>
#include <thread>
#include <vector>
#include <setjmp.h>
#include <stdint.h>
@@ -89,6 +91,8 @@ namespace rubinius {
friend class State;

private:
static const int cWaitLimit = 100;

UnwindInfoSet unwinds_;

CallFrame* call_frame_;
@@ -112,6 +116,7 @@ namespace rubinius {
eSuspended,
eResuming,
eRunning,
eCanceled,
eFinished
};

@@ -211,6 +216,10 @@ namespace rubinius {
return fiber_transition_flag_ == eRunning;
}

bool canceled_p() const {
return fiber_transition_flag_ == eCanceled;
}

bool finished_p() const {
return fiber_transition_flag_ == eFinished;
}
@@ -231,6 +240,10 @@ namespace rubinius {
fiber_transition_flag_ = eRunning;
}

void set_canceled() {
fiber_transition_flag_ = eCanceled;
}

void set_finished() {
fiber_transition_flag_ = eFinished;
}
@@ -268,6 +281,17 @@ namespace rubinius {
return shared.memory();
}

bool limited_wait_for(std::function<bool ()> f) {
bool status = false;

// TODO: randomize wait interval
for(int i = 0; i < cWaitLimit && !(status = f()); i++) {
std::this_thread::sleep_for(std::chrono::microseconds(10));
}

return status;
}

void set_start_time();
double run_time();

4 changes: 4 additions & 0 deletions machine/vm_thread_state.cpp
Original file line number Diff line number Diff line change
@@ -103,4 +103,8 @@ namespace rubinius {
void VMThreadState::raise_thread_kill() {
raise_reason_ = cThreadKill;
}

void VMThreadState::raise_fiber_cancel() {
raise_reason_ = cFiberCancel;
}
}
1 change: 1 addition & 0 deletions machine/vm_thread_state.hpp
Original file line number Diff line number Diff line change
@@ -61,6 +61,7 @@ namespace rubinius {
void raise_exit(Object* code);
void raise_throw(Object* dest, Object* value);
void raise_thread_kill();
void raise_fiber_cancel();
};
};

0 comments on commit 93465e4

Please sign in to comment.