Skip to content

Commit

Permalink
Removing compose_cb
Browse files Browse the repository at this point in the history
Instead of compose_cb, we now use boost::container::small_vector. This has the
effect of reducing dynamic memory allocations with more than two callbacks
attached to a future
  • Loading branch information
Thomas Heller committed Feb 1, 2018
1 parent 9e199a2 commit ba143e4
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 133 deletions.
142 changes: 20 additions & 122 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -28,6 +28,7 @@
#include <hpx/util/unused.hpp>

#include <boost/intrusive_ptr.hpp>
#include <boost/container/small_vector.hpp>

#include <chrono>
#include <cstddef>
Expand Down Expand Up @@ -66,10 +67,11 @@ namespace detail
///////////////////////////////////////////////////////////////////////
struct HPX_EXPORT future_data_refcnt_base
{
private:
public:
typedef util::unique_function_nonser<void()> completed_callback_type;
typedef boost::container::small_vector<completed_callback_type, 3>
completed_callback_vector_type;

public:
typedef void has_future_data_refcnt_base;

virtual ~future_data_refcnt_base();
Expand Down Expand Up @@ -178,91 +180,6 @@ namespace detail
typedef typename std::aligned_storage<max_size, max_alignment>::type type;
};

///////////////////////////////////////////////////////////////////////////
struct handle_continuation_recursion_count
{
handle_continuation_recursion_count()
: count_(threads::get_continuation_recursion_count())
{
++count_;
}
~handle_continuation_recursion_count()
{
--count_;
}

std::size_t& count_;
};

///////////////////////////////////////////////////////////////////////////
template <typename F1, typename F2>
struct compose_cb_impl
{
template <typename A1, typename A2>
compose_cb_impl(A1 && f1, A2 && f2)
: f1_(std::forward<A1>(f1))
, f2_(std::forward<A2>(f2))
{}

compose_cb_impl(compose_cb_impl&& other)
: f1_(std::move(other.f1_))
, f2_(std::move(other.f2_))
{}

void operator()()
{
bool recurse_asynchronously = hpx::threads::get_self_ptr() == nullptr;
#if defined(HPX_HAVE_THREADS_GET_STACK_POINTER)
recurse_asynchronously =
!this_thread::has_sufficient_stack_space();
#else
handle_continuation_recursion_count cnt;
recurse_asynchronously = recurse_asynchronously ||
cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH;
#endif
if (recurse_asynchronously)
{
error_code ec;
threads::register_thread_nullary(
compose_cb_impl(std::move(f1_), std::move(f2_)),
"compose_cb",
threads::pending, true, threads::thread_priority_boost,
std::size_t(-1), threads::thread_stacksize_current, ec);
return;
}

{
hpx::util::annotate_function annotate(f1_);
(void)annotate;
f1_();
}
{
hpx::util::annotate_function annotate(f2_);
(void)annotate;
f2_();
}
}

F1 f1_;
F2 f2_;
};

template <typename F1, typename F2>
static HPX_FORCEINLINE util::unique_function_nonser<void()>
compose_cb(F1 && f1, F2 && f2)
{
if (!f1)
return std::forward<F2>(f2);
else if (!f2)
return std::forward<F1>(f1);

// otherwise create a combined callback
typedef compose_cb_impl<
typename util::decay<F1>::type, typename util::decay<F2>::type
> result_type;
return result_type(std::forward<F1>(f1), std::forward<F2>(f2));
}

///////////////////////////////////////////////////////////////////////////
template <typename Result>
struct future_data_base;
Expand All @@ -279,9 +196,10 @@ namespace detail
: future_data_refcnt_base(no_addref), state_(empty)
{}

using future_data_refcnt_base::completed_callback_type;
using future_data_refcnt_base::completed_callback_vector_type;
typedef lcos::local::spinlock mutex_type;
typedef util::unused_type result_type;
typedef util::unique_function_nonser<void()> completed_callback_type;
typedef future_data_refcnt_base::init_no_addref init_no_addref;

virtual ~future_data_base();
Expand Down Expand Up @@ -344,12 +262,12 @@ namespace detail
// continuation support

// deferred execution of a given continuation
bool run_on_completed(completed_callback_type && on_completed,
bool run_on_completed(completed_callback_vector_type && on_completed,
std::exception_ptr& ptr);

// make sure continuation invocation does not recurse deeper than
// allowed
void handle_on_completed(completed_callback_type && on_completed);
void handle_on_completed(completed_callback_vector_type && on_completed);

/// Set the callback which needs to be invoked when the future becomes
/// ready. If the future is ready the function will be invoked
Expand Down Expand Up @@ -379,7 +297,7 @@ namespace detail
protected:
mutable mutex_type mtx_;
state state_; // current state
completed_callback_type on_completed_;
completed_callback_vector_type on_completed_;
local::detail::condition_variable cond_; // threads waiting in read
};

Expand All @@ -390,10 +308,12 @@ namespace detail
HPX_NON_COPYABLE(future_data_base);

typedef typename future_data_result<Result>::type result_type;
typedef util::unique_function_nonser<void()> completed_callback_type;
typedef future_data_base<traits::detail::future_data_void> base_type;
typedef lcos::local::spinlock mutex_type;
typedef typename base_type::init_no_addref init_no_addref;
typedef typename base_type::completed_callback_type completed_callback_type;
typedef typename base_type::completed_callback_vector_type
completed_callback_vector_type;

future_data_base() = default;

Expand Down Expand Up @@ -477,7 +397,8 @@ namespace detail
return;
}

completed_callback_type on_completed = std::move(on_completed_);
completed_callback_vector_type on_completed;
on_completed.swap(on_completed_);

// set the data
result_type* value_ptr =
Expand All @@ -504,7 +425,7 @@ namespace detail
// it unlocked when returning.

// invoke the callback (continuation) function
if (on_completed)
if (!on_completed.empty())
handle_on_completed(std::move(on_completed));
}

Expand All @@ -522,7 +443,8 @@ namespace detail
return;
}

completed_callback_type on_completed = std::move(on_completed_);
completed_callback_vector_type on_completed;
on_completed.swap(on_completed_);

// set the data
std::exception_ptr* exception_ptr =
Expand All @@ -548,7 +470,7 @@ namespace detail
// it unlocked when returning.

// invoke the callback (continuation) function
if (on_completed)
if (!on_completed.empty())
handle_on_completed(std::move(on_completed));
}

Expand Down Expand Up @@ -614,7 +536,8 @@ namespace detail
}

state_ = empty;
on_completed_ = completed_callback_type();
on_completed_.clear();
on_completed_.shrink_to_fit();
}

std::exception_ptr get_exception_ptr() const override
Expand Down Expand Up @@ -1008,31 +931,6 @@ namespace hpx { namespace traits
typedef lcos::detail::future_data_allocator<R, Allocator> type;
};
}

#if defined(HPX_HAVE_THREAD_DESCRIPTION)
///////////////////////////////////////////////////////////////////////////
template <typename F1, typename F2>
struct get_function_annotation<lcos::detail::compose_cb_impl<F1, F2> >
{
static char const*
call(lcos::detail::compose_cb_impl<F1, F2> const& f) noexcept
{
return get_function_annotation<F1>::call(f.f1_);
}
};

#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
template <typename F1, typename F2>
struct get_function_annotation_itt<lcos::detail::compose_cb_impl<F1, F2> >
{
static util::itt::string_handle
call(lcos::detail::compose_cb_impl<F1, F2> const& f) noexcept
{
return get_function_annotation_itt<F1>::call(f.f1_);
}
};
#endif
#endif
}}

#include <hpx/config/warnings_suffix.hpp>
Expand Down
40 changes: 29 additions & 11 deletions src/lcos/detail/future_data.cpp
Expand Up @@ -20,6 +20,7 @@

#include <boost/intrusive_ptr.hpp>

#include <cstddef>
#include <exception>
#include <functional>
#include <mutex>
Expand All @@ -29,6 +30,22 @@ namespace hpx { namespace lcos { namespace detail
{
future_data_refcnt_base::~future_data_refcnt_base() = default;

///////////////////////////////////////////////////////////////////////////
struct handle_continuation_recursion_count
{
handle_continuation_recursion_count()
: count_(threads::get_continuation_recursion_count())
{
++count_;
}
~handle_continuation_recursion_count()
{
--count_;
}

std::size_t& count_;
};

///////////////////////////////////////////////////////////////////////////
static bool run_on_completed_on_new_thread(
util::unique_function_nonser<bool()> && f, error_code& ec)
Expand Down Expand Up @@ -115,12 +132,15 @@ namespace hpx { namespace lcos { namespace detail

// deferred execution of a given continuation
bool future_data_base<traits::detail::future_data_void>::
run_on_completed(completed_callback_type && on_completed,
run_on_completed(completed_callback_vector_type && on_completed,
std::exception_ptr& ptr)
{
try {
hpx::util::annotate_function annotate(on_completed);
on_completed();
for (auto& func: on_completed)
{
hpx::util::annotate_function annotate(func);
func();
}
}
catch (...) {
ptr = std::current_exception();
Expand All @@ -132,7 +152,7 @@ namespace hpx { namespace lcos { namespace detail
// make sure continuation invocation does not recurse deeper than
// allowed
void future_data_base<traits::detail::future_data_void>::
handle_on_completed(completed_callback_type && on_completed)
handle_on_completed(completed_callback_vector_type && on_completed)
{
// We need to run the completion on a new thread if we are on a
// non HPX thread.
Expand Down Expand Up @@ -194,19 +214,17 @@ namespace hpx { namespace lcos { namespace detail

if (is_ready_locked(l)) {

HPX_ASSERT(!on_completed_);
HPX_ASSERT(on_completed_.empty());

// invoke the callback (continuation) function right away
l.unlock();

handle_on_completed(std::move(data_sink));
completed_callback_vector_type on_completed;
on_completed.push_back(std::move(data_sink));
handle_on_completed(std::move(on_completed));
}
else {
// store a combined callback wrapping the old and the new one
// make sure continuations are evaluated in the order they are
// attached
on_completed_ = compose_cb(
std::move(on_completed_), std::move(data_sink));
on_completed_.push_back(std::move(data_sink));
}
}

Expand Down

0 comments on commit ba143e4

Please sign in to comment.