Skip to content

Commit

Permalink
Making sure resume/suspend is not called for schedulers that don't su…
Browse files Browse the repository at this point in the history
…pport it

- this introduces a new scheduler mode flag: enable_suspension that is set by default
  for the schedulers that support this operation
- this flag is also used to reject resume/suspend requests if this flag is not set
  for a given scheduler
- flyby: made scheduler_base::suspend()/resume() virtual

This fixes #3180
  • Loading branch information
hkaiser committed Feb 18, 2018
1 parent 17a0ba8 commit d0282df
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 31 deletions.
68 changes: 61 additions & 7 deletions hpx/runtime/threads/detail/scheduled_thread_pool_impl.hpp
Expand Up @@ -325,7 +325,10 @@ namespace hpx { namespace threads { namespace detail
for (std::size_t virt_core = 0; virt_core != threads_.size();
++virt_core)
{
this->sched_->Scheduler::resume(virt_core);
if (mode_ & policies::enable_suspension)
{
this->sched_->Scheduler::resume(virt_core);
}
}

if (blocking)
Expand Down Expand Up @@ -353,6 +356,14 @@ namespace hpx { namespace threads { namespace detail
return hpx::make_ready_future();
}

if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::resume",
"this scheduler does not support suspension");
return hpx::make_ready_future();
}

return hpx::async(
hpx::util::bind(&scheduled_thread_pool::resume_internal, this, true,
std::move(throws)));
Expand All @@ -362,6 +373,14 @@ namespace hpx { namespace threads { namespace detail
void scheduled_thread_pool<Scheduler>::resume_cb(
std::function<void(void)> callback, error_code& ec)
{
if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::resume_cb",
"this scheduler does not support suspension");
return;
}

std::function<void(void)> resume_internal_wrapper =
[this, HPX_CAPTURE_MOVE(callback)]()
{
Expand All @@ -385,11 +404,19 @@ namespace hpx { namespace threads { namespace detail
if (threads::get_self_ptr() && hpx::this_thread::get_pool() == this)
{
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::suspend",
"scheduled_thread_pool<Scheduler>::resume_direct",
"cannot suspend a pool from itself");
return;
}

if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::resume_direct",
"this scheduler does not support suspension");
return;
}

this->resume_internal(true, ec);
}

Expand Down Expand Up @@ -425,7 +452,7 @@ namespace hpx { namespace threads { namespace detail
"scheduled_thread_pool<Scheduler>::suspend",
"cannot call suspend from outside HPX, use suspend_cb or"
"suspend_direct instead");
return make_ready_future();
return hpx::make_ready_future();
}
else if (threads::get_self_ptr() &&
hpx::this_thread::get_pool() == this)
Expand All @@ -436,6 +463,14 @@ namespace hpx { namespace threads { namespace detail
"cannot suspend a pool from itself"));
}

if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::suspend",
"this scheduler does not support suspension");
return hpx::make_ready_future();
}

return hpx::async(
hpx::util::bind(&scheduled_thread_pool::suspend_internal, this,
std::move(throws)));
Expand All @@ -448,11 +483,19 @@ namespace hpx { namespace threads { namespace detail
if (threads::get_self_ptr() && hpx::this_thread::get_pool() == this)
{
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::suspend",
"scheduled_thread_pool<Scheduler>::suspend_cb",
"cannot suspend a pool from itself");
return;
}

if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::suspend_cb",
"this scheduler does not support suspension");
return;
}

std::function<void(void)> suspend_internal_wrapper =
[this, HPX_CAPTURE_MOVE(callback)]()
{
Expand All @@ -476,8 +519,16 @@ namespace hpx { namespace threads { namespace detail
if (threads::get_self_ptr() && hpx::this_thread::get_pool() == this)
{
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::suspend",
"cannot suspend a pool from itself");
"scheduled_thread_pool<Scheduler>::suspend_direct",
"cannot suspend a pool from itself");
return;
}

if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::suspend_direct",
"this scheduler does not support suspension");
return;
}

Expand Down Expand Up @@ -1775,7 +1826,10 @@ namespace hpx { namespace threads { namespace detail

util::yield_while([this, &state, virt_core]()
{
this->sched_->Scheduler::resume(virt_core);
if (mode_ & policies::enable_suspension)
{
this->sched_->Scheduler::resume(virt_core);
}
return state.load() == state_sleeping;
}, "scheduled_thread_pool::resume_processing_unit_internal",
hpx::threads::pending);
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/threads/policies/hierarchy_scheduler.hpp
Expand Up @@ -207,13 +207,13 @@ namespace hpx { namespace threads { namespace policies
return "hierarchy_scheduler";
}

void suspend(std::size_t)
void suspend(std::size_t) override
{
HPX_ASSERT_MSG(false, "hierarchy_scheduler does not support"
" suspending");
}

void resume(std::size_t)
void resume(std::size_t) override
{
HPX_ASSERT_MSG(false, "hierarchy_scheduler does not support"
" resuming");
Expand Down
Expand Up @@ -75,13 +75,13 @@ namespace hpx { namespace threads { namespace policies
return "periodic_priority_queue_scheduler";
}

void suspend(std::size_t)
void suspend(std::size_t) override
{
HPX_ASSERT_MSG(false, "periodic_priority_queue_scheduler does not"
" support suspending");
}

void resume(std::size_t)
void resume(std::size_t) override
{
HPX_ASSERT_MSG(false, "periodic_priority_queue_scheduler does not"
" support resuming");
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/threads/policies/scheduler_base.hpp
Expand Up @@ -154,7 +154,7 @@ namespace hpx { namespace threads { namespace policies
#endif
}

void suspend(std::size_t num_thread)
virtual void suspend(std::size_t num_thread)
{
HPX_ASSERT(num_thread < suspend_conds_.size());

Expand All @@ -172,7 +172,7 @@ namespace hpx { namespace threads { namespace policies
expected == state_stopping || expected == state_terminating);
}

void resume(std::size_t num_thread)
virtual void resume(std::size_t num_thread)
{
if (num_thread == std::size_t(-1))
{
Expand Down
6 changes: 5 additions & 1 deletion hpx/runtime/threads/policies/scheduler_mode.hpp
Expand Up @@ -26,9 +26,13 @@ namespace hpx { namespace threads { namespace policies
///< to act as 'embedded' schedulers. In this case it needs to
///< periodically invoke a provided callback into the outer scheduler
///< more frequently than normal. This option enables this behavior.
enable_elasticity = 0x10 ///< This options allows for the
enable_elasticity = 0x10, ///< This options allows for the
///< scheduler to dynamically increase and reduce the number of
///< processing units it runs on.
enable_suspension = 0x20 ///< This options allows for the
///< scheduler to suspend/resume its operation. Setting this value
///< may not succeed for schedulers that do not support this
///< functionality.
};
}}}

Expand Down
Expand Up @@ -62,13 +62,13 @@ namespace hpx { namespace threads { namespace policies
return "static_priority_queue_scheduler";
}

void suspend(std::size_t)
void suspend(std::size_t) override
{
HPX_ASSERT_MSG(false, "static_priority_queue_scheduler does not"
" support suspending");
}

void resume(std::size_t)
void resume(std::size_t) override
{
HPX_ASSERT_MSG(false, "static_priority_queue_scheduler does not"
" support resuming");
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/threads/policies/static_queue_scheduler.hpp
Expand Up @@ -70,13 +70,13 @@ namespace hpx { namespace threads { namespace policies
return "static_queue_scheduler";
}

void suspend(std::size_t)
void suspend(std::size_t) override
{
HPX_ASSERT_MSG(false, "static_queue_scheduler does not support"
" suspending");
}

void resume(std::size_t)
void resume(std::size_t) override
{
HPX_ASSERT_MSG(false, "static_queue_scheduler does not support"
" resuming");
Expand Down
7 changes: 7 additions & 0 deletions hpx/runtime/threads/thread_pool_base.hpp
Expand Up @@ -436,6 +436,13 @@ namespace hpx { namespace threads
std::function<void(void)> callback, std::size_t virt_core,
error_code& ec = throws) = 0;

/// \cond NOINTERNAL
policies::scheduler_mode get_scheduler_mode() const
{
return mode_;
}
/// \endcond

protected:
/// \cond NOINTERNAL
void init_pool_time_scale();
Expand Down
37 changes: 28 additions & 9 deletions src/runtime/threads/threadmanager.cpp
Expand Up @@ -410,7 +410,8 @@ namespace hpx { namespace threads
notifier_, i, name.c_str(),
policies::scheduler_mode(policies::do_background_work |
policies::reduce_thread_priority |
policies::delay_exit),
policies::delay_exit |
policies::enable_suspension),
thread_offset));
pools_.push_back(std::move(pool));

Expand Down Expand Up @@ -453,7 +454,8 @@ namespace hpx { namespace threads
notifier_, i, name.c_str(),
policies::scheduler_mode(policies::do_background_work |
policies::reduce_thread_priority |
policies::delay_exit),
policies::delay_exit |
policies::enable_suspension),
thread_offset));
pools_.push_back(std::move(pool));

Expand Down Expand Up @@ -490,7 +492,8 @@ namespace hpx { namespace threads
notifier_, i, name.c_str(),
policies::scheduler_mode(policies::do_background_work |
policies::reduce_thread_priority |
policies::delay_exit),
policies::delay_exit |
policies::enable_suspension),
thread_offset));
pools_.push_back(std::move(pool));

Expand Down Expand Up @@ -614,7 +617,8 @@ namespace hpx { namespace threads
notifier_, i, name.c_str(),
policies::scheduler_mode(policies::do_background_work |
policies::reduce_thread_priority |
policies::delay_exit),
policies::delay_exit |
policies::enable_suspension),
thread_offset));
pools_.push_back(std::move(pool));
#else
Expand Down Expand Up @@ -1923,7 +1927,11 @@ namespace hpx { namespace threads

for (auto& pool_iter : pools_)
{
fs.push_back(pool_iter->suspend());
if (pool_iter->get_scheduler_mode() &
policies::enable_suspension)
{
fs.push_back(pool_iter->suspend());
}
}

hpx::wait_all(fs);
Expand All @@ -1932,7 +1940,11 @@ namespace hpx { namespace threads
{
for (auto& pool_iter : pools_)
{
pool_iter->suspend_direct();
if (pool_iter->get_scheduler_mode() &
policies::enable_suspension)
{
pool_iter->suspend_direct();
}
}
}
}
Expand All @@ -1945,16 +1957,23 @@ namespace hpx { namespace threads

for (auto& pool_iter : pools_)
{
fs.push_back(pool_iter->resume());
if (pool_iter->get_scheduler_mode() &
policies::enable_suspension)
{
fs.push_back(pool_iter->resume());
}
}

hpx::wait_all(fs);
}
else
{
for (auto& pool_iter : pools_)
{
pool_iter->resume_direct();
if (pool_iter->get_scheduler_mode() &
policies::enable_suspension)
{
pool_iter->resume_direct();
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/resource/shutdown_suspended_pus.cpp
Expand Up @@ -69,7 +69,8 @@ void test_scheduler(int argc, char* argv[])
hpx::threads::policies::do_background_work |
hpx::threads::policies::reduce_thread_priority |
hpx::threads::policies::delay_exit |
hpx::threads::policies::enable_elasticity);
hpx::threads::policies::enable_elasticity |
hpx::threads::policies::enable_suspension);

std::unique_ptr<hpx::threads::thread_pool_base> pool(
new hpx::threads::detail::scheduled_thread_pool<Scheduler>(
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/resource/suspend_pool.cpp
Expand Up @@ -160,7 +160,8 @@ void test_scheduler(int argc, char* argv[])
hpx::threads::policies::do_background_work |
hpx::threads::policies::reduce_thread_priority |
hpx::threads::policies::delay_exit |
hpx::threads::policies::enable_elasticity);
hpx::threads::policies::enable_elasticity |
hpx::threads::policies::enable_suspension);

std::unique_ptr<hpx::threads::thread_pool_base> pool(
new hpx::threads::detail::scheduled_thread_pool<Scheduler>(
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/resource/throttle.cpp
Expand Up @@ -213,7 +213,8 @@ void test_scheduler(int argc, char* argv[])
hpx::threads::policies::do_background_work |
hpx::threads::policies::reduce_thread_priority |
hpx::threads::policies::delay_exit |
hpx::threads::policies::enable_elasticity);
hpx::threads::policies::enable_elasticity |
hpx::threads::policies::enable_suspension);

std::unique_ptr<hpx::threads::thread_pool_base> pool(
new hpx::threads::detail::scheduled_thread_pool<Scheduler>(
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/resource/throttle_timed.cpp
Expand Up @@ -118,7 +118,8 @@ void test_scheduler(int argc, char* argv[])
hpx::threads::policies::do_background_work |
hpx::threads::policies::reduce_thread_priority |
hpx::threads::policies::delay_exit |
hpx::threads::policies::enable_elasticity);
hpx::threads::policies::enable_elasticity |
hpx::threads::policies::enable_suspension);

std::unique_ptr<hpx::threads::thread_pool_base> pool(
new hpx::threads::detail::scheduled_thread_pool<Scheduler>(
Expand Down

0 comments on commit d0282df

Please sign in to comment.