Skip to content

Commit

Permalink
Speed up local action execution
Browse files Browse the repository at this point in the history
- this patch significantly speeds up local action execution (both,
  direct and non-direct) by avoiding to construct a (remote) promise
  but instead directly scheduling the function bound to the action
  as a local thread, while using a simple (local-only) future.
  • Loading branch information
hkaiser committed Nov 20, 2017
1 parent a7f7eec commit 0ce15e2
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 12 deletions.
162 changes: 151 additions & 11 deletions hpx/lcos/detail/async_implementations.hpp
Expand Up @@ -37,6 +37,42 @@ namespace hpx { namespace detail
naming::id_type id_;
};

template <typename T>
future<T> keep_alive(future<T> && f, id_type const& id)
{
if (id.get_management_type() == naming::id_type::managed)
{
traits::detail::get_shared_state(f)->set_on_completed(
hpx::detail::keep_id_alive(id));
}
return std::move(f);
}

struct keep_id_and_ptr_alive
{
explicit keep_id_and_ptr_alive(
naming::id_type const& id, components::pinned_ptr&& p)
: id_(id), p_(std::move(p))
{}

void operator()() const {}

naming::id_type id_;
components::pinned_ptr p_;
};

template <typename T>
future<T> keep_alive(
future<T>&& f, id_type const& id, components::pinned_ptr&& p)
{
if (id.get_management_type() == naming::id_type::managed)
{
traits::detail::get_shared_state(f)->set_on_completed(
hpx::detail::keep_id_and_ptr_alive(id, std::move(p)));
}
return std::move(f);
}

template <typename Result>
class handle_managed_target
{
Expand Down Expand Up @@ -96,16 +132,51 @@ namespace hpx { namespace detail
static lcos::future<Result>
call(naming::id_type const& id, naming::address && addr, Ts &&... vs)
{
future<Result> f;
typedef typename hpx::traits::extract_action<Action>::type
action_type;

try
{
handle_managed_target<Result> hmt(id, f);
lcos::packaged_action<Action, Result> p;
auto&& result = action_type::execute_function(
addr.address_, addr.type_, std::forward<Ts>(vs)...);

f = p.get_future();
p.apply(std::move(addr), hmt.get_id(), std::forward<Ts>(vs)...);
f.wait();
typedef typename util::decay<decltype(result)>::type naked_type;
typedef traits::get_remote_result<Result, naked_type>
get_remote_result_type;

future<Result> f = make_ready_future(
get_remote_result_type::call(std::move(result)));

return keep_alive(std::move(f), id);
}
catch (...)
{
return make_exceptional_future<Result>(std::current_exception());
}
}
};

template <typename Action>
struct sync_local_invoke<Action, void>
{
template <typename ...Ts>
static lcos::future<void>
call(naming::id_type const& id, naming::address && addr, Ts &&... vs)
{
typedef typename hpx::traits::extract_action<Action>::type
action_type;

try
{
action_type::execute_function(
addr.address_, addr.type_, std::forward<Ts>(vs)...);

return keep_alive(make_ready_future(), id);
}
catch (...)
{
return make_exceptional_future<void>(std::current_exception());
}
return f;
}
};

Expand Down Expand Up @@ -240,6 +311,24 @@ namespace hpx { namespace detail
return f;
}

template <typename Action>
struct action_invoker
{
typedef typename Action::remote_result_type remote_result_type;
typedef typename Action::local_result_type result_type;
typedef traits::get_remote_result<result_type, remote_result_type>
get_remote_result_type;

template <typename... Ts>
HPX_FORCEINLINE result_type operator()(
naming::address::address_type lva,
naming::address::component_type comptype, Ts&&... vs) const
{
return get_remote_result_type::call(typename Action::invoker()(
lva, comptype, std::forward<Ts>(vs)...));
}
};

template <typename Action, typename ...Ts>
hpx::future<
typename hpx::traits::extract_action<Action>::type::local_result_type
Expand All @@ -248,18 +337,43 @@ namespace hpx { namespace detail
{
typedef typename hpx::traits::extract_action<Action>::type action_type;
typedef typename action_type::local_result_type result_type;
typedef typename action_type::component_type component_type;

future<result_type> f;
std::pair<bool, components::pinned_ptr> r;

naming::address addr;
agas::is_local_address_cached(id, addr);
if (agas::is_local_address_cached(id, addr))
{
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
f = hpx::async(action_invoker<action_type>(),
addr.address_, addr.type_, std::forward<Ts>(vs)...);

return keep_alive(std::move(f), id, std::move(r.second));
}
}
else
{
f = hpx::async(action_invoker<action_type>(),
addr.address_, addr.type_, std::forward<Ts>(vs)...);

return keep_alive(std::move(f), id);
}
}

future<result_type> f;
{
handle_managed_target<result_type> hmt(id, f);
lcos::packaged_action<action_type, result_type> p;

f = p.get_future();
p.apply(std::move(addr), hmt.get_id(), std::forward<Ts>(vs)...);
}

return f;
}

Expand All @@ -271,11 +385,36 @@ namespace hpx { namespace detail
{
typedef typename hpx::traits::extract_action<Action>::type action_type;
typedef typename action_type::local_result_type result_type;
typedef typename action_type::component_type component_type;

future<result_type> f;
std::pair<bool, components::pinned_ptr> r;

naming::address addr;
agas::is_local_address_cached(id, addr);
if (agas::is_local_address_cached(id, addr))
{
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
f = hpx::async(launch::deferred,
action_invoker<action_type>(), addr.address_,
addr.type_, std::forward<Ts>(vs)...);

return keep_alive(std::move(f), id, std::move(r.second));
}
}
else
{
f = hpx::async(launch::deferred, action_invoker<action_type>(),
addr.address_, addr.type_, std::forward<Ts>(vs)...);

return keep_alive(std::move(f), id);
}
}

future<result_type> f;
{
handle_managed_target<result_type> hmt(id, f);
lcos::packaged_action<action_type, result_type> p;
Expand All @@ -284,6 +423,7 @@ namespace hpx { namespace detail
p.apply_deferred(std::move(addr), hmt.get_id(),
std::forward<Ts>(vs)...);
}

return f;
}

Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/actions/basic_action.hpp
Expand Up @@ -185,7 +185,6 @@ namespace hpx { namespace actions
static R invoke(naming::address::address_type /*lva*/,
naming::address::component_type /*comptype*/, Ts&&... /*vs*/);

protected:
struct invoker
{
typedef
Expand Down Expand Up @@ -221,6 +220,7 @@ namespace hpx { namespace actions
}
};

protected:
/// The \a thread_function will be registered as the thread
/// function of a thread. It encapsulates the execution of the
/// original function (given by \a func).
Expand Down

0 comments on commit 0ce15e2

Please sign in to comment.