Skip to content

Commit

Permalink
Unify access_data trait for use in both, serialization and de-seriali…
Browse files Browse the repository at this point in the history
…zation

- adding daft example demonstrating the use of this trait for writing/reading to a file
  • Loading branch information
hkaiser committed May 28, 2017
1 parent 847d58f commit 46e915f
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 135 deletions.
1 change: 1 addition & 0 deletions examples/quickstart/CMakeLists.txt
Expand Up @@ -25,6 +25,7 @@ set(example_programs
fibonacci_one
fibonacci_futures
fibonacci_futures_distributed
file_serialization
fractals
fractals_struct
hello_world
Expand Down
120 changes: 120 additions & 0 deletions examples/quickstart/file_serialization.cpp
@@ -0,0 +1,120 @@
// Copyright (c) 2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// This example demonstrates how the HPX serialization archives could be used
// to directly store/load to/from a file.

#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>
#include <hpx/traits/serialization_access_data.hpp>

#include <stdexcept>
#include <fstream>

struct file_wrapper
{
file_wrapper(std::string const& name, std::ios_base::openmode mode)
: stream_(name.c_str(), mode), size_(0)
{
if (!stream_.is_open())
throw std::runtime_error("Couldn't open file: " + name);

std::fstream::pos_type fsize = stream_.tellg();
stream_.seekg(0, std::ios::end);
size_ = stream_.tellg();
stream_.seekg(fsize, std::ios::beg);
}

std::istream& read(char* s, std::streamsize count) const
{
return stream_.read(s, count);
}

std::ostream& write(char const* s, std::streamsize count)
{
return stream_.write(s, count);
}

std::size_t size() const
{
return size_;
}

void resize(std::size_t count)
{
size_ += count;
}

private:
mutable std::fstream stream_;
std::size_t size_;
};

namespace hpx { namespace traits
{
template <>
struct serialization_access_data<file_wrapper>
: default_serialization_access_data<file_wrapper>
{
static std::size_t size(file_wrapper const& cont)
{
return cont.size();
}

static void resize(file_wrapper& cont, std::size_t count)
{
return cont.resize(cont.size() + count);
}

static void write(file_wrapper& cont, std::size_t count,
std::size_t current, void const* address)
{
cont.write(reinterpret_cast<char const*>(address), count);
}

// functions related to input operations
static void read(file_wrapper const& cont, std::size_t count,
std::size_t current, void* address)
{
cont.read(reinterpret_cast<char*>(address), count);
}
};
}}

int main(int argc, char* argv[])
{
std::size_t size = 0;
std::vector<double> os;
{
file_wrapper buffer("file_serialization_test.archive",
std::ios_base::out | std::ios_base::binary | std::ios_base::trunc);
hpx::serialization::output_archive oarchive(buffer);
for(double c = -100.0; c < +100.0; c += 1.3)
{
os.push_back(c);
}
oarchive << os;
size = oarchive.bytes_written();
}

{
file_wrapper buffer("file_serialization_test.archive",
std::ios_base::in | std::ios_base::binary);
hpx::serialization::input_archive iarchive(buffer, size);
std::vector<double> is;
iarchive >> is;
for(std::size_t i = 0; i < os.size(); ++i)
{
if (os[i] != is[i])
{
std::cerr << "Mismatch for element " << i << ":"
<< os[i] << " != " << is[i] << "\n";
}
}
}
return 0;
}


47 changes: 25 additions & 22 deletions hpx/runtime/serialization/detail/preprocess.hpp
Expand Up @@ -17,6 +17,7 @@
#include <hpx/lcos/future.hpp>
#include <hpx/lcos/local/promise.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/traits/serialization_access_data.hpp>

#include <cstddef>
#include <map>
Expand All @@ -26,13 +27,11 @@

namespace hpx { namespace serialization { namespace detail
{
template <typename Container>
struct access_data;

class preprocess
{
typedef hpx::lcos::local::spinlock mutex_type;
typedef std::map<const naming::gid_type*, naming::gid_type> split_gids_map;

public:
preprocess()
: size_(0)
Expand Down Expand Up @@ -137,51 +136,55 @@ namespace hpx { namespace serialization { namespace detail

hpx::lcos::local::promise<void> promise_;
};
}}}

namespace hpx { namespace traits
{
template <>
struct access_data<preprocess>
struct serialization_access_data<serialization::detail::preprocess>
: default_serialization_access_data<serialization::detail::preprocess>
{
typedef std::true_type preprocessing_only;

HPX_CONSTEXPR static bool is_preprocessing() { return true; }

static std::size_t size(serialization::detail::preprocess const& cont)
{
return cont.size();
}

static void resize(serialization::detail::preprocess& cont,
std::size_t count)
{
return cont.resize(cont.size() + count);
}

// functions related to output operations
static void await_future(
preprocess& cont
serialization::detail::preprocess& cont
, hpx::lcos::detail::future_data_refcnt_base & future_data)
{
cont.await_future(future_data);
}

static void add_gid(preprocess& cont,
static void add_gid(serialization::detail::preprocess& cont,
naming::gid_type const & gid,
naming::gid_type const & split_gid)
{
cont.add_gid(gid, split_gid);
}

static bool has_gid(preprocess& cont, naming::gid_type const& gid)
static bool has_gid(serialization::detail::preprocess& cont,
naming::gid_type const& gid)
{
return cont.has_gid(gid);
}

static void
write(preprocess& cont, std::size_t count,
std::size_t current, void const* address)
{
}

HPX_CONSTEXPR static bool
flush(binary_filter* filter, preprocess& cont,
std::size_t current, std::size_t size, std::size_t written)
{
return true;
}

static void reset(preprocess& cont)
static void reset(serialization::detail::preprocess& cont)
{
cont.reset();
}
};
}}}
}}

#endif
43 changes: 11 additions & 32 deletions hpx/runtime/serialization/input_container.hpp
Expand Up @@ -12,6 +12,7 @@
#include <hpx/runtime/serialization/container.hpp>
#include <hpx/runtime/serialization/serialization_chunk.hpp>
#include <hpx/throw_exception.hpp>
#include <hpx/traits/serialization_access_data.hpp>
#include <hpx/util/assert.hpp>

#include <cstddef> // for size_t
Expand All @@ -23,9 +24,11 @@
namespace hpx { namespace serialization
{
template <typename Container>
struct input_container: erased_input_container
struct input_container : erased_input_container
{
private:
typedef traits::serialization_access_data<Container> access_traits;

std::size_t get_chunk_size(std::size_t chunk) const
{
return (*chunks_)[chunk].size_;
Expand Down Expand Up @@ -73,8 +76,8 @@ namespace hpx { namespace serialization
{
filter_.reset(filter);
if (filter) {
current_ = filter->init_data(&cont_[current_],
cont_.size()-current_, decompressed_size_);
current_ = access_traits::init_data(cont_, filter_.get(),
current_, decompressed_size_);

if (decompressed_size_ < current_)
{
Expand All @@ -93,47 +96,23 @@ namespace hpx { namespace serialization
}
else {
std::size_t new_current = current_ + count;
if (new_current > cont_.size())
if (new_current > access_traits::size(cont_))
{
HPX_THROW_EXCEPTION(serialization_error
, "input_container::load_binary"
, "archive data bstream is too short");
return;
}

void const* src = &cont_[current_];
switch (count)
{
case 8:
*static_cast<std::uint64_t*>(address) =
*static_cast<std::uint64_t const*>(src);
break;

case 4:
*static_cast<std::uint32_t*>(address) =
*static_cast<std::uint32_t const*>(src);
break;

case 2:
*static_cast<std::uint16_t*>(address) =
*static_cast<std::uint16_t const*>(src);
break;

case 1:
*static_cast<std::uint8_t*>(address) =
*static_cast<std::uint8_t const*>(src);
break;

default:
std::memcpy(address, src, count);
break;
}
access_traits::read(cont_, count, current_, address);

current_ = new_current;

if (chunks_) {
current_chunk_size_ += count;
// make sure we switch to the next serialization_chunk if necessary

// make sure we switch to the next serialization_chunk if
// necessary
std::size_t current_chunk_size = get_chunk_size(current_chunk_);
if (current_chunk_size != 0 && current_chunk_size_ >=
current_chunk_size)
Expand Down
3 changes: 2 additions & 1 deletion hpx/runtime/serialization/output_archive.hpp
Expand Up @@ -120,7 +120,8 @@ namespace hpx { namespace serialization
binary_filter* filter = nullptr)
: base_type(make_flags(flags, chunks))
, buffer_(detail::create_output_container(buffer, chunks, filter,
typename detail::access_data<Container>::preprocessing_only()))
typename traits::serialization_access_data<Container>::
preprocessing_only()))
{
// endianness needs to be saves separately as it is needed to
// properly interpret the flags
Expand Down

0 comments on commit 46e915f

Please sign in to comment.