Skip to content

Commit

Permalink
Revert "Fixing a race with timed suspension"
Browse files Browse the repository at this point in the history
  • Loading branch information
msimberg committed Feb 8, 2018
1 parent 2719f8d commit 30135c6
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 44 deletions.
61 changes: 22 additions & 39 deletions hpx/runtime/threads/detail/set_thread_state.hpp
Expand Up @@ -16,9 +16,7 @@
#include <hpx/runtime/threads/thread_helpers.hpp>
#include <hpx/runtime_fwd.hpp>
#include <hpx/throw_exception.hpp>
#include <hpx/util/yield_while.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/bind_front.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/io_service_pool.hpp>
#include <hpx/util/logging.hpp>
Expand Down Expand Up @@ -246,8 +244,7 @@ namespace hpx { namespace threads { namespace detail
thread_id_type const& thrd, thread_state_enum newstate,
thread_state_ex_enum newstate_ex, thread_priority priority,
thread_id_type const& timer_id,
std::shared_ptr<std::atomic<bool> > const& triggered,
thread_state_ex_enum my_statex)
std::shared_ptr<std::atomic<bool> > const& triggered)
{
if (HPX_UNLIKELY(!thrd)) {
HPX_THROW_EXCEPTION(null_thread_id,
Expand All @@ -262,15 +259,18 @@ namespace hpx { namespace threads { namespace detail
return thread_result_type(terminated, invalid_thread_id);
}

HPX_ASSERT(my_statex == wait_abort || my_statex == wait_timeout);

if (!triggered->load())
bool oldvalue = false;
if (triggered->compare_exchange_strong(oldvalue, true)) //-V601
{
error_code ec(lightweight); // do not throw
detail::set_thread_state(timer_id, pending, my_statex,
thread_priority_boost, std::size_t(-1), ec);
// timer has not been canceled yet, trigger the requested set_state
detail::set_thread_state(thrd, newstate, newstate_ex, priority);
}

// then re-activate the thread holding the deadline_timer
error_code ec(lightweight); // do not throw
detail::set_thread_state(timer_id, pending, wait_timeout,
thread_priority_boost, std::size_t(-1), ec);

return thread_result_type(terminated, invalid_thread_id);
}

Expand All @@ -280,8 +280,7 @@ namespace hpx { namespace threads { namespace detail
thread_result_type at_timer(SchedulingPolicy& scheduler,
util::steady_clock::time_point& abs_time,
thread_id_type const& thrd, thread_state_enum newstate,
thread_state_ex_enum newstate_ex, thread_priority priority,
std::atomic<bool>& started)
thread_state_ex_enum newstate_ex, thread_priority priority)
{
if (HPX_UNLIKELY(!thrd)) {
HPX_THROW_EXCEPTION(null_thread_id,
Expand All @@ -299,7 +298,7 @@ namespace hpx { namespace threads { namespace detail
std::make_shared<std::atomic<bool> >(false));

thread_init_data data(
util::bind_front(&wake_timer_thread,
util::bind(&wake_timer_thread,
thrd, newstate, newstate_ex, priority,
self_id, triggered),
"wake_timer", 0, priority);
Expand All @@ -315,40 +314,27 @@ namespace hpx { namespace threads { namespace detail
get_thread_pool("timer-pool")->get_io_service(), abs_time);

// let the timer invoke the set_state on the new (suspended) thread
t.async_wait(
[wake_id, priority](const boost::system::error_code& ec)
{
if (ec.value() == boost::system::errc::operation_canceled)
{
detail::set_thread_state(wake_id, pending, wait_abort,
priority, std::size_t(-1), throws);
}
else
{
detail::set_thread_state(wake_id, pending, wait_timeout,
priority, std::size_t(-1), throws);
}
});

started.store(true);
t.async_wait(util::bind(&detail::set_thread_state,
wake_id, pending, wait_timeout, priority,
std::size_t(-1), std::ref(throws)));

// this waits for the thread to be reactivated when the timer fired
// if it returns signaled the timer has been canceled, otherwise
// the timer fired and the wake_timer_thread above has been executed
thread_state_ex_enum statex =
get_self().yield(thread_result_type(suspended, invalid_thread_id));

HPX_ASSERT(statex == wait_abort || statex == wait_timeout);

if (wait_timeout != statex) //-V601
{
triggered->store(true);

// wake_timer_thread has not been executed yet, cancel timer
t.cancel();
}
else
{
detail::set_thread_state(thrd, newstate, newstate_ex, priority);

// cancel wake_timer_thread
error_code ec(lightweight); // do not throw
detail::set_thread_state(wake_id, pending, wait_abort,
priority, std::size_t(-1), ec);
}

return thread_result_type(terminated, invalid_thread_id);
Expand All @@ -371,17 +357,14 @@ namespace hpx { namespace threads { namespace detail

// this creates a new thread which creates the timer and handles the
// requested actions
std::atomic<bool> started(false);
thread_init_data data(
util::bind(&at_timer<SchedulingPolicy>,
std::ref(scheduler), abs_time.value(), thrd, newstate, newstate_ex,
priority, std::ref(started)),
priority),
"at_timer (expire at)", 0, priority, thread_num);

thread_id_type newid = invalid_thread_id;
create_thread(&scheduler, data, newid, pending, true, ec); //-V601
hpx::util::yield_while(
[&started]() { return !started.load(); }, "set_thread_state_timed");
return newid;
}

Expand Down
4 changes: 0 additions & 4 deletions src/runtime/threads/thread_helpers.cpp
Expand Up @@ -16,7 +16,6 @@
#include <hpx/runtime/threads/executors/current_executor.hpp>
#include <hpx/runtime/threads/thread_data_fwd.hpp>
#include <hpx/runtime/threads/thread_enums.hpp>
#include <hpx/util/assert.hpp>
#ifdef HPX_HAVE_THREAD_BACKTRACE_ON_SUSPENSION
#include <hpx/util/backtrace.hpp>
#endif
Expand Down Expand Up @@ -580,9 +579,6 @@ namespace hpx { namespace this_thread

if (statex != threads::wait_timeout)
{
HPX_ASSERT(
statex == threads::wait_abort ||
statex == threads::wait_signaled);
error_code ec1(lightweight); // do not throw
threads::set_thread_state(timer_id,
threads::pending, threads::wait_abort,
Expand Down
2 changes: 1 addition & 1 deletion tests/regressions/lcos/wait_for_1751.cpp
Expand Up @@ -35,7 +35,7 @@ int hpx_main()
auto now = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> dif = now - start_time;

HPX_TEST_LTE(dif.count(), 1.01);
HPX_TEST_LTE(dif.count(), 1.0);
break;
}
else
Expand Down

0 comments on commit 30135c6

Please sign in to comment.