Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: mockingbirdnest/Principia
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: a7f9a2b0c7ca
Choose a base ref
...
head repository: mockingbirdnest/Principia
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 1fb11386d2a1
Choose a head ref
  • 11 commits
  • 8 files changed
  • 1 contributor

Commits on Apr 24, 2018

  1. Copy the full SHA
    d8ad241 View commit details
  2. The old tests pass.

    pleroy committed Apr 24, 2018
    Copy the full SHA
    76c05ed View commit details
  3. Start to write a test.

    pleroy committed Apr 24, 2018
    Copy the full SHA
    d08eab8 View commit details

Commits on Apr 25, 2018

  1. The test passes.

    pleroy committed Apr 25, 2018
    Copy the full SHA
    ad705b4 View commit details
  2. Copy the full SHA
    11ff758 View commit details
  3. One new test.

    pleroy committed Apr 25, 2018
    Copy the full SHA
    7db5f34 View commit details
  4. Fix the client.

    pleroy committed Apr 25, 2018
    Copy the full SHA
    10cb4ed View commit details

Commits on Apr 26, 2018

  1. Comments.

    pleroy committed Apr 26, 2018
    Copy the full SHA
    8d055a7 View commit details
  2. Copy the full SHA
    dde7e55 View commit details

Commits on Apr 27, 2018

  1. Copy the full SHA
    6434c5b View commit details

Commits on Apr 29, 2018

  1. Merge pull request #1801 from pleroy/Gipfeli2

    Support for Gipfeli compression in the PushDeserializer
    pleroy authored Apr 29, 2018
    Copy the full SHA
    1fb1138 View commit details
3 changes: 2 additions & 1 deletion base/array_body.hpp
Original file line number Diff line number Diff line change
@@ -27,7 +27,8 @@ UniqueArray<Element>::UniqueArray() : size(0) {}
template<typename Element>
template<typename Size, typename>
UniqueArray<Element>::UniqueArray(Size const size)
: data(new std::uint8_t[static_cast<std::size_t>(size)]),
: data(size == 0 ? nullptr
: new std::uint8_t[static_cast<std::size_t>(size)]),
size(static_cast<std::int64_t>(size)) {}

template<typename Element>
6 changes: 4 additions & 2 deletions base/pull_serializer.hpp
Original file line number Diff line number Diff line change
@@ -64,7 +64,9 @@ class PullSerializer final {
// |number_of_chunks * (chunk_size + O(1)) + O(1)| bytes. Note that in the
// presence of compression |chunk_size| is replaced by |compressed_chunk_size|
// in this formula.
PullSerializer(int chunk_size, int number_of_chunks, Compressor* compressor);
PullSerializer(int chunk_size,
int number_of_chunks,
std::unique_ptr<Compressor> compressor);
~PullSerializer();

// Starts the serializer, which will proceed to serialize |message|. This
@@ -91,7 +93,7 @@ class PullSerializer final {

std::unique_ptr<google::protobuf::Message const> message_;

Compressor* const compressor_;
std::unique_ptr<Compressor> const compressor_;

// The chunk size passed at construction. The stream outputs chunks of that
// size.
11 changes: 6 additions & 5 deletions base/pull_serializer_body.hpp
Original file line number Diff line number Diff line change
@@ -64,14 +64,15 @@ inline std::int64_t DelegatingArrayOutputStream::ByteCount() const {

inline PullSerializer::PullSerializer(int const chunk_size,
int const number_of_chunks,
Compressor* const compressor)
: compressor_(compressor),
std::unique_ptr<Compressor> compressor)
: compressor_(std::move(compressor)),
chunk_size_(chunk_size),
compressed_chunk_size_(
compressor == nullptr ? chunk_size_
: compressor->MaxCompressedLength(chunk_size_)),
compressor_ == nullptr
? chunk_size_
: compressor_->MaxCompressedLength(chunk_size_)),
number_of_chunks_(number_of_chunks),
number_of_compression_chunks_(compressor == nullptr ? 0 : 1),
number_of_compression_chunks_(compressor_ == nullptr ? 0 : 1),
data_(std::make_unique<std::uint8_t[]>(compressed_chunk_size_ *
number_of_chunks_)),
stream_(Bytes(data_.get(), chunk_size_),
9 changes: 5 additions & 4 deletions base/pull_serializer_test.cpp
Original file line number Diff line number Diff line change
@@ -124,7 +124,6 @@ TEST_F(PullSerializerTest, SerializationSizes) {
}

TEST_F(PullSerializerTest, SerializationGipfeli) {
Compressor* compressor = google::compression::NewGipfeliCompressor();
std::string uncompressed1;
std::string uncompressed2;
{
@@ -142,11 +141,13 @@ TEST_F(PullSerializerTest, SerializationGipfeli) {
}
{
auto const compressed_pull_serializer =
std::make_unique<PullSerializer>(chunk_size,
/*number_of_chunks=*/4,
compressor);
std::make_unique<PullSerializer>(
chunk_size,
/*number_of_chunks=*/4,
google::compression::NewGipfeliCompressor());
auto trajectory = BuildTrajectory();
compressed_pull_serializer->Start(std::move(trajectory));
auto compressor = google::compression::NewGipfeliCompressor();
for (;;) {
Bytes const bytes = compressed_pull_serializer->Pull();
if (bytes.size == 0) {
22 changes: 20 additions & 2 deletions base/push_deserializer.hpp
Original file line number Diff line number Diff line change
@@ -11,14 +11,16 @@
#include "base/array.hpp"
#include "base/macros.hpp"
#include "base/not_null.hpp"
#include "gipfeli/gipfeli.h"
#include "gipfeli/compression.h"
#include "google/protobuf/message.h"
#include "google/protobuf/io/zero_copy_stream.h"

namespace principia {
namespace base {
namespace internal_push_deserializer {

using google::compression::Compressor;

// An input stream based on an array that delegates to a function the handling
// of the case where the array is empty. It calls the |on_empty| function
// passed at construction and proceeds with deserializing the array returned by
@@ -57,7 +59,9 @@ class PushDeserializer final {
// |chunk_size|. The internal queue holds at most |number_of_chunks| chunks.
// Therefore, this class uses at most
// |number_of_chunks * (chunk_size + O(1)) + O(1)| bytes.
PushDeserializer(int chunk_size, int number_of_chunks);
PushDeserializer(int chunk_size,
int number_of_chunks,
std::unique_ptr<Compressor> compressor);
~PushDeserializer();

// Starts the deserializer, which will proceed to deserialize data into
@@ -85,8 +89,22 @@ class PushDeserializer final {

std::unique_ptr<google::protobuf::Message> message_;

std::unique_ptr<Compressor> const compressor_;

// The chunk size passed at construction. The stream consumes chunks of that
// size.
int const chunk_size_;

// The maximum size of a chunk after compression. Greater than |chunk_size_|
// because the compressor will occasionally expand data. This is the
// maximum size of the chunks passed to |Push| by the client.
int const compressed_chunk_size_;

// The number of chunks passed at construction, used to size |data_|.
int const number_of_chunks_;

UniqueBytes uncompressed_data_;

DelegatingArrayInputStream stream_;
std::unique_ptr<std::thread> thread_;

47 changes: 39 additions & 8 deletions base/push_deserializer_body.hpp
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@

#include <algorithm>

#include "base/sink_source.hpp"
#include "glog/logging.h"
#include "google/protobuf/io/coded_stream_inl.h"

@@ -74,10 +75,18 @@ inline std::int64_t DelegatingArrayInputStream::ByteCount() const {
return byte_count_;
}

inline PushDeserializer::PushDeserializer(int const chunk_size,
int const number_of_chunks)
: chunk_size_(chunk_size),
inline PushDeserializer::PushDeserializer(
int const chunk_size,
int const number_of_chunks,
std::unique_ptr<Compressor> compressor)
: compressor_(std::move(compressor)),
chunk_size_(chunk_size),
compressed_chunk_size_(
compressor_ == nullptr
? chunk_size_
: compressor_->MaxCompressedLength(chunk_size_)),
number_of_chunks_(number_of_chunks),
uncompressed_data_(chunk_size_),
stream_(std::bind(&PushDeserializer::Pull, this)) {
// This sentinel ensures that the two queue are correctly out of step.
done_.push(nullptr);
@@ -128,22 +137,35 @@ inline void PushDeserializer::Push(Bytes const bytes,
// circumstances. The |done| callback is attached to the last chunk.
Bytes current = bytes;
CHECK_LE(0, bytes.size);

// Decide how much data we are going to push on the queue. In the presence of
// compression we have to respect the boundary of the incoming block. In the
// absence of compression we have a stream so we can cut into as many chunks
// as we like.
int queued_chunk_size;
if (compressor_ == nullptr) {
queued_chunk_size = chunk_size_;
} else {
CHECK_LE(bytes.size, compressed_chunk_size_);
queued_chunk_size = compressed_chunk_size_;
}

bool is_last;
do {
{
is_last = current.size <= chunk_size_;
is_last = current.size <= queued_chunk_size;
std::unique_lock<std::mutex> l(lock_);
queue_has_room_.wait(l, [this]() {
return queue_.size() < static_cast<std::size_t>(number_of_chunks_);
});
queue_.emplace(current.data,
std::min(current.size,
static_cast<std::int64_t>(chunk_size_)));
static_cast<std::int64_t>(queued_chunk_size)));
done_.emplace(is_last ? std::move(done) : nullptr);
}
queue_has_elements_.notify_all();
current.data = &current.data[chunk_size_];
current.size -= chunk_size_;
current.data = &current.data[queued_chunk_size];
current.size -= queued_chunk_size;
} while (!is_last);
}

@@ -161,7 +183,16 @@ inline Bytes PushDeserializer::Pull() {
}
done_.pop();
// Get the next |Bytes| object to process and remove it from |queue_|.
result = queue_.front();
// Uncompress it if needed.
auto const& front = queue_.front();
if (front.size == 0 || compressor_ == nullptr) {
result = front;
} else {
ArraySource<std::uint8_t> source(front);
ArraySink<std::uint8_t> sink(uncompressed_data_.get());
CHECK(compressor_->UncompressStream(&source, &sink));
result = sink.array();
}
queue_.pop();
}
queue_has_room_.notify_all();
166 changes: 127 additions & 39 deletions base/push_deserializer_test.cpp
Original file line number Diff line number Diff line change
@@ -6,15 +6,18 @@
#include <cstring>
#include <functional>
#include <list>
#include <memory>
#include <string>
#include <thread>
#include <vector>

#include "base/array.hpp"
#include "base/not_null.hpp"
#include "base/pull_serializer.hpp"
#include "gipfeli/gipfeli.h"
#include "gmock/gmock.h"
#include "serialization/physics.pb.h"
#include "testing_utilities/matchers.hpp"

namespace principia {
namespace base {
@@ -25,6 +28,7 @@ using serialization::DiscreteTrajectory;
using serialization::Pair;
using serialization::Point;
using serialization::Quantity;
using testing_utilities::EqualsProto;
using ::std::placeholders::_1;
using ::testing::ElementsAreArray;

@@ -45,7 +49,8 @@ class PushDeserializerTest : public ::testing::Test {
/*compressor=*/nullptr)),
push_deserializer_(
std::make_unique<PushDeserializer>(deserializer_chunk_size,
number_of_chunks)),
number_of_chunks,
/*compressor=*/nullptr)),
stream_(std::bind(&PushDeserializerTest::OnEmpty,
this,
std::ref(strings_))) {}
@@ -83,7 +88,6 @@ class PushDeserializerTest : public ::testing::Test {
EXPECT_EQ(actual_serialized, expected_serialized);
}


static void Stomp(Bytes const& bytes) {
std::memset(bytes.data, 0xCD, static_cast<std::size_t>(bytes.size));
}
@@ -98,6 +102,50 @@ class PushDeserializerTest : public ::testing::Test {
static_cast<std::int64_t>(front.size()));
}

// Exercises concurrent serialization and deserialization.
void TestSerializationDeserialization(
std::unique_ptr<Compressor> serializer_compressor,
std::unique_ptr<Compressor> deserializer_compressor) {
auto const trajectory = BuildTrajectory();
int const byte_size = trajectory->ByteSize();
for (int i = 0; i < runs_per_test; ++i) {
auto read_trajectory = make_not_null_unique<DiscreteTrajectory>();
auto written_trajectory = BuildTrajectory();
auto storage = std::make_unique<std::uint8_t[]>(byte_size);
std::uint8_t* data = &storage[0];

pull_serializer_ =
std::make_unique<PullSerializer>(serializer_chunk_size,
/*number_of_chunks=*/4,
std::move(serializer_compressor));
push_deserializer_ =
std::make_unique<PushDeserializer>(
deserializer_chunk_size,
number_of_chunks,
std::move(deserializer_compressor));

pull_serializer_->Start(std::move(written_trajectory));
push_deserializer_->Start(std::move(read_trajectory),
PushDeserializerTest::CheckSerialization);
for (;;) {
Bytes const bytes = pull_serializer_->Pull();
std::memcpy(data, bytes.data, static_cast<std::size_t>(bytes.size));
push_deserializer_->Push(
Bytes(data, bytes.size),
std::bind(&PushDeserializerTest::Stomp, Bytes(data, bytes.size)));
data = &data[bytes.size];
if (bytes.size == 0) {
break;
}
}

// Destroying the deserializer waits until deserialization is done. It is
// important that this happens before |storage| is destroyed.
pull_serializer_.reset();
push_deserializer_.reset();
}
}

std::unique_ptr<PullSerializer> pull_serializer_;
std::unique_ptr<PushDeserializer> push_deserializer_;
DelegatingArrayInputStream stream_;
@@ -149,6 +197,76 @@ TEST_F(PushDeserializerTest, Stream) {
EXPECT_EQ(23, stream_.ByteCount());
}

TEST_F(PushDeserializerTest, DeserializationGipfeli) {
DiscreteTrajectory read_trajectory1;
DiscreteTrajectory read_trajectory2;

auto const written_trajectory = BuildTrajectory();
int const byte_size = written_trajectory->ByteSize();

{
auto const serialized_trajectory =
std::make_unique<std::uint8_t[]>(byte_size);
written_trajectory->SerializePartialToArray(&serialized_trajectory[0],
byte_size);

auto read_trajectory = make_not_null_unique<DiscreteTrajectory>();
push_deserializer_->Start(
std::move(read_trajectory),
[&read_trajectory1](google::protobuf::Message const& read_trajectory) {
read_trajectory1.CopyFrom(read_trajectory);
});
Bytes bytes(serialized_trajectory.get(), byte_size);
push_deserializer_->Push(bytes, nullptr);
push_deserializer_->Push(Bytes(), nullptr);

// Destroying the deserializer waits until deserialization is done.
push_deserializer_.reset();
}
{
auto compressed_push_deserializer =
std::make_unique<PushDeserializer>(
deserializer_chunk_size,
number_of_chunks,
google::compression::NewGipfeliCompressor());
auto const written_trajectory = BuildTrajectory();
int const byte_size = written_trajectory->ByteSize();
auto const uncompressed = written_trajectory->SerializePartialAsString();

auto read_trajectory = make_not_null_unique<DiscreteTrajectory>();
compressed_push_deserializer->Start(
std::move(read_trajectory),
[&read_trajectory2](google::protobuf::Message const& read_trajectory) {
read_trajectory2.CopyFrom(read_trajectory);
});

std::vector<std::unique_ptr<std::uint8_t[]>> compressed_chunks;
auto compressor = google::compression::NewGipfeliCompressor();
for (int i = 0; i < uncompressed.size(); i += deserializer_chunk_size) {
std::string compressed;
compressor->Compress(
uncompressed.substr(
i,
std::min(deserializer_chunk_size,
static_cast<int>(uncompressed.size()) - i)),
&compressed);
compressed_chunks.push_back(
std::make_unique<std::uint8_t[]>(compressed.size()));
for (int j = 0; j < compressed.size(); ++j) {
compressed_chunks.back()[j] = compressed[j];
}
Bytes bytes(compressed_chunks.back().get(), compressed.size());
compressed_push_deserializer->Push(bytes, nullptr);
}
compressed_push_deserializer->Push(Bytes(), nullptr);

// Destroying the deserializer waits until deserialization is done.
compressed_push_deserializer.reset();
}

EXPECT_THAT(read_trajectory1, EqualsProto(read_trajectory2));
}

TEST_F(PushDeserializerTest, DeserializationThreading) {
auto const written_trajectory = BuildTrajectory();
int const byte_size = written_trajectory->ByteSize();
@@ -158,7 +276,7 @@ TEST_F(PushDeserializerTest, DeserializationThreading) {
for (int i = 0; i < runs_per_test; ++i) {
auto read_trajectory = make_not_null_unique<DiscreteTrajectory>();
push_deserializer_ = std::make_unique<PushDeserializer>(
deserializer_chunk_size, number_of_chunks);
deserializer_chunk_size, number_of_chunks, /*compressor=*/nullptr);

written_trajectory->SerializePartialToArray(&serialized_trajectory[0],
byte_size);
@@ -174,45 +292,15 @@ TEST_F(PushDeserializerTest, DeserializationThreading) {
}
}

// Exercise concurrent serialization and deserialization.
TEST_F(PushDeserializerTest, SerializationDeserialization) {
auto const trajectory = BuildTrajectory();
int const byte_size = trajectory->ByteSize();
for (int i = 0; i < runs_per_test; ++i) {
auto read_trajectory = make_not_null_unique<DiscreteTrajectory>();
auto written_trajectory = BuildTrajectory();
auto storage = std::make_unique<std::uint8_t[]>(byte_size);
std::uint8_t* data = &storage[0];

pull_serializer_ = std::make_unique<PullSerializer>(serializer_chunk_size,
number_of_chunks,
/*compressor=*/nullptr);
push_deserializer_ = std::make_unique<PushDeserializer>(
deserializer_chunk_size, number_of_chunks);

pull_serializer_->Start(std::move(written_trajectory));
push_deserializer_->Start(
std::move(read_trajectory), PushDeserializerTest::CheckSerialization);
for (;;) {
Bytes const bytes = pull_serializer_->Pull();
std::memcpy(data, bytes.data, static_cast<std::size_t>(bytes.size));
push_deserializer_->Push(Bytes(data, bytes.size),
std::bind(&PushDeserializerTest::Stomp,
Bytes(data, bytes.size)));
data = &data[bytes.size];
if (bytes.size == 0) {
break;
}
}

// Destroying the deserializer waits until deserialization is done. It is
// important that this happens before |storage| is destroyed.
pull_serializer_.reset();
push_deserializer_.reset();
}
TestSerializationDeserialization(/*serializer_compressor=*/nullptr,
/*deserializer_compressor=*/nullptr);
TestSerializationDeserialization(
/*serializer_compressor=*/google::compression::NewGipfeliCompressor(),
/*deserializer_compressor=*/google::compression::NewGipfeliCompressor());
}

// Check that deserialization fails if we stomp on one extra bytes.
// Check that deserialization fails if we stomp on one extra byte.
TEST_F(PushDeserializerDeathTest, Stomp) {
EXPECT_DEATH({
const int stomp_chunk = 77;
3 changes: 2 additions & 1 deletion ksp_plugin/interface.cpp
Original file line number Diff line number Diff line change
@@ -344,7 +344,8 @@ void principia__DeserializePlugin(char const* const serialization,
if (*deserializer == nullptr) {
LOG(INFO) << "Begin plugin deserialization";
*deserializer = new PushDeserializer(chunk_size,
number_of_chunks);
number_of_chunks,
/*compressor=*/nullptr);
auto message = make_not_null_unique<serialization::Plugin>();
(*deserializer)->Start(
std::move(message),