Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: rubinius/rubinius
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 3d2adf1b20f5
Choose a base ref
...
head repository: rubinius/rubinius
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: b4292328f462
Choose a head ref
  • 2 commits
  • 4 files changed
  • 1 contributor

Commits on Jun 20, 2016

  1. Copy the full SHA
    5f22ac8 View commit details
  2. Fixed race starting Fiber.

    brixen committed Jun 20, 2016
    Copy the full SHA
    b429232 View commit details
Showing with 119 additions and 67 deletions.
  1. +85 −64 machine/builtin/fiber.cpp
  2. +19 −3 machine/builtin/fiber.hpp
  3. +8 −0 machine/vm.hpp
  4. +7 −0 spec/ruby/core/thread/fiber_list_spec.rb
149 changes: 85 additions & 64 deletions machine/builtin/fiber.cpp
Original file line number Diff line number Diff line change
@@ -78,49 +78,53 @@ namespace rubinius {
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

if(!vm()->suspended_p()) {
Exception::raise_fiber_error(state, "attempt to restart non-suspended fiber");
}

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->wait_mutex());
vm()->wait_condition().notify_one();
std::ostringstream msg;
msg << "attempt to restart non-suspended (" << vm()->transition_flag() << ") fiber";
Exception::raise_fiber_error(state, msg.str().c_str());
}

state->vm()->set_suspending();

restart_context(state->vm());
wakeup();

{
UnmanagedPhase unmanaged(state);

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->wait_mutex());
vm()->wait_condition().notify_one();
}
}
}

while(!vm()->running_p()) {
; // spin wait
}

{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());
state->vm()->thread()->current_fiber(state, this);
}
}

void Fiber::suspend(STATE) {
void Fiber::suspend_and_continue(STATE) {
{
std::unique_lock<std::mutex> lk(vm()->wait_mutex());
vm()->set_suspended();

vm()->set_suspended();
{
UnmanagedPhase unmanaged(state);
vm()->wait_condition().wait(lk);
}

while(!wakeup_p()) {
vm()->wait_condition().wait(lk);
}
}
clear_wakeup();
vm()->set_resuming();
}

while(invoke_context()->running_p()) {
; // spin wait
}

{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

vm()->set_running();

while(invoke_context()->suspending_p()) {
while(!restart_context()->suspended_p()) {
; // spin wait
}

@@ -159,7 +163,7 @@ namespace rubinius {

NativeMethod::init_thread(state);

vm->fiber()->suspend(state);
vm->fiber()->suspend_and_continue(state);

Object* value = vm->fiber()->block()->send(state, G(sym_call),
as<Array>(vm->thread()->fiber_value()), vm->fiber()->block());
@@ -179,8 +183,12 @@ namespace rubinius {
vm->fiber()->invoke_context()->fiber()->restart(state);
}

vm->fiber()->status(eDead);
vm->fiber()->vm()->set_finished();
{
std::lock_guard<std::mutex> guard(vm->wait_mutex());

vm->fiber()->status(eDead);
vm->set_suspended();
}

vm->unmanaged_phase();

@@ -295,60 +303,69 @@ namespace rubinius {
}

Object* Fiber::resume(STATE, Arguments& args) {
if(state->vm()->thread() != thread()) {
Exception::raise_fiber_error(state, "attempt to resume fiber across threads");
}
{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

unpack_arguments(state, args);

if(status() == eCreated) {
start(state, args);
} else if(status() == eTransfer) {
Exception::raise_fiber_error(state, "attempt to resume transfered fiber");
} else if(status() == eRunning) {
Exception::raise_fiber_error(state, "attempt to resume running fiber");
} else if(status() == eDead) {
Exception::raise_fiber_error(state, "attempt to resume dead fiber");
} else if(root_p()) {
Exception::raise_fiber_error(state, "attempt to resume root fiber");
}
if(state->vm()->thread() != thread()) {
Exception::raise_fiber_error(state, "attempt to resume fiber across threads");
} else if(status() == eTransfer) {
Exception::raise_fiber_error(state, "attempt to resume transfered fiber");
} else if(status() == eRunning) {
Exception::raise_fiber_error(state, "attempt to resume running fiber");
} else if(status() == eDead) {
Exception::raise_fiber_error(state, "attempt to resume dead fiber");
} else if(root_p()) {
Exception::raise_fiber_error(state, "attempt to resume root fiber");
}

unpack_arguments(state, args);
invoke_context(state->vm());

status(eRunning);
invoke_context(state->vm());
if(status() == eCreated) {
start(state, args);
}

status(eRunning);
}

// Being cooperative...
restart(state);

// Through the worm hole...
state->vm()->fiber()->suspend(state);
state->vm()->fiber()->suspend_and_continue(state);

// We're back...
return return_value(state);
}

Object* Fiber::transfer(STATE, Arguments& args) {
if(state->vm()->thread() != thread()) {
Exception::raise_fiber_error(state, "attempt to transfer fiber across threads");
}
{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

unpack_arguments(state, args);
if(state->vm()->thread() != thread()) {
Exception::raise_fiber_error(state, "attempt to transfer fiber across threads");
} else if(status() == eDead) {
Exception::raise_fiber_error(state, "attempt to transfer to dead fiber");
} else if(state->vm()->fiber() == this) {
// This should arguably be a FiberError
return args.as_array(state);
}

if(status() == eCreated) {
start(state, args);
} else if(state->vm()->fiber() == this) {
return args.as_array(state);
} else if(status() == eDead) {
Exception::raise_fiber_error(state, "attempt to transfer to dead fiber");
}
unpack_arguments(state, args);
invoke_context(state->vm());

if(status() == eCreated) {
start(state, args);
}

status(eTransfer);
invoke_context(state->vm());
status(eTransfer);
}

// Being cooperative...
restart(state);

// Through the worm hole...
state->vm()->fiber()->suspend(state);
state->vm()->fiber()->suspend_and_continue(state);

// We're back...
return return_value(state);
@@ -358,20 +375,24 @@ namespace rubinius {
Fiber* fiber = state->vm()->fiber();
OnStack<1> os(state, fiber);

if(fiber->root_p()) {
Exception::raise_fiber_error(state, "can't yield from root fiber");
} else if(fiber->status() == eTransfer) {
Exception::raise_fiber_error(state, "can't yield from transferred fiber");
}
{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

fiber->unpack_arguments(state, args);
fiber->status(eYielding);
if(fiber->root_p()) {
Exception::raise_fiber_error(state, "can't yield from root fiber");
} else if(fiber->status() == eTransfer) {
Exception::raise_fiber_error(state, "can't yield from transferred fiber");
}

fiber->unpack_arguments(state, args);
fiber->status(eYielding);
}

// Being cooperative...
fiber->invoke_context()->fiber()->restart(state);

// Through the worm hole...
fiber->suspend(state);
fiber->suspend_and_continue(state);

// We're back...
return fiber->return_value(state);
22 changes: 19 additions & 3 deletions machine/builtin/fiber.hpp
Original file line number Diff line number Diff line change
@@ -48,8 +48,10 @@ namespace rubinius {

attr_field(vm, VM*);
attr_field(invoke_context, VM*);
attr_field(restart_context, VM*);

std::atomic<Status> status_;
std::atomic<bool> wakeup_;

public:
static void bootstrap(STATE);
@@ -63,10 +65,12 @@ namespace rubinius {
obj->fiber_id(Fixnum::from(++Fiber::fiber_ids_));
obj->source(nil<String>());
obj->thread(state->vm()->thread());
obj->status(eCreated);
obj->start_time(get_current_time());
obj->vm(NULL);
obj->invoke_context(state->vm());
obj->restart_context(state->vm());
obj->status(eCreated);
obj->clear_wakeup();
}

static void finalize(STATE, Fiber* fib);
@@ -89,6 +93,8 @@ namespace rubinius {
// Rubinius.primitive :fiber_s_main
static Fiber* s_main(STATE);

bool root_p();

Status status() {
return status_;
}
@@ -97,14 +103,24 @@ namespace rubinius {
status_ = status;
}

bool root_p();
void wakeup() {
wakeup_ = true;
}

void clear_wakeup() {
wakeup_ = false;
}

bool wakeup_p() {
return wakeup_;
}

void unpack_arguments(STATE, Arguments& args);
Object* return_value(STATE);

void start(STATE, Arguments& args);
void restart(STATE);
void suspend(STATE);
void suspend_and_continue(STATE);

// Rubinius.primitive :fiber_status
String* status(STATE);
8 changes: 8 additions & 0 deletions machine/vm.hpp
Original file line number Diff line number Diff line change
@@ -191,6 +191,10 @@ namespace rubinius {
return wait_condition_;
}

FiberTransition transition_flag() {
return transition_flag_;
}

bool suspending_p() const {
return transition_flag_ == eSuspending;
}
@@ -207,6 +211,10 @@ namespace rubinius {
return transition_flag_ == eRunning;
}

bool finished_p() const {
return transition_flag_ == eFinished;
}

void set_suspending() {
transition_flag_ = eSuspending;
}
7 changes: 7 additions & 0 deletions spec/ruby/core/thread/fiber_list_spec.rb
Original file line number Diff line number Diff line change
@@ -6,7 +6,11 @@
end

it "returns Fibers only for the Thread" do
start = false

t1 = Thread.new do
Thread.pass until start

f1 = Fiber.new {}
f2 = Fiber.new {}

@@ -17,9 +21,12 @@
end

t2 = Thread.new do
Thread.pass until start

t2.fiber_list.size.should == 0
end

start = true
t1.join
t2.join
end