Skip to content

Commit

Permalink
Move future_data_void functionality to source file
Browse files Browse the repository at this point in the history
  • Loading branch information
K-ballo committed Nov 9, 2017
1 parent 00d477f commit 5c8cdaf
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 192 deletions.
202 changes: 13 additions & 189 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -10,7 +10,6 @@
#include <hpx/error_code.hpp>
#include <hpx/lcos/local/detail/condition_variable.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/runtime/get_worker_thread_num.hpp>
#include <hpx/runtime/launch_policy.hpp>
#include <hpx/runtime/threads/coroutines/detail/get_stack_pointer.hpp>
#include <hpx/runtime/threads/thread_executor.hpp>
Expand All @@ -24,7 +23,6 @@
#include <hpx/util/atomic_count.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/decay.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/steady_clock.hpp>
#include <hpx/util/unique_function.hpp>
#include <hpx/util/unused.hpp>
Expand All @@ -34,7 +32,6 @@
#include <chrono>
#include <cstddef>
#include <exception>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -264,16 +261,12 @@ namespace detail
return result_type(std::forward<F1>(f1), std::forward<F2>(f2));
}

///////////////////////////////////////////////////////////////////////////
HPX_EXPORT bool run_on_completed_on_new_thread(
util::unique_function_nonser<bool()> && f, error_code& ec);

///////////////////////////////////////////////////////////////////////////
template <typename Result>
struct future_data_base;

template <>
struct future_data_base<traits::detail::future_data_void>
struct HPX_EXPORT future_data_base<traits::detail::future_data_void>
: future_data_refcnt_base
{
future_data_base()
Expand All @@ -289,7 +282,7 @@ namespace detail
typedef util::unique_function_nonser<void()> completed_callback_type;
typedef future_data_refcnt_base::init_no_addref init_no_addref;

virtual ~future_data_base() noexcept {}
virtual ~future_data_base();

enum state
{
Expand Down Expand Up @@ -340,188 +333,31 @@ namespace detail
"this future does not support cancellation");
}

result_type* get_result_void(void const* storage, error_code& ec = throws)
{
// yields control if needed
wait(ec);
if (ec) return nullptr;

// No locking is required. Once a future has been made ready, which
// is a postcondition of wait, either:
//
// - there is only one writer (future), or
// - there are multiple readers only (shared_future, lock hurts
// concurrency)

if (state_ == empty) {
// the value has already been moved out of this future
HPX_THROWS_IF(ec, no_state,
"future_data_base::get_result",
"this future has no valid shared state");
return nullptr;
}

// the thread has been re-activated by one of the actions
// supported by this promise (see promise::set_event
// and promise::set_exception).
if (state_ == exception)
{
std::exception_ptr const* exception_ptr =
static_cast<std::exception_ptr const*>(storage);
// an error has been reported in the meantime, throw or set
// the error code
if (&ec == &throws) {
std::rethrow_exception(*exception_ptr);
// never reached
}
else {
ec = make_error_code(*exception_ptr);
}
return nullptr;
}

static util::unused_type unused_;
return &unused_;
}
result_type* get_result_void(void const* storage, error_code& ec = throws);
virtual result_type* get_result_void(error_code& ec = throws) = 0;

virtual void set_exception(std::exception_ptr data,
error_code& ec = throws) = 0;

// continuation support

// deferred execution of a given continuation
bool run_on_completed(completed_callback_type && on_completed,
std::exception_ptr& ptr)
{
try {
hpx::util::annotate_function annotate(on_completed);
on_completed();
}
catch (...) {
ptr = std::current_exception();
return false;
}
return true;
}
std::exception_ptr& ptr);

// make sure continuation invocation does not recurse deeper than
// allowed
void handle_on_completed(completed_callback_type && on_completed)
{
// We need to run the completion on a new thread if we are on a
// non HPX thread.
bool recurse_asynchronously = hpx::threads::get_self_ptr() == nullptr;
#if defined(HPX_HAVE_THREADS_GET_STACK_POINTER)
recurse_asynchronously =
!this_thread::has_sufficient_stack_space();
#else
handle_continuation_recursion_count cnt;
recurse_asynchronously = recurse_asynchronously ||
cnt.count_ > HPX_CONTINUATION_MAX_RECURSION_DEPTH;
#endif
if (!recurse_asynchronously)
{
// directly execute continuation on this thread
std::exception_ptr ptr;
if (!run_on_completed(std::move(on_completed), ptr))
{
error_code ec(lightweight);
set_exception(hpx::detail::access_exception(ec));
}
}
else
{
// re-spawn continuation on a new thread
boost::intrusive_ptr<future_data_base> this_(this);

error_code ec(lightweight);
std::exception_ptr ptr;
if (!run_on_completed_on_new_thread(
util::deferred_call(
&future_data_base::run_on_completed,
std::move(this_), std::move(on_completed),
std::ref(ptr)),
ec))
{
// thread creation went wrong
if (ec) {
set_exception(hpx::detail::access_exception(ec));
return;
}

// re-throw exception in this context
HPX_ASSERT(ptr); // exception should have been set
std::rethrow_exception(ptr);
}
}
}
void handle_on_completed(completed_callback_type && on_completed);

/// Set the callback which needs to be invoked when the future becomes
/// ready. If the future is ready the function will be invoked
/// immediately.
void set_on_completed(completed_callback_type data_sink)
{
if (!data_sink) return;

std::unique_lock<mutex_type> l(mtx_);

if (is_ready_locked(l)) {

HPX_ASSERT(!on_completed_);

// invoke the callback (continuation) function right away
l.unlock();

handle_on_completed(std::move(data_sink));
}
else {
// store a combined callback wrapping the old and the new one
// make sure continuations are evaluated in the order they are
// attached
on_completed_ = compose_cb(
std::move(on_completed_), std::move(data_sink));
}
}

void wait(local::detail::condition_variable& cond, error_code& ec = throws)
{
std::unique_lock<mutex_type> l(mtx_);

// block if this entry is empty
if (state_ == empty) {
cond.wait(l, "future_data_base::wait", ec);
if (ec) return;
}

if (&ec != &throws)
ec = make_success_code();
}
virtual void wait(error_code& = throws) = 0;

future_status wait_until(local::detail::condition_variable& cond,
util::steady_clock::time_point const& abs_time, error_code& ec = throws)
{
std::unique_lock<mutex_type> l(mtx_);

// block if this entry is empty
if (state_ == empty) {
threads::thread_state_ex_enum const reason =
cond.wait_until(l, abs_time,
"future_data_base::wait_until", ec);
if (ec) return future_status::uninitialized;

if (reason == threads::wait_timeout)
return future_status::timeout;

return future_status::ready;
}
void set_on_completed(completed_callback_type data_sink);

if (&ec != &throws)
ec = make_success_code();
virtual void wait(error_code& ec = throws);

return future_status::ready; //-V110
}
virtual future_status wait_until(util::steady_clock::time_point const&,
error_code& = throws) = 0;
virtual future_status wait_until(
util::steady_clock::time_point const& abs_time, error_code& ec = throws);

virtual std::exception_ptr get_exception_ptr() const = 0;

Expand All @@ -542,6 +378,7 @@ namespace detail
mutable mutex_type mtx_;
state state_; // current state
completed_callback_type on_completed_;
local::detail::condition_variable cond_; // threads waiting in read
};

template <typename Result>
Expand Down Expand Up @@ -777,19 +614,6 @@ namespace detail
on_completed_ = completed_callback_type();
}

// continuation support

virtual void wait(error_code& ec = throws)
{
return base_type::wait(cond_, ec);
}

virtual future_status wait_until(
util::steady_clock::time_point const& abs_time, error_code& ec = throws)
{
return base_type::wait_until(cond_, abs_time, ec);
}

std::exception_ptr get_exception_ptr() const
{
HPX_ASSERT(state_ == exception);
Expand All @@ -802,7 +626,7 @@ namespace detail
using base_type::on_completed_;

private:
local::detail::condition_variable cond_; // threads waiting in read
using base_type::cond_;
typename future_data_storage<Result>::type storage_;
};

Expand Down
1 change: 1 addition & 0 deletions hpx/lcos/local/futures_factory.hpp
Expand Up @@ -9,6 +9,7 @@
#include <hpx/config.hpp>
#include <hpx/lcos/detail/future_data.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/runtime/get_worker_thread_num.hpp>
#include <hpx/runtime/launch_policy.hpp>
#include <hpx/runtime/threads/thread_data_fwd.hpp>
#include <hpx/runtime/threads/thread_enums.hpp>
Expand Down

0 comments on commit 5c8cdaf

Please sign in to comment.