Skip to content

Commit

Permalink
Merge pull request #3138 from STEllAR-GROUP/fix_3126
Browse files Browse the repository at this point in the history
Partially reverting #3126
  • Loading branch information
hkaiser committed Feb 3, 2018
2 parents 0dde0aa + 66f0493 commit f910b98
Show file tree
Hide file tree
Showing 20 changed files with 136 additions and 78 deletions.
18 changes: 11 additions & 7 deletions hpx/runtime/actions/action_invoke_no_more_than.hpp
Expand Up @@ -15,7 +15,7 @@
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/traits/is_future.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/static.hpp>

#include <memory>
Expand Down Expand Up @@ -83,22 +83,24 @@ namespace hpx { namespace actions { namespace detail
// If the action returns something which is not a future, we inject
// a semaphore into the call graph.
static threads::thread_result_type thread_function(
threads::thread_state_ex_enum state,
threads::thread_function_type f)
{
typedef typename construct_semaphore_type::semaphore_type
semaphore_type;

signal_on_exit<semaphore_type> on_exit(
construct_semaphore_type::get_sem());
return f();
return f(state);
}

template <typename F>
static threads::thread_function_type
call(naming::address::address_type lva, F && f, std::false_type)
{
return util::deferred_call(
&action_decorate_function::thread_function,
return util::bind(
util::one_shot(&action_decorate_function::thread_function),
util::placeholders::_1,
traits::action_decorate_function<action_wrapper>::call(
lva, std::forward<F>(f))
);
Expand All @@ -107,18 +109,20 @@ namespace hpx { namespace actions { namespace detail
// If the action returns a future we wait on the semaphore as well,
// however it will be signaled once the future gets ready only.
static threads::thread_result_type thread_function_future(
threads::thread_state_ex_enum state,
threads::thread_function_type f)
{
construct_semaphore_type::get_sem().wait();
return f();
return f(state);
}

template <typename F>
static threads::thread_function_type
call(naming::address::address_type lva, F && f, std::true_type)
{
return util::deferred_call(
&action_decorate_function::thread_function_future,
return util::bind(
util::one_shot(&action_decorate_function::thread_function_future),
util::placeholders::_1,
traits::action_decorate_function<action_wrapper>::call(
lva, std::forward<F>(f))
);
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/actions/basic_action.hpp
Expand Up @@ -97,7 +97,7 @@ namespace hpx { namespace actions
{}

HPX_FORCEINLINE threads::thread_result_type
operator()()
operator()(threads::thread_state_ex_enum)
{
LTM_(debug)
<< "Executing " << Action::get_action_name(lva_)
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/components/server/executor_component.hpp
Expand Up @@ -46,7 +46,7 @@ namespace hpx { namespace components
// executor
static void execute(hpx::threads::thread_function_type const& f)
{
f();
f(hpx::threads::wait_signaled);
}

/// This is the default hook implementation for schedule_thread which
Expand Down
19 changes: 12 additions & 7 deletions hpx/runtime/components/server/locking_hook.hpp
Expand Up @@ -11,8 +11,8 @@
#include <hpx/runtime/get_lva.hpp>
#include <hpx/runtime/threads/coroutines/coroutine.hpp>
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/bind_front.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/register_locks.hpp>
#include <hpx/util/unlock_guard.hpp>

Expand Down Expand Up @@ -55,15 +55,17 @@ namespace hpx { namespace components
static threads::thread_function_type
decorate_action(naming::address::address_type lva, F && f)
{
return util::deferred_call(&locking_hook::thread_function,
return util::bind(
util::one_shot(&locking_hook::thread_function),
get_lva<this_component_type>::call(lva),
util::placeholders::_1,
traits::action_decorate_function<base_type>::call(
lva, std::forward<F>(f)));
}

protected:
typedef util::function_nonser<
void(threads::thread_result_type)
threads::thread_arg_type(threads::thread_result_type)
> yield_decorator_type;

struct decorate_wrapper
Expand All @@ -85,7 +87,7 @@ namespace hpx { namespace components
// Execute the wrapped action. This locks the mutex ensuring a thread
// safe action invocation.
threads::thread_result_type thread_function(
threads::thread_function_type f)
threads::thread_arg_type state, threads::thread_function_type f)
{
threads::thread_result_type result(threads::unknown,
threads::invalid_thread_id);
Expand All @@ -107,7 +109,7 @@ namespace hpx { namespace components
decorate_wrapper yield_decorator(
util::bind_front(&locking_hook::yield_function, this));

result = f();
result = f(state);

(void)yield_decorator; // silence gcc warnings
}
Expand All @@ -131,21 +133,24 @@ namespace hpx { namespace components

// The yield decorator unlocks the mutex and calls the system yield
// which gives up control back to the thread manager.
void yield_function(threads::thread_result_type state)
threads::thread_arg_type yield_function(threads::thread_result_type state)
{
// We un-decorate the yield function as the lock handling may
// suspend, which causes an infinite recursion otherwise.
undecorate_wrapper yield_decorator;
threads::thread_arg_type result = threads::wait_unknown;

{
util::unlock_guard<mutex_type> ul(mtx_);
threads::get_self().yield_impl(state);
result = threads::get_self().yield_impl(state);
}

// Re-enable ignoring the lock on the mutex above (this
// information is lost in the lock tracking tables once a mutex is
// unlocked).
util::ignore_lock(&mtx_);

return result;
}

private:
Expand Down
9 changes: 6 additions & 3 deletions hpx/runtime/components/server/migration_support.hpp
Expand Up @@ -16,7 +16,7 @@
#include <hpx/runtime/threads_fwd.hpp>
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/bind.hpp>

#include <cstdint>
#include <mutex>
Expand Down Expand Up @@ -167,8 +167,10 @@ namespace hpx { namespace components
// Make sure we pin the component at construction of the bound object
// which will also unpin it once the thread runs to completion (the
// bound object goes out of scope).
return util::deferred_call(&migration_support::thread_function,
return util::bind(
util::one_shot(&migration_support::thread_function),
get_lva<this_component_type>::call(lva),
util::placeholders::_1,
traits::action_decorate_function<base_type>::call(
lva, std::forward<F>(f)),
components::pinned_ptr::create<this_component_type>(lva));
Expand All @@ -191,10 +193,11 @@ namespace hpx { namespace components
// Execute the wrapped action. This function is bound in decorate_action
// above. The bound object performs the pinning/unpinning.
threads::thread_result_type thread_function(
threads::thread_state_ex_enum state,
threads::thread_function_type && f,
components::pinned_ptr)
{
return f();
return f(state);
}

private:
Expand Down
7 changes: 5 additions & 2 deletions hpx/runtime/threads/coroutines/coroutine.hpp
Expand Up @@ -57,8 +57,9 @@ namespace hpx { namespace threads { namespace coroutines
typedef impl_type::thread_id_type thread_id_type;

typedef impl_type::result_type result_type;
typedef impl_type::arg_type arg_type;

typedef util::unique_function_nonser<result_type()> functor_type;
typedef util::unique_function_nonser<result_type(arg_type)> functor_type;

coroutine(functor_type&& f,
thread_id_type id,
Expand Down Expand Up @@ -107,10 +108,12 @@ namespace hpx { namespace threads { namespace coroutines
impl_.rebind(std::move(f), id);
}

HPX_FORCEINLINE result_type operator()()
HPX_FORCEINLINE result_type operator()(arg_type arg = arg_type())
{
HPX_ASSERT(impl_.is_ready());

impl_.bind_args(&arg);

impl_.invoke();

return impl_.result();
Expand Down
17 changes: 15 additions & 2 deletions hpx/runtime/threads/coroutines/detail/coroutine_impl.hpp
Expand Up @@ -63,13 +63,15 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail
typedef context_base::thread_id_type thread_id_type;

typedef std::pair<thread_state_enum, thread_id_type> result_type;
typedef thread_state_ex_enum arg_type;

typedef util::unique_function_nonser<result_type()> functor_type;
typedef util::unique_function_nonser<result_type(arg_type)> functor_type;

coroutine_impl(functor_type&& f, thread_id_type id,
std::ptrdiff_t stack_size)
: context_base(*this, stack_size, id)
, m_result(terminated, invalid_thread_id)
, m_result(unknown, invalid_thread_id)
, m_arg(nullptr)
, m_fun(std::move(f))
{}

Expand All @@ -89,6 +91,16 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail
{
return m_result;
}
arg_type * args()
{
HPX_ASSERT(m_arg);
return m_arg;
};

void bind_args(arg_type* arg)
{
m_arg = arg;
}

#if defined(HPX_HAVE_THREAD_PHASE_INFORMATION)
std::size_t get_thread_phase() const
Expand All @@ -114,6 +126,7 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail

private:
result_type m_result;
arg_type* m_arg;

functor_type m_fun;
};
Expand Down
11 changes: 7 additions & 4 deletions hpx/runtime/threads/coroutines/detail/coroutine_self.hpp
Expand Up @@ -75,18 +75,19 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail
typedef impl_type::thread_id_type thread_id_type;

typedef impl_type::result_type result_type;
typedef impl_type::arg_type arg_type;

typedef util::function_nonser<void(result_type)>
typedef util::function_nonser<arg_type(result_type)>
yield_decorator_type;

void yield(result_type arg)
arg_type yield(result_type arg = result_type())
{
!yield_decorator_.empty() ?
return !yield_decorator_.empty() ?
yield_decorator_(std::move(arg)) :
yield_impl(std::move(arg));
}

void yield_impl(result_type arg)
arg_type yield_impl(result_type arg)
{
HPX_ASSERT(m_pimpl);

Expand All @@ -96,6 +97,8 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail
reset_self_on_exit on_exit(this);
this->m_pimpl->yield();
}

return *m_pimpl->args();
}

template <typename F>
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -295,7 +295,7 @@ namespace hpx { namespace threads { namespace detail
thread_id_type background_thread;
background_running.reset(new bool(true));
thread_init_data background_init(
[&, background_running]() -> thread_result_type
[&, background_running](thread_state_ex_enum) -> thread_result_type
{
while(*background_running)
{
Expand Down
3 changes: 1 addition & 2 deletions hpx/runtime/threads/detail/set_thread_state.hpp
Expand Up @@ -321,9 +321,8 @@ namespace hpx { namespace threads { namespace detail
// this waits for the thread to be reactivated when the timer fired
// if it returns signaled the timer has been canceled, otherwise
// the timer fired and the wake_timer_thread above has been executed
get_self().yield(thread_result_type(suspended, invalid_thread_id));
thread_state_ex_enum statex =
self_id->set_state_ex(thread_state_ex_enum::wait_signaled);
get_self().yield(thread_result_type(suspended, invalid_thread_id));

if (wait_timeout != statex) //-V601
{
Expand Down
5 changes: 3 additions & 2 deletions hpx/runtime/threads/thread_data.hpp
Expand Up @@ -241,7 +241,7 @@ namespace hpx { namespace threads
thread_state(new_state, state_ex, tag));
}

public:
private:
/// The set_state function changes the extended state of this
/// thread instance.
///
Expand Down Expand Up @@ -269,6 +269,7 @@ namespace hpx { namespace threads
}
}

public:
/// Return the id of the component this thread is running in
naming::address_type get_component_id() const
{
Expand Down Expand Up @@ -516,7 +517,7 @@ namespace hpx { namespace threads
coroutine_type::result_type operator()()
{
HPX_ASSERT(this == coroutine_.get_thread_id().get());
return coroutine_();
return coroutine_(set_state_ex(wait_signaled));
}

thread_id_type get_thread_id() const
Expand Down
3 changes: 2 additions & 1 deletion hpx/runtime/threads/thread_data_fwd.hpp
Expand Up @@ -48,7 +48,8 @@ namespace hpx { namespace threads
typedef std::pair<thread_state_enum, thread_id_type> thread_result_type;
typedef thread_state_ex_enum thread_arg_type;

typedef util::unique_function_nonser<thread_result_type()> thread_function_type;
typedef thread_result_type thread_function_sig(thread_arg_type);
typedef util::unique_function_nonser<thread_function_sig> thread_function_type;
/// \endcond

///////////////////////////////////////////////////////////////////////
Expand Down
26 changes: 23 additions & 3 deletions hpx/runtime/threads/thread_helpers.hpp
Expand Up @@ -796,7 +796,27 @@ namespace hpx { namespace applier
{
F f;

inline threads::thread_result_type operator()()
inline threads::thread_result_type operator()(threads::thread_arg_type)
{
// execute the actual thread function
f(threads::wait_signaled);

// Verify that there are no more registered locks for this
// OS-thread. This will throw if there are still any locks
// held.
util::force_error_on_lock();

return threads::thread_result_type(threads::terminated,
threads::invalid_thread_id);
}
};

template <typename F>
struct thread_function_nullary
{
F f;

inline threads::thread_result_type operator()(threads::thread_arg_type)
{
// execute the actual thread function
f();
Expand Down Expand Up @@ -855,7 +875,7 @@ namespace hpx { namespace applier
error_code& ec = throws)
{
threads::thread_function_type thread_func(
detail::thread_function<typename std::decay<F>::type>{
detail::thread_function_nullary<typename std::decay<F>::type>{
std::forward<F>(func)});
return register_thread_plain(std::move(thread_func),
description, initial_state, run_now, priority, os_thread, stacksize,
Expand Down Expand Up @@ -988,7 +1008,7 @@ namespace hpx { namespace applier
error_code& ec = throws)
{
threads::thread_function_type thread_func(
detail::thread_function<typename std::decay<F>::type>{
detail::thread_function_nullary<typename std::decay<F>::type>{
std::forward<F>(func)});
return register_work_plain(std::move(thread_func),
description, 0, initial_state, priority, os_thread, stacksize,
Expand Down

0 comments on commit f910b98

Please sign in to comment.