Skip to content

Commit

Permalink
Merge pull request #3179 from STEllAR-GROUP/support_flecsi
Browse files Browse the repository at this point in the history
Various minor changes to support FLeCSI
  • Loading branch information
hkaiser committed Feb 18, 2018
2 parents 2e3ed96 + b81a510 commit 8b0f81c
Show file tree
Hide file tree
Showing 21 changed files with 258 additions and 64 deletions.
3 changes: 2 additions & 1 deletion hpx/include/thread_executors.hpp
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2015 Hartmut Kaiser
// Copyright (c) 2007-2018 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Expand All @@ -8,6 +8,7 @@

#include <hpx/runtime/threads/executors/current_executor.hpp>
#include <hpx/runtime/threads/executors/default_executor.hpp>
#include <hpx/runtime/threads/executors/pool_executor.hpp>
#include <hpx/runtime/threads/executors/service_executors.hpp>
#include <hpx/runtime/threads/executors/thread_pool_executors.hpp>
#include <hpx/runtime/threads/executors/thread_pool_os_executors.hpp>
Expand Down
1 change: 1 addition & 0 deletions hpx/parallel/executors.hpp
Expand Up @@ -22,6 +22,7 @@
#include <hpx/parallel/executors/default_executor.hpp>
#include <hpx/parallel/executors/distribution_policy_executor.hpp>
#include <hpx/parallel/executors/parallel_executor.hpp>
#include <hpx/parallel/executors/pool_executor.hpp>
#include <hpx/parallel/executors/sequenced_executor.hpp>
#include <hpx/parallel/executors/service_executors.hpp>
#include <hpx/parallel/executors/this_thread_executors.hpp>
Expand Down
24 changes: 24 additions & 0 deletions hpx/parallel/executors/pool_executor.hpp
@@ -0,0 +1,24 @@
// Copyright (c) 2007-2018 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

/// \file parallel/executors/pool_executor.hpp

#if !defined(HPX_PARALLEL_EXECUTORS_POOL_EXECUTOR_FEB_17_2018_0327PM)
#define HPX_PARALLEL_EXECUTORS_POOL_EXECUTOR_FEB_17_2018_0327PM

#include <hpx/config.hpp>
#include <hpx/parallel/executors/execution_parameters.hpp>
#include <hpx/parallel/executors/thread_execution.hpp>
#include <hpx/parallel/executors/thread_execution_information.hpp>
#include <hpx/parallel/executors/thread_timed_execution.hpp>
#include <hpx/runtime/threads/executors/pool_executor.hpp>

namespace hpx { namespace parallel { namespace execution
{
///////////////////////////////////////////////////////////////////////////
using pool_executor = threads::executors::pool_executor;
}}}

#endif
39 changes: 39 additions & 0 deletions hpx/runtime/resource/detail/create_partitioner.hpp
Expand Up @@ -9,7 +9,9 @@
#include <hpx/config.hpp>
#include <hpx/runtime/resource/partitioner_fwd.hpp>
#include <hpx/runtime/runtime_mode.hpp>
#include <hpx/util/bind_back.hpp>
#include <hpx/util/find_prefix.hpp>
#include <hpx/util/function.hpp>

#include <boost/program_options.hpp>

Expand All @@ -24,6 +26,13 @@ int hpx_main(boost::program_options::variables_map& vm);
typedef int (*hpx_main_type)(boost::program_options::variables_map&);
#endif

namespace hpx { namespace detail
{
HPX_EXPORT int init_helper(
boost::program_options::variables_map&,
util::function_nonser<int(int, char**)> const&);
}}

namespace hpx { namespace resource { namespace detail
{
// if the resource partitioner is accessed before the HPX runtime has started
Expand Down Expand Up @@ -56,6 +65,36 @@ namespace hpx { namespace resource { namespace detail
rpmode, mode, check);
}

inline partitioner& create_partitioner(
util::function_nonser<int(int, char**)> const& f, int argc, char** argv,
resource::partitioner_mode rpmode = resource::mode_default,
hpx::runtime_mode mode = hpx::runtime_mode_default, bool check = true)
{
boost::program_options::options_description desc_cmdline(
std::string("Usage: ") + HPX_APPLICATION_STRING + " [options]");

util::set_hpx_prefix(HPX_PREFIX);

return create_partitioner(util::bind_back(hpx::detail::init_helper, f),
desc_cmdline, argc, argv, std::vector<std::string>(), rpmode, mode,
check);
}

inline partitioner& create_partitioner(
util::function_nonser<int(int, char**)> const& f, int argc, char** argv,
std::vector<std::string> const& cfg,
resource::partitioner_mode rpmode = resource::mode_default,
hpx::runtime_mode mode = hpx::runtime_mode_default, bool check = true)
{
boost::program_options::options_description desc_cmdline(
std::string("Usage: ") + HPX_APPLICATION_STRING + " [options]");

util::set_hpx_prefix(HPX_PREFIX);

return create_partitioner(util::bind_back(hpx::detail::init_helper, f),
desc_cmdline, argc, argv, cfg, rpmode, mode, check);
}

inline partitioner &create_partitioner(
int argc, char **argv, std::vector<std::string> ini_config,
resource::partitioner_mode rpmode = resource::mode_default,
Expand Down
16 changes: 16 additions & 0 deletions hpx/runtime/resource/partitioner.hpp
Expand Up @@ -141,6 +141,22 @@ namespace hpx { namespace resource
{}

#if !defined(HPX_EXPORTS)
partitioner(util::function_nonser<int(int, char**)> const& f,
int argc, char** argv,
resource::partitioner_mode rpmode = resource::mode_default,
hpx::runtime_mode mode = hpx::runtime_mode_default)
: partitioner_(
detail::create_partitioner(f, argc, argv, rpmode, mode))
{}

partitioner(util::function_nonser<int(int, char**)> const& f,
int argc, char** argv, std::vector<std::string> const& cfg,
resource::partitioner_mode rpmode = resource::mode_default,
hpx::runtime_mode mode = hpx::runtime_mode_default)
: partitioner_(
detail::create_partitioner(f, argc, argv, cfg, rpmode, mode))
{}

partitioner(int argc, char** argv,
resource::partitioner_mode rpmode = resource::mode_default,
runtime_mode mode = runtime_mode_default)
Expand Down
68 changes: 61 additions & 7 deletions hpx/runtime/threads/detail/scheduled_thread_pool_impl.hpp
Expand Up @@ -325,7 +325,10 @@ namespace hpx { namespace threads { namespace detail
for (std::size_t virt_core = 0; virt_core != threads_.size();
++virt_core)
{
this->sched_->Scheduler::resume(virt_core);
if (mode_ & policies::enable_suspension)
{
this->sched_->Scheduler::resume(virt_core);
}
}

if (blocking)
Expand Down Expand Up @@ -353,6 +356,14 @@ namespace hpx { namespace threads { namespace detail
return hpx::make_ready_future();
}

if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::resume",
"this scheduler does not support suspension");
return hpx::make_ready_future();
}

return hpx::async(
hpx::util::bind(&scheduled_thread_pool::resume_internal, this, true,
std::move(throws)));
Expand All @@ -362,6 +373,14 @@ namespace hpx { namespace threads { namespace detail
void scheduled_thread_pool<Scheduler>::resume_cb(
std::function<void(void)> callback, error_code& ec)
{
if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::resume_cb",
"this scheduler does not support suspension");
return;
}

std::function<void(void)> resume_internal_wrapper =
[this, HPX_CAPTURE_MOVE(callback)]()
{
Expand All @@ -385,11 +404,19 @@ namespace hpx { namespace threads { namespace detail
if (threads::get_self_ptr() && hpx::this_thread::get_pool() == this)
{
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::suspend",
"scheduled_thread_pool<Scheduler>::resume_direct",
"cannot suspend a pool from itself");
return;
}

if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::resume_direct",
"this scheduler does not support suspension");
return;
}

this->resume_internal(true, ec);
}

Expand Down Expand Up @@ -425,7 +452,7 @@ namespace hpx { namespace threads { namespace detail
"scheduled_thread_pool<Scheduler>::suspend",
"cannot call suspend from outside HPX, use suspend_cb or"
"suspend_direct instead");
return make_ready_future();
return hpx::make_ready_future();
}
else if (threads::get_self_ptr() &&
hpx::this_thread::get_pool() == this)
Expand All @@ -436,6 +463,14 @@ namespace hpx { namespace threads { namespace detail
"cannot suspend a pool from itself"));
}

if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::suspend",
"this scheduler does not support suspension");
return hpx::make_ready_future();
}

return hpx::async(
hpx::util::bind(&scheduled_thread_pool::suspend_internal, this,
std::move(throws)));
Expand All @@ -448,11 +483,19 @@ namespace hpx { namespace threads { namespace detail
if (threads::get_self_ptr() && hpx::this_thread::get_pool() == this)
{
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::suspend",
"scheduled_thread_pool<Scheduler>::suspend_cb",
"cannot suspend a pool from itself");
return;
}

if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::suspend_cb",
"this scheduler does not support suspension");
return;
}

std::function<void(void)> suspend_internal_wrapper =
[this, HPX_CAPTURE_MOVE(callback)]()
{
Expand All @@ -476,8 +519,16 @@ namespace hpx { namespace threads { namespace detail
if (threads::get_self_ptr() && hpx::this_thread::get_pool() == this)
{
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::suspend",
"cannot suspend a pool from itself");
"scheduled_thread_pool<Scheduler>::suspend_direct",
"cannot suspend a pool from itself");
return;
}

if (!(mode_ & policies::enable_suspension))
{
HPX_THROW_EXCEPTION(invalid_status,
"scheduled_thread_pool<Scheduler>::suspend_direct",
"this scheduler does not support suspension");
return;
}

Expand Down Expand Up @@ -1775,7 +1826,10 @@ namespace hpx { namespace threads { namespace detail

util::yield_while([this, &state, virt_core]()
{
this->sched_->Scheduler::resume(virt_core);
if (mode_ & policies::enable_suspension)
{
this->sched_->Scheduler::resume(virt_core);
}
return state.load() == state_sleeping;
}, "scheduled_thread_pool::resume_processing_unit_internal",
hpx::threads::pending);
Expand Down
8 changes: 5 additions & 3 deletions hpx/runtime/threads/executors/pool_executor.hpp
Expand Up @@ -95,12 +95,14 @@ namespace hpx { namespace threads { namespace executors
///////////////////////////////////////////////////////////////////////
struct HPX_EXPORT pool_executor : public scheduled_executor
{
pool_executor(const std::string& pool_name);
pool_executor() = default;

pool_executor(const std::string& pool_name,
pool_executor(std::string const& pool_name);

pool_executor(std::string const& pool_name,
thread_stacksize stacksize);

pool_executor(const std::string& pool_name,
pool_executor(std::string const& pool_name,
thread_priority priority,
thread_stacksize stacksize = thread_stacksize_default);
};
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/threads/policies/hierarchy_scheduler.hpp
Expand Up @@ -207,13 +207,13 @@ namespace hpx { namespace threads { namespace policies
return "hierarchy_scheduler";
}

void suspend(std::size_t)
void suspend(std::size_t) override
{
HPX_ASSERT_MSG(false, "hierarchy_scheduler does not support"
" suspending");
}

void resume(std::size_t)
void resume(std::size_t) override
{
HPX_ASSERT_MSG(false, "hierarchy_scheduler does not support"
" resuming");
Expand Down
Expand Up @@ -75,13 +75,13 @@ namespace hpx { namespace threads { namespace policies
return "periodic_priority_queue_scheduler";
}

void suspend(std::size_t)
void suspend(std::size_t) override
{
HPX_ASSERT_MSG(false, "periodic_priority_queue_scheduler does not"
" support suspending");
}

void resume(std::size_t)
void resume(std::size_t) override
{
HPX_ASSERT_MSG(false, "periodic_priority_queue_scheduler does not"
" support resuming");
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/threads/policies/scheduler_base.hpp
Expand Up @@ -154,7 +154,7 @@ namespace hpx { namespace threads { namespace policies
#endif
}

void suspend(std::size_t num_thread)
virtual void suspend(std::size_t num_thread)
{
HPX_ASSERT(num_thread < suspend_conds_.size());

Expand All @@ -172,7 +172,7 @@ namespace hpx { namespace threads { namespace policies
expected == state_stopping || expected == state_terminating);
}

void resume(std::size_t num_thread)
virtual void resume(std::size_t num_thread)
{
if (num_thread == std::size_t(-1))
{
Expand Down
6 changes: 5 additions & 1 deletion hpx/runtime/threads/policies/scheduler_mode.hpp
Expand Up @@ -26,9 +26,13 @@ namespace hpx { namespace threads { namespace policies
///< to act as 'embedded' schedulers. In this case it needs to
///< periodically invoke a provided callback into the outer scheduler
///< more frequently than normal. This option enables this behavior.
enable_elasticity = 0x10 ///< This options allows for the
enable_elasticity = 0x10, ///< This options allows for the
///< scheduler to dynamically increase and reduce the number of
///< processing units it runs on.
enable_suspension = 0x20 ///< This options allows for the
///< scheduler to suspend/resume its operation. Setting this value
///< may not succeed for schedulers that do not support this
///< functionality.
};
}}}

Expand Down
Expand Up @@ -62,13 +62,13 @@ namespace hpx { namespace threads { namespace policies
return "static_priority_queue_scheduler";
}

void suspend(std::size_t)
void suspend(std::size_t) override
{
HPX_ASSERT_MSG(false, "static_priority_queue_scheduler does not"
" support suspending");
}

void resume(std::size_t)
void resume(std::size_t) override
{
HPX_ASSERT_MSG(false, "static_priority_queue_scheduler does not"
" support resuming");
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/threads/policies/static_queue_scheduler.hpp
Expand Up @@ -70,13 +70,13 @@ namespace hpx { namespace threads { namespace policies
return "static_queue_scheduler";
}

void suspend(std::size_t)
void suspend(std::size_t) override
{
HPX_ASSERT_MSG(false, "static_queue_scheduler does not support"
" suspending");
}

void resume(std::size_t)
void resume(std::size_t) override
{
HPX_ASSERT_MSG(false, "static_queue_scheduler does not support"
" resuming");
Expand Down
7 changes: 7 additions & 0 deletions hpx/runtime/threads/thread_pool_base.hpp
Expand Up @@ -436,6 +436,13 @@ namespace hpx { namespace threads
std::function<void(void)> callback, std::size_t virt_core,
error_code& ec = throws) = 0;

/// \cond NOINTERNAL
policies::scheduler_mode get_scheduler_mode() const
{
return mode_;
}
/// \endcond

protected:
/// \cond NOINTERNAL
void init_pool_time_scale();
Expand Down

0 comments on commit 8b0f81c

Please sign in to comment.