Skip to content

Commit

Permalink
Merge pull request #2966 from STEllAR-GROUP/fixing_2959
Browse files Browse the repository at this point in the history
Fixing a couple of held locks during exception handling
  • Loading branch information
hkaiser committed Oct 20, 2017
2 parents bc724ee + adcd1df commit a7c673b
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 29 deletions.
6 changes: 4 additions & 2 deletions hpx/runtime/resource/detail/partitioner.hpp
Expand Up @@ -197,13 +197,15 @@ namespace hpx { namespace resource { namespace detail

// helper functions
detail::init_pool_data const& get_pool_data(
std::size_t pool_index) const;
std::unique_lock<mutex_type>& l, std::size_t pool_index) const;

// has to be private because pointers become invalid after data member
// thread_pools_ is resized we don't want to allow the user to use it
detail::init_pool_data const& get_pool_data(
std::unique_lock<mutex_type>& l,
std::string const& pool_name) const;
detail::init_pool_data& get_pool_data(std::string const& pool_name);
detail::init_pool_data& get_pool_data(
std::unique_lock<mutex_type>& l, std::string const& pool_name);

void set_scheduler(scheduling_policy sched, std::string const& pool_name);

Expand Down
8 changes: 5 additions & 3 deletions hpx/runtime/threads/detail/scheduled_thread_pool_impl.hpp
Expand Up @@ -292,7 +292,7 @@ namespace hpx { namespace threads { namespace detail
<< " failed with: " << e.what();

// trigger the barrier
pool_threads -= thread_num;
pool_threads -= (thread_num + 1);
while (pool_threads-- != 0)
startup->wait();

Expand Down Expand Up @@ -1344,14 +1344,15 @@ namespace hpx { namespace threads { namespace detail
std::size_t virt_core, std::size_t thread_num,
std::shared_ptr<compat::barrier> startup, error_code& ec)
{
std::lock_guard<pu_mutex_type> l(used_processing_units_mtx_);
resource::get_partitioner().assign_pu(id_.name(), virt_core);

std::unique_lock<pu_mutex_type> l(used_processing_units_mtx_);
if (threads_.size() <= virt_core)
threads_.resize(virt_core + 1);

if (threads_[virt_core].joinable())
{
l.unlock();
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::add_processing_unit",
"the given virtual core has already been added to this "
Expand Down Expand Up @@ -1406,9 +1407,10 @@ namespace hpx { namespace threads { namespace detail
oldstate == state_stopping || oldstate == state_stopped);

{
std::lock_guard<pu_mutex_type> l(used_processing_units_mtx_);
std::unique_lock<pu_mutex_type> l(used_processing_units_mtx_);
if (threads_.size() <= virt_core || !threads_[virt_core].joinable())
{
l.unlock();
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::remove_processing_unit",
"the given virtual core has already been stopped to run on "
Expand Down
51 changes: 27 additions & 24 deletions src/runtime/resource/detail/detail_partitioner.cpp
Expand Up @@ -378,7 +378,7 @@ namespace hpx { namespace resource { namespace detail
std::unique_lock<mutex_type> l(mtx_);

// @TODO allow empty pools
if (get_pool_data(get_default_pool_name()).num_threads_ == 0)
if (get_pool_data(l, get_default_pool_name()).num_threads_ == 0)
{
l.unlock();
throw_runtime_error("partitioner::setup_pools",
Expand Down Expand Up @@ -651,7 +651,7 @@ namespace hpx { namespace resource { namespace detail
if (mode_ & mode_allow_oversubscription)
{
// increment occupancy counter
get_pool_data(pool_name).add_resource(
get_pool_data(l, pool_name).add_resource(
p.id_, exclusive, num_threads);
++p.thread_occupancy_count_;
return;
Expand All @@ -660,7 +660,7 @@ namespace hpx { namespace resource { namespace detail
// check occupancy counter and increment it
if (p.thread_occupancy_count_ == 0)
{
get_pool_data(pool_name).add_resource(
get_pool_data(l, pool_name).add_resource(
p.id_, exclusive, num_threads);
++p.thread_occupancy_count_;

Expand Down Expand Up @@ -737,8 +737,8 @@ namespace hpx { namespace resource { namespace detail
void partitioner::set_scheduler(
scheduling_policy sched, std::string const& pool_name)
{
std::lock_guard<mutex_type> l(mtx_);
get_pool_data(pool_name).scheduling_policy_ = sched;
std::unique_lock<mutex_type> l(mtx_);
get_pool_data(l, pool_name).scheduling_policy_ = sched;
}

void partitioner::configure_pools()
Expand All @@ -758,7 +758,7 @@ namespace hpx { namespace resource { namespace detail

// look up which scheduler is needed
scheduling_policy sched_type =
get_pool_data(pool_name).scheduling_policy_;
get_pool_data(l, pool_name).scheduling_policy_;
if (sched_type == unspecified)
{
l.unlock();
Expand Down Expand Up @@ -790,11 +790,11 @@ namespace hpx { namespace resource { namespace detail
std::size_t num_threads = 0;

{
std::lock_guard<mutex_type> l(mtx_);
std::unique_lock<mutex_type> l(mtx_);
std::size_t num_thread_pools = initial_thread_pools_.size();
for (size_t i = 0; i != num_thread_pools; ++i)
{
num_threads += get_pool_data(i).num_threads_;
num_threads += get_pool_data(l, i).num_threads_;
}
}

Expand All @@ -815,22 +815,23 @@ namespace hpx { namespace resource { namespace detail
std::size_t partitioner::get_num_threads(
std::size_t pool_index) const
{
std::lock_guard<mutex_type> l(mtx_);
return get_pool_data(pool_index).num_threads_;
std::unique_lock<mutex_type> l(mtx_);
return get_pool_data(l, pool_index).num_threads_;
}

std::size_t partitioner::get_num_threads(
const std::string &pool_name) const
{
std::lock_guard<mutex_type> l(mtx_);
return get_pool_data(pool_name).num_threads_;
std::unique_lock<mutex_type> l(mtx_);
return get_pool_data(l, pool_name).num_threads_;
}

detail::init_pool_data const& partitioner::get_pool_data(
std::size_t pool_index) const
std::unique_lock<mutex_type>&l, std::size_t pool_index) const
{
if (pool_index >= initial_thread_pools_.size())
{
l.unlock();
throw_invalid_argument(
"partitioner::get_pool_data",
"pool index " + std::to_string(pool_index) +
Expand Down Expand Up @@ -914,23 +915,23 @@ namespace hpx { namespace resource { namespace detail
throw std::invalid_argument(
"partitioner::get_pool_creator: pool requested out of bounds.");
}
return get_pool_data(index).create_function_;
return get_pool_data(l, index).create_function_;
}

///////////////////////////////////////////////////////////////////////////
void partitioner::assign_pu(
std::string const& pool_name, std::size_t virt_core)
{
std::lock_guard<mutex_type> l(mtx_);
detail::init_pool_data& data = get_pool_data(pool_name);
std::unique_lock<mutex_type> l(mtx_);
detail::init_pool_data& data = get_pool_data(l, pool_name);
data.assign_pu(virt_core);
}

void partitioner::unassign_pu(
std::string const& pool_name, std::size_t virt_core)
{
std::lock_guard<mutex_type> l(mtx_);
detail::init_pool_data& data = get_pool_data(pool_name);
std::unique_lock<mutex_type> l(mtx_);
detail::init_pool_data& data = get_pool_data(l, pool_name);
data.unassign_pu(virt_core);
}

Expand All @@ -956,8 +957,8 @@ namespace hpx { namespace resource { namespace detail
bool has_non_exclusive_pus = false;

{
std::lock_guard<mutex_type> l(mtx_);
detail::init_pool_data const& data = get_pool_data(pool_name);
std::unique_lock<mutex_type> l(mtx_);
detail::init_pool_data const& data = get_pool_data(l, pool_name);

pu_nums_to_remove.reserve(data.num_threads_);

Expand Down Expand Up @@ -1012,8 +1013,8 @@ namespace hpx { namespace resource { namespace detail
bool has_non_exclusive_pus = false;

{
std::lock_guard<mutex_type> l(mtx_);
detail::init_pool_data const& data = get_pool_data(pool_name);
std::unique_lock<mutex_type> l(mtx_);
detail::init_pool_data const& data = get_pool_data(l, pool_name);

pu_nums_to_add.reserve(data.num_threads_);

Expand Down Expand Up @@ -1076,7 +1077,7 @@ namespace hpx { namespace resource { namespace detail
// has to be private bc pointers become invalid after data member
// thread_pools_ is resized we don't want to allow the user to use it
detail::init_pool_data const& partitioner::get_pool_data(
std::string const& pool_name) const
std::unique_lock<mutex_type>&l, std::string const& pool_name) const
{
auto pool = std::find_if(
initial_thread_pools_.begin(), initial_thread_pools_.end(),
Expand All @@ -1090,14 +1091,15 @@ namespace hpx { namespace resource { namespace detail
return *pool;
}

l.unlock();
throw_invalid_argument(
"partitioner::get_pool_data",
"the resource partitioner does not own a thread pool named '" +
pool_name + "'");
}

detail::init_pool_data& partitioner::get_pool_data(
std::string const& pool_name)
std::unique_lock<mutex_type>& l, std::string const& pool_name)
{
auto pool = std::find_if(
initial_thread_pools_.begin(), initial_thread_pools_.end(),
Expand All @@ -1111,6 +1113,7 @@ namespace hpx { namespace resource { namespace detail
return *pool;
}

l.unlock();
throw_invalid_argument(
"partitioner::get_pool_data",
"the resource partitioner does not own a thread pool named '" +
Expand Down
1 change: 1 addition & 0 deletions src/runtime/threads/executors/thread_pool_os_executors.cpp
Expand Up @@ -85,6 +85,7 @@ namespace hpx { namespace threads { namespace executors { namespace detail

if (!pool_->run(lk, num_threads_))
{
lk.unlock();
HPX_THROW_EXCEPTION(invalid_status,
"thread_pool_os_executor<Scheduler>::thread_pool_os_executor",
"couldn't start thread_pool");
Expand Down
1 change: 1 addition & 0 deletions tests/regressions/parallel/executors/CMakeLists.txt
Expand Up @@ -4,6 +4,7 @@
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

set(tests
handled_exception_2959
is_executor_1691
)

Expand Down
24 changes: 24 additions & 0 deletions tests/regressions/parallel/executors/handled_exception_2959.cpp
@@ -0,0 +1,24 @@
// Copyright (c) 2017 Christopher HInx
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_main.hpp>
#include <hpx/include/thread_executors.hpp>
#include <hpx/util/lightweight_test.hpp>

int main()
{
bool caught_exception = false;
try
{
hpx::threads::executors::local_priority_queue_os_executor exec(1);
}
catch (...)
{
caught_exception = true;
}
HPX_TEST(caught_exception);

return hpx::util::report_errors();
}

0 comments on commit a7c673b

Please sign in to comment.