Skip to content

Commit

Permalink
Speeding up accessing the resource partitioner and the topology info
Browse files Browse the repository at this point in the history
- this reduces the overheads in the scheduler significantly
  • Loading branch information
hkaiser committed Nov 19, 2017
1 parent 42a588b commit d0e95d2
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 27 deletions.
3 changes: 3 additions & 0 deletions hpx/runtime/resource/detail/partitioner.hpp
Expand Up @@ -233,6 +233,9 @@ namespace hpx { namespace resource { namespace detail
// store policy flags determining the general behavior of the
// resource_partitioner
resource::partitioner_mode mode_;

// topology information
threads::topology& topo_;
};
}}}

Expand Down
5 changes: 3 additions & 2 deletions hpx/runtime/threads/policies/affinity_data.hpp
Expand Up @@ -51,7 +51,8 @@ namespace hpx { namespace threads { namespace policies { namespace detail
return num_threads_;
}

mask_cref_type get_pu_mask(std::size_t num_thread) const;
mask_cref_type get_pu_mask(threads::topology const& topo,
std::size_t num_thread) const;

mask_type get_used_pus_mask(std::size_t pu_num) const;
std::size_t get_thread_occupancy(std::size_t pid) const;
Expand Down Expand Up @@ -86,7 +87,7 @@ namespace hpx { namespace threads { namespace policies { namespace detail
std::vector<mask_type> affinity_masks_;
std::vector<std::size_t> pu_nums_;
mask_type no_affinity_; ///< mask of processing units which have no affinity
static std::atomic<int> instance_number_counter_; ///< counter for instance numbers
static std::atomic<int> instance_number_counter_; ///< counter for instance numbers
};
}}}}

Expand Down
18 changes: 9 additions & 9 deletions hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp
Expand Up @@ -126,7 +126,8 @@ namespace hpx { namespace threads { namespace policies
high_priority_queues_(init.num_high_priority_queues_),
low_priority_queue_(init.max_queue_thread_count_),
curr_queue_(0),
numa_sensitive_(init.numa_sensitive_)
numa_sensitive_(init.numa_sensitive_),
rp_(resource::get_partitioner())
{
victim_threads_.clear();
victim_threads_.resize(init.num_queues_);
Expand Down Expand Up @@ -468,8 +469,7 @@ namespace hpx { namespace threads { namespace policies
num_thread %= queue_size;

// Select a OS thread which hasn't been disabled
auto const& rp = resource::get_partitioner();
auto mask = rp.get_pu_mask(
auto mask = rp_.get_pu_mask(
num_thread + parent_pool_->get_thread_offset());
if(!threads::any(mask))
threads::set(mask, num_thread + parent_pool_->get_thread_offset());
Expand Down Expand Up @@ -937,8 +937,7 @@ namespace hpx { namespace threads { namespace policies

// Check if we have been disabled
{
auto const& rp = resource::get_partitioner();
auto mask = rp.get_pu_mask(
auto mask = rp_.get_pu_mask(
num_thread + parent_pool_->get_thread_offset());

if (!bit_and(mask, parent_pool_->get_used_processing_units()))
Expand Down Expand Up @@ -1043,15 +1042,14 @@ namespace hpx { namespace threads { namespace policies
queues_[num_thread]->on_start_thread(num_thread);

std::size_t num_threads = queues_.size();
auto const& rp = resource::get_partitioner();
auto const& topo = rp.get_topology();
auto const& topo = rp_.get_topology();

// get numa domain masks of all queues...
std::vector<mask_type> numa_masks(num_threads);
std::vector<mask_type> core_masks(num_threads);
for (std::size_t i = 0; i != num_threads; ++i)
{
std::size_t num_pu = rp.get_affinity_data().get_pu_num(i);
std::size_t num_pu = rp_.get_affinity_data().get_pu_num(i);
numa_masks[i] = topo.get_numa_node_affinity_mask(num_pu);
core_masks[i] = topo.get_core_affinity_mask(num_pu);
}
Expand All @@ -1062,7 +1060,7 @@ namespace hpx { namespace threads { namespace policies
static_cast<std::ptrdiff_t>((num_threads / 2.0) + 0.5);
victim_threads_[num_thread].reserve(num_threads);

std::size_t num_pu = rp.get_affinity_data().get_pu_num(num_thread);
std::size_t num_pu = rp_.get_affinity_data().get_pu_num(num_thread);
mask_cref_type pu_mask = topo.get_thread_affinity_mask(num_pu);
mask_cref_type numa_mask = numa_masks[num_thread];
mask_cref_type core_mask = core_masks[num_thread];
Expand Down Expand Up @@ -1176,6 +1174,8 @@ namespace hpx { namespace threads { namespace policies
std::size_t numa_sensitive_;

std::vector<std::vector<std::size_t> > victim_threads_;

resource::detail::partitioner& rp_;
};
}}}

Expand Down
11 changes: 6 additions & 5 deletions src/runtime/resource/detail/detail_partitioner.cpp
Expand Up @@ -213,9 +213,10 @@ namespace hpx { namespace resource { namespace detail

////////////////////////////////////////////////////////////////////////
partitioner::partitioner()
: first_core_(std::size_t(-1))
, cores_needed_(std::size_t(-1))
, mode_(mode_default)
: first_core_(std::size_t(-1))
, cores_needed_(std::size_t(-1))
, mode_(mode_default)
, topo_(threads::create_topology())
{
// allow only one partitioner instance
if (++instance_number_counter_ > 1)
Expand Down Expand Up @@ -771,7 +772,7 @@ namespace hpx { namespace resource { namespace detail

threads::topology &partitioner::get_topology() const
{
return threads::create_topology();
return topo_;
}

util::command_line_handling &
Expand Down Expand Up @@ -865,7 +866,7 @@ namespace hpx { namespace resource { namespace detail
threads::mask_cref_type partitioner::get_pu_mask(
std::size_t global_thread_num) const
{
return affinity_data_.get_pu_mask(global_thread_num);
return affinity_data_.get_pu_mask(topo_, global_thread_num);
}

bool partitioner::cmd_line_parsed() const
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/threads/detail/thread_pool_base.cpp
Expand Up @@ -47,7 +47,7 @@ namespace hpx { namespace threads { namespace detail
///////////////////////////////////////////////////////////////////////////
mask_cref_type thread_pool_base::get_used_processing_units() const
{
std::lock_guard<pu_mutex_type> l(used_processing_units_mtx_);
// std::lock_guard<pu_mutex_type> l(used_processing_units_mtx_);
return used_processing_units_;
}

Expand Down
20 changes: 10 additions & 10 deletions src/runtime/threads/policies/affinity_data.cpp
Expand Up @@ -5,9 +5,10 @@

#include <hpx/error_code.hpp>
#include <hpx/runtime/config_entry.hpp>
#include <hpx/runtime/resource/detail/partitioner.hpp>
#include <hpx/runtime/threads/cpu_mask.hpp>
#include <hpx/runtime/threads/policies/affinity_data.hpp>
#include <hpx/runtime/resource/detail/partitioner.hpp>
#include <hpx/runtime/threads/policies/topology.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/command_line_handling.hpp>
#include <hpx/util/format.hpp>
Expand Down Expand Up @@ -113,7 +114,6 @@ namespace hpx { namespace threads { namespace policies { namespace detail
{
num_threads_ = cfg_.num_threads_;
std::size_t num_system_pus = hardware_concurrency();
auto& topo = resource::get_partitioner().get_topology();

// initialize from command line
std::size_t pu_offset = get_pu_offset(cfg_);
Expand All @@ -139,6 +139,8 @@ namespace hpx { namespace threads { namespace policies { namespace detail

init_cached_pu_nums(num_system_pus);

auto const& topo = threads::create_topology();

#if defined(HPX_HAVE_HWLOC)
std::string affinity_desc;
hpx::detail::get_affinity_description(cfg_, affinity_desc);
Expand Down Expand Up @@ -206,11 +208,9 @@ namespace hpx { namespace threads { namespace policies { namespace detail
return (std::max)(num_unique_cores, max_cores);
}

mask_cref_type affinity_data::get_pu_mask(std::size_t global_thread_num) const
mask_cref_type affinity_data::get_pu_mask(threads::topology const& topo,
std::size_t global_thread_num) const
{
// get a topology instance
topology const& topology = resource::get_partitioner().get_topology();

// --hpx:bind=none disables all affinity
if (threads::test(no_affinity_, global_thread_num))
{
Expand All @@ -228,27 +228,27 @@ namespace hpx { namespace threads { namespace policies { namespace detail
{
// The affinity domain is 'processing unit', just convert the
// pu-number into a bit-mask.
return topology.get_thread_affinity_mask(pu_num);
return topo.get_thread_affinity_mask(pu_num);
}
if (0 == std::string("core").find(affinity_domain_))
{
// The affinity domain is 'core', return a bit mask corresponding
// to all processing units of the core containing the given
// pu_num.
return topology.get_core_affinity_mask(pu_num);
return topo.get_core_affinity_mask(pu_num);
}
if (0 == std::string("numa").find(affinity_domain_))
{
// The affinity domain is 'numa', return a bit mask corresponding
// to all processing units of the NUMA domain containing the
// given pu_num.
return topology.get_numa_node_affinity_mask(pu_num);
return topo.get_numa_node_affinity_mask(pu_num);
}

// The affinity domain is 'machine', return a bit mask corresponding
// to all processing units of the machine.
HPX_ASSERT(0 == std::string("machine").find(affinity_domain_));
return topology.get_machine_affinity_mask();
return topo.get_machine_affinity_mask();
}

mask_type affinity_data::get_used_pus_mask(std::size_t pu_num) const
Expand Down

0 comments on commit d0e95d2

Please sign in to comment.