Skip to content

Commit

Permalink
Merge pull request #2900 from STEllAR-GROUP/numa_balanced
Browse files Browse the repository at this point in the history
Add numa-balanced mode to hpx::bind, spread cores over numa domains
  • Loading branch information
sithhell committed Sep 20, 2017
2 parents 0bc6e4e + 958f5dc commit 157d3a0
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 5 deletions.
7 changes: 4 additions & 3 deletions hpx/runtime/threads/policies/parse_affinity_options.hpp
Expand Up @@ -31,9 +31,10 @@ namespace hpx { namespace threads { namespace detail

enum distribution_type
{
compact = 0x01,
scatter = 0x02,
balanced = 0x04
compact = 0x01,
scatter = 0x02,
balanced = 0x04,
numa_balanced = 0x08
};

struct spec_type
Expand Down
84 changes: 82 additions & 2 deletions src/runtime/threads/policies/parse_affinity_options.cpp
Expand Up @@ -118,6 +118,7 @@ namespace hpx { namespace threads { namespace detail
partlit("compact") >> qi::attr(compact)
| partlit("scatter") >> qi::attr(scatter)
| partlit("balanced") >> qi::attr(balanced)
| partlit("numa-balanced") >> qi::attr(numa_balanced)
;

thread_spec =
Expand Down Expand Up @@ -819,9 +820,9 @@ namespace hpx { namespace threads { namespace detail
// Iterate over the cores and assigned pus per core. this additional
// loop is needed so that we have consecutive worker thread numbers
std::size_t num_thread = 0;
for(std::size_t num_core = 0; num_core != num_cores; ++num_core)
for (std::size_t num_core = 0; num_core != num_cores; ++num_core)
{
for(std::size_t num_pu = 0; num_pu != num_pus_cores[num_core]; ++num_pu)
for (std::size_t num_pu = 0; num_pu != num_pus_cores[num_core]; ++num_pu)
{
if (any(affinities[num_thread]))
{
Expand All @@ -841,6 +842,80 @@ namespace hpx { namespace threads { namespace detail
}
}

///////////////////////////////////////////////////////////////////////////
void decode_numabalanced_distribution(hwloc_topology_info& t,
std::vector<mask_type>& affinities,
std::size_t used_cores, std::size_t max_cores,
std::vector<std::size_t>& num_pus, error_code& ec)
{
std::size_t num_threads = affinities.size();
num_pus.resize(num_threads);
// numa nodes
std::size_t num_numas = (std::max)(std::size_t(1),t.get_number_of_numa_nodes());
std::vector<std::size_t> num_cores_numa(num_numas, 0);
std::vector<std::size_t> num_threads_numa(num_numas, 0);
std::size_t cores_t = 0;
for (std::size_t n=0; n<num_numas; ++n) {
num_cores_numa[n] = t.get_number_of_numa_node_cores(n);
cores_t += num_cores_numa[n];
}

// how many threads should go on each domain
std::size_t cores_t2 = 0;
for (std::size_t n=0; n<num_numas; ++n) {
std::size_t temp =
std::floor(0.5 +double(num_threads)*num_cores_numa[n]/cores_t);
// due to rounding up, we might have too many threads
if ((cores_t2+temp)>num_threads)
temp = num_threads - cores_t2;
cores_t2 += temp;
num_threads_numa[n] = temp;
}

// assign threads to cores on each numa domain
std::size_t num_thread = 0;
std::size_t offset = 0;
for (std::size_t n = 0; n != num_numas; ++n)
{
std::vector<std::size_t> num_pus_cores(num_cores_numa[n], 0);

// iterate once and count pus/core
for (std::size_t thrd = 0; thrd != num_threads_numa[n]; /**/)
{
for(std::size_t c = 0; c != num_cores_numa[n]; ++c)
{
num_pus_cores[c]++;
if (++thrd == num_threads_numa[n])
break;
}
}

// Iterate over the cores and assigned pus per core. this additional
// loop is needed so that we have consecutive worker thread numbers
for (std::size_t num_core = 0; num_core != num_cores_numa[n]; ++num_core)
{
for (std::size_t num_pu = 0; num_pu != num_pus_cores[num_core]; ++num_pu)
{
if (any(affinities[num_thread]))
{
HPX_THROWS_IF(ec, bad_parameter,
"decode_balancednuma_distribution",
boost::str(boost::format(
"affinity mask for thread %1% has "
"already been set"
) % num_thread));
return;
}
num_pus[num_thread] = t.get_pu_number(num_core + used_cores, num_pu);
affinities[num_thread] = t.init_thread_affinity_mask(
num_core + used_cores + offset, num_pu);
++num_thread;
}
}
offset += num_cores_numa[n];
}
}

///////////////////////////////////////////////////////////////////////////
void decode_distribution(distribution_type d, hwloc_topology_info& t,
std::vector<mask_type>& affinities,
Expand All @@ -864,6 +939,11 @@ namespace hpx { namespace threads { namespace detail
num_pus, ec);
break;

case numa_balanced:
decode_numabalanced_distribution(t, affinities, used_cores, max_cores,
num_pus, ec);
break;

default:
HPX_ASSERT(false);
}
Expand Down

0 comments on commit 157d3a0

Please sign in to comment.