Skip to content

Commit

Permalink
Don't use boost::intrusive_ptr for thread_id_type
Browse files Browse the repository at this point in the history
This change gets rid of using boost::intrusive_ptr to refcount thread_data. The
lifetime of thread_data is perfectly managed by the scheduling policy. That is,
once a task terminates, it can be deleted/reused.
  • Loading branch information
Thomas Heller committed Jan 17, 2018
1 parent 3a711a6 commit bc74956
Show file tree
Hide file tree
Showing 29 changed files with 392 additions and 396 deletions.
2 changes: 1 addition & 1 deletion hpx/lcos/detail/future_data.hpp
Expand Up @@ -223,7 +223,7 @@ namespace detail
if (recurse_asynchronously)
{
error_code ec;
threads::thread_id_type id = threads::register_thread_nullary(
threads::register_thread_nullary(
compose_cb_impl(std::move(f1_), std::move(f2_)),
"compose_cb",
threads::pending, true, threads::thread_priority_boost,
Expand Down
4 changes: 2 additions & 2 deletions hpx/lcos/local/recursive_mutex.hpp
Expand Up @@ -50,7 +50,7 @@ namespace hpx { namespace lcos { namespace local
static thread_id_type call()
{
return hpx::threads::get_self_ptr() ?
(thread_id_type)hpx::threads::get_self_id().get() :
(thread_id_type)hpx::threads::get_self_id() :
thread_id_from_mutex<void>::call();
};
};
Expand Down Expand Up @@ -127,7 +127,7 @@ namespace hpx { namespace lcos { namespace local
// bool timed_lock(::boost::system_time const& wait_until);
// {
// threads::thread_id_repr_type const current_thread_id =
// threads::get_self_id().get();
// threads::get_self_id();
//
// return try_recursive_lock(current_thread_id) ||
// try_timed_lock(current_thread_id, wait_until);
Expand Down
3 changes: 0 additions & 3 deletions hpx/runtime/threads/coroutines/coroutine_fwd.hpp
Expand Up @@ -36,9 +36,6 @@ namespace hpx { namespace threads
{
class HPX_EXPORT thread_data;

HPX_EXPORT void intrusive_ptr_add_ref(thread_data* p);
HPX_EXPORT void intrusive_ptr_release(thread_data* p);

namespace coroutines
{
namespace detail
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/coroutines/detail/coroutine_impl.hpp
Expand Up @@ -62,7 +62,7 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail
public:
typedef context_base super_type;
typedef context_base::thread_id_repr_type thread_id_repr_type;
typedef boost::intrusive_ptr<threads::thread_data> thread_id_type;
typedef threads::thread_data* thread_id_type;

typedef std::pair<thread_state_enum, thread_id_type> result_type;
typedef thread_state_ex_enum arg_type;
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/detail/create_thread.hpp
Expand Up @@ -59,7 +59,7 @@ namespace hpx { namespace threads { namespace detail
if (nullptr == data.parent_id) {
if (self)
{
data.parent_id = threads::get_self_id().get();
data.parent_id = threads::get_self_id();
data.parent_phase = self->get_thread_phase();
}
}
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/detail/create_work.hpp
Expand Up @@ -67,7 +67,7 @@ namespace hpx { namespace threads { namespace detail

if (self)
{
data.parent_id = threads::get_self_id().get();
data.parent_id = threads::get_self_id();
data.parent_phase = self->get_thread_phase();
}
}
Expand Down
46 changes: 30 additions & 16 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -37,7 +37,7 @@ namespace hpx { namespace threads { namespace detail
thread_data* thrd, thread_state_enum state, char const* info)
{
LTM_(debug) << "tfunc(" << num_thread << "): " //-V128
<< "thread(" << thrd->get_thread_id().get() << "), "
<< "thread(" << thrd->get_thread_id() << "), "
<< "description(" << thrd->get_description() << "), "
<< "new state(" << get_thread_state_name(state) << "), "
<< info;
Expand All @@ -47,7 +47,7 @@ namespace hpx { namespace threads { namespace detail
{
// log this in any case
LTM_(warning) << "tfunc(" << num_thread << "): " //-V128
<< "thread(" << thrd->get_thread_id().get() << "), "
<< "thread(" << thrd->get_thread_id() << "), "
<< "description(" << thrd->get_description() << "), "
<< "new state(" << get_thread_state_name(state) << "), "
<< info;
Expand All @@ -56,7 +56,7 @@ namespace hpx { namespace threads { namespace detail
thread_data* thrd, thread_state_enum state)
{
LTM_(debug) << "tfunc(" << num_thread << "): " //-V128
<< "thread(" << thrd->get_thread_id().get() << "), "
<< "thread(" << thrd->get_thread_id() << "), "
<< "description(" << thrd->get_description() << "), "
<< "old state(" << get_thread_state_name(state) << ")";
}
Expand Down Expand Up @@ -118,7 +118,7 @@ namespace hpx { namespace threads { namespace detail
thread_data* get_next_thread() const
{
// we know that the thread-id is just the pointer to the thread_data
return reinterpret_cast<thread_data*>(next_thread_id_.get());
return next_thread_id_;
}

private:
Expand Down Expand Up @@ -292,7 +292,7 @@ namespace hpx { namespace threads { namespace detail
scheduling_callbacks& callbacks, std::shared_ptr<bool>& background_running,
std::size_t num_thread, std::int64_t& idle_loop_count)
{
thread_id_type background_thread;
thread_id_type background_thread = nullptr;
background_running.reset(new bool(true));
thread_init_data background_init(
[&, background_running](thread_state_ex_enum) -> thread_result_type
Expand Down Expand Up @@ -350,7 +350,7 @@ namespace hpx { namespace threads { namespace detail
{
// tries to set state to active (only if state is still
// the same as 'state')
detail::switch_status thrd_stat (background_thread.get(), state);
detail::switch_status thrd_stat (background_thread, state);
if (HPX_LIKELY(thrd_stat.is_valid() &&
thrd_stat.get_previous() == pending))
{
Expand Down Expand Up @@ -379,7 +379,7 @@ namespace hpx { namespace threads { namespace detail
thrd_stat = (*background_thread)();
#endif
thread_data *next = thrd_stat.get_next_thread();
if (next != nullptr && next != background_thread.get())
if (next != nullptr && next != background_thread)
{
if (next_thrd == nullptr)
{
Expand All @@ -405,8 +405,8 @@ namespace hpx { namespace threads { namespace detail
scheduler.SchedulingPolicy::
decrement_background_thread_count();
scheduler.SchedulingPolicy::destroy_thread(
background_thread.get(), busy_count);
background_thread.reset();
background_thread, busy_count);
background_thread = nullptr;//.reset();
}

This comment has been minimized.

Copy link
@hkaiser

hkaiser Jan 17, 2018

Member

Please remove commented code.

else if(suspended == state_val)
{
Expand All @@ -428,11 +428,13 @@ namespace hpx { namespace threads { namespace detail
{
std::atomic<hpx::state>& this_state = scheduler.get_state(num_thread);

#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
util::itt::stack_context ctx; // helper for itt support
util::itt::domain domain = hpx::get_thread_itt_domain();
util::itt::id threadid(domain, &scheduler);
util::itt::string_handle task_id("task_id");
util::itt::string_handle task_phase("task_phase");
#endif

// util::itt::frame_context fctx(domain);

Expand Down Expand Up @@ -462,7 +464,17 @@ namespace hpx { namespace threads { namespace detail

while (true) {
// Get the next HPX thread from the queue
thrd = next_thrd;
thrd = nullptr;
if (next_thrd)
{
if (next_thrd->get_scheduler_base() == &scheduler)
thrd = next_thrd;
else
{
next_thrd->get_scheduler_base()->schedule_thread(next_thrd, std::size_t(-1));
}
}

This comment has been minimized.

Copy link
@hkaiser

hkaiser Jan 17, 2018

Member

This is an unrelated changeset, is it?

next_thrd = nullptr;
bool running = this_state.load(
std::memory_order_relaxed) < state_pre_sleep;

Expand All @@ -471,6 +483,7 @@ namespace hpx { namespace threads { namespace detail
num_thread, running, idle_loop_count, thrd)))
{
tfunc_time_wrapper tfunc_time_collector(idle_rate);
HPX_ASSERT(thrd->get_scheduler_base() == &scheduler);

idle_loop_count = 0;
++busy_loop_count;
Expand Down Expand Up @@ -514,6 +527,7 @@ namespace hpx { namespace threads { namespace detail
// and add to aggregate execution time.
exec_time_wrapper exec_time_collector(idle_rate);


#if defined(HPX_HAVE_APEX)
// get the APEX data pointer, in case we are resuming the
// thread and have to restore any leaf timers from
Expand Down Expand Up @@ -635,7 +649,7 @@ namespace hpx { namespace threads { namespace detail
}
else if (HPX_UNLIKELY(active == state_val)) {
LTM_(warning) << "tfunc(" << num_thread << "): " //-V128
"thread(" << thrd->get_thread_id().get() << "), "
"thread(" << thrd->get_thread_id() << "), "
"description(" << thrd->get_description() << "), "
"rescheduling";

Expand Down Expand Up @@ -689,15 +703,15 @@ namespace hpx { namespace threads { namespace detail
if (!(scheduler.get_scheduler_mode() & policies::delay_exit))
{
// If this is an inner scheduler, try to exit immediately
if (background_thread.get() != nullptr)
if (background_thread != nullptr)
{
HPX_ASSERT(background_running);
*background_running = false;
scheduler.SchedulingPolicy::
decrement_background_thread_count();
scheduler.SchedulingPolicy::schedule_thread(
background_thread.get(), num_thread);
background_thread.reset();
background_thread, num_thread);
background_thread = nullptr;//.reset();

This comment has been minimized.

Copy link
@hkaiser

hkaiser Jan 17, 2018

Member

Please remove commented code. More such spots are below.

background_running.reset();
}
else
Expand Down Expand Up @@ -795,8 +809,8 @@ namespace hpx { namespace threads { namespace detail
scheduler.SchedulingPolicy::
decrement_background_thread_count();
scheduler.SchedulingPolicy::schedule_thread(
background_thread.get(), num_thread);
background_thread.reset();
background_thread, num_thread);
background_thread = nullptr;//.reset();
background_running.reset();
}
else
Expand Down
16 changes: 8 additions & 8 deletions hpx/runtime/threads/detail/set_thread_state.hpp
Expand Up @@ -63,7 +63,7 @@ namespace hpx { namespace threads { namespace detail
<< "set_active_state: thread is still active, however "
"it was non-active since the original set_state "
"request was issued, aborting state change, thread("
<< thrd.get() << "), description("
<< thrd << "), description("
<< thrd->get_description() << "), new state("
<< get_thread_state_name(newstate) << ")";
return thread_result_type(terminated, nullptr);
Expand Down Expand Up @@ -117,7 +117,7 @@ namespace hpx { namespace threads { namespace detail
LTM_(warning)
<< "set_thread_state: old thread state is the same as new "
"thread state, aborting state change, thread("
<< thrd.get() << "), description("
<< thrd << "), description("
<< thrd->get_description() << "), new state("
<< get_thread_state_name(new_state) << ")";

Expand All @@ -135,7 +135,7 @@ namespace hpx { namespace threads { namespace detail
// schedule a new thread to set the state
LTM_(warning)
<< "set_thread_state: thread is currently active, scheduling "
"new thread, thread(" << thrd.get() << "), description("
"new thread, thread(" << thrd << "), description("
<< thrd->get_description() << "), new state("
<< get_thread_state_name(new_state) << ")";

Expand All @@ -157,7 +157,7 @@ namespace hpx { namespace threads { namespace detail
{
LTM_(warning)
<< "set_thread_state: thread is terminated, aborting state "
"change, thread(" << thrd.get() << "), description("
"change, thread(" << thrd << "), description("
<< thrd->get_description() << "), new state("
<< get_thread_state_name(new_state) << ")";

Expand All @@ -177,7 +177,7 @@ namespace hpx { namespace threads { namespace detail
std::ostringstream strm;
strm << "set_thread_state: invalid new state, can't demote a "
"pending thread, "
<< "thread(" << thrd.get() << "), description("
<< "thread(" << thrd << "), description("
<< thrd->get_description() << "), new state("
<< get_thread_state_name(new_state) << ")";

Expand All @@ -203,7 +203,7 @@ namespace hpx { namespace threads { namespace detail
// at some point will ignore this thread by simply skipping it
// (if it's not pending anymore).

LTM_(info) << "set_thread_state: thread(" << thrd.get() << "), "
LTM_(info) << "set_thread_state: thread(" << thrd << "), "
"description(" << thrd->get_description() << "), "
"new state(" << get_thread_state_name(new_state) << "), "
"old state(" << get_thread_state_name(previous_state_val)
Expand All @@ -216,7 +216,7 @@ namespace hpx { namespace threads { namespace detail
// state has changed since we fetched it from the thread, retry
LTM_(error)
<< "set_thread_state: state has been changed since it was fetched, "
"retrying, thread(" << thrd.get() << "), "
"retrying, thread(" << thrd << "), "
"description(" << thrd->get_description() << "), "
"new state(" << get_thread_state_name(new_state) << "), "
"old state(" << get_thread_state_name(previous_state_val)
Expand All @@ -226,7 +226,7 @@ namespace hpx { namespace threads { namespace detail
if (new_state == pending) {
// REVIEW: Passing a specific target thread may interfere with the
// round robin queuing.
thrd->get_scheduler_base()->schedule_thread(thrd.get(),
thrd->get_scheduler_base()->schedule_thread(thrd,
thread_num, priority);
thrd->get_scheduler_base()->do_some_work(thread_num);
}
Expand Down
11 changes: 11 additions & 0 deletions hpx/runtime/threads/policies/local_queue_scheduler.hpp
Expand Up @@ -334,6 +334,11 @@ namespace hpx { namespace threads { namespace policies
HPX_ASSERT(num_thread < queue_size);
queues_[num_thread]->create_thread(data, id, initial_state,
run_now, ec);

if (id)
{
HPX_ASSERT((*id)->get_scheduler_base() == this);
}

This comment has been minimized.

Copy link
@hkaiser

hkaiser Jan 17, 2018

Member

I'd rewrite this as:

HPX_ASSERT(id == nullptr || (*id)->get_scheduler_base() == this);
}

/// Return the next thread to be executed, return false if none is
Expand Down Expand Up @@ -460,6 +465,8 @@ namespace hpx { namespace threads { namespace policies
if (std::size_t(-1) == num_thread)
num_thread = curr_queue_++ % queues_.size();

HPX_ASSERT(thrd->get_scheduler_base() == this);

HPX_ASSERT(num_thread < queues_.size());
queues_[num_thread]->schedule_thread(thrd);
}
Expand All @@ -471,13 +478,17 @@ namespace hpx { namespace threads { namespace policies
if (std::size_t(-1) == num_thread)
num_thread = curr_queue_++ % queues_.size();

HPX_ASSERT(thrd->get_scheduler_base() == this);

HPX_ASSERT(num_thread < queues_.size());
queues_[num_thread]->schedule_thread(thrd, true);
}

/// Destroy the passed thread as it has been terminated
bool destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
{
HPX_ASSERT(thrd->get_scheduler_base() == this);

for (std::size_t i = 0; i != queues_.size(); ++i)
{
if (queues_[i]->destroy_thread(thrd, busy_count))
Expand Down
6 changes: 3 additions & 3 deletions hpx/runtime/threads/policies/queue_helpers.hpp
Expand Up @@ -65,7 +65,7 @@ namespace detail
typename Map::const_iterator end = tm.end();
for (typename Map::const_iterator it = tm.begin(); it != end; ++it)
{
threads::thread_data const* thrd = (*it).get();
threads::thread_data const* thrd = (*it);
threads::thread_state_enum state = thrd->get_state().state();
threads::thread_state_enum marked_state = thrd->get_marked_state();

Expand All @@ -89,7 +89,7 @@ namespace detail
LTM_(error) << "queue(" << num_thread << "): " //-V128
<< get_thread_state_name(state)
<< "(" << std::hex << std::setw(8)
<< std::setfill('0') << (*it).get()
<< std::setfill('0') << (*it)
<< "." << std::hex << std::setw(2)
<< std::setfill('0') << thrd->get_thread_phase()
<< "/" << std::hex << std::setw(8)
Expand All @@ -107,7 +107,7 @@ namespace detail
<< "queue(" << num_thread << "): "
<< get_thread_state_name(state)
<< "(" << std::hex << std::setw(8)
<< std::setfill('0') << (*it).get()
<< std::setfill('0') << (*it)
<< "." << std::hex << std::setw(2)
<< std::setfill('0') << thrd->get_thread_phase()
<< "/" << std::hex << std::setw(8)
Expand Down

1 comment on commit bc74956

@hkaiser
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I'd suggest wrapping the thread_id into a separate type with pointer semantics, at least to ensure proper (implicit) initialization.

Please sign in to comment.