Skip to content

Commit

Permalink
Improving destruction of threads
Browse files Browse the repository at this point in the history
Instead of having to loop over the available queues, this patch stores the queue
in which the thread has been allocated.
In addition, we completely avoid going over an additional memory pool as the
queues cache and reuse the allocated threads.
  • Loading branch information
Thomas Heller committed Jan 31, 2018
1 parent ff49251 commit 0eb4090
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 110 deletions.
24 changes: 3 additions & 21 deletions examples/resource_partitioner/shared_priority_scheduler.hpp
Expand Up @@ -859,34 +859,16 @@ namespace threads {
}

/// Destroy the passed thread - as it has been terminated
bool destroy_thread(
void destroy_thread(
threads::thread_data* thrd, std::int64_t& busy_count)
{
LOG_CUSTOM_MSG("destroy thread " << THREAD_DESC(thrd)
<< " busy_count "
<< decnumber(busy_count));
HPX_ASSERT(thrd->get_scheduler_base() == this);

for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
{
if (high_priority_queues_[i]->destroy_thread(
thrd, busy_count))
return true;
}

for (std::size_t i = 0; i != queues_.size(); ++i)
{
if (queues_[i]->destroy_thread(thrd, busy_count))
return true;
}

if (low_priority_queue_.destroy_thread(thrd, busy_count))
return true;

// the thread has to belong to one of the queues, always
HPX_ASSERT(false);

return false;
HPX_ASSERT(thrd->get_scheduler_base() == this);
thrd->get_queue<thread_queue_type>().destroy_thread(thrd, busy_count);
}

///////////////////////////////////////////////////////////////////////
Expand Down
13 changes: 3 additions & 10 deletions hpx/runtime/threads/policies/hierarchy_scheduler.hpp
Expand Up @@ -710,17 +710,10 @@ namespace hpx { namespace threads { namespace policies
}

/// Destroy the passed thread as it has been terminated
bool destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
void destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
{
for(size_type i = 0; i < tree.size(); ++i)
{
for(size_type j = 0; j < tree[i].size(); ++j)
{
if(tree[i][j]->destroy_thread(thrd, busy_count))
return true;
}
}
return false;
HPX_ASSERT(thrd->get_scheduler_base() == this);
thrd->get_queue<thread_queue_type>().destroy_thread(thrd, busy_count);
}

void transfer_tasks(
Expand Down
23 changes: 3 additions & 20 deletions hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp
Expand Up @@ -649,27 +649,10 @@ namespace hpx { namespace threads { namespace policies
}

/// Destroy the passed thread as it has been terminated
bool destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
void destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
{
for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
{
if (high_priority_queues_[i]->destroy_thread(thrd, busy_count))
return true;
}

for (std::size_t i = 0; i != queues_.size(); ++i)
{
if (queues_[i]->destroy_thread(thrd, busy_count))
return true;
}

if (low_priority_queue_.destroy_thread(thrd, busy_count))
return true;

// the thread has to belong to one of the queues, always
HPX_ASSERT(false);

return false;
HPX_ASSERT(thrd->get_scheduler_base() == this);
thrd->get_queue<thread_queue_type>().destroy_thread(thrd, busy_count);
}

///////////////////////////////////////////////////////////////////////
Expand Down
14 changes: 2 additions & 12 deletions hpx/runtime/threads/policies/local_queue_scheduler.hpp
Expand Up @@ -480,20 +480,10 @@ namespace hpx { namespace threads { namespace policies
}

/// Destroy the passed thread as it has been terminated
bool destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
void destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
{
HPX_ASSERT(thrd->get_scheduler_base() == this);

for (std::size_t i = 0; i != queues_.size(); ++i)
{
if (queues_[i]->destroy_thread(thrd, busy_count))
return true;
}

// the thread has to belong to one of the queues, always
HPX_ASSERT(false);

return false;
thrd->get_queue<thread_queue_type>().destroy_thread(thrd, busy_count);
}

///////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/policies/scheduler_base.hpp
Expand Up @@ -387,7 +387,7 @@ namespace hpx { namespace threads { namespace policies
std::size_t num_thread,
thread_priority priority = thread_priority_normal) = 0;

virtual bool destroy_thread(threads::thread_data* thrd,
virtual void destroy_thread(threads::thread_data* thrd,
std::int64_t& busy_count) = 0;

virtual bool wait_or_add_new(std::size_t num_thread, bool running,
Expand Down
28 changes: 10 additions & 18 deletions hpx/runtime/threads/policies/thread_queue.hpp
Expand Up @@ -268,7 +268,7 @@ namespace hpx { namespace threads { namespace policies

// Allocate a new thread object.
thrd = thread_id_type(
threads::thread_data::create(data, memory_pool_, state));
new threads::thread_data(data, this, state));
}
}

Expand Down Expand Up @@ -331,7 +331,7 @@ namespace hpx { namespace threads { namespace policies

// this thread has to be in the map now
HPX_ASSERT(thread_map_.find(thrd) != thread_map_.end());
HPX_ASSERT(thrd->get_pool() == &memory_pool_);
HPX_ASSERT(&thrd->get_queue<thread_queue>() == this);
}

if (added) {
Expand Down Expand Up @@ -549,7 +549,6 @@ namespace hpx { namespace threads { namespace policies
new_tasks_wait_(0),
new_tasks_wait_count_(0),
#endif
memory_pool_(64),
thread_heap_small_(),
thread_heap_medium_(),
thread_heap_large_(),
Expand Down Expand Up @@ -744,7 +743,7 @@ namespace hpx { namespace threads { namespace policies

// this thread has to be in the map now
HPX_ASSERT(thread_map_.find(thrd) != thread_map_.end());
HPX_ASSERT(thrd->get_pool() == &memory_pool_);
HPX_ASSERT(&thrd->get_queue<thread_queue>() == this);

// push the new thread in the pending queue thread
if (initial_state == pending)
Expand Down Expand Up @@ -882,20 +881,16 @@ namespace hpx { namespace threads { namespace policies
}

/// Destroy the passed thread as it has been terminated
bool destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
void destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
{
if (thrd->get_pool() == &memory_pool_)
{
terminated_items_.push(thrd);
HPX_ASSERT(&thrd->get_queue<thread_queue>() == this);
terminated_items_.push(thrd);

std::int64_t count = ++terminated_items_count_;
if (count > max_terminated_threads)
{
cleanup_terminated(true); // clean up all terminated threads
}
return true;
std::int64_t count = ++terminated_items_count_;
if (count > max_terminated_threads)
{
cleanup_terminated(true); // clean up all terminated threads
}
return false;
}

///////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1128,9 +1123,6 @@ namespace hpx { namespace threads { namespace policies
///< overall number tasks waited
#endif

threads::thread_pool memory_pool_; ///< OS thread local memory pools for
///< HPX-threads

std::list<thread_id_type> thread_heap_small_;
std::list<thread_id_type> thread_heap_medium_;
std::list<thread_id_type> thread_heap_large_;
Expand Down
34 changes: 6 additions & 28 deletions hpx/runtime/threads/thread_data.hpp
Expand Up @@ -21,7 +21,6 @@
#include <hpx/util/atomic_count.hpp>
#include <hpx/util/backtrace.hpp>
#include <hpx/util/function.hpp>
#include <hpx/util/lockfree/freelist.hpp>
#include <hpx/util/logging.hpp>
#include <hpx/util/spinlock_pool.hpp>
#include <hpx/util/thread_description.hpp>
Expand Down Expand Up @@ -94,26 +93,6 @@ namespace hpx { namespace threads
struct tag {};
typedef util::spinlock_pool<tag> mutex_type;

typedef boost::lockfree::caching_freelist<thread_data> pool_type;

static thread_data* create(
thread_init_data& init_data, pool_type& pool,
thread_state_enum newstate)
{
thread_data* ret = pool.allocate();
if (ret == nullptr)
{
HPX_THROW_EXCEPTION(out_of_memory,
"thread_data::operator new",
"could not allocate memory for thread_data");
}
#ifdef HPX_DEBUG_THREAD_POOL
using namespace std; // some systems have memset in namespace std
memset (ret, initial_value, sizeof(thread_data));
#endif
return new (ret) thread_data(init_data, &pool, newstate);
}

~thread_data()
{
free_thread_exit_callbacks();
Expand Down Expand Up @@ -503,9 +482,10 @@ namespace hpx { namespace threads
return stacksize_;
}

pool_type* get_pool()
template <typename ThreadQueue>
ThreadQueue& get_queue()
{
return pool_;
return *static_cast<ThreadQueue *>(queue_);
}

/// \brief Execute the thread function
Expand Down Expand Up @@ -572,7 +552,7 @@ namespace hpx { namespace threads

/// Construct a new \a thread
thread_data(thread_init_data& init_data,
pool_type* pool, thread_state_enum newstate)
void* queue, thread_state_enum newstate)
: current_state_(thread_state(newstate, wait_signaled)),
#ifdef HPX_HAVE_THREAD_TARGET_ADDRESS
component_id_(init_data.lva),
Expand Down Expand Up @@ -600,7 +580,7 @@ namespace hpx { namespace threads
stacksize_(init_data.stacksize),
coroutine_(std::move(init_data.func),
thread_id_type(this_()), init_data.stacksize),
pool_(pool)
queue_(queue)
{
LTM_(debug) << "thread::thread(" << this << "), description("
<< get_description() << ")";
Expand Down Expand Up @@ -724,10 +704,8 @@ namespace hpx { namespace threads
std::ptrdiff_t stacksize_;

coroutine_type coroutine_;
pool_type* pool_;
void* queue_;
};

typedef thread_data::pool_type thread_pool;
}}

#include <hpx/config/warnings_suffix.hpp>
Expand Down

0 comments on commit 0eb4090

Please sign in to comment.