Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: rubinius/rubinius
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 9540f89923e8
Choose a base ref
...
head repository: rubinius/rubinius
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 23b9f18cc1e7
Choose a head ref
  • 2 commits
  • 16 files changed
  • 1 contributor

Commits on Jun 13, 2016

  1. A bit of Enumerator cleanup.

    brixen committed Jun 13, 2016
    Copy the full SHA
    1b8941f View commit details

Commits on Jun 14, 2016

  1. Copy the full SHA
    23b9f18 View commit details
27 changes: 7 additions & 20 deletions core/enumerator.rb
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ def initialize(receiver_or_size=undefined, method_name=:each, *method_args, &blo
size = receiver_or_size
end

receiver = Generator.new(&block)
receiver = EnumeratorObject.new(&block)
else
if undefined.equal? receiver_or_size
raise ArgumentError, "Enumerator#initialize requires a block when called without arguments"
@@ -98,19 +98,7 @@ def each_with_index
def next
return @lookahead.shift unless @lookahead.empty?

unless @generator
# Allow #to_generator to return nil, indicating it has none for
# this method.
if @object.respond_to? :to_generator
@generator = @object.to_generator(@iter)
end

if !@generator and gen = FiberGenerator
@generator = gen.new(self)
else
@generator = ThreadGenerator.new(self, @object, @iter, @args)
end
end
@generator ||= Iterator.new self

begin
return @generator.next if @generator.next?
@@ -196,8 +184,9 @@ def <<(*args)
end
end

class Generator
class EnumeratorObject
include Enumerable

def initialize(&block)
raise LocalJumpError, "Expected a block to be given" unless block_given?

@@ -447,7 +436,7 @@ def zip(*lists)
end
end

class FiberGenerator
class Iterator
STACK_SIZE = 1_048_576

attr_reader :result
@@ -478,11 +467,9 @@ def rewind

def reset
@done = false
@fiber = Rubinius::Fiber.new stack_size: STACK_SIZE do
@fiber = Fiber.new stack_size: STACK_SIZE do
obj = @object
@result = obj.each do |*val|
Rubinius::Fiber.yield *val
end
@result = obj.each { |*val| Fiber.yield *val }
@done = true
end
end
2 changes: 1 addition & 1 deletion machine/builtin/channel.cpp
Original file line number Diff line number Diff line change
@@ -168,7 +168,7 @@ namespace rubinius {
}

state->vm()->clear_waiter();
state->vm()->thread->sleep(state, cFalse);
state->vm()->thread()->sleep(state, cFalse);

self->unpin();
self->_waiters_--;
157 changes: 141 additions & 16 deletions machine/builtin/fiber.cpp
Original file line number Diff line number Diff line change
@@ -8,10 +8,18 @@
#include "builtin/exception.hpp"
#include "builtin/fiber.hpp"
#include "builtin/lookup_table.hpp"
#include "builtin/native_method.hpp"
#include "builtin/object.hpp"
#include "builtin/thread.hpp"

#include "memory/gc.hpp"

#include "dtrace/dtrace.h"

#include "logger.hpp"

#include "missing/gettid.h"

#include <ostream>
#include <regex>
#include <string>
@@ -25,23 +33,127 @@ namespace rubinius {
fiber_ids_.store(0);
}

double Fiber::run_time() {
return timer::time_elapsed_seconds(start_time());
Object* Fiber::start_fiber(STATE, Fiber* fiber, Arguments& args) {
fiber->arguments(state, args.as_array(state));

pthread_attr_t attrs;
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, fiber->stack_size()->to_native());
pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);

pthread_create(&fiber->vm()->os_thread(), &attrs,
Fiber::run, (void*)fiber->vm());

pthread_attr_destroy(&attrs);

fiber->function(Fiber::continue_fiber);

return continue_fiber(state, fiber, args);
}

Object* Fiber::continue_fiber(STATE, Fiber* fiber, Arguments& args) {
/* signal fiber
* wait till fiber is running
* lock
* set waiting
* wait
* retrieve value
* return value
*/
return cNil;
}

Object* Fiber::dead_fiber(STATE, Fiber* fiber, Arguments& args) {
Exception::raise_fiber_error(state, "fiber is dead");
}

void* Fiber::run(void* ptr) {
VM* vm = reinterpret_cast<VM*>(ptr);
State state_obj(vm), *state = &state_obj;

vm->set_stack_bounds(vm->fiber()->stack_size()->to_native());
vm->set_current_thread();
vm->set_start_time();

RUBINIUS_THREAD_START(
const_cast<RBX_DTRACE_CHAR_P>(
vm->name().c_str()), vm->fiber()->fiber_id()->to_native(), 0);

vm->fiber()->pid(state, Fixnum::from(gettid()));

if(state->shared().config.machine_thread_log_lifetime.value) {
logger::write("fiber: run: %s, %d, %#x",
vm->name().c_str(), vm->fiber()->pid()->to_native(),
(intptr_t)pthread_self());
}

NativeMethod::init_thread(state);

state->vm()->managed_phase();

Object* value = vm->fiber()->send(state, G(sym_call),
vm->fiber()->arguments(), vm->fiber()->block());

vm->fiber()->function(Fiber::dead_fiber);

if(value) vm->fiber()->value(state, value);

// TODO: Fiber: propagate exception

vm->set_call_frame(NULL);

state->shared().report_profile(state);

NativeMethod::cleanup_thread(state);

// TODO: restart calling context

if(state->shared().config.machine_fiber_log_lifetime.value) {
logger::write("fiber: exit: %s %fs", vm->name().c_str(), vm->run_time());
}

vm->unmanaged_phase();

/* TODO: Fiber
if(vm->main_thread_p() || (!value && vm->thread_state()->raise_reason() == cExit)) {
state->shared().signals()->system_exit(vm->thread_state()->raise_value());
}
*/

vm->set_zombie(state);

RUBINIUS_THREAD_STOP(
const_cast<RBX_DTRACE_CHAR_P>(
vm->name().c_str()), vm->fiber()->fiber_id()->to_native(), 0);

return 0;
}

Fiber* Fiber::current(STATE) {
return nil<Fiber>();
return state->vm()->fiber();
}

Fiber* Fiber::create(STATE, Object* self, Object* stack_size, Object* callable) {
Fiber* fib = state->memory()->new_object<Fiber>(state, as<Class>(self));
fib->starter(state, callable);
Fiber* Fiber::create(STATE, Object* self, Object* stack_size, Object* block) {
Fiber* fiber = state->memory()->new_object<Fiber>(state, as<Class>(self));
fiber->block(state, block);

if(Fixnum* size = try_as<Fixnum>(stack_size)) {
state->vm()->validate_stack_size(state, size->to_native());
fib->stack_size(state, size);
fiber->stack_size(state, size);
}

std::ostringstream name;
name << "fiber." << fiber->fiber_id()->to_native();

fiber->vm(state->vm()->thread_nexus()->new_vm(&state->shared(), name.str().c_str()));
fiber->vm()->set_thread(state->vm()->thread());
fiber->vm()->set_fiber(fiber);

fiber->function(Fiber::start_fiber);

state->memory()->native_finalizer(state, fiber,
(memory::FinalizerFunction)&Fiber::finalize);

if(state->shared().config.machine_fiber_log_lifetime.value) {
const std::regex& filter = state->shared().config.machine_fiber_log_filter();

@@ -52,33 +164,46 @@ namespace rubinius {
<< ":" << call_frame->line(state);

logger::write("fiber: new: %s, %d, %s",
fib->thread_name()->c_str(state),
fib->fiber_id()->to_native(), source.str().c_str());
fiber->thread_name()->c_str(state),
fiber->fiber_id()->to_native(), source.str().c_str());

fib->source(state, String::create(state, source.str().c_str()));
fiber->source(state, String::create(state, source.str().c_str()));
} else {
logger::write("fiber: new: %s, %d",
fib->thread_name()->c_str(state), fib->fiber_id()->to_native());
fiber->thread_name()->c_str(state), fiber->fiber_id()->to_native());
}
}

state->vm()->metrics().system.fibers_created++;

state->memory()->native_finalizer(state, fib,
(memory::FinalizerFunction)&Fiber::finalize);

return fib;
return fiber;
}

Object* Fiber::resume(STATE, Arguments& args) {
return cNil;
/* state-vm() is the calling context
* set run state on 'this'
* unstop
* set wait state on calling context
* wait
*/
return _function_(state, this, args);
}

Object* Fiber::transfer(STATE, Arguments& args) {
/* state->vm() is calling context
* ensure not itself
* set transfer state on calling context
* suspend calling context
* continue 'this' fiber
*/

return cNil;
}

Object* Fiber::s_yield(STATE, Arguments& args) {
/* state->vm->managed_object is Fiber that is yielding
* args are objects to resume calling context
*/
return cNil;
}

30 changes: 22 additions & 8 deletions machine/builtin/fiber.hpp
Original file line number Diff line number Diff line change
@@ -30,12 +30,14 @@ namespace rubinius {
eDead
};

attr_accessor(starter, Object);
attr_accessor(value, Array);
attr_accessor(block, Object);
attr_accessor(arguments, Array);
attr_accessor(value, Object);
attr_accessor(prev, Fiber);
attr_accessor(exception, Exception);
attr_accessor(locals, LookupTable);
attr_accessor(dead, Object);
attr_accessor(pid, Fixnum);
attr_accessor(stack_size, Fixnum);
attr_accessor(thread_name, String);
attr_accessor(fiber_id, Fixnum);
@@ -46,6 +48,12 @@ namespace rubinius {
attr_field(root, bool);
attr_field(start_time, uint64_t);

typedef Object* (*FiberFunction)(STATE, Fiber* fiber, Arguments& args);

attr_field(function, FiberFunction);

attr_field(vm, VM*);

public:
bool root_p() const {
return root();
@@ -54,12 +62,14 @@ namespace rubinius {
public:
static void bootstrap(STATE);
static void initialize(STATE, Fiber* obj) {
obj->starter(nil<Object>());
obj->value(nil<Array>());
obj->block(nil<Object>());
obj->arguments(nil<Array>());
obj->value(nil<Object>());
obj->prev(nil<Fiber>());
obj->exception(nil<Exception>());
obj->locals(nil<LookupTable>());
obj->dead(nil<Object>());
obj->dead(cFalse);
obj->pid(Fixnum::from(0));
obj->stack_size(Fixnum::from(state->shared().config.machine_fiber_stack_size.value));
obj->thread_name(String::create(state, state->vm()->name().c_str()));
obj->fiber_id(Fixnum::from(++Fiber::fiber_ids_));
@@ -69,10 +79,14 @@ namespace rubinius {
obj->start_time(get_current_time());
}

// Rubinius.primitive :fiber_new
static Fiber* create(STATE, Object* self, Object* stack_size, Object* callable);
static void* run(void*);

static Object* start_fiber(STATE, Fiber* fiber, Arguments& args);
static Object* continue_fiber(STATE, Fiber* fiber, Arguments& args);
static Object* dead_fiber(STATE, Fiber* fiber, Arguments& args);

double run_time();
// Rubinius.primitive :fiber_new
static Fiber* create(STATE, Object* self, Object* stack_size, Object* block);

// Rubinius.primitive :fiber_s_current
static Fiber* current(STATE);
Loading