Skip to content

Commit

Permalink
Cleaning up coroutine implementation
Browse files Browse the repository at this point in the history
 - Removing dynamic memory allocation
 - Removing intrusive ptr
 - Removing unused functions
 - Simplifying thread functions
 - Using thread_data directly to get/set the state_ex value
  • Loading branch information
Thomas Heller committed Jan 30, 2018
1 parent 4ba45e7 commit 9e1648c
Show file tree
Hide file tree
Showing 29 changed files with 157 additions and 776 deletions.
18 changes: 7 additions & 11 deletions hpx/runtime/actions/action_invoke_no_more_than.hpp
Expand Up @@ -15,7 +15,7 @@
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/traits/is_future.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/static.hpp>

#include <memory>
Expand Down Expand Up @@ -83,24 +83,22 @@ namespace hpx { namespace actions { namespace detail
// If the action returns something which is not a future, we inject
// a semaphore into the call graph.
static threads::thread_result_type thread_function(
threads::thread_state_ex_enum state,
threads::thread_function_type f)
{
typedef typename construct_semaphore_type::semaphore_type
semaphore_type;

signal_on_exit<semaphore_type> on_exit(
construct_semaphore_type::get_sem());
return f(state);
return f();
}

template <typename F>
static threads::thread_function_type
call(naming::address::address_type lva, F && f, std::false_type)
{
return util::bind(
util::one_shot(&action_decorate_function::thread_function),
util::placeholders::_1,
return util::deferred_call(
&action_decorate_function::thread_function,
traits::action_decorate_function<action_wrapper>::call(
lva, std::forward<F>(f))
);
Expand All @@ -109,20 +107,18 @@ namespace hpx { namespace actions { namespace detail
// If the action returns a future we wait on the semaphore as well,
// however it will be signaled once the future gets ready only.
static threads::thread_result_type thread_function_future(
threads::thread_state_ex_enum state,
threads::thread_function_type f)
{
construct_semaphore_type::get_sem().wait();
return f(state);
return f();
}

template <typename F>
static threads::thread_function_type
call(naming::address::address_type lva, F && f, std::true_type)
{
return util::bind(
util::one_shot(&action_decorate_function::thread_function_future),
util::placeholders::_1,
return util::deferred_call(
&action_decorate_function::thread_function_future,
traits::action_decorate_function<action_wrapper>::call(
lva, std::forward<F>(f))
);
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/actions/basic_action.hpp
Expand Up @@ -97,7 +97,7 @@ namespace hpx { namespace actions
{}

HPX_FORCEINLINE threads::thread_result_type
operator()(threads::thread_state_ex_enum)
operator()()
{
LTM_(debug)
<< "Executing " << Action::get_action_name(lva_)
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/components/server/executor_component.hpp
Expand Up @@ -46,7 +46,7 @@ namespace hpx { namespace components
// executor
static void execute(hpx::threads::thread_function_type const& f)
{
f(hpx::threads::wait_signaled);
f();
}

/// This is the default hook implementation for schedule_thread which
Expand Down
19 changes: 7 additions & 12 deletions hpx/runtime/components/server/locking_hook.hpp
Expand Up @@ -11,8 +11,8 @@
#include <hpx/runtime/get_lva.hpp>
#include <hpx/runtime/threads/coroutines/coroutine.hpp>
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/bind_front.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/register_locks.hpp>
#include <hpx/util/unlock_guard.hpp>

Expand Down Expand Up @@ -55,17 +55,15 @@ namespace hpx { namespace components
static threads::thread_function_type
decorate_action(naming::address::address_type lva, F && f)
{
return util::bind(
util::one_shot(&locking_hook::thread_function),
return util::deferred_call(&locking_hook::thread_function,
get_lva<this_component_type>::call(lva),
util::placeholders::_1,
traits::action_decorate_function<base_type>::call(
lva, std::forward<F>(f)));
}

protected:
typedef util::function_nonser<
threads::thread_arg_type(threads::thread_result_type)
void(threads::thread_result_type)
> yield_decorator_type;

struct decorate_wrapper
Expand All @@ -87,7 +85,7 @@ namespace hpx { namespace components
// Execute the wrapped action. This locks the mutex ensuring a thread
// safe action invocation.
threads::thread_result_type thread_function(
threads::thread_arg_type state, threads::thread_function_type f)
threads::thread_function_type f)
{
threads::thread_result_type result(threads::unknown,
threads::invalid_thread_id);
Expand All @@ -109,7 +107,7 @@ namespace hpx { namespace components
decorate_wrapper yield_decorator(
util::bind_front(&locking_hook::yield_function, this));

result = f(state);
result = f();

(void)yield_decorator; // silence gcc warnings
}
Expand All @@ -133,24 +131,21 @@ namespace hpx { namespace components

// The yield decorator unlocks the mutex and calls the system yield
// which gives up control back to the thread manager.
threads::thread_arg_type yield_function(threads::thread_result_type state)
void yield_function(threads::thread_result_type state)
{
// We un-decorate the yield function as the lock handling may
// suspend, which causes an infinite recursion otherwise.
undecorate_wrapper yield_decorator;
threads::thread_arg_type result = threads::wait_unknown;

{
util::unlock_guard<mutex_type> ul(mtx_);
result = threads::get_self().yield_impl(state);
threads::get_self().yield_impl(state);
}

// Re-enable ignoring the lock on the mutex above (this
// information is lost in the lock tracking tables once a mutex is
// unlocked).
util::ignore_lock(&mtx_);

return result;
}

private:
Expand Down
9 changes: 3 additions & 6 deletions hpx/runtime/components/server/migration_support.hpp
Expand Up @@ -16,7 +16,7 @@
#include <hpx/runtime/threads_fwd.hpp>
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/deferred_call.hpp>

#include <cstdint>
#include <mutex>
Expand Down Expand Up @@ -167,10 +167,8 @@ namespace hpx { namespace components
// Make sure we pin the component at construction of the bound object
// which will also unpin it once the thread runs to completion (the
// bound object goes out of scope).
return util::bind(
util::one_shot(&migration_support::thread_function),
return util::deferred_call(&migration_support::thread_function,
get_lva<this_component_type>::call(lva),
util::placeholders::_1,
traits::action_decorate_function<base_type>::call(
lva, std::forward<F>(f)),
components::pinned_ptr::create<this_component_type>(lva));
Expand All @@ -193,11 +191,10 @@ namespace hpx { namespace components
// Execute the wrapped action. This function is bound in decorate_action
// above. The bound object performs the pinning/unpinning.
threads::thread_result_type thread_function(
threads::thread_state_ex_enum state,
threads::thread_function_type && f,
components::pinned_ptr)
{
return f(state);
return f();
}

private:
Expand Down
126 changes: 21 additions & 105 deletions hpx/runtime/threads/coroutines/coroutine.hpp
Expand Up @@ -54,168 +54,84 @@ namespace hpx { namespace threads { namespace coroutines
friend struct detail::coroutine_accessor;

typedef detail::coroutine_impl impl_type;
typedef impl_type::pointer impl_ptr;
typedef impl_type::thread_id_type thread_id_type;

typedef impl_type::result_type result_type;
typedef impl_type::arg_type arg_type;

typedef util::unique_function_nonser<result_type(arg_type)> functor_type;

coroutine() : m_pimpl(nullptr) {}
typedef util::unique_function_nonser<result_type()> functor_type;

coroutine(functor_type&& f,
thread_id_type id,
std::ptrdiff_t stack_size = detail::default_stack_size)
: m_pimpl(impl_type::create(
std::move(f), id, stack_size))
{
HPX_ASSERT(m_pimpl->is_ready());
}

coroutine(coroutine && src)
: m_pimpl(std::move(src.m_pimpl))
: impl_(std::move(f), id, stack_size)
{
src.m_pimpl = nullptr;
HPX_ASSERT(impl_.is_ready());
}

coroutine& operator=(coroutine && src)
{
coroutine(std::move(src)).swap(*this);
return *this;
}

coroutine& swap(coroutine& rhs)
{
std::swap(m_pimpl, rhs.m_pimpl);
return *this;
}

friend void swap(coroutine& lhs, coroutine& rhs)
{
lhs.swap(rhs);
}
coroutine(coroutine const& src) = delete;
coroutine& operator=(coroutine const& src) = delete;
coroutine(coroutine && src) = delete;
coroutine& operator=(coroutine && src) = delete;

thread_id_type get_thread_id() const
{
return m_pimpl->get_thread_id();
return impl_.get_thread_id();
}

#if defined(HPX_HAVE_THREAD_PHASE_INFORMATION)
std::size_t get_thread_phase() const
{
return m_pimpl->get_thread_phase();
return impl_.get_thread_phase();
}
#endif

std::size_t get_thread_data() const
{
return m_pimpl.get() ? m_pimpl->get_thread_data() : 0;
return impl_.get_thread_data();
}

std::size_t set_thread_data(std::size_t data)
{
return m_pimpl.get() ? m_pimpl->set_thread_data(data) : 0;
return impl_.set_thread_data(data);
}

#if defined(HPX_HAVE_APEX)
void** get_apex_data() const
{
return m_pimpl.get() ? m_pimpl->get_apex_data() : 0ull;
return impl_.get_apex_data();
}
#endif

void rebind(functor_type&& f, thread_id_type id)
{
HPX_ASSERT(exited());
impl_type::rebind(m_pimpl.get(), std::move(f), id);
impl_.rebind(std::move(f), id);
}

HPX_FORCEINLINE result_type operator()(arg_type arg = arg_type())
HPX_FORCEINLINE result_type operator()()
{
HPX_ASSERT(m_pimpl);
HPX_ASSERT(m_pimpl->is_ready());

result_type* ptr = nullptr;
m_pimpl->bind_args(&arg);
m_pimpl->bind_result_pointer(&ptr);

m_pimpl->invoke();
HPX_ASSERT(impl_.is_ready());

return std::move(*m_pimpl->result());
}
impl_.invoke();

explicit operator bool() const
{
return good();
}

bool operator==(const coroutine& rhs) const
{
return m_pimpl == rhs.m_pimpl;
}

void exit()
{
HPX_ASSERT(m_pimpl);
m_pimpl->exit();
}

bool waiting() const
{
HPX_ASSERT(m_pimpl);
return m_pimpl->waiting();
}

bool pending() const
{
HPX_ASSERT(m_pimpl);
return m_pimpl->pending() != 0;
}

bool exited() const
{
HPX_ASSERT(m_pimpl);
return m_pimpl->exited();
return impl_.result();
}

bool is_ready() const
{
HPX_ASSERT(m_pimpl);
return m_pimpl->is_ready();
}

bool empty() const
{
return m_pimpl == nullptr;
return impl_.is_ready();
}

std::ptrdiff_t get_available_stack_space()
{
#if defined(HPX_HAVE_THREADS_GET_STACK_POINTER)
return m_pimpl->get_available_stack_space();
return impl_.get_available_stack_space();
#else
return (std::numeric_limits<std::ptrdiff_t>::max)();
#endif
}

protected:
bool good() const
{
return !empty() && !exited() && !waiting();
}

impl_ptr m_pimpl;

std::uint64_t count() const
{
return m_pimpl->count();
}

impl_ptr get_impl()
{
return m_pimpl;
}
private:
impl_type impl_;
};
}}}

Expand Down

0 comments on commit 9e1648c

Please sign in to comment.