Skip to content

Commit

Permalink
Enable over-subscription of pus in resource_partitioner
Browse files Browse the repository at this point in the history
- streamlined resource partitioner creation API
- pu.thread_occupancy_count_ now counts up, not down
  • Loading branch information
hkaiser committed Aug 6, 2017
1 parent fe41eeb commit cc295bf
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 95 deletions.
5 changes: 3 additions & 2 deletions examples/resource_partitioner/simple_resource_partitioner.cpp
Expand Up @@ -266,7 +266,8 @@ int main(int argc, char* argv[])

pool_threads = vm["pool-threads"].as<int>();

auto& rp = hpx::get_resource_partitioner(desc_cmdline, argc, argv);
auto& rp = hpx::get_resource_partitioner(desc_cmdline, argc, argv,
hpx::resource::mode_allow_oversubscription);

// auto &topo = rp.get_topology();
std::cout << "[main] obtained reference to the resource_partitioner\n";
Expand Down Expand Up @@ -297,7 +298,7 @@ int main(int argc, char* argv[])
return pool;
});

// rp.add_resource(rp.numa_domains(), "default");
rp.add_resource(rp.numa_domains(), "default");

if (use_pools)
{
Expand Down
85 changes: 43 additions & 42 deletions hpx/runtime/resource_partitioner.hpp
Expand Up @@ -42,8 +42,16 @@ int hpx_main(boost::program_options::variables_map& vm);

namespace hpx {

namespace resource {
namespace resource
{
class resource_partitioner;

// resource_partitioner mode
enum resource_partitioner_mode
{
mode_default = 0,
mode_allow_oversubscription = 1,
};
}

// if the resource partitioner is accessed before the HPX runtime has started
Expand All @@ -54,78 +62,63 @@ HPX_EXPORT resource::resource_partitioner& get_resource_partitioner(
util::function_nonser<
int(boost::program_options::variables_map& vm)
> const& f,
boost::program_options::options_description const& desc_cmdline, int argc,
char** argv, std::vector<std::string> ini_config, runtime_mode mode,
boost::program_options::options_description const& desc_cmdline,
int argc, char** argv, std::vector<std::string> ini_config,
resource::resource_partitioner_mode rpmode = resource::mode_default,
runtime_mode mode = runtime_mode_default,
bool check = true);

#if !defined(HPX_EXPORTS)
typedef int (*hpx_main_type)(boost::program_options::variables_map&);

inline resource::resource_partitioner& get_resource_partitioner(
int argc, char** argv)
int argc, char** argv,
resource::resource_partitioner_mode rpmode = resource::mode_default,
runtime_mode mode = runtime_mode_default, bool check = true)
{
boost::program_options::options_description desc_cmdline(
std::string("Usage: ") + HPX_APPLICATION_STRING + " [options]");

return get_resource_partitioner(static_cast<hpx_main_type>(::hpx_main),
desc_cmdline, argc, argv, std::vector<std::string>(),
runtime_mode_default);
rpmode, mode, check);
}

inline resource::resource_partitioner &get_resource_partitioner(
boost::program_options::options_description const& desc_cmdline, int argc,
char **argv, bool check = true)
{
return get_resource_partitioner(static_cast<hpx_main_type>(::hpx_main),
desc_cmdline, argc, argv, std::vector<std::string>(),
runtime_mode_default, check);
}

inline resource::resource_partitioner &get_resource_partitioner(
int argc, char **argv, std::vector<std::string> ini_config)
int argc, char **argv, std::vector<std::string> ini_config,
resource::resource_partitioner_mode rpmode = resource::mode_default,
runtime_mode mode = runtime_mode_default, bool check = true)
{
boost::program_options::options_description desc_cmdline(
std::string("Usage: ") + HPX_APPLICATION_STRING + " [options]");

return get_resource_partitioner(static_cast<hpx_main_type>(::hpx_main),
desc_cmdline, argc, argv, std::move(ini_config), runtime_mode_default);
}

inline resource::resource_partitioner &get_resource_partitioner(
int argc, char **argv, runtime_mode mode)
{
boost::program_options::options_description desc_cmdline(
std::string("Usage: ") + HPX_APPLICATION_STRING + " [options]");

return get_resource_partitioner(static_cast<hpx_main_type>(::hpx_main),
desc_cmdline, argc, argv, std::vector<std::string>(0), mode);
desc_cmdline, argc, argv, std::move(ini_config),
rpmode, mode, check);
}

///////////////////////////////////////////////////////////////////////////////
inline resource::resource_partitioner &get_resource_partitioner(
boost::program_options::options_description const& desc_cmdline, int argc,
char **argv, std::vector<std::string> ini_config)
boost::program_options::options_description const& desc_cmdline,
int argc, char **argv,
resource::resource_partitioner_mode rpmode = resource::mode_default,
runtime_mode mode = runtime_mode_default, bool check = true)
{
return get_resource_partitioner(static_cast<hpx_main_type>(::hpx_main),
desc_cmdline, argc, argv, std::move(ini_config), runtime_mode_default);
desc_cmdline, argc, argv, std::vector<std::string>(),
rpmode, mode, check);
}

inline resource::resource_partitioner &get_resource_partitioner(
boost::program_options::options_description const& desc_cmdline, int argc,
char **argv, runtime_mode mode)
boost::program_options::options_description const& desc_cmdline,
int argc, char **argv, std::vector<std::string> ini_config,
resource::resource_partitioner_mode rpmode = resource::mode_default,
runtime_mode mode = runtime_mode_default, bool check = true)
{
return get_resource_partitioner(static_cast<hpx_main_type>(::hpx_main),
desc_cmdline, argc, argv, std::vector<std::string>(), mode);
desc_cmdline, argc, argv, ini_config, rpmode, mode, check);
}

inline resource::resource_partitioner& get_resource_partitioner(int argc,
char** argv, std::vector<std::string> ini_config, runtime_mode mode)
{
boost::program_options::options_description desc_cmdline(
std::string("Usage: ") + HPX_APPLICATION_STRING + " [options]");

return get_resource_partitioner(static_cast<hpx_main_type>(::hpx_main),
desc_cmdline, argc, argv, std::move(ini_config), mode);
}
#endif

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -257,7 +250,8 @@ namespace resource {
std::string pool_name_;
scheduling_policy scheduling_policy_;
// PUs this pool is allowed to run on
std::vector<threads::mask_type> assigned_pus_;
std::vector<threads::mask_type> assigned_pus_; // mask
std::vector<std::size_t> assigned_pu_nums_; // pu index
// counter for number of threads bound to this pool
std::size_t num_threads_;
scheduler_function create_function_;
Expand Down Expand Up @@ -317,7 +311,9 @@ namespace resource {
threads::topology &get_topology() const;
util::command_line_handling &get_command_line_switches();

std::size_t get_num_distinct_pus() const;
std::size_t get_num_threads() const;

std::size_t get_num_pools() const;
std::size_t get_num_threads(std::string const& pool_name) const;
std::size_t get_num_threads(std::size_t pool_index) const;
Expand All @@ -337,6 +333,7 @@ namespace resource {
> const& f,
boost::program_options::options_description desc_cmdline,
int argc, char **argv, std::vector<std::string> ini_config,
resource::resource_partitioner_mode rpmode,
runtime_mode mode, bool fill_internal_topology = true);

scheduler_function const& get_pool_creator(size_t index) const;
Expand Down Expand Up @@ -401,6 +398,10 @@ namespace resource {
// contains the internal topology back-end used to add resources to
// initial_thread_pools
std::vector<numa_domain> numa_domains_;

// store policy flags determining the general behavior of the
// resource_partitioner
resource_partitioner_mode mode_;
};

} // namespace resource
Expand Down
13 changes: 13 additions & 0 deletions hpx/runtime/threads/policies/affinity_data.hpp
Expand Up @@ -38,6 +38,11 @@ namespace hpx { namespace threads { namespace policies { namespace detail
{
affinity_masks_ = affinity_masks;
}
void set_affinity_masks(
std::vector<threads::mask_type> && affinity_masks)
{
affinity_masks_ = std::move(affinity_masks);
}

std::size_t get_num_threads() const
{
Expand All @@ -54,6 +59,14 @@ namespace hpx { namespace threads { namespace policies { namespace detail
HPX_ASSERT(num_thread < pu_nums_.size());
return pu_nums_[num_thread];
}
void set_pu_nums(std::vector<std::size_t> const& pu_nums)
{
pu_nums_ = pu_nums;
}
void set_pu_nums(std::vector<std::size_t> && pu_nums)
{
pu_nums_ = std::move(pu_nums);
}

void add_punit(std::size_t virt_core, std::size_t thread_num);

Expand Down
28 changes: 15 additions & 13 deletions hpx/runtime/threads/policies/scheduler_base.hpp
Expand Up @@ -219,25 +219,25 @@ namespace hpx { namespace threads { namespace policies
inline std::size_t domain_from_local_thread_index(std::size_t n)
{
auto &rp = get_resource_partitioner();
auto const& topo = hpx::threads::get_topology();
auto const& topo = rp.get_topology();
std::size_t global_id = local_to_global_thread_index(n);
std::size_t pu_num = rp.get_pu_num(global_id);
//

return topo.get_numa_node_number(pu_num);
}

template <typename queue_type>
std::size_t num_domains(const std::vector<queue_type*> &queues)
template <typename Queue>
std::size_t num_domains(const std::vector<Queue*> &queues)
{
auto &rp = get_resource_partitioner();
auto const& topo = hpx::threads::get_topology();
auto const& topo = rp.get_topology();
std::size_t num_queues = queues.size();
//

std::set<std::size_t> domains;
for (std::size_t local_id=0; local_id<num_queues; ++local_id) {
for (std::size_t local_id = 0; local_id != num_queues; ++local_id)
{
std::size_t global_id = local_to_global_thread_index(local_id);
std::size_t pu_num = rp.get_pu_num(global_id);
//
std::size_t dom = topo.get_numa_node_number(pu_num);
domains.insert(dom);
}
Expand All @@ -252,14 +252,16 @@ namespace hpx { namespace threads { namespace policies
{
std::vector<std::size_t> result;
auto &rp = get_resource_partitioner();
auto const& topo = hpx::threads::get_topology();
auto const& topo = rp.get_topology();
std::size_t global_id = local_to_global_thread_index(local_id);
std::size_t pu_num = rp.get_pu_num(global_id);
std::size_t numa = topo.get_numa_node_number(pu_num);
for (auto local_id : ts) {
std::size_t global_id = local_to_global_thread_index(local_id);
std::size_t pu_num = rp.get_pu_num(global_id);
if (pred(numa,topo.get_numa_node_number(pu_num))) {
for (auto local_id : ts)
{
global_id = local_to_global_thread_index(local_id);
pu_num = rp.get_pu_num(global_id);
if (pred(numa, topo.get_numa_node_number(pu_num)))
{
result.push_back(local_id);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/hpx_init.cpp
Expand Up @@ -615,7 +615,8 @@ namespace hpx
// and get a handle to it
// (if the command-line parsing has not yet been done, do it now)
auto& rp = hpx::get_resource_partitioner(f, desc_cmdline, argc,
argv, std::move(ini_config), mode, false);
argv, std::move(ini_config), resource::mode_default,
mode, false);

// check whether HPX should be exited at this point
// (if the program options contain --hpx:help or --hpx:version)
Expand Down

0 comments on commit cc295bf

Please sign in to comment.