Skip to content

Commit

Permalink
Properly releasing parcelport write handlers.
Browse files Browse the repository at this point in the history
This patch is properly releasing handlers used when sending a parcel. This avoids
possible memory leaks. In addition, a race between sending a new parcel and
calling the postprocess handler was fixed.
  • Loading branch information
Thomas Heller committed Jun 16, 2017
1 parent 0025fba commit 71ef3ee
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 5 deletions.
10 changes: 9 additions & 1 deletion hpx/plugins/parcelport/mpi/sender.hpp
Expand Up @@ -70,7 +70,15 @@ namespace hpx { namespace parcelset { namespace policies { namespace mpi
if (connection->send())
{
error_code ec;
connection->postprocess_handler_(
util::unique_function_nonser<
void(
error_code const&
, parcelset::locality const&
, connection_ptr
)
> postprocess_handler;
std::swap(postprocess_handler, connection->postprocess_handler_);
postprocess_handler(
ec, connection->destination(), connection);
}
else
Expand Down
4 changes: 4 additions & 0 deletions hpx/plugins/parcelport/mpi/sender_connection.hpp
Expand Up @@ -96,6 +96,8 @@ namespace hpx { namespace parcelset { namespace policies { namespace mpi
template <typename Handler, typename ParcelPostprocess>
void async_write(Handler && handler, ParcelPostprocess && parcel_postprocess)
{
HPX_ASSERT(!handler_);
HPX_ASSERT(!postprocess_handler_);
HPX_ASSERT(!buffer_.data_.empty());
request_ptr_ = nullptr;
chunks_idx_ = 0;
Expand All @@ -115,6 +117,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace mpi
}
else
{
HPX_ASSERT(!handler_);
error_code ec;
parcel_postprocess(ec, there_, shared_from_this());
}
Expand Down Expand Up @@ -259,6 +262,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace mpi

error_code ec;
handler_(ec);
handler_.reset();
buffer_.data_point_.time_ =
util::high_resolution_clock::now() - buffer_.data_point_.time_;
pp_->add_sent_data(buffer_.data_point_);
Expand Down
28 changes: 26 additions & 2 deletions hpx/plugins/parcelport/tcp/sender.hpp
Expand Up @@ -99,10 +99,17 @@ namespace hpx { namespace parcelset { namespace policies { namespace tcp
void async_write(Handler && handler,
ParcelPostprocess && parcel_postprocess)
{
#if defined(HPX_TRACK_STATE_OF_OUTGOING_TCP_CONNECTION)
HPX_ASSERT(state_ == state_send_pending);
#endif
HPX_ASSERT(!buffer_.data_.empty());
HPX_ASSERT(!handler_);
HPX_ASSERT(!postprocess_handler_);

handler_ = std::forward<Handler>(handler);
postprocess_handler_ = std::forward<ParcelPostprocess>(parcel_postprocess);
HPX_ASSERT(handler_);
HPX_ASSERT(postprocess_handler_);

#if defined(HPX_TRACK_STATE_OF_OUTGOING_TCP_CONNECTION)
state_ = state_async_write;
Expand Down Expand Up @@ -165,10 +172,19 @@ namespace hpx { namespace parcelset { namespace policies { namespace tcp
#endif
// just call initial handler
handler_(e);
handler_.reset();
if (e)
{
// inform post-processing handler of error as well
postprocess_handler_(e, there_, shared_from_this());
util::unique_function_nonser<
void(
boost::system::error_code const&
, parcelset::locality const&
, std::shared_ptr<sender>
)
> postprocess_handler;
std::swap(postprocess_handler, postprocess_handler_);
postprocess_handler(e, there_, shared_from_this());
return;
}

Expand Down Expand Up @@ -202,7 +218,15 @@ namespace hpx { namespace parcelset { namespace policies { namespace tcp
// Call post-processing handler, which will send remaining pending
// parcels. Pass along the connection so it can be reused if more
// parcels have to be sent.
postprocess_handler_(e, there_, shared_from_this());
util::unique_function_nonser<
void(
boost::system::error_code const&
, parcelset::locality const&
, std::shared_ptr<sender>
)
> postprocess_handler;
std::swap(postprocess_handler, postprocess_handler_);
postprocess_handler(e, there_, shared_from_this());
}

/// Socket for the parcelport_connection.
Expand Down
6 changes: 6 additions & 0 deletions hpx/runtime/parcelset/parcelport_connection.hpp
Expand Up @@ -81,6 +81,12 @@ namespace hpx { namespace parcelset {
#if defined(HPX_TRACK_STATE_OF_OUTGOING_TCP_CONNECTION)
void set_state(state newstate)
{
if (newstate == state_send_pending)
{
HPX_ASSERT(state_ == state_initialized ||
state_ == state_reinitialized ||
state_ == state_handle_read_ack);
}
state_ = newstate;
}
#endif
Expand Down
8 changes: 6 additions & 2 deletions hpx/runtime/parcelset/parcelport_impl.hpp
Expand Up @@ -717,13 +717,17 @@ namespace hpx { namespace parcelset
#endif
{
HPX_ASSERT(it->first == locality_id);
HPX_ASSERT(handlers.size() == 0);
HPX_ASSERT(handlers.size() == parcels.size());
#if defined(HPX_PARCELSET_PENDING_PARCELS_WORKAROUND)
std::swap(parcels, *util::get<0>(it->second));
HPX_ASSERT(util::get<0>(it->second)->size() == 0);
#else
std::swap(parcels, util::get<0>(it->second));
HPX_ASSERT(util::get<0>(it->second).size() == 0);
#endif
std::swap(handlers, util::get<1>(it->second));
HPX_ASSERT(handlers.size() == parcels.size());

HPX_ASSERT(!handlers.empty());
}
Expand Down Expand Up @@ -877,7 +881,7 @@ namespace hpx { namespace parcelset
--operations_in_flight_;

#if defined(HPX_TRACK_STATE_OF_OUTGOING_TCP_CONNECTION)
sender_connection->set_state(parcelport_connection::state_scheduled_thread);
sender_connection->set_state(connection::state_scheduled_thread);
#endif
if (!ec)
{
Expand Down Expand Up @@ -916,7 +920,7 @@ namespace hpx { namespace parcelset
std::vector<write_handler_type>&& handlers)
{
#if defined(HPX_TRACK_STATE_OF_OUTGOING_TCP_CONNECTION)
sender_connection->set_state(parcelport_connection::state_send_pending);
sender_connection->set_state(connection::state_send_pending);
#endif

#if defined(HPX_DEBUG)
Expand Down
1 change: 1 addition & 0 deletions plugins/parcelport/libfabric/parcelport_libfabric.cpp
Expand Up @@ -382,6 +382,7 @@ namespace libfabric
LOG_DEBUG_MSG("parcelport::async_write using sender " << hexpointer(snd));
snd->dst_addr_ = addr;
snd->buffer_ = std::move(buffer);
HPX_ASSERT(!snd->handler_);
snd->handler_ = std::forward<Handler>(handler);
snd->async_write_impl();
// after a send poll to make progress on the network and
Expand Down
1 change: 1 addition & 0 deletions plugins/parcelport/libfabric/sender.cpp
Expand Up @@ -235,6 +235,7 @@ namespace libfabric

error_code ec;
handler_(ec);
handler_.reset();

// cleanup header and message region
memory_pool_->deallocate(message_region_);
Expand Down

0 comments on commit 71ef3ee

Please sign in to comment.