Skip to content

Commit

Permalink
Refactoring thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Aug 2, 2017
1 parent 8e59ff9 commit 6c32863
Show file tree
Hide file tree
Showing 4 changed files with 1,450 additions and 1,377 deletions.
155 changes: 50 additions & 105 deletions hpx/runtime/threads/detail/thread_pool.hpp
Expand Up @@ -73,9 +73,8 @@ namespace hpx { namespace threads { namespace detail
class thread_pool
{
public:
thread_pool(threads::policies::callback_notifier& notifier,
std::size_t index, char const* pool_name,
policies::scheduler_mode m = policies::nothing_special);
thread_pool(std::size_t index, char const* pool_name,
policies::scheduler_mode m, std::size_t thread_offset);

virtual ~thread_pool() = default;

Expand All @@ -93,13 +92,11 @@ namespace hpx { namespace threads { namespace detail
compat::barrier& startup, std::size_t num_threads) = 0;
virtual bool run(
std::unique_lock<compat::mutex>& l, std::size_t num_threads) = 0;
void stop(std::unique_lock<compat::mutex>& l, bool blocking = true);

virtual void stop_locked(std::unique_lock<lcos::local::no_mutex>& l,
bool blocking = true) = 0;
virtual void stop_locked(
virtual void stop(
std::unique_lock<compat::mutex>& l, bool blocking = true) = 0;

public:
std::size_t get_worker_thread_num() const;
virtual std::size_t get_os_thread_count() const = 0;

Expand Down Expand Up @@ -131,54 +128,46 @@ namespace hpx { namespace threads { namespace detail

// performance counters
#if defined(HPX_HAVE_THREAD_CUMULATIVE_COUNTS)
std::int64_t get_executed_threads(std::size_t num, bool reset);
std::int64_t get_executed_thread_phases(std::size_t num, bool reset);
virtual std::int64_t get_executed_threads(std::size_t, bool) = 0;
virtual std::int64_t get_executed_thread_phases(std::size_t, bool) = 0;
#if defined(HPX_HAVE_THREAD_IDLE_RATES)
std::int64_t get_thread_phase_duration(std::size_t num, bool reset);
std::int64_t get_thread_duration(std::size_t num, bool reset);
std::int64_t get_thread_phase_overhead(std::size_t num, bool reset);
std::int64_t get_thread_overhead(std::size_t num, bool reset);
std::int64_t get_cumulative_thread_duration(std::size_t num, bool reset);
std::int64_t get_cumulative_thread_overhead(std::size_t num, bool reset);
virtual std::int64_t get_thread_phase_duration(std::size_t, bool) = 0;
virtual std::int64_t get_thread_duration(std::size_t, bool) = 0;
virtual std::int64_t get_thread_phase_overhead(std::size_t, bool) = 0;
virtual std::int64_t get_thread_overhead(std::size_t, bool) = 0;
virtual std::int64_t get_cumulative_thread_duration(std::size_t, bool) = 0;
virtual std::int64_t get_cumulative_thread_overhead(std::size_t, bool) = 0;
#endif
#endif

std::int64_t get_cumulative_duration(std::size_t num, bool reset);
virtual std::int64_t get_cumulative_duration(std::size_t num,
bool reset) = 0;

#if defined(HPX_HAVE_THREAD_IDLE_RATES)
std::int64_t avg_idle_rate_all(bool reset);
std::int64_t avg_idle_rate(std::size_t num_thread, bool reset);
virtual std::int64_t avg_idle_rate_all(bool reset) = 0;
virtual std::int64_t avg_idle_rate(std::size_t, bool) = 0;

#if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES)
virtual std::int64_t avg_creation_idle_rate(std::size_t, bool reset) = 0;
virtual std::int64_t avg_cleanup_idle_rate(std::size_t, bool reset) = 0;
virtual std::int64_t avg_creation_idle_rate(std::size_t, bool) = 0;
virtual std::int64_t avg_cleanup_idle_rate(std::size_t, bool) = 0;
#endif
#endif

virtual std::int64_t get_queue_length(
std::size_t num_thread, bool reset) = 0;
virtual std::int64_t get_queue_length(std::size_t, bool) = 0;

#if defined(HPX_HAVE_THREAD_QUEUE_WAITTIME)
virtual std::int64_t get_average_thread_wait_time(
std::size_t num_thread, bool reset) = 0;
virtual std::int64_t get_average_task_wait_time(
std::size_t num_thread, bool reset) = 0;
virtual std::int64_t get_average_thread_wait_time(std::size_t, bool) = 0;
virtual std::int64_t get_average_task_wait_time(std::size_t, bool) = 0;
#endif

#if defined(HPX_HAVE_THREAD_STEALING_COUNTS)
virtual std::int64_t get_num_pending_misses(
std::size_t num, bool reset) = 0;
virtual std::int64_t get_num_pending_accesses(
std::size_t num, bool reset) = 0;
virtual std::int64_t get_num_pending_misses(std::size_t, bool) = 0;
virtual std::int64_t get_num_pending_accesses(std::size_t, bool) = 0;

virtual std::int64_t get_num_stolen_from_pending(
std::size_t num, bool reset) = 0;
virtual std::int64_t get_num_stolen_to_pending(
std::size_t num, bool reset) = 0;
virtual std::int64_t get_num_stolen_from_staged(
std::size_t num, bool reset) = 0;
virtual std::int64_t get_num_stolen_to_staged(
std::size_t num, bool reset) = 0;
virtual std::int64_t get_num_stolen_from_pending(std::size_t, bool) = 0;
virtual std::int64_t get_num_stolen_to_pending(std::size_t, bool) = 0;
virtual std::int64_t get_num_stolen_from_staged(std::size_t, bool) = 0;
virtual std::int64_t get_num_stolen_to_staged(std::size_t, bool) = 0;
#endif

virtual std::int64_t get_thread_count(thread_state_enum state,
Expand Down Expand Up @@ -220,10 +209,12 @@ namespace hpx { namespace threads { namespace detail
staged, thread_priority_default, num_thread, reset);
}

std::int64_t get_scheduler_utilization() const;
virtual std::int64_t get_scheduler_utilization() const = 0;

std::int64_t get_idle_loop_count(std::size_t num, bool reset);
std::int64_t get_busy_loop_count(std::size_t num, bool reset);
virtual std::int64_t get_idle_loop_count(
std::size_t num, bool reset) = 0;
virtual std::int64_t get_busy_loop_count(
std::size_t num, bool reset) = 0;

///////////////////////////////////////////////////////////////////////
virtual bool enumerate_threads(
Expand All @@ -246,82 +237,36 @@ namespace hpx { namespace threads { namespace detail

virtual void do_some_work(std::size_t num_thread) = 0;

virtual std::size_t get_thread_offset() const = 0;

virtual void report_error(
std::size_t num, std::exception_ptr const& e) = 0;

virtual void thread_func(std::size_t num_thread,
topology const& topology, compat::barrier& startup) = 0;
public:
void init_pool_time_scale();

protected:
double timestamp_scale_; // scale timestamps to nanoseconds
std::size_t get_thread_offset() const
{
return thread_offset_;
}

//! FIXME should I leave them here or move them to thread_pool_impl?
// private:
threads::policies::callback_notifier& notifier_;
protected:
pool_id_type id_;

// count number of executed HPX-threads and thread phases (invocations)
std::vector<std::int64_t> executed_threads_;
std::vector<std::int64_t> executed_thread_phases_;
boost::atomic<long> thread_count_;

#if defined(HPX_HAVE_THREAD_CUMULATIVE_COUNTS)
// timestamps/values of last reset operation for various performance
// counters
std::vector<std::int64_t> reset_executed_threads_;
std::vector<std::int64_t> reset_executed_thread_phases_;

#if defined(HPX_HAVE_THREAD_IDLE_RATES)
std::vector<std::int64_t> reset_thread_duration_;
std::vector<std::uint64_t> reset_thread_duration_times_;

std::vector<std::int64_t> reset_thread_overhead_;
std::vector<std::uint64_t> reset_thread_overhead_times_;
std::vector<std::uint64_t> reset_thread_overhead_times_total_;

std::vector<std::int64_t> reset_thread_phase_duration_;
std::vector<std::uint64_t> reset_thread_phase_duration_times_;

std::vector<std::int64_t> reset_thread_phase_overhead_;
std::vector<std::uint64_t> reset_thread_phase_overhead_times_;
std::vector<std::uint64_t> reset_thread_phase_overhead_times_total_;

std::vector<std::uint64_t> reset_cumulative_thread_duration_;

std::vector<std::uint64_t> reset_cumulative_thread_overhead_;
std::vector<std::uint64_t> reset_cumulative_thread_overhead_total_;
#endif
#endif

#if defined(HPX_HAVE_THREAD_IDLE_RATES)
std::vector<std::uint64_t> reset_idle_rate_time_;
std::vector<std::uint64_t> reset_idle_rate_time_total_;

#if defined(HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES)
std::vector<std::uint64_t> reset_creation_idle_rate_time_;
std::vector<std::uint64_t> reset_creation_idle_rate_time_total_;

std::vector<std::uint64_t> reset_cleanup_idle_rate_time_;
std::vector<std::uint64_t> reset_cleanup_idle_rate_time_total_;
#endif
#endif

// tfunc_impl timers
std::vector<std::uint64_t> exec_times_, tfunc_times_;
std::vector<std::uint64_t> reset_tfunc_times_;

std::vector<std::int64_t> idle_loop_counts_, busy_loop_counts_;

std::vector<std::uint8_t> tasks_active_;

// Stores the mask identifying all processing units used by this
// thread manager.
// thread pool.
threads::mask_type used_processing_units_;

// Mode of operation of the pool
policies::scheduler_mode mode_;

// The thread_offset is equal to the accumulated number of
// threads in all pools preceding this pool
// in the thread indexation. That means, that in order to know
// the global index of a thread it owns, the pool has to compute:
// global index = thread_offset_ + local index.
std::size_t thread_offset_;

// scale timestamps to nanoseconds
double timestamp_scale_;
};
}}}

Expand Down

0 comments on commit 6c32863

Please sign in to comment.