Skip to content

Commit

Permalink
Making sure future::wait_for et.al. work properly for action results
Browse files Browse the repository at this point in the history
- this fixes #2796
  • Loading branch information
hkaiser committed Aug 2, 2017
1 parent 68bd40c commit 49ff4d8
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 32 deletions.
12 changes: 6 additions & 6 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -961,12 +961,6 @@ namespace detail
return started_;
}

bool started_test_and_set()
{
std::lock_guard<mutex_type> l(this->mtx_);
return started_test_and_set_locked(l);
}

template <typename Lock>
bool started_test_and_set_locked(Lock& l)
{
Expand All @@ -979,6 +973,12 @@ namespace detail
}

protected:
bool started_test_and_set()
{
std::lock_guard<mutex_type> l(this->mtx_);
return started_test_and_set_locked(l);
}

void check_started()
{
std::unique_lock<mutex_type> l(this->mtx_);
Expand Down
5 changes: 5 additions & 0 deletions hpx/lcos/detail/promise_base.hpp
Expand Up @@ -47,6 +47,11 @@ namespace lcos {
f_ = std::move(f);
}

void mark_as_started()
{
this->task_base<Result>::started_test_and_set();
}

private:
void do_run()
{
Expand Down
69 changes: 43 additions & 26 deletions hpx/lcos/packaged_action.hpp
Expand Up @@ -135,6 +135,8 @@ namespace lcos {
std::move(cont_id), std::move(addr_)),
id, priority, std::move(f), std::forward<Ts>(vs)...);
}

this->shared_state_->mark_as_started();
}

template <typename... Ts>
Expand All @@ -159,6 +161,8 @@ namespace lcos {
actions::typed_continuation<Result, remote_result_type>(
std::move(cont_id), std::move(addr_)),
id, priority, std::move(f), std::forward<Ts>(vs)...);

this->shared_state_->mark_as_started();
}

template <typename Callback, typename... Ts>
Expand Down Expand Up @@ -198,6 +202,8 @@ namespace lcos {
std::move(cont_id), std::move(addr_)),
id, priority, std::move(cb), std::forward<Ts>(vs)...);
}

this->shared_state_->mark_as_started();
}

template <typename Callback, typename... Ts>
Expand Down Expand Up @@ -226,6 +232,8 @@ namespace lcos {
actions::typed_continuation<Result, remote_result_type>(
std::move(cont_id), std::move(addr_)),
id, priority, std::move(f), std::forward<Ts>(vs)...);

this->shared_state_->mark_as_started();
}

public:
Expand Down Expand Up @@ -391,17 +399,20 @@ namespace lcos {
if (!r.first)
{
// local, direct execution
this->shared_state_->set_data(
action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));
auto && result = action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...);
this->shared_state_->mark_as_started();
this->shared_state_->set_data(std::move(result));
return;
}
}
else
{
// local, direct execution
this->shared_state_->set_data(action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));
auto && result = action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...);
this->shared_state_->mark_as_started();
this->shared_state_->set_data(std::move(result));
return;
}
}
Expand All @@ -424,25 +435,27 @@ namespace lcos {
traits::component_type_is_compatible<component_type>::call(
addr));

if (traits::component_supports_migration<
component_type>::call())
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
// local, direct execution
this->shared_state_->set_data(
action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));
auto && result = action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...);
this->shared_state_->mark_as_started();
this->shared_state_->set_data(std::move(result));
return;
}
}
else
{
// local, direct execution
this->shared_state_->set_data(action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));
auto && result = action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...);
this->shared_state_->mark_as_started();
this->shared_state_->set_data(std::move(result));
return;
}
}
Expand All @@ -466,17 +479,17 @@ namespace lcos {
traits::component_type_is_compatible<component_type>::call(
addr));

if (traits::component_supports_migration<
component_type>::call())
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
// local, direct execution
this->shared_state_->set_data(
action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));
auto && result = action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...);
this->shared_state_->mark_as_started();
this->shared_state_->set_data(std::move(result));

// invoke callback
cb(boost::system::error_code(), parcelset::parcel());
Expand All @@ -486,8 +499,10 @@ namespace lcos {
else
{
// local, direct execution
this->shared_state_->set_data(action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));
auto && result = action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...);
this->shared_state_->mark_as_started();
this->shared_state_->set_data(std::move(result));

// invoke callback
cb(boost::system::error_code(), parcelset::parcel());
Expand All @@ -513,17 +528,17 @@ namespace lcos {
traits::component_type_is_compatible<component_type>::call(
addr));

if (traits::component_supports_migration<
component_type>::call())
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
// local, direct execution
this->shared_state_->set_data(
action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));
auto && result = action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...);
this->shared_state_->mark_as_started();
this->shared_state_->set_data(std::move(result));

// invoke callback
cb(boost::system::error_code(), parcelset::parcel());
Expand All @@ -533,8 +548,10 @@ namespace lcos {
else
{
// local, direct execution
this->shared_state_->set_data(action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));
auto && result = action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...);
this->shared_state_->mark_as_started();
this->shared_state_->set_data(std::move(result));

// invoke callback
cb(boost::system::error_code(), parcelset::parcel());
Expand Down
2 changes: 2 additions & 0 deletions tests/regressions/lcos/CMakeLists.txt
Expand Up @@ -48,6 +48,7 @@ set(tests
split_future_2246
wait_all_hang_1946
wait_for_1751
wait_for_action_2796
when_all_vectors_1623
)

Expand Down Expand Up @@ -75,6 +76,7 @@ set(sliding_semaphore_2338_PARAMETERS THREADS_PER_LOCALITY 2)
set(wait_for_1751_PARAMETERS THREADS_PER_LOCALITY 4)
set(wait_all_hang_1946_PARAMETERS THREADS_PER_LOCALITY 8)
set(wait_all_hang_1946_FLAGS DEPENDENCIES iostreams_component)
set(wait_for_action_2796_PARAMETERS LOCALITIES 2)

# Create test cases
foreach(test ${tests})
Expand Down
44 changes: 44 additions & 0 deletions tests/regressions/lcos/wait_for_action_2796.cpp
@@ -0,0 +1,44 @@
// Copyright (c) 2017 KADichev
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_main.hpp>
#include <hpx/include/actions.hpp>
#include <hpx/include/lcos.hpp>
#include <hpx/util/lightweight_test.hpp>

#include <boost/atomic.hpp>

#include <chrono>

boost::atomic<bool> called(false);

void f()
{
called.store(true);
}

HPX_PLAIN_ACTION(f, f_action);
HPX_PLAIN_DIRECT_ACTION(f, f_direct_action);

int main()
{
called.store(false);
{
auto fut = hpx::async<f_action>(hpx::find_here());
auto status = fut.wait_for(std::chrono::seconds(3));
HPX_TEST(status != hpx::lcos::future_status::deferred);
HPX_TEST(called.load());
}

called.store(false);
{
auto fut = hpx::async<f_direct_action>(hpx::find_here());
auto status = fut.wait_for(std::chrono::seconds(3));
HPX_TEST(status != hpx::lcos::future_status::deferred);
HPX_TEST(called.load());
}

return 0;
}

0 comments on commit 49ff4d8

Please sign in to comment.