Skip to content

Commit

Permalink
Merge pull request #2619 from STEllAR-GROUP/lf_multiple_parcels
Browse files Browse the repository at this point in the history
LF multiple parcels
  • Loading branch information
hkaiser committed Jun 7, 2017
2 parents 5e88a63 + 5ac0d87 commit bb7ab04
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 198 deletions.
2 changes: 1 addition & 1 deletion hpx/config.hpp
Expand Up @@ -457,7 +457,7 @@
# if defined(__powerpc__) || defined(__INTEL_COMPILER)
# define HPX_SMALL_STACK_SIZE 0x20000 // 128kByte
# else
# define HPX_SMALL_STACK_SIZE 0xC000 // 48kByte
# define HPX_SMALL_STACK_SIZE 0x10000 // 64kByte
# endif
# endif
# endif
Expand Down
7 changes: 4 additions & 3 deletions hpx/runtime/agas/big_boot_barrier.hpp
Expand Up @@ -119,11 +119,12 @@ struct HPX_EXPORT big_boot_barrier
p.parcel_id() = parcelset::parcel::generate_unique_id(source_locality_id);
}
#endif
auto f = [this, dest](parcelset::parcel&& p)

parcelset::detail::parcel_await(std::move(p), parcelset::write_handler_type(), 0,
[this, dest](parcelset::parcel&& p, parcelset::write_handler_type&&)
{
pp->send_early_parcel(dest, std::move(p));
};
parcelset::detail::parcel_await(std::move(p), 0, std::move(f)).apply();
}).apply();
} // }}}

template <typename Action, typename... Args>
Expand Down
7 changes: 4 additions & 3 deletions hpx/runtime/applier/apply.hpp
Expand Up @@ -403,12 +403,13 @@ namespace hpx
action_type_(), priority
);

auto f = [](parcelset::parcel&& p)
parcelset::detail::parcel_await(std::move(p),
parcelset::write_handler_type(), 0,
[](parcelset::parcel&& p, parcelset::write_handler_type&&)
{
hpx::get_runtime().get_parcel_handler()
.sync_put_parcel(std::move(p));
};
parcelset::detail::parcel_await(std::move(p), 0, std::move(f)).apply();
}).apply();
return false; // destination is remote
}

Expand Down
80 changes: 72 additions & 8 deletions hpx/runtime/parcelset/detail/parcel_await.hpp
Expand Up @@ -18,23 +18,87 @@
#include <vector>

namespace hpx { namespace parcelset { namespace detail {
struct parcel_await
: std::enable_shared_from_this<parcel_await>

template <typename Parcel, typename Handler, typename Derived>
struct parcel_await_base : std::enable_shared_from_this<Derived>
{
typedef hpx::util::unique_function_nonser<void(parcel&&)> put_parcel_type;
typedef hpx::util::unique_function_nonser<void(Parcel&&, Handler&&)>
put_parcel_type;

parcel_await(parcel&& p, int archive_flags, put_parcel_type pp);
parcel_await_base(Parcel&& parcel, Handler&& handler, int archive_flags,
put_parcel_type pp)
: put_parcel_(std::move(pp))
, parcel_(std::move(parcel))
, handler_(std::move(handler))
, archive_(preprocess_, archive_flags)
, overhead_(archive_.bytes_written())
{}

parcel_await(std::vector<parcel>&& parcels, int archive_flags,
put_parcel_type pp);
void done()
{
put_parcel_(std::move(parcel_), std::move(handler_));
}

HPX_EXPORT void apply();
bool apply_single(parcel &p)
{
archive_.reset();
archive_ << p;

// We are doing a fixed point iteration until we are sure that the
// serialization process requires nothing more to wait on ...
// Things where we need waiting:
// - (shared_)future<id_type>: when the future wasn't ready yet, we
// need to do another await round for the id splitting
// - id_type: we need to await, if and only if, the credit of the
// needs to split.
if(preprocess_.has_futures())
{
auto this_ = this->shared_from_this();
preprocess_([this_](){ this_->apply(); });
return false;
}
archive_.flush();
p.size() = preprocess_.size() + overhead_;
p.num_chunks() = archive_.get_num_chunks();
hpx::serialization::detail::preprocess::split_gids_map split_gids;
std::swap(split_gids, preprocess_.split_gids_);
p.set_split_gids(std::move(split_gids));

return true;
}

put_parcel_type put_parcel_;
std::vector<parcel> parcels_;
Parcel parcel_;
Handler handler_;
hpx::serialization::detail::preprocess preprocess_;
hpx::serialization::output_archive archive_;
std::size_t overhead_;
};

struct parcel_await
: parcel_await_base<parcel, write_handler_type, parcel_await>
{
typedef parcel_await_base<parcel, write_handler_type, parcel_await>
base_type;
parcel_await(parcel&& p, write_handler_type&& f, int archive_flags,
put_parcel_type pp);

HPX_EXPORT void apply();
};

struct parcels_await
: parcel_await_base<std::vector<parcel>, std::vector<write_handler_type>,
parcels_await>
{
typedef parcel_await_base<std::vector<parcel>,
std::vector<write_handler_type>, parcels_await>
base_type;

parcels_await(std::vector<parcel>&& p, std::vector<write_handler_type>&& f,
int archive_flags, put_parcel_type pp);

HPX_EXPORT void apply();

std::size_t idx_;
};
}}}
Expand Down

0 comments on commit bb7ab04

Please sign in to comment.