Skip to content

Commit

Permalink
Partially reverting changes to parcel_await
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Jun 12, 2017
1 parent 512a318 commit b3e7c33
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 138 deletions.
7 changes: 3 additions & 4 deletions hpx/runtime/agas/big_boot_barrier.hpp
Expand Up @@ -119,12 +119,11 @@ struct HPX_EXPORT big_boot_barrier
p.parcel_id() = parcelset::parcel::generate_unique_id(source_locality_id);
}
#endif

parcelset::detail::parcel_await(std::move(p), parcelset::write_handler_type(), 0,
[this, dest](parcelset::parcel&& p, parcelset::write_handler_type&&)
auto f = [this, dest](parcelset::parcel&& p)
{
pp->send_early_parcel(dest, std::move(p));
}).apply();
};
parcelset::detail::parcel_await(std::move(p), 0, std::move(f)).apply();
} // }}}

template <typename Action, typename... Args>
Expand Down
7 changes: 3 additions & 4 deletions hpx/runtime/applier/apply.hpp
Expand Up @@ -403,13 +403,12 @@ namespace hpx
action_type_(), priority
);

parcelset::detail::parcel_await(std::move(p),
parcelset::write_handler_type(), 0,
[](parcelset::parcel&& p, parcelset::write_handler_type&&)
auto f = [](parcelset::parcel&& p)
{
hpx::get_runtime().get_parcel_handler()
.sync_put_parcel(std::move(p));
}).apply();
};
parcelset::detail::parcel_await(std::move(p), 0, std::move(f)).apply();
return false; // destination is remote
}

Expand Down
80 changes: 8 additions & 72 deletions hpx/runtime/parcelset/detail/parcel_await.hpp
Expand Up @@ -18,87 +18,23 @@
#include <vector>

namespace hpx { namespace parcelset { namespace detail {

template <typename Parcel, typename Handler, typename Derived>
struct parcel_await_base : std::enable_shared_from_this<Derived>
struct parcel_await
: std::enable_shared_from_this<parcel_await>
{
typedef hpx::util::unique_function_nonser<void(Parcel&&, Handler&&)>
put_parcel_type;

parcel_await_base(Parcel&& parcel, Handler&& handler, int archive_flags,
put_parcel_type pp)
: put_parcel_(std::move(pp))
, parcel_(std::move(parcel))
, handler_(std::move(handler))
, archive_(preprocess_, archive_flags)
, overhead_(archive_.bytes_written())
{}
typedef hpx::util::unique_function_nonser<void(parcel&&)> put_parcel_type;

void done()
{
put_parcel_(std::move(parcel_), std::move(handler_));
}
parcel_await(parcel&& p, int archive_flags, put_parcel_type pp);

bool apply_single(parcel &p)
{
archive_.reset();
archive_ << p;

// We are doing a fixed point iteration until we are sure that the
// serialization process requires nothing more to wait on ...
// Things where we need waiting:
// - (shared_)future<id_type>: when the future wasn't ready yet, we
// need to do another await round for the id splitting
// - id_type: we need to await, if and only if, the credit of the
// needs to split.
if(preprocess_.has_futures())
{
auto this_ = this->shared_from_this();
preprocess_([this_](){ this_->apply(); });
return false;
}
archive_.flush();
p.size() = preprocess_.size() + overhead_;
p.num_chunks() = archive_.get_num_chunks();
hpx::serialization::detail::preprocess::split_gids_map split_gids;
std::swap(split_gids, preprocess_.split_gids_);
p.set_split_gids(std::move(split_gids));
parcel_await(std::vector<parcel>&& parcels, int archive_flags,
put_parcel_type pp);

return true;
}
HPX_EXPORT void apply();

put_parcel_type put_parcel_;
Parcel parcel_;
Handler handler_;
std::vector<parcel> parcels_;
hpx::serialization::detail::preprocess preprocess_;
hpx::serialization::output_archive archive_;
std::size_t overhead_;
};

struct parcel_await
: parcel_await_base<parcel, write_handler_type, parcel_await>
{
typedef parcel_await_base<parcel, write_handler_type, parcel_await>
base_type;
parcel_await(parcel&& p, write_handler_type&& f, int archive_flags,
put_parcel_type pp);

HPX_EXPORT void apply();
};

struct parcels_await
: parcel_await_base<std::vector<parcel>, std::vector<write_handler_type>,
parcels_await>
{
typedef parcel_await_base<std::vector<parcel>,
std::vector<write_handler_type>, parcels_await>
base_type;

parcels_await(std::vector<parcel>&& p, std::vector<write_handler_type>&& f,
int archive_flags, put_parcel_type pp);

HPX_EXPORT void apply();

std::size_t idx_;
};
}}}
Expand Down
126 changes: 87 additions & 39 deletions hpx/runtime/parcelset/parcelport_impl.hpp
Expand Up @@ -206,32 +206,99 @@ namespace hpx { namespace parcelset
}

public:
void put_parcel(locality const & dest, parcel p, write_handler_type f)
// this is the handler used by put_parcel - it deals with a single parcel
// at a time
struct parcel_await_handler
{
HPX_ASSERT(dest.type() == type());
parcelport_impl& this_;
locality dest_;
write_handler_type f_;

void operator()(parcel&& p)
{
if (connection_handler_traits<ConnectionHandler>::
send_immediate_parcels::value &&
this_.can_send_immediate_impl<ConnectionHandler>())
{
this_.send_immediate_impl<ConnectionHandler>(
this_, dest_, &f_, &p, 1);
}
else
{
// enqueue the outgoing parcel ...
this_.enqueue_parcel(dest_, std::move(p), std::move(f_));

this_.get_connection_and_send_parcels(dest_);
}
}
};

// this is the handler used by put_parcels - this version handles a vector
// of parcels rather than just a single one
struct parcel_await_handlers
{
parcelport_impl* this_;
locality dest_;
std::vector<write_handler_type> handler_;
std::vector<parcel> parcels_;

parcel_await_handlers(
parcelport_impl& pp,
locality dest,
std::vector<write_handler_type>&& handler)
: this_(&pp),
dest_(std::move(dest)),
handler_(std::move(handler))
{}

parcel_await_handlers(parcel_await_handlers&& other)
: this_(other.this_),
dest_(std::move(other.dest_)),
handler_(std::move(other.handler_)),
parcels_(std::move(other.parcels_))
{}

parcel_await_handlers& operator=(parcel_await_handlers&& other)
{
this_ = other.this_;
dest_ = std::move(other.dest_);
handler_ = std::move(other.handler_);
parcels_ = std::move(other.parcels_);
return *this;
}

void operator()(parcel&& p)
{
parcels_.push_back(std::move(p));
// enqueue the outgoing parcels ...

// We create a shared pointer of the parcels_await object since it
// needs to be kept alive as long as there are futures not ready
// or GIDs to be split. This is necessary to preserve the identiy
// of the this pointer.
std::make_shared<detail::parcel_await>(std::move(p), std::move(f),
archive_flags_, [this, dest](parcel&& p, write_handler_type&& f)
if (parcels_.size() == handler_.size())
{
if (connection_handler_traits<ConnectionHandler>::
send_immediate_parcels::value &&
can_send_immediate_impl<ConnectionHandler>())
this_->can_send_immediate_impl<ConnectionHandler>())
{
send_immediate_impl<ConnectionHandler>(
*this, dest, &f, &p, 1);
this_->send_immediate_impl<ConnectionHandler>(
*this_, dest_, handler_.data(), parcels_.data(), parcels_.size());
}
else
{
// enqueue the outgoing parcel ...
enqueue_parcel(dest, std::move(p), std::move(f));
this_->enqueue_parcels(
dest_, std::move(parcels_), std::move(handler_));

get_connection_and_send_parcels(dest);
this_->get_connection_and_send_parcels(dest_);
}
})->apply();
}
}
};

void put_parcel(locality const & dest, parcel p, write_handler_type f)
{
HPX_ASSERT(dest.type() == type());

std::make_shared<detail::parcel_await>(
std::move(p), archive_flags_,
parcel_await_handler{*this, dest, std::move(f)})->apply();
}

void put_parcels(locality const& dest, std::vector<parcel> parcels,
Expand All @@ -253,31 +320,12 @@ namespace hpx { namespace parcelset
parcels[i].destination_locality());
}
#endif
// We create a shared pointer of the parcels_await object since it
// needs to be kept alive as long as there are futures not ready
// or GIDs to be split. This is necessary to preserve the identiy
// of the this pointer.
std::make_shared<detail::parcels_await>(std::move(parcels),
std::move(handlers), archive_flags_,
[this, dest](std::vector<parcel>&& parcels,
std::vector<write_handler_type>&& handlers)
{
if (connection_handler_traits<ConnectionHandler>::
send_immediate_parcels::value &&
can_send_immediate_impl<ConnectionHandler>())
{
send_immediate_impl<ConnectionHandler>(
*this, dest, handlers.data(), parcels.data(),
parcels.size());
}
else
{
enqueue_parcels(
dest, std::move(parcels), std::move(handlers));
parcel_await_handlers handler(
*this, dest, std::move(handlers));
handler.parcels_.reserve(parcels.size());

get_connection_and_send_parcels(dest);
}
})->apply();
std::make_shared<detail::parcel_await>(
std::move(parcels), archive_flags_, std::move(handler))->apply();
}

void send_early_parcel(locality const & dest, parcel p)
Expand Down
1 change: 1 addition & 0 deletions hpx/runtime/serialization/detail/preprocess.hpp
Expand Up @@ -122,6 +122,7 @@ namespace hpx { namespace serialization { namespace detail
promise_.set_value();

hpx::future<void> fut = promise_.get_future();

// we don't call f directly to avoid possible stack overflow.
auto shared_state_ = hpx::traits::future_access<hpx::future<void> >::
get_shared_state(fut);
Expand Down
54 changes: 35 additions & 19 deletions src/runtime/parcelset/detail/parcel_await.cpp
Expand Up @@ -14,36 +14,52 @@

namespace hpx { namespace parcelset { namespace detail {

parcel_await::parcel_await(parcel&& p, write_handler_type&& f,
int archive_flags, parcel_await::put_parcel_type pp)
: base_type(std::move(p), std::move(f), archive_flags, std::move(pp))
parcel_await::parcel_await(parcel&& p, int archive_flags,
parcel_await::put_parcel_type pp)
: put_parcel_(std::move(pp)),
archive_(preprocess_, archive_flags),
overhead_(archive_.bytes_written()),
idx_(0)
{
parcels_.push_back(std::move(p));
}

void parcel_await::apply()
{
if (apply_single(parcel_))
{
done();
}
}

parcels_await::parcels_await(std::vector<parcel>&& p,
std::vector<write_handler_type>&& f, int archive_flags,
parcels_await::put_parcel_type pp)
: base_type(std::move(p), std::move(f), archive_flags, std::move(pp)),
parcel_await::parcel_await(std::vector<parcel>&& parcels, int archive_flags,
parcel_await::put_parcel_type pp)
: put_parcel_(std::move(pp)),
parcels_(std::move(parcels)),
archive_(preprocess_, archive_flags),
overhead_(archive_.bytes_written()),
idx_(0)
{
}

void parcels_await::apply()
void parcel_await::apply()
{
for (/*idx_*/; idx_ != parcel_.size(); ++idx_)
for (/*idx_*/; idx_ != parcels_.size(); ++idx_)
{
if(!apply_single(parcel_[idx_]))
archive_.reset();
archive_ << parcels_[idx_];

// We are doing a fixed point iteration until we are sure that the
// serialization process requires nothing more to wait on ...
// Things where we need waiting:
// - (shared_)future<id_type>: when the future wasn't ready yet, we
// need to do another await round for the id splitting
// - id_type: we need to await, if and only if, the credit of the
// needs to split.
if(preprocess_.has_futures())
{
auto this_ = this->shared_from_this();
preprocess_([this_](){ this_->apply(); });
return;
}
archive_.flush();
parcels_[idx_].size() = preprocess_.size() + overhead_;
parcels_[idx_].num_chunks() = archive_.get_num_chunks();
parcels_[idx_].set_split_gids(std::move(preprocess_.split_gids_));
put_parcel_(std::move(parcels_[idx_]));
}
done();
}

}}}

0 comments on commit b3e7c33

Please sign in to comment.