Skip to content

Commit

Permalink
Fixing a race with timed suspension
Browse files Browse the repository at this point in the history
When doing a timed suspension on a thread, there were several race conditions
between setting the different participating thread states, which should now be
fixed. The problem was observable with the wait_for_1751 regression test.
  • Loading branch information
Thomas Heller committed Feb 8, 2018
1 parent c807a4e commit 81b2856
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 39 deletions.
3 changes: 2 additions & 1 deletion hpx/runtime/threads/detail/scheduled_thread_pool_impl.hpp
Expand Up @@ -708,8 +708,9 @@ namespace hpx { namespace threads { namespace detail
thread_state_enum newstate, thread_state_ex_enum newstate_ex,
thread_priority priority, error_code& ec)
{
std::atomic<bool> timer_started(false);
return detail::set_thread_state_timed(*sched_, abs_time, id, newstate,
newstate_ex, priority, get_worker_thread_num(), ec);
newstate_ex, priority, get_worker_thread_num(), timer_started, ec);
}

///////////////////////////////////////////////////////////////////////////
Expand Down
73 changes: 44 additions & 29 deletions hpx/runtime/threads/detail/set_thread_state.hpp
Expand Up @@ -17,6 +17,7 @@
#include <hpx/runtime_fwd.hpp>
#include <hpx/throw_exception.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/bind_front.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/io_service_pool.hpp>
#include <hpx/util/logging.hpp>
Expand Down Expand Up @@ -244,7 +245,8 @@ namespace hpx { namespace threads { namespace detail
thread_id_type const& thrd, thread_state_enum newstate,
thread_state_ex_enum newstate_ex, thread_priority priority,
thread_id_type const& timer_id,
std::shared_ptr<std::atomic<bool> > const& triggered)
std::shared_ptr<std::atomic<bool> > const& triggered,
thread_state_ex_enum my_statex)
{
if (HPX_UNLIKELY(!thrd)) {
HPX_THROW_EXCEPTION(null_thread_id,
Expand All @@ -259,18 +261,15 @@ namespace hpx { namespace threads { namespace detail
return thread_result_type(terminated, invalid_thread_id);
}

bool oldvalue = false;
if (triggered->compare_exchange_strong(oldvalue, true)) //-V601
HPX_ASSERT(my_statex == wait_abort || my_statex == wait_timeout);

if (!triggered->load())
{
// timer has not been canceled yet, trigger the requested set_state
detail::set_thread_state(thrd, newstate, newstate_ex, priority);
error_code ec(lightweight); // do not throw
detail::set_thread_state(timer_id, pending, my_statex,
thread_priority_boost, std::size_t(-1), ec);
}

// then re-activate the thread holding the deadline_timer
error_code ec(lightweight); // do not throw
detail::set_thread_state(timer_id, pending, wait_timeout,
thread_priority_boost, std::size_t(-1), ec);

return thread_result_type(terminated, invalid_thread_id);
}

Expand All @@ -280,7 +279,8 @@ namespace hpx { namespace threads { namespace detail
thread_result_type at_timer(SchedulingPolicy& scheduler,
util::steady_clock::time_point& abs_time,
thread_id_type const& thrd, thread_state_enum newstate,
thread_state_ex_enum newstate_ex, thread_priority priority)
thread_state_ex_enum newstate_ex, thread_priority priority,
std::atomic<bool>& started)
{
if (HPX_UNLIKELY(!thrd)) {
HPX_THROW_EXCEPTION(null_thread_id,
Expand All @@ -298,7 +298,7 @@ namespace hpx { namespace threads { namespace detail
std::make_shared<std::atomic<bool> >(false));

thread_init_data data(
util::bind(&wake_timer_thread,
util::bind_front(&wake_timer_thread,
thrd, newstate, newstate_ex, priority,
self_id, triggered),
"wake_timer", 0, priority);
Expand All @@ -314,27 +314,40 @@ namespace hpx { namespace threads { namespace detail
get_thread_pool("timer-pool")->get_io_service(), abs_time);

// let the timer invoke the set_state on the new (suspended) thread
t.async_wait(util::bind(&detail::set_thread_state,
wake_id, pending, wait_timeout, priority,
std::size_t(-1), std::ref(throws)));
t.async_wait(
[wake_id, priority](const boost::system::error_code& ec)
{
if (ec.value() == boost::system::errc::operation_canceled)
{
detail::set_thread_state(wake_id, pending, wait_abort,
priority, std::size_t(-1), throws);
}
else
{
detail::set_thread_state(wake_id, pending, wait_timeout,
priority, std::size_t(-1), throws);
}
});

started.store(true);

// this waits for the thread to be reactivated when the timer fired
// if it returns signaled the timer has been canceled, otherwise
// the timer fired and the wake_timer_thread above has been executed
thread_state_ex_enum statex =
get_self().yield(thread_result_type(suspended, invalid_thread_id));

HPX_ASSERT(statex == wait_abort || statex == wait_timeout);

if (wait_timeout != statex) //-V601
{
triggered->store(true);

// wake_timer_thread has not been executed yet, cancel timer
t.cancel();

// cancel wake_timer_thread
error_code ec(lightweight); // do not throw
detail::set_thread_state(wake_id, pending, wait_abort,
priority, std::size_t(-1), ec);
}
else
{
detail::set_thread_state(thrd, newstate, newstate_ex, priority);
}

return thread_result_type(terminated, invalid_thread_id);
Expand All @@ -346,7 +359,8 @@ namespace hpx { namespace threads { namespace detail
thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler,
util::steady_time_point const& abs_time, thread_id_type const& thrd,
thread_state_enum newstate, thread_state_ex_enum newstate_ex,
thread_priority priority, std::size_t thread_num, error_code& ec)
thread_priority priority, std::size_t thread_num,
std::atomic<bool>& started, error_code& ec)
{
if (HPX_UNLIKELY(!thrd)) {
HPX_THROWS_IF(ec, null_thread_id,
Expand All @@ -360,7 +374,7 @@ namespace hpx { namespace threads { namespace detail
thread_init_data data(
util::bind(&at_timer<SchedulingPolicy>,
std::ref(scheduler), abs_time.value(), thrd, newstate, newstate_ex,
priority),
priority, std::ref(started)),
"at_timer (expire at)", 0, priority, thread_num);

thread_id_type newid = invalid_thread_id;
Expand All @@ -371,10 +385,10 @@ namespace hpx { namespace threads { namespace detail
template <typename SchedulingPolicy>
thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler,
util::steady_time_point const& abs_time, thread_id_type const& id,
error_code& ec)
std::atomic<bool>& started, error_code& ec)
{
return set_thread_state_timed(scheduler, abs_time, id, pending,
wait_timeout, thread_priority_normal, std::size_t(-1), ec);
wait_timeout, thread_priority_normal, std::size_t(-1), started, ec);
}

/// Set a timer to set the state of the given \a thread to the given
Expand All @@ -383,19 +397,20 @@ namespace hpx { namespace threads { namespace detail
thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler,
util::steady_duration const& rel_time, thread_id_type const& thrd,
thread_state_enum newstate, thread_state_ex_enum newstate_ex,
thread_priority priority, std::size_t thread_num, error_code& ec)
thread_priority priority, std::size_t thread_num,
std::atomic<bool>& started, error_code& ec)
{
return set_thread_state_timed(scheduler, rel_time.from_now(), thrd,
newstate, newstate_ex, priority, thread_num, ec);
newstate, newstate_ex, priority, thread_num, started, ec);
}

template <typename SchedulingPolicy>
thread_id_type set_thread_state_timed(SchedulingPolicy& scheduler,
util::steady_duration const& rel_time, thread_id_type const& thrd,
error_code& ec)
std::atomic<bool>& started, error_code& ec)
{
return set_thread_state_timed(scheduler, rel_time.from_now(), thrd,
pending, wait_timeout, thread_priority_normal, std::size_t(-1), ec);
pending, wait_timeout, thread_priority_normal, std::size_t(-1), started, ec);
}
}}}

Expand Down
14 changes: 14 additions & 0 deletions hpx/runtime/threads/thread_helpers.hpp
Expand Up @@ -23,6 +23,7 @@
#include <hpx/util/steady_clock.hpp>
#include <hpx/util/thread_description.hpp>

#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
Expand Down Expand Up @@ -104,11 +105,24 @@ namespace hpx { namespace threads
/// of hpx#exception.
HPX_API_EXPORT thread_id_type set_thread_state(thread_id_type const& id,
util::steady_time_point const& abs_time,
std::atomic<bool>& started,
thread_state_enum state = pending,
thread_state_ex_enum stateex = wait_timeout,
thread_priority priority = thread_priority_normal,
error_code& ec = throws);

inline thread_id_type set_thread_state(thread_id_type const& id,
util::steady_time_point const& abs_time,
thread_state_enum state = pending,
thread_state_ex_enum stateex = wait_timeout,
thread_priority priority = thread_priority_normal,
error_code& ec = throws)
{
std::atomic<bool> timer_started(false);
return set_thread_state(id, abs_time, timer_started, state, stateex,
priority, throws);
}

///////////////////////////////////////////////////////////////////////////
/// \brief Set the thread state of the \a thread referenced by the
/// thread_id \a id.
Expand Down
3 changes: 2 additions & 1 deletion src/runtime/threads/executors/current_executor.cpp
Expand Up @@ -90,8 +90,9 @@ namespace hpx { namespace threads { namespace executors { namespace detail
HPX_ASSERT(invalid_thread_id != id); // would throw otherwise

// now schedule new thread for execution
std::atomic<bool> timer_started(false);
threads::detail::set_thread_state_timed(
*scheduler_base_, abs_time, id, ec);
*scheduler_base_, abs_time, id, timer_started, ec);
if (ec) return;

if (&ec != &throws)
Expand Down
4 changes: 3 additions & 1 deletion src/runtime/threads/executors/this_thread_executors.cpp
Expand Up @@ -196,7 +196,9 @@ namespace hpx { namespace threads { namespace executors { namespace detail
++tasks_scheduled_;

// now schedule new thread for execution
threads::detail::set_thread_state_timed(scheduler_, abs_time, id, ec);
std::atomic<bool> timer_started(false);
threads::detail::set_thread_state_timed(scheduler_, abs_time, id,
timer_started, ec);
if (ec) {
--tasks_scheduled_;
return;
Expand Down
4 changes: 3 additions & 1 deletion src/runtime/threads/executors/thread_pool_executors.cpp
Expand Up @@ -208,7 +208,9 @@ namespace hpx { namespace threads { namespace executors { namespace detail
++tasks_scheduled_;

// now schedule new thread for execution
threads::detail::set_thread_state_timed(scheduler_, abs_time, id, ec);
std::atomic<bool> timer_started(false);
threads::detail::set_thread_state_timed(scheduler_, abs_time, id,
timer_started, ec);
if (ec) {
--tasks_scheduled_;
return;
Expand Down
18 changes: 14 additions & 4 deletions src/runtime/threads/thread_helpers.cpp
Expand Up @@ -16,6 +16,7 @@
#include <hpx/runtime/threads/executors/current_executor.hpp>
#include <hpx/runtime/threads/thread_data_fwd.hpp>
#include <hpx/runtime/threads/thread_enums.hpp>
#include <hpx/util/assert.hpp>
#ifdef HPX_HAVE_THREAD_BACKTRACE_ON_SUSPENSION
#include <hpx/util/backtrace.hpp>
#endif
Expand All @@ -25,6 +26,7 @@
#include <hpx/util/steady_clock.hpp>
#include <hpx/util/thread_description.hpp>
#include <hpx/util/thread_specific_ptr.hpp>
#include <hpx/util/yield_while.hpp>

#include <cstddef>
#include <limits>
Expand All @@ -48,11 +50,12 @@ namespace hpx { namespace threads

///////////////////////////////////////////////////////////////////////////
thread_id_type set_thread_state(thread_id_type const& id,
util::steady_time_point const& abs_time, thread_state_enum state,
thread_state_ex_enum stateex, thread_priority priority, error_code& ec)
util::steady_time_point const& abs_time, std::atomic<bool>& timer_started,
thread_state_enum state, thread_state_ex_enum stateex,
thread_priority priority, error_code& ec)
{
return detail::set_thread_state_timed(*id->get_scheduler_base(), abs_time, id,
state, stateex, priority, std::size_t(-1), ec);
state, stateex, priority, std::size_t(-1), timer_started, ec);
}

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -556,8 +559,9 @@ namespace hpx { namespace this_thread
#ifdef HPX_HAVE_THREAD_BACKTRACE_ON_SUSPENSION
detail::reset_backtrace bt(id, ec);
#endif
std::atomic<bool> timer_started(false);
threads::thread_id_type timer_id = threads::set_thread_state(id,
abs_time, threads::pending, threads::wait_timeout,
abs_time, timer_started, threads::pending, threads::wait_timeout,
threads::thread_priority_boost, ec);
if (ec) return threads::wait_unknown;

Expand All @@ -579,7 +583,13 @@ namespace hpx { namespace this_thread

if (statex != threads::wait_timeout)
{
HPX_ASSERT(
statex == threads::wait_abort ||
statex == threads::wait_signaled);
error_code ec1(lightweight); // do not throw
hpx::util::yield_while(
[&timer_started]() { return !timer_started.load(); },
"set_thread_state_timed");
threads::set_thread_state(timer_id,
threads::pending, threads::wait_abort,
threads::thread_priority_boost, ec1);
Expand Down
2 changes: 1 addition & 1 deletion tests/regressions/lcos/wait_for_1751.cpp
Expand Up @@ -35,7 +35,7 @@ int hpx_main()
auto now = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> dif = now - start_time;

HPX_TEST_LTE(dif.count(), 1.0);
HPX_TEST_LTE(dif.count(), 1.01);
break;
}
else
Expand Down
1 change: 0 additions & 1 deletion tests/unit/component/CMakeLists.txt
Expand Up @@ -47,7 +47,6 @@ add_hpx_executable(launched_process_test
COMPONENT_DEPENDENCIES launch_process_test_server)

set(action_invoke_no_more_than_PARAMETERS
LOCALITIES 2
THREADS_PER_LOCALITY 4)
set(action_invoke_no_more_than_FLAGS
DEPENDENCIES iostreams_component)
Expand Down

0 comments on commit 81b2856

Please sign in to comment.