Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: mockingbirdnest/Principia
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 8477f25cdf28
Choose a base ref
...
head repository: mockingbirdnest/Principia
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 9615cd83d21e
Choose a head ref
  • 19 commits
  • 15 files changed
  • 1 contributor

Commits on Apr 5, 2018

  1. Copy the full SHA
    a4170e0 View commit details

Commits on Apr 8, 2018

  1. Copy the full SHA
    4c50b14 View commit details
  2. A approximation of Push.

    pleroy committed Apr 8, 2018
    Copy the full SHA
    02e71cf View commit details
  3. Sink and source declarations.

    pleroy committed Apr 8, 2018
    Copy the full SHA
    375fa40 View commit details

Commits on Apr 9, 2018

  1. A sink/source implementation.

    pleroy committed Apr 9, 2018
    Copy the full SHA
    32062b9 View commit details
  2. Fix some compilation errors.

    pleroy committed Apr 9, 2018
    Copy the full SHA
    32b9ee1 View commit details

Commits on Apr 10, 2018

  1. Make the sink/source compile.

    pleroy committed Apr 10, 2018
    Copy the full SHA
    af6c51e View commit details

Commits on Apr 11, 2018

  1. Copy the full SHA
    b2d451e View commit details
  2. A test.

    pleroy committed Apr 11, 2018
    Copy the full SHA
    1d56d27 View commit details
  3. Preview 3.

    pleroy committed Apr 11, 2018
    Copy the full SHA
    eb127cc View commit details

Commits on Apr 13, 2018

  1. Copy the full SHA
    f1f8316 View commit details

Commits on Apr 22, 2018

  1. Merge.

    pleroy committed Apr 22, 2018
    Copy the full SHA
    a9763b3 View commit details
  2. Comments.

    pleroy committed Apr 22, 2018
    Copy the full SHA
    d093160 View commit details
  3. Cleanup.

    pleroy committed Apr 22, 2018
    Copy the full SHA
    5502946 View commit details
  4. Copy the full SHA
    68831ff View commit details
  5. Copy the full SHA
    fd941b2 View commit details
  6. Lint.

    pleroy committed Apr 22, 2018
    Copy the full SHA
    de29236 View commit details

Commits on Apr 25, 2018

  1. After egg's review.

    pleroy committed Apr 25, 2018
    Copy the full SHA
    157497d View commit details
  2. Merge pull request #1799 from pleroy/Gipfeli

    Support for Gipfeli compression in PullSerializer
    pleroy authored Apr 25, 2018
    Copy the full SHA
    9615cd8 View commit details
1 change: 1 addition & 0 deletions Principia.sln
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Project Property Sheets", "
ProjectSection(SolutionItems) = preProject
generate_version_translation_unit.props = generate_version_translation_unit.props
google_benchmark.props = google_benchmark.props
google_gipfeli.props = google_gipfeli.props
google_glog.props = google_glog.props
google_googlemock_main.props = google_googlemock_main.props
google_googletest.props = google_googletest.props
2 changes: 2 additions & 0 deletions base/base.vcxproj
Original file line number Diff line number Diff line change
@@ -40,6 +40,8 @@
<ClInclude Include="ranges_body.hpp" />
<ClInclude Include="shared_lock_guard.hpp" />
<ClInclude Include="shared_lock_guard_body.hpp" />
<ClInclude Include="sink_source.hpp" />
<ClInclude Include="sink_source_body.hpp" />
<ClInclude Include="status.hpp" />
<ClInclude Include="status_or.hpp" />
<ClInclude Include="status_or_body.hpp" />
6 changes: 6 additions & 0 deletions base/base.vcxproj.filters
Original file line number Diff line number Diff line change
@@ -143,6 +143,12 @@
<ClInclude Include="ranges_body.hpp">
<Filter>Source Files</Filter>
</ClInclude>
<ClInclude Include="sink_source.hpp">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="sink_source_body.hpp">
<Filter>Source Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="not_null_test.cpp">
31 changes: 28 additions & 3 deletions base/pull_serializer.hpp
Original file line number Diff line number Diff line change
@@ -11,13 +11,16 @@
#include "base/array.hpp"
#include "base/macros.hpp"
#include "base/not_null.hpp"
#include "gipfeli/compression.h"
#include "google/protobuf/message.h"
#include "google/protobuf/io/zero_copy_stream.h"

namespace principia {
namespace base {
namespace internal_pull_serializer {

using google::compression::Compressor;

// An output stream based on an array that delegates to a function the handling
// of the case where one array is full. It calls the |on_full| function passed
// at construction and proceeds with filling the array returned by that
@@ -55,11 +58,13 @@ class DelegatingArrayOutputStream
// irrespective of the size of the message to serialize.
class PullSerializer final {
public:
// The |size| of the data objects returned by |Pull| are never greater than
// The |size| of the data objects enqueued by |Push| is never greater than
// |chunk_size|. At most |number_of_chunks| chunks are held in the internal
// queue. This class uses at most
// |number_of_chunks * (chunk_size + O(1)) + O(1)| bytes.
PullSerializer(int chunk_size, int number_of_chunks);
// |number_of_chunks * (chunk_size + O(1)) + O(1)| bytes. Note that in the
// presence of compression |chunk_size| is replaced by |compressed_chunk_size|
// in this formula.
PullSerializer(int chunk_size, int number_of_chunks, Compressor* compressor);
~PullSerializer();

// Starts the serializer, which will proceed to serialize |message|. This
@@ -71,6 +76,11 @@ class PullSerializer final {
// available. Returns a |Bytes| object of |size| 0 at the end of the
// serialization. The returned object may become invalid the next time |Pull|
// is called.
// In the absence of compression, the data produced by |Pull| constitute a
// stream and the boundaries between chunks are irrelevant. In the presence
// of compression however, the data producted by |Pull| are made of blocks and
// the boundaries between chunks are relevant and must be preserved by the
// clients and used when feeding data back to the deserializer.
Bytes Pull();

private:
@@ -81,9 +91,24 @@ class PullSerializer final {

std::unique_ptr<google::protobuf::Message const> message_;

Compressor* const compressor_;

// The chunk size passed at construction. The stream outputs chunks of that
// size.
int const chunk_size_;

// The maximum size of a chunk after compression. Greater than |chunk_size_|
// because the compressor will occasionally expand data. This is the size of
// the chunks in |data_|.
int const compressed_chunk_size_;

// The number of chunks passed at construction, used to size |data_|.
int const number_of_chunks_;

// How many of the |number_of_chunks_| chunks in |data_| are reserved for
// compression.
int const number_of_compression_chunks_;

// The array supporting the stream and the stream itself.
std::unique_ptr<std::uint8_t[]> data_;
DelegatingArrayOutputStream stream_;
58 changes: 47 additions & 11 deletions base/pull_serializer_body.hpp
Original file line number Diff line number Diff line change
@@ -5,6 +5,8 @@

#include <algorithm>

#include "base/sink_source.hpp"

namespace principia {
namespace base {
namespace internal_pull_serializer {
@@ -61,19 +63,32 @@ inline std::int64_t DelegatingArrayOutputStream::ByteCount() const {
}

inline PullSerializer::PullSerializer(int const chunk_size,
int const number_of_chunks)
: chunk_size_(chunk_size),
int const number_of_chunks,
Compressor* const compressor)
: compressor_(compressor),
chunk_size_(chunk_size),
compressed_chunk_size_(
compressor == nullptr ? chunk_size_
: compressor->MaxCompressedLength(chunk_size_)),
number_of_chunks_(number_of_chunks),
data_(std::make_unique<std::uint8_t[]>(chunk_size_ * number_of_chunks_)),
number_of_compression_chunks_(compressor == nullptr ? 0 : 1),
data_(std::make_unique<std::uint8_t[]>(compressed_chunk_size_ *
number_of_chunks_)),
stream_(Bytes(data_.get(), chunk_size_),
std::bind(&PullSerializer::Push, this, _1)) {
// Check the compatibility of the wait conditions in Push and Pull.
CHECK_GT(number_of_chunks_ - number_of_compression_chunks_ - 1, 1);

// Mark all the chunks as free except the last one which is a sentinel for the
// |queue_|. The 0th chunk has been passed to the stream, but it's still free
// until the first call to |on_full|.
// until the first call to |on_full|. Note that the last
// |compressed_chunk_size_ - chunk_size_| bytes of each chunk are not
// considered as free.
for (int i = 0; i < number_of_chunks_ - 1; ++i) {
free_.push(data_.get() + i * chunk_size_);
free_.push(data_.get() + i * compressed_chunk_size_);
}
queue_.push(Bytes(data_.get() + (number_of_chunks_ - 1) * chunk_size_, 0));
queue_.push(
Bytes(data_.get() + (number_of_chunks_ - 1) * compressed_chunk_size_, 0));
}

inline PullSerializer::~PullSerializer() {
@@ -107,7 +122,7 @@ inline Bytes PullSerializer::Pull() {
// The element at the front of the queue is the one that was last returned
// by |Pull| and must be dropped and freed.
queue_has_elements_.wait(l, [this]() { return queue_.size() > 1; });
CHECK_LE(2u, queue_.size());
CHECK_LE(2, queue_.size());
free_.push(queue_.front().data);
queue_.pop();
result = queue_.front();
@@ -117,18 +132,39 @@ inline Bytes PullSerializer::Pull() {
return result;
}

inline Bytes PullSerializer::Push(Bytes const bytes) {
inline Bytes PullSerializer::Push(Bytes bytes) {
Bytes result;
CHECK_GE(chunk_size_, bytes.size);
if (bytes.size > 0 && compressor_ != nullptr) {
Bytes compressed_bytes;
{
std::unique_lock<std::mutex> l(lock_);
CHECK_LE(1 + number_of_compression_chunks_, free_.size());
free_.pop();
compressed_bytes = Bytes(free_.front(), compressed_chunk_size_);
free_.push(bytes.data);
}
// We maintain the invariant that the chunk being filled is at the front of
// the |free_| queue.
ArraySource<std::uint8_t> source(bytes);
ArraySink<std::uint8_t> sink(compressed_bytes);
compressor_->CompressStream(&source, &sink);
{
std::unique_lock<std::mutex> l(lock_);
bytes = sink.array();
}
}
{
std::unique_lock<std::mutex> l(lock_);
queue_has_room_.wait(l, [this]() {
// -1 here is because we want to ensure that there is an entry in the
// (real) free list.
return queue_.size() < static_cast<std::size_t>(number_of_chunks_) - 1;
// free list, in addition to |result| and to
// |number_of_compression_chunks_| (if present).
return queue_.size() < static_cast<std::size_t>(number_of_chunks_) -
number_of_compression_chunks_ - 1;
});
queue_.emplace(bytes.data, bytes.size);
CHECK_LE(2u, free_.size());
CHECK_LE(2 + number_of_compression_chunks_, free_.size());
CHECK_EQ(free_.front(), bytes.data);
free_.pop();
result = Bytes(free_.front(), chunk_size_);
54 changes: 51 additions & 3 deletions base/pull_serializer_test.cpp
Original file line number Diff line number Diff line change
@@ -6,13 +6,16 @@
#include <string>
#include <vector>

#include "gipfeli/compression.h"
#include "gipfeli/gipfeli.h"
#include "gmock/gmock.h"
#include "serialization/physics.pb.h"

namespace principia {
namespace base {
namespace internal_pull_serializer {

using google::compression::Compressor;
using serialization::DiscreteTrajectory;
using serialization::Pair;
using serialization::Point;
@@ -32,7 +35,9 @@ class PullSerializerTest : public ::testing::Test {
protected:
PullSerializerTest()
: pull_serializer_(
std::make_unique<PullSerializer>(chunk_size, number_of_chunks)),
std::make_unique<PullSerializer>(chunk_size,
number_of_chunks,
/*compressor=*/nullptr)),
stream_(Bytes(data_, small_chunk_size),
std::bind(&PullSerializerTest::OnFull,
this,
@@ -118,6 +123,48 @@ TEST_F(PullSerializerTest, SerializationSizes) {
EXPECT_THAT(actual_sizes, ElementsAreArray(expected_sizes));
}

TEST_F(PullSerializerTest, SerializationGipfeli) {
Compressor* compressor = google::compression::NewGipfeliCompressor();
std::string uncompressed1;
std::string uncompressed2;
{
auto trajectory = BuildTrajectory();
pull_serializer_->Start(std::move(trajectory));
for (;;) {
Bytes const bytes = pull_serializer_->Pull();
if (bytes.size == 0) {
break;
}
for (int i = 0; i < bytes.size; ++i) {
uncompressed1.append(1, bytes.data[i]);
}
}
}
{
auto const compressed_pull_serializer =
std::make_unique<PullSerializer>(chunk_size,
/*number_of_chunks=*/4,
compressor);
auto trajectory = BuildTrajectory();
compressed_pull_serializer->Start(std::move(trajectory));
for (;;) {
Bytes const bytes = compressed_pull_serializer->Pull();
if (bytes.size == 0) {
break;
}
std::string compressed;
std::string uncompressed;
for (int i = 0; i < bytes.size; ++i) {
compressed.append(1, bytes.data[i]);
}
compressor->Uncompress(compressed, &uncompressed);
uncompressed2.append(uncompressed);
}
}

EXPECT_EQ(uncompressed1, uncompressed2);
}

TEST_F(PullSerializerTest, SerializationThreading) {
DiscreteTrajectory read_trajectory;
auto const trajectory = BuildTrajectory();
@@ -136,8 +183,9 @@ TEST_F(PullSerializerTest, SerializationThreading) {
std::uint8_t* data = &actual_serialized_trajectory[0];

// The serialization happens concurrently with the test.
pull_serializer_ =
std::make_unique<PullSerializer>(chunk_size, number_of_chunks);
pull_serializer_ = std::make_unique<PullSerializer>(chunk_size,
number_of_chunks,
/*compressor=*/nullptr);
pull_serializer_->Start(std::move(trajectory));
for (;;) {
Bytes const bytes = pull_serializer_->Pull();
1 change: 1 addition & 0 deletions base/push_deserializer.hpp
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
#include "base/array.hpp"
#include "base/macros.hpp"
#include "base/not_null.hpp"
#include "gipfeli/gipfeli.h"
#include "google/protobuf/message.h"
#include "google/protobuf/io/zero_copy_stream.h"

2 changes: 0 additions & 2 deletions base/push_deserializer_body.hpp
Original file line number Diff line number Diff line change
@@ -12,8 +12,6 @@ namespace principia {
namespace base {
namespace internal_push_deserializer {

using std::swap;

inline DelegatingArrayInputStream::DelegatingArrayInputStream(
std::function<Bytes()> on_empty)
: on_empty_(std::move(on_empty)),
9 changes: 6 additions & 3 deletions base/push_deserializer_test.cpp
Original file line number Diff line number Diff line change
@@ -39,8 +39,10 @@ const char start[] = "START";
class PushDeserializerTest : public ::testing::Test {
protected:
PushDeserializerTest()
: pull_serializer_(std::make_unique<PullSerializer>(serializer_chunk_size,
number_of_chunks)),
: pull_serializer_(
std::make_unique<PullSerializer>(serializer_chunk_size,
number_of_chunks,
/*compressor=*/nullptr)),
push_deserializer_(
std::make_unique<PushDeserializer>(deserializer_chunk_size,
number_of_chunks)),
@@ -183,7 +185,8 @@ TEST_F(PushDeserializerTest, SerializationDeserialization) {
std::uint8_t* data = &storage[0];

pull_serializer_ = std::make_unique<PullSerializer>(serializer_chunk_size,
number_of_chunks);
number_of_chunks,
/*compressor=*/nullptr);
push_deserializer_ = std::make_unique<PushDeserializer>(
deserializer_chunk_size, number_of_chunks);

59 changes: 59 additions & 0 deletions base/sink_source.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@

#pragma once

#include <cstdint>

#include "base/array.hpp"
#include "gipfeli/sinksource.h"

namespace principia {
namespace base {
namespace internal_sink_source {

template<typename Element>
class ArraySource : public google::compression::Source {
public:
explicit ArraySource(Array<Element> const& array);
~ArraySource() override = default;

size_t Available() const override;

const char* Peek(size_t* length) override;

void Skip(size_t n) override;

private:
const Array<Element> array_;
std::int64_t next_to_read_ = 0;
};

template<typename Element>
class ArraySink : public google::compression::Sink {
public:
explicit ArraySink(Array<Element> const& array);
~ArraySink() override = default;

Array<Element> array() const;

void Append(const char* data, size_t n) override;

char* GetAppendBuffer(size_t min_size,
size_t desired_size_hint,
char* scratch,
size_t scratch_size,
size_t* allocated_size) override;

private:
const Array<Element> array_;
std::int64_t next_to_write_ = 0;
};

} // namespace internal_sink_source

using internal_sink_source::ArraySink;
using internal_sink_source::ArraySource;

} // namespace base
} // namespace principia

#include "base/sink_source_body.hpp"
Loading