Skip to content

Commit

Permalink
Improve .then interface
Browse files Browse the repository at this point in the history
  • Loading branch information
K-ballo committed Oct 31, 2017
1 parent 40c4f02 commit dcb3559
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 165 deletions.
261 changes: 145 additions & 116 deletions hpx/lcos/future.hpp
Expand Up @@ -34,7 +34,6 @@
#include <hpx/util/function.hpp>
#include <hpx/util/identity.hpp>
#include <hpx/util/invoke.hpp>
#include <hpx/util/lazy_conditional.hpp>
#include <hpx/util/lazy_enable_if.hpp>
#include <hpx/util/result_of.hpp>
#include <hpx/util/steady_clock.hpp>
Expand Down Expand Up @@ -428,11 +427,132 @@ namespace hpx { namespace lcos { namespace detail
inline typename traits::detail::shared_state_ptr<ContResult>::type
make_continuation_exec(Future const& future, Executor const& exec, F && f);

template <typename Executor, typename Future, typename F, typename ... Ts>
template <typename Executor, typename Future, typename F>
inline typename hpx::traits::future_then_executor_result<
Executor, typename std::decay<Future>::type, F, Ts...
Executor, typename std::decay<Future>::type, F
>::type
then_execute_helper(Executor const&, F &&, Future &&, Ts &&...);
then_execute_helper(Executor const&, F &&, Future &&);

///////////////////////////////////////////////////////////////////////////
template <typename Future, typename FD, typename Enable = void>
struct future_then_dispatch;

// launch
template <typename Future, typename Policy>
struct future_then_dispatch<Future, Policy, typename std::enable_if<
traits::is_launch_policy<Policy>::value
>::type>
{
template <typename Policy_, typename F>
HPX_FORCEINLINE
static typename hpx::traits::future_then_result<Future, F>::type
call(Future && fut, Policy_ && policy, F && f)
{
using result_type =
typename hpx::traits::future_then_result<Future, F>::result_type;
using continuation_result_type =
typename hpx::util::invoke_result<F, Future>::type;

typename hpx::traits::detail::shared_state_ptr<result_type>::type p =
detail::make_continuation<continuation_result_type>(
std::move(fut), std::forward<Policy_>(policy),
std::forward<F>(f));
return hpx::traits::future_access<future<result_type> >::create(std::move(p));
}
};

// threads::executor
template <typename Future, typename Executor>
struct future_then_dispatch<Future, Executor, typename std::enable_if<
traits::is_threads_executor<Executor>::value
>::type>
{
template <typename Executor_, typename F>
HPX_FORCEINLINE
static typename hpx::traits::future_then_result<Future, F>::type
call(Future && fut, Executor_& sched, F && f)
{
using result_type =
typename hpx::traits::future_then_result<Future, F>::result_type;
using continuation_result_type =
typename hpx::util::invoke_result<F, Future>::type;

typename hpx::traits::detail::shared_state_ptr<result_type>::type p =
detail::make_continuation<continuation_result_type>(
std::move(fut), static_cast<threads::executor&>(sched),
std::forward<F>(f));
return hpx::traits::future_access<future<result_type> >::create(std::move(p));
}
};

#if defined(HPX_HAVE_EXECUTOR_COMPATIBILITY)
// parallel executors v1
template <typename Future, typename Executor>
struct future_then_dispatch<Future, Executor, typename std::enable_if<
traits::is_executor<Executor>::value
>::type>
{
template <typename Executor_, typename F>
HPX_FORCEINLINE
static typename hpx::traits::future_then_result<Future, F>::type
call(Future && fut, Executor_ && exec, F && f)
{
using result_type =
typename hpx::traits::future_then_result<Future, F>::result_type;
using continuation_result_type =
typename hpx::util::invoke_result<F, Future>::type;

typename hpx::traits::detail::shared_state_ptr<result_type>::type p =
detail::make_continuation_exec_v1<continuation_result_type>(
std::move(fut), std::forward<Executor_>(exec),
std::forward<F>(f));
return hpx::traits::future_access<future<result_type> >::create(std::move(p));
}
};
#endif

// parallel executors v2
template <typename Future, typename Executor>
struct future_then_dispatch<Future, Executor, typename std::enable_if<
traits::is_one_way_executor<Executor>::value ||
traits::is_two_way_executor<Executor>::value
>::type>
{
template <typename Executor_, typename F>
HPX_FORCEINLINE
static typename hpx::traits::future_then_executor_result<
Executor_, Future, F>::type
call(Future && fut, Executor_ && exec, F && f)
{
// simply forward this to executor
return detail::then_execute_helper(exec, std::forward<F>(f),
std::move(fut));
}
};

// plain function, or function object
template <typename Future, typename FD>
struct future_then_dispatch<Future, FD, typename std::enable_if<
!traits::is_launch_policy<FD>::value &&
!traits::is_threads_executor<FD>::value &&
#if defined(HPX_HAVE_EXECUTOR_COMPATIBILITY)
!traits::is_executor<FD>::value &&
#endif
!(
traits::is_one_way_executor<FD>::value ||
traits::is_two_way_executor<FD>::value)
>::type>
{
template <typename F>
HPX_FORCEINLINE static auto
call(Future && fut, F && f)
-> decltype(future_then_dispatch<Future, launch>::call(
std::move(fut), launch::all, std::forward<F>(f)))
{
return future_then_dispatch<Future, launch>::call(
std::move(fut), launch::all, std::forward<F>(f));
}
};

///////////////////////////////////////////////////////////////////////////
template <typename Future>
Expand All @@ -455,6 +575,12 @@ namespace hpx { namespace lcos { namespace detail
typename traits::detail::shared_state_ptr_result<R>::type
> shared_state_type;

private:
template <typename F>
struct future_then_dispatch
: detail::future_then_dispatch<Derived, F>
{};

public:
future_base() noexcept
: shared_state_()
Expand Down Expand Up @@ -588,38 +714,12 @@ namespace hpx { namespace lcos { namespace detail
// - valid() == false on original future object immediately after it
// returns.
template <typename F>
static typename util::lazy_enable_if<
!hpx::traits::is_launch_policy<
typename std::decay<F>::type>::value &&
!hpx::traits::is_threads_executor<
typename std::decay<F>::type>::value &&
#if defined(HPX_HAVE_EXECUTOR_COMPATIBILITY)
!hpx::traits::is_executor<
typename std::decay<F>::type>::value &&
#endif
!hpx::traits::is_one_way_executor<
typename std::decay<F>::type>::value &&
!hpx::traits::is_two_way_executor<
typename std::decay<F>::type>::value
, hpx::traits::future_then_result<Derived, F>
>::type
then(Derived && fut, F && f, error_code& ec = throws)
static auto then(Derived && fut, F && f, error_code& ec = throws)
-> decltype(future_then_dispatch<typename std::decay<F>::type>::call(
std::move(fut), std::forward<F>(f)))
{
return then(std::move(fut), launch::all, std::forward<F>(f), ec);
}

template <typename Policy, typename F>
static typename util::lazy_enable_if<
hpx::traits::is_launch_policy<
typename std::decay<Policy>::type
>::value,
hpx::traits::future_then_result<Derived, F>
>::type
then(Derived && fut, Policy && policy, F && f, error_code& ec = throws)
{
typedef
typename hpx::traits::future_then_result<Derived, F>::result_type
result_type;
using result_type =
typename hpx::traits::future_then_result<Derived, F>::result_type;

if (!fut.shared_state_)
{
Expand All @@ -629,27 +729,17 @@ namespace hpx { namespace lcos { namespace detail
return future<result_type>();
}

typedef
typename hpx::util::invoke_result<F, Derived>::type
continuation_result_type;
typedef
typename hpx::traits::detail::shared_state_ptr<result_type>::type
shared_state_ptr;

shared_state_ptr p =
detail::make_continuation<continuation_result_type>(
std::move(fut), std::forward<Policy>(policy), std::forward<F>(f));
return hpx::traits::future_access<future<result_type> >::create(
std::move(p));
return future_then_dispatch<typename std::decay<F>::type>::call(
std::move(fut), std::forward<F>(f));
}

template <typename F>
static typename hpx::traits::future_then_result<Derived, F>::type
then(Derived && fut, threads::executor& sched, F && f, error_code& ec = throws)
template <typename F, typename T0>
static auto then(Derived && fut, T0 && t0, F && f, error_code& ec = throws)
-> decltype(future_then_dispatch<typename std::decay<T0>::type>::call(
std::move(fut), std::forward<T0>(t0), std::forward<F>(f)))
{
typedef
typename hpx::traits::future_then_result<Derived, F>::result_type
result_type;
using result_type =
typename hpx::traits::future_then_result<Derived, F>::result_type;

if (!fut.shared_state_)
{
Expand All @@ -659,69 +749,8 @@ namespace hpx { namespace lcos { namespace detail
return future<result_type>();
}

typedef
typename hpx::util::invoke_result<F, Derived>::type
continuation_result_type;
typedef
typename hpx::traits::detail::shared_state_ptr<result_type>::type
shared_state_ptr;

shared_state_ptr p =
detail::make_continuation<continuation_result_type>(
std::move(fut), sched, std::forward<F>(f));
return hpx::traits::future_access<future<result_type> >::create(
std::move(p));
}

#if defined(HPX_HAVE_EXECUTOR_COMPATIBILITY)
template <typename Executor, typename F>
HPX_DEPRECATED(HPX_DEPRECATED_MSG)
static typename util::lazy_enable_if<
hpx::traits::is_executor<Executor>::value
, hpx::traits::future_then_result<Derived, F>
>::type
then(Derived && fut, Executor& exec, F && f, error_code& ec = throws)
{
typedef
typename hpx::traits::future_then_result<Derived, F>::result_type
result_type;

if (!fut.shared_state_)
{
HPX_THROWS_IF(ec, no_state,
"future_base<R>::then",
"this future has no valid shared state");
return future<result_type>();
}

typedef
typename hpx::util::invoke_result<F, Derived>::type
continuation_result_type;
typedef
typename hpx::traits::detail::shared_state_ptr<result_type>::type
shared_state_ptr;

shared_state_ptr p =
detail::make_continuation_exec_v1<continuation_result_type>(
std::move(fut), exec, std::forward<F>(f));
return hpx::traits::future_access<future<result_type> >::
create(std::move(p));
}
#endif

template <typename Executor, typename F>
static typename util::lazy_enable_if<
hpx::traits::is_one_way_executor<
typename std::decay<Executor>::type>::value ||
hpx::traits::is_two_way_executor<
typename std::decay<Executor>::type>::value
, hpx::traits::future_then_executor_result<Executor, Derived, F>
>::type
then(Derived && fut, Executor && exec, F && f, error_code& ec = throws)
{
// simply forward this to executor
return detail::then_execute_helper(exec, std::forward<F>(f),
std::move(fut));
return future_then_dispatch<typename std::decay<T0>::type>::call(
std::move(fut), std::forward<T0>(t0), std::forward<F>(f));
}

// Effects: blocks until the shared state is ready.
Expand Down
9 changes: 4 additions & 5 deletions hpx/lcos/local/packaged_continuation.hpp
Expand Up @@ -718,16 +718,15 @@ namespace hpx { namespace lcos { namespace detail
return p;
}

template <typename Executor, typename Future, typename F, typename ... Ts>
template <typename Executor, typename Future, typename F>
inline typename hpx::traits::future_then_executor_result<
Executor, typename std::decay<Future>::type, F, Ts...
Executor, typename std::decay<Future>::type, F
>::type
then_execute_helper(Executor const& exec, F && f, Future && predecessor,
Ts &&... ts)
then_execute_helper(Executor const& exec, F && f, Future && predecessor)
{
// simply forward this to executor
return parallel::execution::then_execute(exec, std::forward<F>(f),
predecessor, std::forward<Ts>(ts)...);
predecessor);
}
}}}

Expand Down

0 comments on commit dcb3559

Please sign in to comment.