Skip to content

Commit

Permalink
Fixing futures_factory
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Nov 29, 2017
1 parent ec7fc7f commit 43b1f20
Showing 1 changed file with 181 additions and 62 deletions.
243 changes: 181 additions & 62 deletions hpx/lcos/local/futures_factory.hpp
Expand Up @@ -20,7 +20,6 @@
#include <hpx/util/thread_description.hpp>

#include <hpx/parallel/executors/execution.hpp>
#include <hpx/parallel/executors/parallel_executor.hpp>

#include <boost/intrusive_ptr.hpp>

Expand All @@ -36,57 +35,34 @@ namespace hpx { namespace lcos { namespace local
///////////////////////////////////////////////////////////////////////
template <typename Result, typename F, typename Executor,
typename Base = lcos::detail::task_base<Result> >
struct task_object : Base
struct task_object;

template <typename Result, typename F, typename Base>
struct task_object<Result, F, void, Base>
: Base
{
typedef Base base_type;
typedef typename Base::result_type result_type;
typedef typename Base::init_no_addref init_no_addref;

F f_;
Executor* exec_;

task_object(F const& f)
: f_(f)
, exec_(nullptr)
{}

task_object(F&& f)
: f_(std::move(f))
, exec_(nullptr)
{}

task_object(Executor& exec, F const& f)
: f_(f)
, exec_(&exec)
{}

task_object(Executor& exec, F&& f)
: f_(std::move(f))
, exec_(&exec)
{}

task_object(F const& f, init_no_addref no_addref)
: base_type(no_addref)
, f_(f)
, exec_(nullptr)
{}

task_object(F&& f, init_no_addref no_addref)
: base_type(no_addref)
, f_(std::move(f))
, exec_(nullptr)
{}

task_object(Executor& exec, F const& f, init_no_addref no_addref)
: base_type(no_addref)
, f_(f)
, exec_(&exec)
{}

task_object(Executor& exec, F&& f, init_no_addref no_addref)
: base_type(no_addref)
, f_(std::move(f))
, exec_(&exec)
{}

void do_run()
Expand Down Expand Up @@ -116,6 +92,88 @@ namespace hpx { namespace lcos { namespace local
}
}

protected:
// run in a separate thread
threads::thread_id_type apply(launch policy,
threads::thread_priority priority,
threads::thread_stacksize stacksize, error_code& ec) override
{
this->check_started();

typedef typename Base::future_base_type future_base_type;
future_base_type this_(this);

if (policy == launch::fork) {
return threads::register_thread_nullary(
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(f_, "task_object::apply"),
threads::pending_do_not_schedule, true,
threads::thread_priority_boost, get_worker_thread_num(),
stacksize, ec);
}
else {
threads::register_thread_nullary(
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(f_, "task_object::apply"),
threads::pending, false, priority, std::size_t(-1),
stacksize, ec);
return threads::invalid_thread_id;
}
}
};

template <typename Result, typename F, typename Executor, typename Base>
struct task_object
: task_object<Result, F, void, Base>
{
typedef task_object<Result, F, void, Base> base_type;
typedef typename Base::result_type result_type;
typedef typename Base::init_no_addref init_no_addref;

Executor* exec_;

task_object(F const& f)
: base_type(f)
, exec_(nullptr)
{}

task_object(F&& f)
: base_type(std::move(f))
, exec_(nullptr)
{}

task_object(Executor& exec, F const& f)
: base_type(f)
, exec_(&exec)
{}

task_object(Executor& exec, F&& f)
: base_type(std::move(f))
, exec_(&exec)
{}

task_object(F const& f, init_no_addref no_addref)
: base_type(f, no_addref)
, exec_(nullptr)
{}

task_object(F&& f, init_no_addref no_addref)
: base_type(std::move(f), no_addref)
, exec_(nullptr)
{}

task_object(Executor& exec, F const& f, init_no_addref no_addref)
: base_type(f, no_addref)
, exec_(&exec)
{}

task_object(Executor& exec, F&& f, init_no_addref no_addref)
: base_type(std::move(f), no_addref)
, exec_(&exec)
{}

protected:
// run in a separate thread
threads::thread_id_type apply(launch policy,
Expand All @@ -137,7 +195,7 @@ namespace hpx { namespace lcos { namespace local
return threads::register_thread_nullary(
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(f_, "task_object::apply"),
util::thread_description(this->f_, "task_object::apply"),
threads::pending_do_not_schedule, true,
threads::thread_priority_boost, get_worker_thread_num(),
stacksize, ec);
Expand All @@ -146,14 +204,47 @@ namespace hpx { namespace lcos { namespace local
threads::register_thread_nullary(
util::deferred_call(
&base_type::run_impl, std::move(this_)),
util::thread_description(f_, "task_object::apply"),
util::thread_description(this->f_, "task_object::apply"),
threads::pending, false, priority, std::size_t(-1),
stacksize, ec);
return threads::invalid_thread_id;
}
}
};

///////////////////////////////////////////////////////////////////////
template <typename Result, typename F, typename Executor>
struct cancelable_task_object;

template <typename Result, typename F>
struct cancelable_task_object<Result, F, void>
: task_object<Result, F, void,
lcos::detail::cancelable_task_base<Result> >
{
typedef task_object<
Result, F, void,
lcos::detail::cancelable_task_base<Result>
> base_type;
typedef typename base_type::result_type result_type;
typedef typename base_type::init_no_addref init_no_addref;

cancelable_task_object(F const& f)
: base_type(f)
{}

cancelable_task_object(F && f)
: base_type(std::move(f))
{}

cancelable_task_object(F const& f, init_no_addref no_addref)
: base_type(f, no_addref)
{}

cancelable_task_object(F && f, init_no_addref no_addref)
: base_type(std::move(f), no_addref)
{}
};

template <typename Result, typename F, typename Executor>
struct cancelable_task_object
: task_object<Result, F, Executor,
Expand All @@ -174,12 +265,12 @@ namespace hpx { namespace lcos { namespace local
: base_type(std::move(f))
{}

cancelable_task_object(Executor& sched, F const& f)
: base_type(sched, f)
cancelable_task_object(Executor& exec, F const& f)
: base_type(exec, f)
{}

cancelable_task_object(Executor& sched, F && f)
: base_type(sched, std::move(f))
cancelable_task_object(Executor& exec, F && f)
: base_type(exec, std::move(f))
{}

cancelable_task_object(F const& f, init_no_addref no_addref)
Expand All @@ -190,14 +281,14 @@ namespace hpx { namespace lcos { namespace local
: base_type(std::move(f), no_addref)
{}

cancelable_task_object(Executor& sched, F const& f,
cancelable_task_object(Executor& exec, F const& f,
init_no_addref no_addref)
: base_type(sched, f, no_addref)
: base_type(exec, f, no_addref)
{}

cancelable_task_object(Executor& sched, F && f,
cancelable_task_object(Executor& exec, F && f,
init_no_addref no_addref)
: base_type(sched, std::move(f), no_addref)
: base_type(exec, std::move(f), no_addref)
{}
};
}
Expand All @@ -213,9 +304,42 @@ namespace hpx { namespace lcos { namespace local

namespace detail
{
template <typename Result, bool Cancelable,
typename Executor = parallel::execution::parallel_executor>
struct create_task_object
///////////////////////////////////////////////////////////////////////
template <typename Result, bool Cancelable, typename Executor = void>
struct create_task_object;

template <typename Result>
struct create_task_object<Result, false, void>
{
typedef
boost::intrusive_ptr<lcos::detail::task_base<Result> >
return_type;
typedef
typename lcos::detail::future_data_refcnt_base::init_no_addref
init_no_addref;

template <typename F>
static return_type call(F&& f)
{
return return_type(
new task_object<Result, F, void>(
std::forward<F>(f), init_no_addref()),
false);
}

template <typename R>
static return_type call(R (*f)())
{
return return_type(
new task_object<Result, Result (*)(), void>(
f, init_no_addref()),
false);
}
};

template <typename Result, typename Executor>
struct create_task_object<Result, false, Executor>
: create_task_object<Result, false, void>
{
typedef
boost::intrusive_ptr<lcos::detail::task_base<Result> >
Expand All @@ -241,12 +365,24 @@ namespace hpx { namespace lcos { namespace local
exec, f, init_no_addref()),
false);
}
};

///////////////////////////////////////////////////////////////////////
template <typename Result>
struct create_task_object<Result, true, void>
{
typedef
boost::intrusive_ptr<lcos::detail::task_base<Result> >
return_type;
typedef
typename lcos::detail::future_data_refcnt_base::init_no_addref
init_no_addref;

template <typename F>
static return_type call(F&& f)
{
return return_type(
new task_object<Result, F, Executor>(
new cancelable_task_object<Result, F, void>(
std::forward<F>(f), init_no_addref()),
false);
}
Expand All @@ -255,14 +391,15 @@ namespace hpx { namespace lcos { namespace local
static return_type call(R (*f)())
{
return return_type(
new task_object<Result, Result (*)(), Executor>(
new cancelable_task_object<Result, Result (*)(), void>(
f, init_no_addref()),
false);
}
};

template <typename Result, typename Executor>
struct create_task_object<Result, true, Executor>
: create_task_object<Result, true, void>
{
typedef
boost::intrusive_ptr<lcos::detail::task_base<Result> >
Expand All @@ -288,24 +425,6 @@ namespace hpx { namespace lcos { namespace local
exec, f, init_no_addref()),
false);
}

template <typename F>
static return_type call(F&& f)
{
return return_type(
new cancelable_task_object<Result, F, Executor>(
std::forward<F>(f), init_no_addref()),
false);
}

template <typename R>
static return_type call(R (*f)())
{
return return_type(
new cancelable_task_object<Result, Result (*)(), Executor>(
f, init_no_addref()),
false);
}
};
}

Expand Down

0 comments on commit 43b1f20

Please sign in to comment.