Skip to content

Commit

Permalink
Fixing termination detection of scheduling loop
Browse files Browse the repository at this point in the history
 - Partially reverting to old detection mechanism
 - Only cleaning up currently running thread
 - Letting background thread phase out correctly
  • Loading branch information
Thomas Heller committed Oct 9, 2017
1 parent bd35a2f commit 21b9946
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 125 deletions.
10 changes: 6 additions & 4 deletions hpx/runtime/threads/detail/scheduled_thread_pool_impl.hpp
Expand Up @@ -192,9 +192,11 @@ namespace hpx { namespace threads { namespace detail

LTM_(info) << "stop: " << id_.name() << " join:" << i;

// unlock the lock while joining
util::unlock_guard<Lock> ul(l);
remove_processing_unit(i, hpx::throws);
{
// unlock the lock while joining
util::unlock_guard<Lock> ul(l);
remove_processing_unit(i, hpx::throws);
}
}
threads_.clear();
}
Expand Down Expand Up @@ -1408,7 +1410,7 @@ namespace hpx { namespace threads { namespace detail
// inform the scheduler to stop the virtual core
std::atomic<hpx::state>& state =
sched_->Scheduler::get_state(virt_core);
hpx::state oldstate = state.exchange(state_stopped);
hpx::state oldstate = state.exchange(state_stopping);

HPX_ASSERT(oldstate == state_starting ||
oldstate == state_running || oldstate == state_suspended ||
Expand Down
23 changes: 19 additions & 4 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -297,6 +297,7 @@ namespace hpx { namespace threads { namespace detail
thread_init_data background_init(
[&, background_running](thread_state_ex_enum) -> thread_result_type
{

while(*background_running)
{
if (callbacks.background_())
Expand Down Expand Up @@ -671,6 +672,7 @@ namespace hpx { namespace threads { namespace detail
// if this is an inner scheduler, exit immediately
if (!(scheduler.get_scheduler_mode() & policies::delay_exit))
{
*background_running = false;
this_state.store(state_stopped);
break;
}
Expand All @@ -695,6 +697,7 @@ namespace hpx { namespace threads { namespace detail
// possible. No need to reschedule, as another LCO will
// set it to pending and schedule it back eventually
HPX_ASSERT(background_thread);
HPX_ASSERT(background_running);
*background_running = false;
// Create a new one which will replace the current such we
// avoid deadlock situations, if all background threads are
Expand Down Expand Up @@ -724,6 +727,7 @@ namespace hpx { namespace threads { namespace detail
// possible. No need to reschedule, as another LCO will
// set it to pending and schedule it back eventually
HPX_ASSERT(background_thread);
HPX_ASSERT(background_running);
*background_running = false;
// Create a new one which will replace the current such we
// avoid deadlock situations, if all background threads are
Expand All @@ -746,18 +750,29 @@ namespace hpx { namespace threads { namespace detail
// break if we were idling after 'may_exit'
if (may_exit)
{
if (scheduler.SchedulingPolicy::cleanup_terminated(num_thread, true))
if (background_thread.get() != nullptr)
{
HPX_ASSERT(background_running);
*background_running = false;
scheduler.SchedulingPolicy::schedule_thread(
background_thread.get(), num_thread);
background_thread.reset();
background_running.reset();
}
else
{
this_state.store(state_stopped);
break;
if (scheduler.SchedulingPolicy::cleanup_terminated(num_thread, true))
{
this_state.store(state_stopped);
break;
}
}
may_exit = false;
}
else
{
scheduler.SchedulingPolicy::cleanup_terminated(std::size_t(-1), true);
}

}
}
}
Expand Down
33 changes: 16 additions & 17 deletions hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp
Expand Up @@ -429,8 +429,8 @@ namespace hpx { namespace threads { namespace policies
bool cleanup_terminated(std::size_t num_thread, bool delete_all)
{
bool empty = true;
// if (num_thread == std::size_t(-1))
// {
if (num_thread == std::size_t(-1))
{
for (std::size_t i = 0; i != queues_.size(); ++i)
empty = queues_[i]->cleanup_terminated(delete_all) && empty;
if (!delete_all)
Expand All @@ -441,21 +441,20 @@ namespace hpx { namespace threads { namespace policies
cleanup_terminated(delete_all) && empty;

empty = low_priority_queue_.cleanup_terminated(delete_all) && empty;
return empty;
// }
// else
// {
// empty = queues_[num_thread]->cleanup_terminated(delete_all);
// if (delete_all)
// return true;
//
// if (num_thread < high_priority_queues_.size())
// empty = high_priority_queues_[num_thread]->
// cleanup_terminated(delete_all) && empty;
//
// low_priority_queue_.cleanup_terminated(delete_all);
// return true;
// }
}
else
{
empty = queues_[num_thread]->cleanup_terminated(delete_all);
if (delete_all)
return true;

if (num_thread < high_priority_queues_.size())
empty = high_priority_queues_[num_thread]->
cleanup_terminated(delete_all) && empty;

empty = low_priority_queue_.cleanup_terminated(delete_all) && empty;
}
return empty;
}

///////////////////////////////////////////////////////////////////////
Expand Down
12 changes: 10 additions & 2 deletions hpx/runtime/threads/policies/local_queue_scheduler.hpp
Expand Up @@ -281,8 +281,16 @@ namespace hpx { namespace threads { namespace policies
bool cleanup_terminated(std::size_t num_thread, bool delete_all)
{
bool empty = true;
for (std::size_t i = 0; i != queues_.size(); ++i)
empty = queues_[i]->cleanup_terminated(delete_all) && empty;
if (num_thread == std::size_t(-1))
{
bool empty = true;
for (std::size_t i = 0; i != queues_.size(); ++i)
empty = queues_[i]->cleanup_terminated(delete_all) && empty;
}
else
{
empty = queues_[num_thread]->cleanup_terminated(delete_all);
}
return empty;
}

Expand Down
4 changes: 4 additions & 0 deletions src/runtime/components/server/runtime_support_server.cpp
Expand Up @@ -591,6 +591,7 @@ namespace hpx { namespace components { namespace server
tm.get_thread_count() > std::int64_t(1 + hpx::get_os_thread_count());
++k)
{
tm.cleanup_terminated(true);
// avoid timed suspension, don't boost priority
util::detail::yield_k(k % 32,
"runtime_support::dijkstra_termination",
Expand Down Expand Up @@ -632,6 +633,7 @@ namespace hpx { namespace components { namespace server
tm.get_thread_count() > std::int64_t(1 + hpx::get_os_thread_count());
++k)
{
tm.cleanup_terminated(true);
// avoid timed suspension, don't boost priority
util::detail::yield_k(k % 32,
"runtime_support::dijkstra_termination_detection",
Expand Down Expand Up @@ -695,6 +697,7 @@ namespace hpx { namespace components { namespace server
tm.get_thread_count() > std::int64_t(1 + hpx::get_os_thread_count());
++k)
{
tm.cleanup_terminated(true);
// avoid timed suspension, don't boost priority
util::detail::yield_k(k % 32,
"runtime_support::send_dijkstra_termination_token",
Expand Down Expand Up @@ -774,6 +777,7 @@ namespace hpx { namespace components { namespace server
tm.get_thread_count() > std::int64_t(1 + hpx::get_os_thread_count());
++k)
{
tm.cleanup_terminated(true);
// avoid timed suspension, don't boost priority
util::detail::yield_k(k % 32,
"runtime_support::dijkstra_termination_detection",
Expand Down
192 changes: 94 additions & 98 deletions tests/unit/resource/throttle.cpp
Expand Up @@ -24,104 +24,100 @@ int hpx_main(int argc, char* argv[])

HPX_TEST_EQ(hpx::threads::count(tp.get_used_processing_units()), std::size_t(4));

// // Check number of used resources
// tp.remove_processing_unit(0);
// HPX_TEST_EQ(std::size_t(3), hpx::threads::count(tp.get_used_processing_units()));
// tp.remove_processing_unit(1);
// HPX_TEST_EQ(std::size_t(2), hpx::threads::count(tp.get_used_processing_units()));
// tp.remove_processing_unit(2);
// HPX_TEST_EQ(std::size_t(1), hpx::threads::count(tp.get_used_processing_units()));
//
// tp.add_processing_unit(0, 0 + tp.get_thread_offset());
// HPX_TEST_EQ(std::size_t(2), hpx::threads::count(tp.get_used_processing_units()));
// tp.add_processing_unit(1, 1 + tp.get_thread_offset());
// HPX_TEST_EQ(std::size_t(3), hpx::threads::count(tp.get_used_processing_units()));
// tp.add_processing_unit(2, 2 + tp.get_thread_offset());
// HPX_TEST_EQ(std::size_t(4), hpx::threads::count(tp.get_used_processing_units()));
//
// // Check when removing all but one, we end up on the same thread
// std::size_t num_thread = 0;
// auto test_function = [&num_thread, &tp]()
// {
// HPX_TEST_EQ(num_thread + tp.get_thread_offset(), hpx::get_worker_thread_num());
// };
//
// num_thread = 0;
// tp.remove_processing_unit(1);
// tp.remove_processing_unit(2);
// tp.remove_processing_unit(3);
// hpx::async(test_function).get();
// tp.add_processing_unit(1, 1 + tp.get_thread_offset());
// tp.add_processing_unit(2, 2 + tp.get_thread_offset());
// tp.add_processing_unit(3, 3 + tp.get_thread_offset());
// std::cout << "1\n";
//
// num_thread = 1;
// tp.remove_processing_unit(0);
// tp.remove_processing_unit(2);
// tp.remove_processing_unit(3);
// hpx::async(test_function).get();
// tp.add_processing_unit(0, 0 + tp.get_thread_offset());
// tp.add_processing_unit(2, 2 + tp.get_thread_offset());
// tp.add_processing_unit(3, 3 + tp.get_thread_offset());
// std::cout << "2\n";
//
// num_thread = 2;
// tp.remove_processing_unit(0);
// tp.remove_processing_unit(1);
// tp.remove_processing_unit(3);
// hpx::async(test_function).get();
// tp.add_processing_unit(0, 0 + tp.get_thread_offset());
// tp.add_processing_unit(1, 1 + tp.get_thread_offset());
// tp.add_processing_unit(3, 3 + tp.get_thread_offset());
// std::cout << "3\n";
//
// num_thread = 3;
// tp.remove_processing_unit(0);
// tp.remove_processing_unit(1);
// tp.remove_processing_unit(2);
// hpx::async(test_function).get();
// tp.add_processing_unit(0, 0 + tp.get_thread_offset());
// tp.add_processing_unit(1, 1 + tp.get_thread_offset());
// tp.add_processing_unit(2, 2 + tp.get_thread_offset());
// std::cout << "4\n";
//
// // Check random scheduling with reducing resources.
// num_thread = 0;
// bool up = true;
// hpx::util::high_resolution_timer t;
// std::vector<hpx::future<void>> fs;
// while (t.elapsed() < 5)
// {
// for (std::size_t i = 0; i < hpx::resource::get_num_threads("default") * 10; ++i)
// {
// fs.push_back(hpx::async([](){}));
// }
// if (up)
// {
// if (num_thread != hpx::resource::get_num_threads("default") -1)
// tp.remove_processing_unit(num_thread);
//
// ++num_thread;
// if (num_thread == hpx::resource::get_num_threads("default"))
// {
// up = false;
// --num_thread;
// }
// }
// else
// {
// tp.add_processing_unit(
// num_thread - 1, num_thread + tp.get_thread_offset() - 1);
// --num_thread;
// if (num_thread == 0)
// {
// up = true;
// }
// }
// }
//
// hpx::when_all(fs).get();
// Check number of used resources
tp.remove_processing_unit(0);
HPX_TEST_EQ(std::size_t(3), hpx::threads::count(tp.get_used_processing_units()));
tp.remove_processing_unit(1);
HPX_TEST_EQ(std::size_t(2), hpx::threads::count(tp.get_used_processing_units()));
tp.remove_processing_unit(2);
HPX_TEST_EQ(std::size_t(1), hpx::threads::count(tp.get_used_processing_units()));

tp.add_processing_unit(0, 0 + tp.get_thread_offset());
HPX_TEST_EQ(std::size_t(2), hpx::threads::count(tp.get_used_processing_units()));
tp.add_processing_unit(1, 1 + tp.get_thread_offset());
HPX_TEST_EQ(std::size_t(3), hpx::threads::count(tp.get_used_processing_units()));
tp.add_processing_unit(2, 2 + tp.get_thread_offset());
HPX_TEST_EQ(std::size_t(4), hpx::threads::count(tp.get_used_processing_units()));

// Check when removing all but one, we end up on the same thread
std::size_t num_thread = 0;
auto test_function = [&num_thread, &tp]()
{
HPX_TEST_EQ(num_thread + tp.get_thread_offset(), hpx::get_worker_thread_num());
};

num_thread = 0;
tp.remove_processing_unit(1);
tp.remove_processing_unit(2);
tp.remove_processing_unit(3);
hpx::async(test_function).get();
tp.add_processing_unit(1, 1 + tp.get_thread_offset());
tp.add_processing_unit(2, 2 + tp.get_thread_offset());
tp.add_processing_unit(3, 3 + tp.get_thread_offset());

num_thread = 1;
tp.remove_processing_unit(0);
tp.remove_processing_unit(2);
tp.remove_processing_unit(3);
hpx::async(test_function).get();
tp.add_processing_unit(0, 0 + tp.get_thread_offset());
tp.add_processing_unit(2, 2 + tp.get_thread_offset());
tp.add_processing_unit(3, 3 + tp.get_thread_offset());

num_thread = 2;
tp.remove_processing_unit(0);
tp.remove_processing_unit(1);
tp.remove_processing_unit(3);
hpx::async(test_function).get();
tp.add_processing_unit(0, 0 + tp.get_thread_offset());
tp.add_processing_unit(1, 1 + tp.get_thread_offset());
tp.add_processing_unit(3, 3 + tp.get_thread_offset());

num_thread = 3;
tp.remove_processing_unit(0);
tp.remove_processing_unit(1);
tp.remove_processing_unit(2);
hpx::async(test_function).get();
tp.add_processing_unit(0, 0 + tp.get_thread_offset());
tp.add_processing_unit(1, 1 + tp.get_thread_offset());
tp.add_processing_unit(2, 2 + tp.get_thread_offset());

// Check random scheduling with reducing resources.
num_thread = 0;
bool up = true;
hpx::util::high_resolution_timer t;
std::vector<hpx::future<void>> fs;
while (t.elapsed() < 2)
{
for (std::size_t i = 0; i < hpx::resource::get_num_threads("default") * 10; ++i)
{
fs.push_back(hpx::async([](){}));
}
if (up)
{
if (num_thread != hpx::resource::get_num_threads("default") -1)
tp.remove_processing_unit(num_thread);

++num_thread;
if (num_thread == hpx::resource::get_num_threads("default"))
{
up = false;
--num_thread;
}
}
else
{
tp.add_processing_unit(
num_thread - 1, num_thread + tp.get_thread_offset() - 1);
--num_thread;
if (num_thread == 0)
{
up = true;
}
}
}

hpx::when_all(fs).get();

return hpx::finalize();
}
Expand Down

0 comments on commit 21b9946

Please sign in to comment.