Skip to content

Commit

Permalink
Fixing return type calculation for bulk_then_execute.
Browse files Browse the repository at this point in the history
- flyby: added util::functional::unwrap[_n|_all]
  • Loading branch information
hkaiser committed Feb 23, 2018
1 parent 6fbd8e1 commit a83c81b
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 20 deletions.
34 changes: 26 additions & 8 deletions hpx/parallel/executors/execution.hpp
Expand Up @@ -12,10 +12,12 @@
#include <hpx/config.hpp>
#include <hpx/parallel/executors/execution_fwd.hpp>

#include <hpx/dataflow.hpp>
#include <hpx/exception_list.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/lcos/wait_all.hpp>
#include <hpx/traits/detail/wrap_int.hpp>
#include <hpx/traits/executor_traits.hpp>
#include <hpx/traits/future_access.hpp>
#include <hpx/traits/future_then_result.hpp>
#include <hpx/traits/future_traits.hpp>
Expand Down Expand Up @@ -1133,6 +1135,7 @@ namespace hpx { namespace parallel { namespace execution
typedef typename bulk_then_execute_result<
F, Shape, Future, Ts...
>::type result_type;

typedef typename hpx::traits::detail::shared_state_ptr<
result_type
>::type shared_state_type;
Expand Down Expand Up @@ -1304,15 +1307,17 @@ namespace hpx { namespace parallel { namespace execution
Future&& predecessor, Ts &&... ts)
-> typename hpx::traits::executor_future<
Executor,
typename then_bulk_function_result<
typename bulk_then_execute_result<
F, Shape, Future, Ts...
>::type
>::type
{
// result_of_t<F(Shape::value_type, Future)>
typedef typename then_bulk_function_result<
F, Shape, Future, Ts...
>::type func_result_type;

// std::vector<future<func_result_type>>
typedef std::vector<typename hpx::traits::executor_future<
Executor, func_result_type, Ts...
>::type> result_type;
Expand All @@ -1321,18 +1326,31 @@ namespace hpx { namespace parallel { namespace execution
exec, std::forward<F>(f), shape,
hpx::util::make_tuple(std::forward<Ts>(ts)...));

// void or std::vector<func_result_type>
typedef typename bulk_then_execute_result<
F, Shape, Future, Ts...
>::type vector_result_type;

// future<vector_result_type>
typedef typename hpx::traits::executor_future<
Executor, vector_result_type
>::type result_future_type;

typedef typename hpx::traits::detail::shared_state_ptr<
result_type
result_future_type
>::type shared_state_type;

shared_state_type p =
lcos::detail::make_continuation_exec<result_type>(
lcos::detail::make_continuation_exec<result_future_type>(
std::forward<Future>(predecessor),
std::forward<BulkExecutor>(exec), std::move(func));

typedef typename hpx::traits::executor_future<
Executor, result_type
>::type result_future_type;
std::forward<BulkExecutor>(exec),
[HPX_CAPTURE_MOVE(func)](Future&& predecessor) mutable
-> result_future_type
{
return hpx::dataflow(
hpx::util::functional::unwrap{},
func(std::forward<Future>(predecessor)));
});

return hpx::traits::future_access<result_future_type>::create(
std::move(p));
Expand Down
22 changes: 18 additions & 4 deletions hpx/parallel/executors/thread_execution.hpp
Expand Up @@ -9,6 +9,7 @@
#define HPX_PARALLEL_EXECUTORS_THREAD_EXECUTION_JAN_03_2017_1145AM

#include <hpx/config.hpp>
#include <hpx/dataflow.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/lcos/local/futures_factory.hpp>
#include <hpx/runtime/threads/thread_executor.hpp>
Expand Down Expand Up @@ -192,22 +193,35 @@ namespace hpx { namespace threads
>::type func_result_type;

typedef std::vector<hpx::lcos::future<func_result_type> > result_type;
typedef hpx::lcos::future<result_type> result_future_type;

auto func =
parallel::execution::detail::make_fused_bulk_async_execute_helper<
result_type
>(exec, std::forward<F>(f), shape,
hpx::util::make_tuple(std::forward<Ts>(ts)...));

// void or std::vector<func_result_type>
typedef typename parallel::execution::detail::bulk_then_execute_result<
F, Shape, Future, Ts...
>::type vector_result_type;

typedef hpx::future<vector_result_type> result_future_type;

typedef typename hpx::traits::detail::shared_state_ptr<
result_type
result_future_type
>::type shared_state_type;

shared_state_type p =
lcos::detail::make_continuation_exec<result_type>(
lcos::detail::make_continuation_exec<result_future_type>(
std::forward<Future>(predecessor),
std::forward<Executor>(exec), std::move(func));
std::forward<Executor>(exec),
[HPX_CAPTURE_MOVE(func)](Future&& predecessor) mutable
-> result_future_type
{
return hpx::dataflow(
hpx::util::functional::unwrap{},
func(std::forward<Future>(predecessor)));
});

return hpx::traits::future_access<result_future_type>::create(
std::move(p));
Expand Down
32 changes: 30 additions & 2 deletions hpx/traits/future_access.hpp
Expand Up @@ -127,13 +127,26 @@ namespace hpx { namespace traits
return lcos::future<R>(shared_state);
}

template <typename SharedState>
static lcos::future<R>
create(boost::intrusive_ptr<SharedState> && shared_state)
create(typename detail::shared_state_ptr_for<
lcos::future<lcos::future<R>>>::type const& shared_state)
{
return lcos::future<lcos::future<R>>(shared_state);
}

template <typename SharedState>
static lcos::future<R> create(
boost::intrusive_ptr<SharedState>&& shared_state)
{
return lcos::future<R>(std::move(shared_state));
}

static lcos::future<R> create(typename detail::shared_state_ptr_for<
lcos::future<lcos::future<R>>>::type&& shared_state)
{
return lcos::future<lcos::future<R>>(std::move(shared_state));
}

template <typename SharedState>
static lcos::future<R>
create(SharedState* shared_state)
Expand Down Expand Up @@ -169,13 +182,28 @@ namespace hpx { namespace traits
return lcos::shared_future<R>(shared_state);
}

static lcos::shared_future<R> create(
typename detail::shared_state_ptr_for<
lcos::shared_future<lcos::future<R>>>::type const& shared_state)
{
return lcos::shared_future<lcos::future<R>>(shared_state);
}

template <typename SharedState>
static lcos::shared_future<R>
create(boost::intrusive_ptr<SharedState> && shared_state)
{
return lcos::shared_future<R>(std::move(shared_state));
}

static lcos::shared_future<R> create(
typename detail::shared_state_ptr_for<
lcos::shared_future<lcos::future<R>>>::type&& shared_state)
{
return lcos::shared_future<lcos::future<R>>(
std::move(shared_state));
}

template <typename SharedState>
static lcos::shared_future<R>
create(SharedState* shared_state)
Expand Down
54 changes: 54 additions & 0 deletions hpx/util/unwrap.hpp
Expand Up @@ -68,6 +68,24 @@ namespace util {
return detail::unwrap_depth_impl<1U>(std::forward<Args>(args)...);
}

namespace functional
{
/// A helper function object for functionally invoking
/// `hpx::util::unwrap`. For more information please refer to its
/// documentation.
struct unwrap
{
/// \cond NOINTERNAL
template <typename... Args>
auto operator()(Args&&... args)
-> decltype(util::unwrap(std::forward<Args>(args)...))
{
return util::unwrap(std::forward<Args>(args)...);
}
/// \endcond
};
}

/// An alterntive version of hpx::util::unwrap(), which unwraps the given
/// arguments to a certain depth of hpx::lcos::future like objects.
///
Expand All @@ -84,6 +102,24 @@ namespace util {
return detail::unwrap_depth_impl<Depth>(std::forward<Args>(args)...);
}

namespace functional
{
/// A helper function object for functionally invoking
/// `hpx::util::unwrap_n`. For more information please refer to its
/// documentation.
struct unwrap_n
{
/// \cond NOINTERNAL
template <typename... Args>
auto operator()(Args&&... args)
-> decltype(util::unwrap_n(std::forward<Args>(args)...))
{
return util::unwrap_n(std::forward<Args>(args)...);
}
/// \endcond
};
}

/// An alterntive version of hpx::util::unwrap(), which unwraps the given
/// arguments recursively so that all contained hpx::lcos::future like
/// objects are replaced by their actual value.
Expand All @@ -97,6 +133,24 @@ namespace util {
return detail::unwrap_depth_impl<0U>(std::forward<Args>(args)...);
}

namespace functional
{
/// A helper function object for functionally invoking
/// `hpx::util::unwrap_all`. For more information please refer to its
/// documentation.
struct unwrap_all
{
/// \cond NOINTERNAL
template <typename... Args>
auto operator()(Args&&... args)
-> decltype(util::unwrap_all(std::forward<Args>(args)...))
{
return util::unwrap_all(std::forward<Args>(args)...);
}
/// \endcond
};
}

/// Returns a callable object which unwraps its arguments upon
/// invocation using the hpx::util::unwrap() function and then passes
/// the result to the given callable object.
Expand Down
5 changes: 0 additions & 5 deletions src/runtime/agas/detail/hosted_locality_namespace.cpp
Expand Up @@ -22,11 +22,6 @@
#include <string>
#include <vector>

HPX_REGISTER_BASE_LCO_WITH_VALUE_ID(
hpx::parcelset::endpoints_type, parcelset_endpoints_type,
hpx::actions::base_lco_with_value_parcelset_endpoints_get,
hpx::actions::base_lco_with_value_parcelset_endpoints_set)

namespace hpx { namespace agas { namespace detail
{
hosted_locality_namespace::hosted_locality_namespace(naming::address addr)
Expand Down
5 changes: 5 additions & 0 deletions src/runtime/agas/locality_namespace.cpp
Expand Up @@ -24,6 +24,11 @@ HPX_DEFINE_COMPONENT_NAME(locality_namespace,
HPX_DEFINE_GET_COMPONENT_TYPE_STATIC(
locality_namespace, component_agas_locality_namespace)

HPX_REGISTER_BASE_LCO_WITH_VALUE_ID(
hpx::parcelset::endpoints_type, parcelset_endpoints_type,
hpx::actions::base_lco_with_value_parcelset_endpoints_get,
hpx::actions::base_lco_with_value_parcelset_endpoints_set)

HPX_REGISTER_ACTION_ID(
locality_namespace::allocate_action,
locality_namespace_allocate_action,
Expand Down
3 changes: 2 additions & 1 deletion tests/regressions/parallel/executors/CMakeLists.txt
@@ -1,9 +1,10 @@
# Copyright (c) 2014-2015 Hartmut Kaiser
# Copyright (c) 2014-2018 Hartmut Kaiser
#
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests
bulk_then_execute_3182
handled_exception_2959
is_executor_1691
)
Expand Down
89 changes: 89 additions & 0 deletions tests/regressions/parallel/executors/bulk_then_execute_3182.cpp
@@ -0,0 +1,89 @@
// Copyright (c) 2018 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// #3182: bulk_then_execute has unexpected return type/does not compile

#include <hpx/hpx.hpp>
#include <hpx/hpx_init.hpp>
#include <hpx/util/lightweight_test.hpp>

#include <hpx/include/parallel_executors.hpp>
#include <hpx/include/parallel_algorithm.hpp>

#include <algorithm>
#include <atomic>
#include <vector>

///////////////////////////////////////////////////////////////////////////////
std::atomic<int> void_count(0);
void fun1(int, hpx::shared_future<int>& f)
{
HPX_TEST(f.is_ready());
HPX_TEST_EQ(f.get(), 42);

++void_count;
}

std::atomic<int> int_count(0);
int fun2(int i, hpx::shared_future<int>& f)
{
HPX_TEST(f.is_ready());
HPX_TEST_EQ(f.get(), 42);

++int_count;
return i;
}

template <typename Executor>
void test_bulk_then_execute(Executor && exec)
{
hpx::shared_future<int> f = hpx::make_ready_future(42);
std::vector<int> v(100);
std::iota(v.begin(), v.end(), 0);

{
hpx::future<void> fut =
hpx::parallel::execution::bulk_then_execute(exec, &fun1, v, f);
fut.get();

HPX_TEST_EQ(void_count.load(), 100);
}

{
hpx::future<std::vector<int>> fut =
hpx::parallel::execution::bulk_then_execute(exec, &fun2, v, f);
auto result = fut.get();

HPX_TEST_EQ(int_count.load(), 100);
HPX_TEST(result == v);
}
}

int hpx_main(int argc, char* argv[])
{
{
void_count.store(0);
int_count.store(0);

hpx::parallel::execution::parallel_executor exec;
test_bulk_then_execute(exec);
}

{
void_count.store(0);
int_count.store(0);

hpx::parallel::execution::pool_executor exec{"default"};
test_bulk_then_execute(exec);
}

return hpx::finalize();
}

int main(int argc, char* argv[])
{
return hpx::init(argc, argv);
}

0 comments on commit a83c81b

Please sign in to comment.