Skip to content

Commit

Permalink
Renaming thread pool classes
Browse files Browse the repository at this point in the history
- fixing linker errors
  • Loading branch information
hkaiser committed Aug 3, 2017
1 parent 8a19601 commit 406be02
Show file tree
Hide file tree
Showing 17 changed files with 330 additions and 200 deletions.
119 changes: 112 additions & 7 deletions examples/resource_partitioner/shared_priority_scheduler.hpp
Expand Up @@ -122,11 +122,11 @@ namespace threads {
thread_queue_type;

shared_priority_scheduler(std::size_t num_worker_queues,
std::size_t num_high_priority_queues_,
bool numa_sensitive,
bool numa_pinned,
char const* description,
int max_tasks = max_thread_count)
std::size_t num_high_priority_queues_,
bool numa_sensitive,
bool numa_pinned,
char const* description,
int max_tasks = max_thread_count)
: scheduler_base(num_worker_queues, description)
, max_queue_thread_count_(max_tasks)
, queues_(num_worker_queues)
Expand Down Expand Up @@ -177,7 +177,8 @@ namespace threads {
{
std::uint64_t time = 0;

time += high_priority_queue_.get_creation_time(reset);
for (std::size_t i = 0; i != queues_.size(); ++i)
time += high_priority_queues_[i]->get_creation_time(reset);

time += low_priority_queue_.get_creation_time(reset);

Expand All @@ -191,7 +192,8 @@ namespace threads {
{
std::uint64_t time = 0;

time += high_priority_queue_.get_cleanup_time(reset);
for (std::size_t i = 0; i != queues_.size(); ++i)
time += high_priority_queues_[i]->get_cleanup_time(reset);

time += low_priority_queue_.get_cleanup_time(reset);

Expand All @@ -211,13 +213,17 @@ namespace threads {
{
for (std::size_t i = 0; i != high_priority_queues_.size();
++i)
{
num_pending_misses +=
high_priority_queues_[i]->get_num_pending_misses(
reset);
}

for (std::size_t i = 0; i != queues_.size(); ++i)
{
num_pending_misses +=
queues_[i]->get_num_pending_misses(reset);
}

num_pending_misses +=
low_priority_queue_.get_num_pending_misses(reset);
Expand Down Expand Up @@ -405,6 +411,105 @@ namespace threads {
}
#endif

#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
///////////////////////////////////////////////////////////////////////
// Queries the current average thread wait time of the queues.
std::int64_t get_average_thread_wait_time(
std::size_t num_thread = std::size_t(-1)) const
{
// Return average thread wait time of one specific queue.
std::uint64_t wait_time = 0;
std::uint64_t count = 0;
if (std::size_t(-1) != num_thread)
{
HPX_ASSERT(num_thread < queues_.size());

if (num_thread < high_priority_queues_.size())
{
wait_time = high_priority_queues_[num_thread]->
get_average_thread_wait_time();
++count;
}

if (queues_.size()-1 == num_thread)
{
wait_time += low_priority_queue_.
get_average_thread_wait_time();
++count;
}

wait_time += queues_[num_thread]->get_average_thread_wait_time();
return wait_time / (count + 1);
}

// Return the cumulative average thread wait time for all queues.
for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
{
wait_time += high_priority_queues_[i]->get_average_thread_wait_time();
++count;
}

wait_time += low_priority_queue_.get_average_thread_wait_time();

for (std::size_t i = 0; i != queues_.size(); ++i)
{
wait_time += queues_[i]->get_average_thread_wait_time();
++count;
}

return wait_time / (count + 1);
}

///////////////////////////////////////////////////////////////////////
// Queries the current average task wait time of the queues.
std::int64_t get_average_task_wait_time(
std::size_t num_thread = std::size_t(-1)) const
{
// Return average task wait time of one specific queue.
std::uint64_t wait_time = 0;
std::uint64_t count = 0;
if (std::size_t(-1) != num_thread)
{
HPX_ASSERT(num_thread < queues_.size());

if (num_thread < high_priority_queues_.size())
{
wait_time = high_priority_queues_[num_thread]->
get_average_task_wait_time();
++count;
}

if (queues_.size()-1 == num_thread)
{
wait_time += low_priority_queue_.
get_average_task_wait_time();
++count;
}

wait_time += queues_[num_thread]->get_average_task_wait_time();
return wait_time / (count + 1);
}

// Return the cumulative average task wait time for all queues.
for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
{
wait_time += high_priority_queues_[i]->
get_average_task_wait_time();
++count;
}

wait_time += low_priority_queue_.get_average_task_wait_time();

for (std::size_t i = 0; i != queues_.size(); ++i)
{
wait_time += queues_[i]->get_average_task_wait_time();
++count;
}

return wait_time / (count + 1);
}
#endif

///////////////////////////////////////////////////////////////////////
void abort_all_suspended_threads()
{
Expand Down
31 changes: 17 additions & 14 deletions examples/resource_partitioner/simple_resource_partitioner.cpp
Expand Up @@ -12,6 +12,7 @@
#include <hpx/runtime/resource_partitioner.hpp>
#include <hpx/runtime/threads/cpu_mask.hpp>
#include <hpx/runtime/threads/executors/customized_pool_executors.hpp>
#include <hpx/runtime/threads/detail/scheduled_thread_pool_impl.hpp>
//
#include <hpx/include/iostreams.hpp>
#include <hpx/include/runtime.hpp>
Expand All @@ -26,17 +27,16 @@
#include "shared_priority_scheduler.hpp"
#include "system_characteristics.hpp"

namespace resource {
namespace pools {
namespace resource { namespace pools
{
enum ids
{
DEFAULT = 0,
MPI = 1,
GPU = 2,
MATRIX = 3,
};
}
}
}}

static bool use_pools = false;
static bool use_scheduler = false;
Expand All @@ -49,7 +49,7 @@ using namespace hpx::threads::policies;
// Force an instantiation of the pool type templated on our custom scheduler
// we need this to ensure that the pool has the generated member functions needed
// by the linker for this pool type
template class hpx::threads::detail::thread_pool_impl<high_priority_sched>;
// template class hpx::threads::detail::scheduled_thread_pool<high_priority_sched>;

// dummy function we will call using async
void do_stuff(std::size_t n, bool printout)
Expand Down Expand Up @@ -212,7 +212,9 @@ int hpx_main(boost::program_options::variables_map& vm)
hpx::parallel::execution::par
.with(fixed /*, high_priority_async_policy*/)
.on(mpi_executor),
0, loop_count, 1, [&](std::size_t i) {
0, loop_count, 1,
[&](std::size_t i)
{
std::lock_guard<hpx::lcos::local::mutex> lock(m);
if (thread_set.insert(std::this_thread::get_id()).second)
{
Expand Down Expand Up @@ -273,7 +275,8 @@ int main(int argc, char* argv[])
[](hpx::threads::policies::callback_notifier& notifier,
std::size_t num_threads, std::size_t thread_offset,
std::size_t pool_index, char const* pool_name)
-> std::unique_ptr<hpx::threads::detail::thread_pool> {
-> std::unique_ptr<hpx::threads::detail::thread_pool_base>
{
std::cout << "User defined scheduler creation callback "
<< std::endl;

Expand All @@ -284,10 +287,10 @@ int main(int argc, char* argv[])
auto mode = scheduler_mode(scheduler_mode::do_background_work |
scheduler_mode::delay_exit);

std::unique_ptr<hpx::threads::detail::thread_pool> pool(
new hpx::threads::detail::thread_pool_impl<high_priority_sched>(
std::move(scheduler), notifier, pool_index, pool_name, mode,
thread_offset));
std::unique_ptr<hpx::threads::detail::thread_pool_base> pool(
new hpx::threads::detail::scheduled_thread_pool<
high_priority_sched>(std::move(scheduler), notifier,
pool_index, pool_name, mode, thread_offset));
return pool;
});

Expand All @@ -304,7 +307,7 @@ int main(int argc, char* argv[])
[](hpx::threads::policies::callback_notifier& notifier,
std::size_t num_threads, std::size_t thread_offset,
std::size_t pool_index, char const* pool_name)
-> std::unique_ptr<hpx::threads::detail::thread_pool> {
-> std::unique_ptr<hpx::threads::detail::thread_pool_base> {
std::cout << "User defined scheduler creation callback "
<< std::endl;
std::unique_ptr<high_priority_sched> scheduler(
Expand All @@ -313,8 +316,8 @@ int main(int argc, char* argv[])

auto mode = scheduler_mode(scheduler_mode::delay_exit);

std::unique_ptr<hpx::threads::detail::thread_pool> pool(
new hpx::threads::detail::thread_pool_impl<
std::unique_ptr<hpx::threads::detail::thread_pool_base> pool(
new hpx::threads::detail::scheduled_thread_pool<
high_priority_sched>(std::move(scheduler), notifier,
pool_index, pool_name, mode, thread_offset));
return pool;
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/resource_partitioner.hpp
Expand Up @@ -17,7 +17,7 @@
#include <hpx/util/command_line_handling.hpp>
#include <hpx/util/thread_specific_ptr.hpp>
//
#include <hpx/runtime/threads/detail/thread_pool.hpp>
#include <hpx/runtime/threads/detail/thread_pool_base.hpp>
#include <hpx/runtime/threads/policies/callback_notifier.hpp>
#include <hpx/runtime/threads/policies/scheduler_mode.hpp>

Expand Down Expand Up @@ -144,7 +144,7 @@ namespace resource {

using scheduler_function =
util::function_nonser<
std::unique_ptr<hpx::threads::detail::thread_pool>(
std::unique_ptr<hpx::threads::detail::thread_pool_base>(
hpx::threads::policies::callback_notifier&,
std::size_t, std::size_t, std::size_t, char const *
)>;
Expand Down
6 changes: 3 additions & 3 deletions hpx/runtime/thread_pool_helpers.hpp
Expand Up @@ -15,7 +15,7 @@

namespace hpx { namespace threads { namespace detail
{
class HPX_EXPORT thread_pool;
class HPX_EXPORT thread_pool_base;
}}}

namespace hpx { namespace resource
Expand All @@ -36,11 +36,11 @@ namespace hpx { namespace resource
HPX_API_EXPORT std::string const& get_pool_name(std::size_t pool_index);

/// Return the name of the pool given its name
HPX_API_EXPORT threads::detail::thread_pool& get_thread_pool(
HPX_API_EXPORT threads::detail::thread_pool_base& get_thread_pool(
std::string const& pool_name);

/// Return the thread pool given its internal index
HPX_API_EXPORT threads::detail::thread_pool& get_thread_pool(
HPX_API_EXPORT threads::detail::thread_pool_base& get_thread_pool(
std::size_t pool_index);
}
}
Expand Down
Expand Up @@ -3,14 +3,14 @@
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#if !defined(HPX_THREAD_POOL_HPP)
#define HPX_THREAD_POOL_HPP
#if !defined(HPX_SCHEDULED_THREAD_POOL_HPP)
#define HPX_SCHEDULED_THREAD_POOL_HPP

#include <hpx/config.hpp>
#include <hpx/compat/barrier.hpp>
#include <hpx/compat/mutex.hpp>
#include <hpx/compat/thread.hpp>
#include <hpx/runtime/threads/detail/thread_pool.hpp>
#include <hpx/runtime/threads/detail/thread_pool_base.hpp>
#include <hpx/runtime/threads/policies/callback_notifier.hpp>
#include <hpx/runtime/threads/policies/scheduler_base.hpp>

Expand All @@ -22,6 +22,8 @@
#include <utility>
#include <vector>

#include <hpx/config/warnings_prefix.hpp>

namespace hpx { namespace threads { namespace detail
{
///////////////////////////////////////////////////////////////////////////
Expand All @@ -30,16 +32,16 @@ namespace hpx { namespace threads { namespace detail

///////////////////////////////////////////////////////////////////////////
template <typename Scheduler>
class HPX_EXPORT thread_pool_impl : public thread_pool
class scheduled_thread_pool : public thread_pool_base
{
public:
///////////////////////////////////////////////////////////////////
thread_pool_impl(std::unique_ptr<Scheduler> sched,
scheduled_thread_pool(std::unique_ptr<Scheduler> sched,
threads::policies::callback_notifier& notifier, std::size_t index,
char const* pool_name, policies::scheduler_mode m =
policies::scheduler_mode::nothing_special,
std::size_t thread_offset = 0);
virtual ~thread_pool_impl();
virtual ~scheduled_thread_pool();

void init (std::size_t pool_threads, std::size_t threads_offset);

Expand Down Expand Up @@ -291,4 +293,6 @@ namespace hpx { namespace threads { namespace detail
};
}}} // namespace hpx::threads::detail

#include <hpx/config/warnings_suffix.hpp>

#endif

0 comments on commit 406be02

Please sign in to comment.