Skip to content

Commit

Permalink
Adding basic test for resource_partitioner
Browse files Browse the repository at this point in the history
- fix --hpx:threads=all command line option to properly interact with hpx.os_threads=N configuration setting
- adding more convenience functions for resource_partitioner
- adding some convenience headers
  • Loading branch information
hkaiser committed Aug 8, 2017
1 parent 5a05907 commit 466cc16
Show file tree
Hide file tree
Showing 17 changed files with 200 additions and 60 deletions.
13 changes: 13 additions & 0 deletions hpx/include/resource_partitioner.hpp
@@ -0,0 +1,13 @@
// Copyright (c) 2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#if !defined(HPX_RESOURCE_PARTITIONER_AUG_08_2017_1103AM)
#define HPX_RESOURCE_PARTITIONER_AUG_08_2017_1103AM

#include <hpx/config.hpp>
#include <hpx/runtime/resource_partitioner.hpp>

#endif

1 change: 1 addition & 0 deletions hpx/include/threads.hpp
Expand Up @@ -14,6 +14,7 @@
#include <hpx/runtime/threads/topology.hpp>
#include <hpx/runtime/threads/run_as_os_thread.hpp>
#include <hpx/runtime/threads/run_as_hpx_thread.hpp>
#include <hpx/runtime/threads/thread_pools.hpp>

#endif

17 changes: 7 additions & 10 deletions hpx/runtime/resource_partitioner.hpp
Expand Up @@ -285,12 +285,12 @@ namespace resource {
void print_init_pool_data(std::ostream&) const;

// create a thread_pool
void create_thread_pool(const std::string &name,
void create_thread_pool(std::string const& name,
scheduling_policy sched = scheduling_policy::unspecified);

// create a thread_pool with a callback function for creating a custom
// scheduler
void create_thread_pool(const std::string &name,
void create_thread_pool(std::string const& name,
scheduler_function scheduler_creation);

// Functions to add processing units to thread pools via
Expand Down Expand Up @@ -319,31 +319,28 @@ namespace resource {
{
return affinity_data_;
}
threads::policies::detail::affinity_data& get_affinity_data()
{
return affinity_data_;
}

// Does initialization of all resources and internal data of the
// resource partitioner called in hpx_init
void configure_pools();

////////////////////////////////////////////////////////////////////////
scheduling_policy which_scheduler(const std::string &pool_name);
scheduling_policy which_scheduler(std::string const& pool_name);
threads::topology &get_topology() const;
util::command_line_handling &get_command_line_switches();

std::size_t get_num_distinct_pus() const;
std::size_t get_num_threads() const;

std::size_t get_num_pools() const;

std::size_t get_num_threads() const;
std::size_t get_num_threads(std::string const& pool_name) const;
std::size_t get_num_threads(std::size_t pool_index) const;

std::string const& get_pool_name(std::size_t index) const;
std::size_t get_pool_index(std::string const& pool_name) const;

size_t get_pu_num(std::size_t global_thread_num);
std::size_t get_pu_num(std::size_t global_thread_num);
threads::mask_cref_type get_pu_mask(std::size_t global_thread_num) const;

bool cmd_line_parsed() const;
Expand All @@ -358,7 +355,7 @@ namespace resource {

scheduler_function get_pool_creator(size_t index) const;

std::vector<numa_domain> &numa_domains()
std::vector<numa_domain> const& numa_domains() const
{
return numa_domains_;
}
Expand Down
11 changes: 9 additions & 2 deletions hpx/runtime/thread_pool_helpers.hpp
Expand Up @@ -25,6 +25,14 @@ namespace hpx { namespace resource
/// \a resource_partitioner
HPX_API_EXPORT std::size_t get_num_thread_pools();

/// Return the number of threads in all thread pools currently
/// managed by the \a resource_partitioner
HPX_API_EXPORT std::size_t get_num_threads();

/// Return the number of threads in the given thread pool currently
/// managed by the \a resource_partitioner
HPX_API_EXPORT std::size_t get_num_threads(std::string const& pool_name);

/// Return the number of threads in the given thread pool currently
/// managed by the \a resource_partitioner
HPX_API_EXPORT std::size_t get_num_threads(std::size_t pool_index);
Expand All @@ -42,7 +50,6 @@ namespace hpx { namespace resource
/// Return the thread pool given its internal index
HPX_API_EXPORT threads::detail::thread_pool_base& get_thread_pool(
std::size_t pool_index);
}
}
}}

#endif /*HPX_RUNTIME_GET_OS_THREAD_COUNT_HPP*/
42 changes: 21 additions & 21 deletions hpx/runtime/threads/detail/scheduled_thread_pool_impl.hpp
Expand Up @@ -115,7 +115,7 @@ namespace hpx { namespace threads { namespace detail
template <typename Scheduler>
void scheduled_thread_pool<Scheduler>::print_pool(std::ostream& os)
{
os << "[pool \"" << id_.name_ << "\", #" << id_.index_
os << "[pool \"" << id_.name() << "\", #" << id_.index()
<< "] with scheduler " << sched_->Scheduler::get_scheduler_name()
<< "\n"
<< "is running on PUs : \n";
Expand Down Expand Up @@ -159,7 +159,7 @@ namespace hpx { namespace threads { namespace detail
template <typename Lock>
void scheduled_thread_pool<Scheduler>::stop_locked(Lock& l, bool blocking)
{
LTM_(info) << "stop: " << id_.name_ << " blocking(" << std::boolalpha
LTM_(info) << "stop: " << id_.name() << " blocking(" << std::boolalpha
<< blocking << ")";

if (!threads_.empty())
Expand All @@ -179,11 +179,11 @@ namespace hpx { namespace threads { namespace detail
continue;

// make sure no OS thread is waiting
LTM_(info) << "stop: " << id_.name_ << " notify_all";
LTM_(info) << "stop: " << id_.name() << " notify_all";

sched_->Scheduler::do_some_work(std::size_t(-1));

LTM_(info) << "stop: " << id_.name_ << " join:" << i;
LTM_(info) << "stop: " << id_.name() << " join:" << i;

// unlock the lock while joining
util::unlock_guard<Lock> ul(l);
Expand All @@ -209,11 +209,11 @@ namespace hpx { namespace threads { namespace detail
HPX_ASSERT(l.owns_lock());

LTM_(info) //-V128
<< "run: " << id_.name_
<< "run: " << id_.name()
<< " number of processing units available: " //-V128
<< threads::hardware_concurrency();
LTM_(info) //-V128
<< "run: " << id_.name_ << " creating " << pool_threads
<< "run: " << id_.name() << " creating " << pool_threads
<< " OS thread(s)"; //-V128

if (0 == pool_threads)
Expand All @@ -231,7 +231,7 @@ namespace hpx { namespace threads { namespace detail
init_perf_counter_data(pool_threads);
this->init_pool_time_scale();

LTM_(info) << "run: " << id_.name_
LTM_(info) << "run: " << id_.name()
<< " timestamp_scale: " << timestamp_scale_; //-V128

// run threads and wait for initialization to complete
Expand All @@ -254,7 +254,7 @@ namespace hpx { namespace threads { namespace detail
// in affinity_data::affinity_masks_
// which is in order of occupied PU
LTM_(info) //-V128
<< "run: " << id_.name_ << " create OS thread "
<< "run: " << id_.name() << " create OS thread "
<< global_thread_num //-V128
<< ": will run on processing units within this mask: "
<< std::hex << HPX_CPU_MASK_PREFIX << mask;
Expand All @@ -266,7 +266,7 @@ namespace hpx { namespace threads { namespace detail
if (!any(mask))
{
LTM_(debug) //-V128
<< "run: " << id_.name_
<< "run: " << id_.name()
<< " setting thread affinity on OS thread " //-V128
<< global_thread_num << " was explicitly disabled.";
}
Expand All @@ -279,7 +279,7 @@ namespace hpx { namespace threads { namespace detail
}
catch (std::exception const& e)
{
LTM_(always) << "run: " << id_.name_
LTM_(always) << "run: " << id_.name()
<< " failed with: " << e.what();

// trigger the barrier
Expand All @@ -293,7 +293,7 @@ namespace hpx { namespace threads { namespace detail
return false;
}

LTM_(info) << "run: " << id_.name_ << " running";
LTM_(info) << "run: " << id_.name() << " running";
return true;
}

Expand All @@ -318,7 +318,7 @@ namespace hpx { namespace threads { namespace detail
if (ec)
{
LTM_(warning) //-V128
<< "thread_func: " << id_.name_
<< "thread_func: " << id_.name()
<< " setting thread affinity on OS thread " //-V128
<< global_thread_num
<< " failed with: " << ec.get_message();
Expand All @@ -327,7 +327,7 @@ namespace hpx { namespace threads { namespace detail
else
{
LTM_(debug) //-V128
<< "thread_func: " << id_.name_
<< "thread_func: " << id_.name()
<< " setting thread affinity on OS thread " //-V128
<< global_thread_num << " was explicitly disabled.";
}
Expand All @@ -343,7 +343,7 @@ namespace hpx { namespace threads { namespace detail
if (ec)
{
LTM_(warning) //-V128
<< "thread_func: " << id_.name_
<< "thread_func: " << id_.name()
<< " reducing thread priority on OS thread " //-V128
<< global_thread_num
<< " failed with: " << ec.get_message();
Expand All @@ -359,7 +359,7 @@ namespace hpx { namespace threads { namespace detail
startup->wait();

LTM_(info) //-V128
<< "thread_func: " << id_.name_
<< "thread_func: " << id_.name()
<< " starting OS thread: " << thread_num; //-V128

// set state to running
Expand Down Expand Up @@ -415,7 +415,7 @@ namespace hpx { namespace threads { namespace detail
catch (hpx::exception const& e)
{
LFATAL_ //-V128
<< "thread_func: " << id_.name_
<< "thread_func: " << id_.name()
<< " thread_num:" << global_thread_num //-V128
<< " : caught hpx::exception: " << e.what()
<< ", aborted thread execution";
Expand All @@ -426,7 +426,7 @@ namespace hpx { namespace threads { namespace detail
catch (boost::system::system_error const& e)
{
LFATAL_ //-V128
<< "thread_func: " << id_.name_
<< "thread_func: " << id_.name()
<< " thread_num:" << global_thread_num //-V128
<< " : caught boost::system::system_error: " << e.what()
<< ", aborted thread execution";
Expand All @@ -444,7 +444,7 @@ namespace hpx { namespace threads { namespace detail
catch (...)
{
LFATAL_ //-V128
<< "thread_func: " << id_.name_
<< "thread_func: " << id_.name()
<< " thread_num:" << global_thread_num //-V128
<< " : caught unexpected " //-V128
"exception, aborted thread execution";
Expand All @@ -454,7 +454,7 @@ namespace hpx { namespace threads { namespace detail
}

LTM_(info) //-V128
<< "thread_func: " << id_.name_
<< "thread_func: " << id_.name()
<< " thread_num: " << global_thread_num
<< " : ending OS thread, " //-V128
"executed "
Expand Down Expand Up @@ -1332,7 +1332,7 @@ namespace hpx { namespace threads { namespace detail
std::size_t virt_core, std::size_t thread_num,
std::shared_ptr<compat::barrier> startup, error_code& ec)
{
get_resource_partitioner().assign_pu(id_.name_, virt_core);
get_resource_partitioner().assign_pu(id_.name(), virt_core);

if (threads_.size() <= virt_core)
threads_.resize(virt_core + 1);
Expand Down Expand Up @@ -1388,7 +1388,7 @@ namespace hpx { namespace threads { namespace detail

threads_[virt_core].join();

get_resource_partitioner().unassign_pu(id_.name_, virt_core);
get_resource_partitioner().unassign_pu(id_.name(), virt_core);
}
}}}

Expand Down
28 changes: 17 additions & 11 deletions hpx/runtime/threads/detail/thread_pool_base.hpp
Expand Up @@ -45,9 +45,12 @@ namespace hpx { namespace threads { namespace detail
: index_(index), name_(name)
{}

std::size_t index_;
std::string name_;
//! could get an hpx::naming::id_type in the future
std::size_t index() const { return index_; };
std::string const& name() const { return name_; }

private:
std::size_t const index_;
std::string const name_;
};

///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -97,9 +100,17 @@ namespace hpx { namespace threads { namespace detail
thread_state_ex_enum newstate_ex, thread_priority priority,
error_code& ec) = 0;

const std::string &get_pool_name() const
std::size_t get_pool_index() const
{
return id_.index();
}
std::string const& get_pool_name() const
{
return id_.name_;
return id_.name();
}
std::size_t get_thread_offset() const
{
return thread_offset_;
}

virtual policies::scheduler_base* get_scheduler() const
Expand Down Expand Up @@ -267,14 +278,9 @@ namespace hpx { namespace threads { namespace detail
// return the description string of the underlying scheduler
char const* get_description() const;

public:
protected:
void init_pool_time_scale();

std::size_t get_thread_offset() const
{
return thread_offset_;
}

protected:
pool_id_type id_;

Expand Down
12 changes: 12 additions & 0 deletions hpx/runtime/threads/thread_pools.hpp
@@ -0,0 +1,12 @@
// Copyright (c) 2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#if !defined(HPX_THREADS_THREAD_POOLS_AUG_08_2017_1110AM)
#define HPX_THREADS_THREAD_POOLS_AUG_08_2017_1110AM

#include <hpx/runtime/threads/detail/thread_pool_base.hpp>
#include <hpx/runtime/threads/detail/scheduled_thread_pool.hpp>

#endif
10 changes: 10 additions & 0 deletions src/runtime/resource_partitioner.cpp
Expand Up @@ -84,6 +84,16 @@ namespace resource {
return get_resource_partitioner().get_num_pools();
}

std::size_t get_num_threads()
{
return get_resource_partitioner().get_num_threads();
}

std::size_t get_num_threads(std::string const& pool_name)
{
return get_resource_partitioner().get_num_threads(pool_name);
}

std::size_t get_num_threads(std::size_t pool_index)
{
return get_resource_partitioner().get_num_threads(pool_index);
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/threads/detail/thread_pool_base.cpp
Expand Up @@ -59,7 +59,7 @@ namespace hpx { namespace threads { namespace detail
// detail::manage_executor interface implementation
char const* thread_pool_base::get_description() const
{
return id_.name_.c_str();
return id_.name().c_str();
}

///////////////////////////////////////////////////////////////////////////
Expand Down
12 changes: 6 additions & 6 deletions src/runtime/threads/executors/this_thread_executors.cpp
Expand Up @@ -311,12 +311,12 @@ namespace hpx { namespace threads { namespace executors { namespace detail
hpx::state expected = state_starting;
if (state.compare_exchange_strong(expected, state_stopping))
{
{
std::unique_lock<mutex_type> l(mtx_);
get_resource_partitioner().get_affinity_data().add_punit(
0, thread_num_);
scheduler_.on_start_thread(0);
}
// {
// std::unique_lock<mutex_type> l(mtx_);
// get_resource_partitioner().get_affinity_data().add_punit(
// 0, thread_num_);
// scheduler_.on_start_thread(0);
// }

self_ = threads::get_self_ptr();

Expand Down

0 comments on commit 466cc16

Please sign in to comment.