Skip to content

Commit

Permalink
Merge pull request #2627 from STEllAR-GROUP/fix_serialization
Browse files Browse the repository at this point in the history
Fix serialization
  • Loading branch information
hkaiser committed May 18, 2017
2 parents 2546c6b + b31a628 commit c24d021
Show file tree
Hide file tree
Showing 17 changed files with 627 additions and 167 deletions.
8 changes: 8 additions & 0 deletions hpx/runtime/actions/action_support.hpp
Expand Up @@ -145,6 +145,14 @@ namespace hpx { namespace actions
}
#endif

template <typename Action>
std::uint32_t get_action_id()
{
static std::uint32_t id = get_action_id_from_name(
get_action_name<Action>());
return id;
}

#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
template <typename Action>
util::itt::string_handle const& get_action_name_itt()
Expand Down
6 changes: 5 additions & 1 deletion hpx/runtime/actions/base_action.hpp
Expand Up @@ -24,8 +24,8 @@
#include <hpx/util/itt_notify.hpp>
#endif

#include <cstdint>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <utility>

Expand Down Expand Up @@ -121,6 +121,10 @@ namespace hpx { namespace actions
naming::gid_type&& target, naming::address_type lva,
std::size_t num_thread, bool& deferred_schedule) = 0;

/// The function \a get_serialization_id returns the id which has been
/// associated with this action (mainly used for serialization purposes).
virtual std::uint32_t get_action_id() const = 0;

#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
/// The function \a get_action_name_itt returns the name of this action
/// as a ITT string_handle
Expand Down
7 changes: 7 additions & 0 deletions hpx/runtime/actions/transfer_base_action.hpp
Expand Up @@ -136,6 +136,13 @@ namespace hpx { namespace actions
return detail::get_action_name<derived_type>();
}

/// The function \a get_serialization_id returns the id which has been
/// associated with this action (mainly used for serialization purposes).
std::uint32_t get_action_id() const
{
return detail::get_action_id<derived_type>();
}

#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
/// The function \a get_action_name_itt returns the name of this action
/// as a ITT string_handle
Expand Down
10 changes: 10 additions & 0 deletions hpx/runtime/actions_fwd.hpp
Expand Up @@ -9,6 +9,8 @@
#include <hpx/config.hpp>
#include <hpx/runtime/actions/continuation_fwd.hpp>

#include <cstdint>

namespace hpx { namespace actions
{
/// \cond NOINTERNAL
Expand All @@ -24,6 +26,14 @@ namespace hpx { namespace actions

template <typename Component, typename Signature, typename Derived>
struct basic_action;

namespace detail
{
HPX_API_EXPORT std::uint32_t get_action_id_from_name(
char const* action_name);
}

/// \endcond
}}

#endif
Expand Down
1 change: 1 addition & 0 deletions hpx/runtime/serialization/basic_archive.hpp
Expand Up @@ -28,6 +28,7 @@ namespace hpx { namespace serialization

enum archive_flags
{
no_archive_flags = 0x00000000,
enable_compression = 0x00002000,
endian_big = 0x00004000,
endian_little = 0x00008000,
Expand Down
11 changes: 7 additions & 4 deletions hpx/runtime/serialization/detail/preprocess.hpp
Expand Up @@ -21,6 +21,7 @@
#include <cstddef>
#include <map>
#include <mutex>
#include <type_traits>
#include <utility>

namespace hpx { namespace serialization { namespace detail
Expand Down Expand Up @@ -113,7 +114,7 @@ namespace hpx { namespace serialization { namespace detail
{
std::lock_guard<mutex_type> l(mtx_);
done_ = true;
if(num_futures_ == triggered_futures_)
if (num_futures_ == triggered_futures_)
{
promise_.set_value();
}
Expand All @@ -140,7 +141,9 @@ namespace hpx { namespace serialization { namespace detail
template <>
struct access_data<preprocess>
{
static bool is_preprocessing() { return true; }
typedef std::true_type preprocessing_only;

HPX_CONSTEXPR static bool is_preprocessing() { return true; }

static void await_future(
preprocess& cont
Expand All @@ -161,13 +164,13 @@ namespace hpx { namespace serialization { namespace detail
return cont.has_gid(gid);
}

static void
HPX_CONSTEXPR static void
write(preprocess& cont, std::size_t count,
std::size_t current, void const* address)
{
}

static bool
HPX_CONSTEXPR static bool
flush(binary_filter* filter, preprocess& cont,
std::size_t current, std::size_t size, std::size_t written)
{
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/serialization/input_archive.hpp
Expand Up @@ -41,8 +41,8 @@ namespace hpx { namespace serialization

template <typename Container>
input_archive(Container & buffer,
std::size_t inbound_data_size = 0,
const std::vector<serialization_chunk>* chunks = nullptr)
std::size_t inbound_data_size = 0,
const std::vector<serialization_chunk>* chunks = nullptr)
: base_type(0U)
, buffer_(new input_container<Container>(buffer, chunks, inbound_data_size))
{
Expand Down
49 changes: 39 additions & 10 deletions hpx/runtime/serialization/input_container.hpp
Expand Up @@ -50,15 +50,17 @@ namespace hpx { namespace serialization
input_container(Container const& cont, std::size_t inbound_data_size)
: cont_(cont), current_(0), filter_(),
decompressed_size_(inbound_data_size),
chunks_(nullptr), current_chunk_(std::size_t(-1)), current_chunk_size_(0)
chunks_(nullptr), current_chunk_(std::size_t(-1)),
current_chunk_size_(0)
{}

input_container(Container const& cont,
std::vector<serialization_chunk> const* chunks,
std::size_t inbound_data_size)
: cont_(cont), current_(0), filter_(),
decompressed_size_(inbound_data_size),
chunks_(nullptr), current_chunk_(std::size_t(-1)), current_chunk_size_(0)
chunks_(nullptr), current_chunk_(std::size_t(-1)),
current_chunk_size_(0)
{
if (chunks && chunks->size() != 0)
{
Expand Down Expand Up @@ -90,19 +92,44 @@ namespace hpx { namespace serialization
filter_->load(address, count);
}
else {
if (current_+count > cont_.size())
std::size_t new_current = current_ + count;
if (new_current > cont_.size())
{
HPX_THROW_EXCEPTION(serialization_error
, "input_container::load_binary"
, "archive data bstream is too short");
return;
}

if (count == 1)
*static_cast<unsigned char*>(address) = cont_[current_];
else
std::memcpy(address, &cont_[current_], count);
current_ += count;
void const* src = &cont_[current_];
switch (count)
{
case 8:
*static_cast<std::uint64_t*>(address) =
*static_cast<std::uint64_t const*>(src);
break;

case 4:
*static_cast<std::uint32_t*>(address) =
*static_cast<std::uint32_t const*>(src);
break;

case 2:
*static_cast<std::uint16_t*>(address) =
*static_cast<std::uint16_t const*>(src);
break;

case 1:
*static_cast<std::uint8_t*>(address) =
*static_cast<std::uint8_t const*>(src);
break;

default:
std::memcpy(address, src, count);
break;
}

current_ = new_current;

if (chunks_) {
current_chunk_size_ += count;
Expand Down Expand Up @@ -130,8 +157,10 @@ namespace hpx { namespace serialization
{
HPX_ASSERT((std::int64_t)count >= 0);

if (filter_.get() || chunks_ == nullptr ||
count < HPX_ZERO_COPY_SERIALIZATION_THRESHOLD) {
if (chunks_ == nullptr ||
count < HPX_ZERO_COPY_SERIALIZATION_THRESHOLD ||
filter_)
{
// fall back to serialization_chunk-less archive
this->input_container::load_binary(address, count);
}
Expand Down
92 changes: 83 additions & 9 deletions hpx/runtime/serialization/output_archive.hpp
Expand Up @@ -33,20 +33,94 @@

namespace hpx { namespace serialization
{
namespace detail
{
template <typename Container>
inline std::unique_ptr<erased_output_container>
create_output_container(Container& buffer,
std::vector<serialization_chunk>* chunks,
binary_filter* filter, std::false_type)
{
std::unique_ptr<erased_output_container> res;
if (filter == nullptr)
{
if (chunks == nullptr)
{
res.reset(new output_container<Container, basic_chunker>(
buffer));
}
else
{
res.reset(new output_container<Container, vector_chunker>(
buffer, chunks));
}
}
else
{
if (chunks == nullptr)
{
res.reset(
new filtered_output_container<Container, basic_chunker>(
buffer));
}
else
{
res.reset(
new filtered_output_container<Container, vector_chunker>(
buffer, chunks));
}
}
return res;
}

template <typename Container>
inline std::unique_ptr<erased_output_container>
create_output_container(Container& buffer,
std::vector<serialization_chunk>* chunks,
binary_filter* filter, std::true_type)
{
std::unique_ptr<erased_output_container> res;
if (filter == nullptr)
{
res.reset(new output_container<Container, counting_chunker>(
buffer, chunks));
}
else
{
res.reset(
new filtered_output_container<Container, counting_chunker>(
buffer, chunks));
}
return res;
}
}

struct HPX_EXPORT output_archive
: basic_archive<output_archive>
{
private:
static std::uint32_t make_flags(std::uint32_t flags,
std::vector<serialization_chunk>* chunks)
{
return flags |
(chunks == nullptr ?
archive_flags::disable_data_chunking :
archive_flags::no_archive_flags);
}

public:
typedef basic_archive<output_archive> base_type;

typedef std::map<const naming::gid_type*, naming::gid_type> split_gids_type;

template <typename Container>
output_archive(Container & buffer,
std::uint32_t flags = 0U,
std::vector<serialization_chunk>* chunks = nullptr,
binary_filter* filter = nullptr)
: base_type(flags)
, buffer_(new output_container<Container>(buffer, chunks, filter))
std::uint32_t flags = 0U,
std::vector<serialization_chunk>* chunks = nullptr,
binary_filter* filter = nullptr)
: base_type(make_flags(flags, chunks))
, buffer_(detail::create_output_container(buffer, chunks, filter,
typename detail::access_data<Container>::preprocessing_only()))
{
// endianness needs to be saves separately as it is needed to
// properly interpret the flags
Expand All @@ -57,7 +131,7 @@ namespace hpx { namespace serialization

// send flags sent by the other end to make sure both ends have
// the same assumptions about the archive format
save(flags);
save(this->flags_);

bool has_filter = filter != nullptr;
save(has_filter);
Expand Down Expand Up @@ -284,10 +358,10 @@ namespace hpx { namespace serialization
{
if(count == 0) return;
size_ += count;
if(disable_data_chunking())
buffer_->save_binary(address, count);
if (disable_data_chunking())
buffer_->save_binary(address, count);
else
buffer_->save_binary_chunk(address, count);
buffer_->save_binary_chunk(address, count);
}

typedef std::map<const void *, std::uint64_t> pointer_tracker;
Expand Down

0 comments on commit c24d021

Please sign in to comment.