Skip to content

Commit

Permalink
Improving UCX Parcelport
Browse files Browse the repository at this point in the history
- added ucx_context abstraction
- improved the way to make progress on the network
  • Loading branch information
sithhell committed Jan 25, 2017
1 parent c61baaa commit 00c495d
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 502 deletions.
91 changes: 69 additions & 22 deletions hpx/plugins/parcelport/ucx/receiver.hpp
Expand Up @@ -14,6 +14,10 @@
#include <hpx/runtime/parcelset/locality.hpp>

#include <hpx/plugins/parcelport/ucx/locality.hpp>
#include <hpx/plugins/parcelport/ucx/ucx_context.hpp>
#include <hpx/plugins/parcelport/ucx/ucx_error.hpp>

#include <hpx/util/detail/yield_k.hpp>

#define NVALGRIND

Expand All @@ -27,25 +31,25 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx

template <typename Parcelport>
struct receiver
: uct_completion_t
{
typedef std::vector<char>
data_type;
typedef parcel_buffer<data_type, data_type> buffer_type;

// @FIXME: make header size configurable
receiver(
uct_md_h pd,
ucx_context& context,
std::size_t sender_handle,
std::size_t rpack_length,
std::uint64_t remote_address,
void *packed_key,
Parcelport& pp
)
: am_ep_(nullptr)
, rma_ep_(nullptr)
, pd_(pd)
, header_(pd, 512, rpack_length)
, rkey_(rpack_length)
, context_(context)
, header_(context.pd_, 512, context.pd_attr_.rkey_packed_size)
, rkey_(context.pd_attr_.rkey_packed_size)
, sender_handle_(sender_handle)
, remote_header_address_(remote_address)
, pp_(pp)
Expand Down Expand Up @@ -93,18 +97,16 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx

// connect to iface...
void connect(
uct_iface_h am_iface,
uct_iface_addr_t *am_iface_addr,
uct_device_addr_t *am_device_addr,
uct_iface_h rma_iface,
uct_iface_addr_t *rma_iface_addr,
uct_device_addr_t *rma_device_addr
)
{
ucs_status_t status;
// Establish the connection to our AM endpoint
status = uct_ep_create_connected(
am_iface, am_device_addr, am_iface_addr, &am_ep_);
context_.am_iface_, am_device_addr, am_iface_addr, &am_ep_);

if (status != UCS_OK)
{
Expand All @@ -114,7 +116,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
}

status = uct_ep_create_connected(
rma_iface, rma_device_addr, rma_iface_addr, &rma_ep_);
context_.rma_iface_, rma_device_addr, rma_iface_addr, &rma_ep_);

if (status != UCS_OK)
{
Expand All @@ -126,18 +128,16 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx

// connect to ep...
void connect(
uct_iface_h am_iface,
uct_iface_addr_t *am_iface_addr,
uct_device_addr_t *am_device_addr,
uct_iface_h rma_iface,
uct_device_addr_t *rma_dev_addr,
uct_ep_addr_t *rma_ep_addr
)
{
ucs_status_t status;
// Establish the connection to our AM endpoint
status = uct_ep_create_connected(
am_iface, am_device_addr, am_iface_addr, &am_ep_);
context_.am_iface_, am_device_addr, am_iface_addr, &am_ep_);

if (status != UCS_OK)
{
Expand All @@ -147,7 +147,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
}

// Establish the connection to our RMA endpoint...
status = uct_ep_create(rma_iface, &rma_ep_);
status = uct_ep_create(context_.rma_iface_, &rma_ep_);
if (status != UCS_OK)
{
std::string error = "receiver::connect, create RMA endpoint ";
Expand Down Expand Up @@ -208,7 +208,26 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
return true;
}

void read(std::uint64_t header_length, uct_completion_t *completion)
static void handle_header_completion(uct_completion_t *self, ucs_status_t status)
{
receiver* this_ = static_cast<receiver*>(self);
HPX_PARCELPORT_UCX_THROW_IF(status, UCS_OK);

if (this_->read_header_done())
{
for (std::size_t k = 0; !this_->read_done(); ++k)
{
this_->context_.progress();
hpx::util::detail::yield_k(k, "ucx::receiver::handle_data_completion");
}
}
else
{
this_->read_data();
}
}

void read(std::uint64_t header_length)
{
ucs_status_t status;

Expand All @@ -217,10 +236,14 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
header_iov_.length = header_length;

// std::cout << "reading header with length " << header_length << " " << std::hex << remote_header_address_ << "\n";
//
//
func = handle_header_completion;
count = 1;

status = uct_ep_get_zcopy(
rma_ep_, &header_iov_, 1,
remote_header_address_, remote_header_.rkey, completion);
remote_header_address_, remote_header_.rkey, this);
// If the status is in progress, the completion handle will get called.
if (status == UCS_INPROGRESS) return;

Expand Down Expand Up @@ -261,12 +284,24 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
return false;
}

void read_data(uct_completion_t *completion)
static void handle_data_completion(uct_completion_t *self, ucs_status_t status)
{
receiver* this_ = static_cast<receiver*>(self);
HPX_PARCELPORT_UCX_THROW_IF(status, UCS_OK);

for (std::size_t k = 0; !this_->read_done(); ++k)
{
this_->context_.progress();
hpx::util::detail::yield_k(k, "ucx::parcelport::handle_data_completion");
}
}

void read_data()
{
ucs_status_t status;

status =
uct_md_mem_reg(pd_, buffer_.data_.data(), buffer_.data_.size(),
uct_md_mem_reg(context_.pd_, buffer_.data_.data(), buffer_.data_.size(),
UCT_MD_MEM_FLAG_NONBLOCK, &uct_mem_);
if (status != UCS_OK)
{
Expand All @@ -291,9 +326,12 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
uct_rkey_bundle_t remote_data;
status = uct_rkey_unpack(rkey_.data(), &remote_data);

count = 1;
func = handle_data_completion;

status = uct_ep_get_zcopy(
rma_ep_, &data_iov_, 1,
remote_data_address, remote_data.rkey, completion);
remote_data_address, remote_data.rkey, this);
// If the status is in progress, the completion handle will get called.
if (status == UCS_INPROGRESS) return;

Expand All @@ -305,30 +343,39 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
}
}

void read_done()
bool read_done()
{
decode_parcels(pp_, std::move(buffer_), -1);
if (!buffer_.data_.empty())
{
decode_parcels(pp_, std::move(buffer_), std::size_t(-1));
buffer_.clear();
}

if (uct_mem_ != nullptr)
{
uct_md_mem_dereg(pd_, uct_mem_);
uct_md_mem_dereg(context_.pd_, uct_mem_);
uct_mem_ = nullptr;
}

ucs_status_t status;
status = uct_ep_am_short(
am_ep_, read_ack_message, sender_handle_, nullptr, 0);
if (status == UCS_ERR_NO_RESOURCE)
{
return false;
}
if (status != UCS_OK)
{
std::string error = "receiver::read_done: ";
error += ucs_status_string(status);
throw std::runtime_error(error);
}
return true;
}

uct_ep_h am_ep_;
uct_ep_h rma_ep_;
uct_md_h pd_;
ucx_context &context_;
header header_;
std::vector<char> rkey_;
std::uint64_t sender_handle_;
Expand Down
55 changes: 29 additions & 26 deletions hpx/plugins/parcelport/ucx/sender.hpp
Expand Up @@ -16,6 +16,7 @@
#include <hpx/plugins/parcelport/ucx/active_messages.hpp>
#include <hpx/plugins/parcelport/ucx/locality.hpp>
#include <hpx/plugins/parcelport/ucx/header.hpp>
#include <hpx/plugins/parcelport/ucx/ucx_context.hpp>

#define NVALGRIND

Expand All @@ -35,25 +36,22 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
// @FIXME: make header size configurable
sender(
parcelset::locality const& there,
uct_iface_h am_iface,
uct_iface_h rma_iface,
uct_md_h pd,
std::size_t rpack_length)
ucx_context& context,
bool rma_connect_to_ep)
: there_(there)
, am_iface_(am_iface)
, context_(context)
, am_ep_(nullptr)
, rma_ep_(nullptr)
, pd_(pd)
, header_(pd, 512, rpack_length)
, rkey_(rpack_length)
, header_(context.pd_, 512, context_.pd_attr_.rkey_packed_size)
, rkey_(context_.pd_attr_.rkey_packed_size)
, receive_handle_(0)
, rma_connect_to_ep_(rma_iface != nullptr)
, rma_connect_to_ep_(rma_connect_to_ep)
{
// std::cout << "sending to: " << there_ << '\n';
locality &lt = there_.get<locality>();
ucs_status_t status;
status = uct_ep_create_connected(
am_iface_, lt.am_addr().device_addr(), lt.am_addr().iface_addr(), &am_ep_);
context_.am_iface_, lt.am_addr().device_addr(), lt.am_addr().iface_addr(), &am_ep_);

if (status != UCS_OK)
{
Expand All @@ -63,7 +61,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx

if (rma_connect_to_ep_)
{
status = uct_ep_create(rma_iface, &rma_ep_);
status = uct_ep_create(context_.rma_iface_, &rma_ep_);

if (status != UCS_OK)
{
Expand All @@ -81,12 +79,12 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx

if (am_ep_ != nullptr)
{
if (receive_handle_ != 0)
{
status = uct_ep_am_short(
am_ep_, close_message, receive_handle_, nullptr, 0);
HPX_ASSERT(status == UCS_OK);
}
// if (receive_handle_ != 0)
// {
// status = uct_ep_am_short(
// am_ep_, close_message, receive_handle_, nullptr, 0);
// HPX_ASSERT(status == UCS_OK);
// }
// status = UCS_INPROGRESS;
// while (status == UCS_INPROGRESS)
// {
Expand Down Expand Up @@ -148,7 +146,6 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
// connection...
else
{
// std::cout << "need EP connection...\n";
am_iface_offset = rma_device_addr_len + rma_ep_addr_len;
size += am_iface_offset;
payload.resize(size);
Expand Down Expand Up @@ -222,7 +219,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
{
// @TODO: memory registration cache...
status =
uct_md_mem_reg(pd_, buffer_.data_.data(), buffer_.data_.size(),
uct_md_mem_reg(context_.pd_, buffer_.data_.data(), buffer_.data_.size(),
UCT_MD_MEM_FLAG_NONBLOCK, &uct_mem_);
if (status != UCS_OK)
{
Expand All @@ -234,16 +231,21 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx
std::memcpy(message, &data_ptr, sizeof(std::uint64_t));

message += sizeof(std::uint64_t);
uct_md_mkey_pack(pd_, uct_mem_, rkey_.data());
uct_md_mkey_pack(context_.pd_, uct_mem_, rkey_.data());
std::memcpy(message, rkey_.data(), rkey_.size());
}

// std::cout << "sending " << header_.size() << " " << header_.length() << "\n";

// Notify the receiver that the message is ready to be read
std::uint64_t payload = header_.length();
status = uct_ep_am_short(
am_ep_, read_message, receive_handle_, &payload, sizeof(std::uint64_t));
status = UCS_ERR_NO_RESOURCE;
while (status == UCS_ERR_NO_RESOURCE)
{
status = uct_ep_am_short(
am_ep_, read_message, receive_handle_, &payload, sizeof(std::uint64_t));
context_.progress();
}
if (status != UCS_OK)
{
std::cerr << ucs_status_string(status) << '\n';
Expand All @@ -264,27 +266,28 @@ namespace hpx { namespace parcelset { namespace policies { namespace ucx

if (uct_mem_ != nullptr)
{
uct_md_mem_dereg(pd_, uct_mem_);
uct_md_mem_dereg(context_.pd_, uct_mem_);
uct_mem_ = nullptr;
}

buffer_.clear();
postprocess_handler_(ec, there_, this_);

// This is needed to keep ourselv alive long enough...
std::shared_ptr<sender> res;
std::swap(this_, res);

postprocess_handler_(ec, there_, res);

return res;
}

// we use this to keep ourselves alive during async sends...
std::shared_ptr<sender> this_;

parcelset::locality there_;
uct_iface_h am_iface_;
ucx_context &context_;
uct_ep_h am_ep_;
uct_ep_h rma_ep_;
uct_md_h pd_;

header header_;
std::vector<char> rkey_;
Expand Down

0 comments on commit 00c495d

Please sign in to comment.