Skip to content

Commit

Permalink
Optimizing thread scheduling (WIP)
Browse files Browse the repository at this point in the history
TBD
  • Loading branch information
Thomas Heller committed Jan 19, 2018
1 parent 77fdf0d commit 1cdb0f6
Show file tree
Hide file tree
Showing 16 changed files with 434 additions and 1,066 deletions.
4 changes: 4 additions & 0 deletions hpx/runtime/threads/coroutines/detail/context_base.hpp
Expand Up @@ -419,12 +419,16 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail
// Nothrow.
void do_yield() noexcept
{
// Lazy initialization to initialize the full stack...
init();
swap_context(*this, m_caller, detail::yield_hint());
}

// Nothrow.
void do_invoke() throw ()
{
// Lazy initialization to initialize the full stack...
init();
HPX_ASSERT(is_ready() || waiting());
#if defined(HPX_HAVE_THREAD_PHASE_INFORMATION)
++m_phase;
Expand Down
37 changes: 23 additions & 14 deletions hpx/runtime/threads/coroutines/detail/context_generic_context.hpp
Expand Up @@ -230,21 +230,8 @@ namespace hpx { namespace threads { namespace coroutines
(stack_size == -1) ?
alloc_.minimum_stacksize() : std::size_t(stack_size)
)
, stack_pointer_(alloc_.allocate(stack_size_))
, stack_pointer_(nullptr)
{
#if BOOST_VERSION < 105600
boost::context::fcontext_t* ctx =
boost::context::make_fcontext(stack_pointer_, stack_size_, funp_);

std::swap(*ctx, ctx_);
#elif BOOST_VERSION < 106100
ctx_ =
boost::context::make_fcontext(stack_pointer_, stack_size_, funp_);
#else
ctx_ =
boost::context::detail::make_fcontext(
stack_pointer_, stack_size_, funp_);
#endif
}

~fcontext_context_impl()
Expand All @@ -263,6 +250,28 @@ namespace hpx { namespace threads { namespace coroutines
}
}

void init()
{
if (stack_pointer == nullptr)
{
stack_pointer_ = alloc_.allocate(stack_size_);

#if BOOST_VERSION < 105600
boost::context::fcontext_t* ctx =
boost::context::make_fcontext(stack_pointer_, stack_size_, funp_);

std::swap(*ctx, ctx_);
#elif BOOST_VERSION < 106100
ctx_ =
boost::context::make_fcontext(stack_pointer_, stack_size_, funp_);
#else
ctx_ =
boost::context::detail::make_fcontext(
stack_pointer_, stack_size_, funp_);
#endif
}
}

// Return the size of the reserved stack address space.
std::ptrdiff_t get_stacksize() const
{
Expand Down
52 changes: 32 additions & 20 deletions hpx/runtime/threads/coroutines/detail/context_linux_x86.hpp
Expand Up @@ -187,7 +187,8 @@ namespace hpx { namespace threads { namespace coroutines
: m_stack_size(stack_size == -1
? static_cast<std::ptrdiff_t>(default_stack_size)
: stack_size),
m_stack(nullptr)
m_stack(nullptr),
cb_(&cb)
{
if (0 != (m_stack_size % EXEC_PAGESIZE))
{
Expand All @@ -204,27 +205,10 @@ namespace hpx { namespace threads { namespace coroutines
m_stack_size));
}

m_stack = posix::alloc_stack(static_cast<std::size_t>(m_stack_size));
HPX_ASSERT(m_stack);
posix::watermark_stack(m_stack, static_cast<std::size_t>(m_stack_size));

typedef void fun(Functor*);
fun * funp = trampoline;

m_sp = (static_cast<void**>(m_stack)
+ static_cast<std::size_t>(m_stack_size) / sizeof(void*))
- context_size;

m_sp[backup_cb_idx] = m_sp[cb_idx] = &cb;
m_sp[backup_funp_idx] = m_sp[funp_idx] = nasty_cast<void*>(funp);

#if defined(HPX_HAVE_VALGRIND) && !defined(NVALGRIND)
{
void * eos = static_cast<char*>(m_stack) + m_stack_size;
m_sp[valgrind_id_idx] = reinterpret_cast<void*>(
VALGRIND_STACK_REGISTER(m_stack, eos));
}
#endif
funp_ = nasty_cast<void*>(funp);

#if defined(HPX_HAVE_STACKOVERFLOW_DETECTION)
// concept inspired by the following links:
Expand All @@ -247,6 +231,32 @@ namespace hpx { namespace threads { namespace coroutines
#endif
}

void init()
{
if (m_stack == nullptr)
{
m_stack = posix::alloc_stack(static_cast<std::size_t>(m_stack_size));
HPX_ASSERT(m_stack);
posix::watermark_stack(m_stack, static_cast<std::size_t>(m_stack_size));

m_sp = (static_cast<void**>(m_stack)
+ static_cast<std::size_t>(m_stack_size) / sizeof(void*))
- context_size;

m_sp[backup_cb_idx] = m_sp[cb_idx] = cb_;
m_sp[backup_funp_idx] = m_sp[funp_idx] = funp_;


#if defined(HPX_HAVE_VALGRIND) && !defined(NVALGRIND)
{
void * eos = static_cast<char*>(m_stack) + m_stack_size;
m_sp[valgrind_id_idx] = reinterpret_cast<void*>(
VALGRIND_STACK_REGISTER(m_stack, eos));
}
#endif
}
}

#if defined(HPX_HAVE_STACKOVERFLOW_DETECTION)

// heuristic value 1 kilobyte
Expand Down Expand Up @@ -442,10 +452,12 @@ namespace hpx { namespace threads { namespace coroutines
static const std::size_t cb_idx = 6;
static const std::size_t funp_idx = 4;
#endif

std::ptrdiff_t m_stack_size;
void* m_stack;

void * cb_;
void * funp_;

#if defined(HPX_HAVE_STACKOVERFLOW_DETECTION)
struct sigaction action;
stack_t segv_stack;
Expand Down
21 changes: 15 additions & 6 deletions hpx/runtime/threads/coroutines/detail/context_posix.hpp
Expand Up @@ -236,15 +236,10 @@ namespace hpx { namespace threads { namespace coroutines
explicit ucontext_context_impl(Functor & cb, std::ptrdiff_t stack_size)
: m_stack_size(stack_size == -1 ? (std::ptrdiff_t)default_stack_size
: stack_size),
m_stack(alloc_stack(m_stack_size)),
m_stack(nullptr),
cb_(&cb)
{
HPX_ASSERT(m_stack);
funp_ = &trampoline<Functor>;
int error = HPX_COROUTINE_MAKE_CONTEXT(
&m_ctx, m_stack, m_stack_size, funp_, cb_, nullptr);
HPX_UNUSED(error);
HPX_ASSERT(error == 0);

#if defined(HPX_HAVE_STACKOVERFLOW_DETECTION)
// concept inspired by the following links:
Expand All @@ -267,6 +262,20 @@ namespace hpx { namespace threads { namespace coroutines
#endif
}

void init()
{
if (m_stack == nullptr)
{
m_stack = alloc_stack(m_stack_size);
HPX_ASSERT(m_stack);

int error = HPX_COROUTINE_MAKE_CONTEXT(
&m_ctx, m_stack, m_stack_size, funp_, cb_, nullptr);
HPX_UNUSED(error);
HPX_ASSERT(error == 0);
}
}

#if defined(HPX_HAVE_STACKOVERFLOW_DETECTION)

// heuristic value 1 kilobyte
Expand Down
33 changes: 13 additions & 20 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -405,7 +405,7 @@ namespace hpx { namespace threads { namespace detail
scheduler.SchedulingPolicy::
decrement_background_thread_count();
scheduler.SchedulingPolicy::destroy_thread(
background_thread, busy_count);
background_thread, num_thread, busy_count);
background_thread = nullptr;
}
else if(suspended == state_val)
Expand Down Expand Up @@ -586,11 +586,11 @@ namespace hpx { namespace threads { namespace detail
// now we just keep it in the map of threads.
if (HPX_UNLIKELY(state_val == pending))
{
if (HPX_LIKELY(next_thrd == nullptr)) {
// schedule other work
scheduler.SchedulingPolicy::wait_or_add_new(
num_thread, running, idle_loop_count);
}
// if (HPX_LIKELY(next_thrd == nullptr)) {
// // schedule other work
// scheduler.SchedulingPolicy::wait_or_add_new(
// num_thread, running, idle_loop_count);
// }

// schedule this thread again, make sure it ends up at
// the end of the queue
Expand All @@ -614,8 +614,8 @@ namespace hpx { namespace threads { namespace detail
else
{
// schedule other work
scheduler.SchedulingPolicy::wait_or_add_new(
num_thread, running, idle_loop_count);
// scheduler.SchedulingPolicy::wait_or_add_new(
// num_thread, running, idle_loop_count);

// schedule this thread again immediately with
// boosted priority
Expand Down Expand Up @@ -661,23 +661,22 @@ namespace hpx { namespace threads { namespace detail
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
++counters.executed_threads_;
#endif
scheduler.SchedulingPolicy::destroy_thread(thrd, busy_loop_count);
scheduler.destroy_thread(thrd, num_thread, busy_loop_count);
}
}

// if nothing else has to be done either wait or terminate
else
{
next_thrd = nullptr;
++idle_loop_count;

if (scheduler.SchedulingPolicy::wait_or_add_new(
num_thread, running, idle_loop_count))
// if (scheduler.SchedulingPolicy::wait_or_add_new(
// num_thread, running, idle_loop_count))
if (!running)
{
// clean up terminated threads one more time before sleeping
bool can_exit =
!running &&
scheduler.SchedulingPolicy::cleanup_terminated(
num_thread, true) &&
scheduler.SchedulingPolicy::get_thread_count(
suspended, thread_priority_default, num_thread) == 0;

Expand Down Expand Up @@ -806,8 +805,6 @@ namespace hpx { namespace threads { namespace detail
{
bool can_exit =
!running &&
scheduler.SchedulingPolicy::cleanup_terminated(
true) &&
scheduler.SchedulingPolicy::get_thread_count(
suspended, thread_priority_default,
num_thread) == 0;
Expand All @@ -821,10 +818,6 @@ namespace hpx { namespace threads { namespace detail

may_exit = false;
}
else
{
scheduler.SchedulingPolicy::cleanup_terminated(true);
}
}
}
}
Expand Down
23 changes: 1 addition & 22 deletions hpx/runtime/threads/policies/hierarchy_scheduler.hpp
Expand Up @@ -710,7 +710,7 @@ namespace hpx { namespace threads { namespace policies
}

/// Destroy the passed thread as it has been terminated
bool destroy_thread(threads::thread_data* thrd, std::int64_t& busy_count)
bool destroy_thread(threads::thread_data* thrd, std::size_t, std::int64_t& busy_count)
{
for(size_type i = 0; i < tree.size(); ++i)
{
Expand Down Expand Up @@ -770,27 +770,6 @@ namespace hpx { namespace threads { namespace policies
);
}

/// This is a function which gets called periodically by the thread
/// manager to allow for maintenance tasks to be executed in the
/// scheduler. Returns true if the OS thread calling this function
/// has to be terminated (i.e. no more work has to be done).
bool wait_or_add_new(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count)
{
HPX_ASSERT(tree.size());
HPX_ASSERT(num_thread < tree.at(0).size());
std::size_t added = 0;

thread_queue_type * tq = tree[0][num_thread];
if(tq->get_staged_queue_length() == 0)
{
transfer_tasks(num_thread/d, num_thread, 1);
}

bool result = tq->wait_or_add_new(running, idle_loop_count, added);
return result && 0 == added;
}

///////////////////////////////////////////////////////////////////////
void on_start_thread(std::size_t num_thread)
{
Expand Down

0 comments on commit 1cdb0f6

Please sign in to comment.