Skip to content

Commit

Permalink
Fixing a race with timed suspension
Browse files Browse the repository at this point in the history
When doing a timed suspension on a thread, there were several race conditions
between setting the different participating thread states, which should now be
fixed. The problem was observable with the wait_for_1751 regression test.
  • Loading branch information
Thomas Heller committed Feb 7, 2018
1 parent 3d86300 commit 3f00c42
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 23 deletions.
61 changes: 39 additions & 22 deletions hpx/runtime/threads/detail/set_thread_state.hpp
Expand Up @@ -16,6 +16,7 @@
#include <hpx/runtime/threads/thread_helpers.hpp>
#include <hpx/runtime_fwd.hpp>
#include <hpx/throw_exception.hpp>
#include <hpx/util/detail/yield_k.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/io_service_pool.hpp>
Expand Down Expand Up @@ -244,7 +245,8 @@ namespace hpx { namespace threads { namespace detail
thread_id_type const& thrd, thread_state_enum newstate,
thread_state_ex_enum newstate_ex, thread_priority priority,
thread_id_type const& timer_id,
std::shared_ptr<std::atomic<bool> > const& triggered)
std::shared_ptr<std::atomic<bool> > const& triggered,
thread_state_ex_enum my_statex)
{
if (HPX_UNLIKELY(!thrd)) {
HPX_THROW_EXCEPTION(null_thread_id,
Expand All @@ -259,18 +261,15 @@ namespace hpx { namespace threads { namespace detail
return thread_result_type(terminated, invalid_thread_id);
}

bool oldvalue = false;
if (triggered->compare_exchange_strong(oldvalue, true)) //-V601
HPX_ASSERT(my_statex == wait_abort || my_statex == wait_timeout);

if (!triggered->load())
{
// timer has not been canceled yet, trigger the requested set_state
detail::set_thread_state(thrd, newstate, newstate_ex, priority);
error_code ec(lightweight); // do not throw
detail::set_thread_state(timer_id, pending, my_statex,
thread_priority_boost, std::size_t(-1), ec);
}

// then re-activate the thread holding the deadline_timer
error_code ec(lightweight); // do not throw
detail::set_thread_state(timer_id, pending, wait_timeout,
thread_priority_boost, std::size_t(-1), ec);

return thread_result_type(terminated, invalid_thread_id);
}

Expand All @@ -280,7 +279,8 @@ namespace hpx { namespace threads { namespace detail
thread_result_type at_timer(SchedulingPolicy& scheduler,
util::steady_clock::time_point& abs_time,
thread_id_type const& thrd, thread_state_enum newstate,
thread_state_ex_enum newstate_ex, thread_priority priority)
thread_state_ex_enum newstate_ex, thread_priority priority,
std::atomic<bool>& started)
{
if (HPX_UNLIKELY(!thrd)) {
HPX_THROW_EXCEPTION(null_thread_id,
Expand All @@ -300,7 +300,8 @@ namespace hpx { namespace threads { namespace detail
thread_init_data data(
util::bind(&wake_timer_thread,
thrd, newstate, newstate_ex, priority,
self_id, triggered),
self_id, triggered,
hpx::util::placeholders::_1),
"wake_timer", 0, priority);

thread_id_type wake_id = invalid_thread_id;
Expand All @@ -314,27 +315,40 @@ namespace hpx { namespace threads { namespace detail
get_thread_pool("timer-pool")->get_io_service(), abs_time);

// let the timer invoke the set_state on the new (suspended) thread
t.async_wait(util::bind(&detail::set_thread_state,
wake_id, pending, wait_timeout, priority,
std::size_t(-1), std::ref(throws)));
t.async_wait(
[wake_id, priority](const boost::system::error_code& ec)
{
if (ec.value() == boost::system::errc::operation_canceled)
{
detail::set_thread_state(wake_id, pending, wait_abort,
priority, std::size_t(-1), throws);
}
else
{
detail::set_thread_state(wake_id, pending, wait_timeout,
priority, std::size_t(-1), throws);
}
});

started.store(true);

// this waits for the thread to be reactivated when the timer fired
// if it returns signaled the timer has been canceled, otherwise
// the timer fired and the wake_timer_thread above has been executed
thread_state_ex_enum statex =
get_self().yield(thread_result_type(suspended, invalid_thread_id));

HPX_ASSERT(statex == wait_abort || statex == wait_timeout);

if (wait_timeout != statex) //-V601
{
triggered->store(true);

// wake_timer_thread has not been executed yet, cancel timer
t.cancel();

// cancel wake_timer_thread
error_code ec(lightweight); // do not throw
detail::set_thread_state(wake_id, pending, wait_abort,
priority, std::size_t(-1), ec);
}
else
{
detail::set_thread_state(thrd, newstate, newstate_ex, priority);
}

return thread_result_type(terminated, invalid_thread_id);
Expand All @@ -357,14 +371,17 @@ namespace hpx { namespace threads { namespace detail

// this creates a new thread which creates the timer and handles the
// requested actions
std::atomic<bool> started(false);
thread_init_data data(
util::bind(&at_timer<SchedulingPolicy>,
std::ref(scheduler), abs_time.value(), thrd, newstate, newstate_ex,
priority),
priority, std::ref(started)),
"at_timer (expire at)", 0, priority, thread_num);

thread_id_type newid = invalid_thread_id;
create_thread(&scheduler, data, newid, pending, true, ec); //-V601
hpx::util::detail::yield_while(
[&started]() { return !started.load(); }, "set_thread_state_timed");
return newid;
}

Expand Down
1 change: 1 addition & 0 deletions src/runtime/threads/thread_helpers.cpp
Expand Up @@ -579,6 +579,7 @@ namespace hpx { namespace this_thread

if (statex != threads::wait_timeout)
{
HPX_ASSERT(statex == threads::wait_abort || statex == threads::wait_signaled);
error_code ec1(lightweight); // do not throw
threads::set_thread_state(timer_id,
threads::pending, threads::wait_abort,
Expand Down
2 changes: 1 addition & 1 deletion tests/regressions/lcos/wait_for_1751.cpp
Expand Up @@ -35,7 +35,7 @@ int hpx_main()
auto now = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> dif = now - start_time;

HPX_TEST_LTE(dif.count(), 1.0);
HPX_TEST_LTE(dif.count(), 1.01);
break;
}
else
Expand Down

0 comments on commit 3f00c42

Please sign in to comment.