Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: mockingbirdnest/Principia
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: a798fabc15a0
Choose a base ref
...
head repository: mockingbirdnest/Principia
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 148c39e48c5e
Choose a head ref
  • 12 commits
  • 5 files changed
  • 1 contributor

Commits on Apr 21, 2021

  1. Copy the full SHA
    bfd5460 View commit details
  2. Lots of stuff removed.

    pleroy committed Apr 21, 2021
    Copy the full SHA
    6d59095 View commit details
  3. It compiles.

    pleroy committed Apr 21, 2021
    Copy the full SHA
    ca421bb View commit details
  4. Improved locking.

    pleroy committed Apr 21, 2021
    Copy the full SHA
    aaba17f View commit details
  5. Fix a bug.

    pleroy committed Apr 21, 2021
    Copy the full SHA
    66610cc View commit details

Commits on Apr 22, 2021

  1. Comments.

    pleroy committed Apr 22, 2021
    Copy the full SHA
    7b25619 View commit details

Commits on Apr 23, 2021

  1. Comments.

    pleroy committed Apr 23, 2021
    Copy the full SHA
    4d5449a View commit details
  2. Merge branch 'UseRecurringThread' of https://github.com/pleroy/Principia

     into UseRecurringThread
    pleroy committed Apr 23, 2021
    Copy the full SHA
    3d56c47 View commit details

Commits on Apr 25, 2021

  1. Copy the full SHA
    a8cbd22 View commit details

Commits on Apr 26, 2021

  1. Fix a test.

    pleroy committed Apr 26, 2021
    Copy the full SHA
    160f6bc View commit details

Commits on Apr 27, 2021

  1. After egg's review.

    pleroy committed Apr 27, 2021
    Copy the full SHA
    7ed8f08 View commit details
  2. Merge pull request #2963 from pleroy/UseRecurringThread

    Use RecurringThread for the prognosticator
    pleroy authored Apr 27, 2021
    Copy the full SHA
    148c39e View commit details
Showing with 96 additions and 127 deletions.
  1. +15 −6 base/recurring_thread.hpp
  2. +23 −10 base/recurring_thread_body.hpp
  3. +1 −0 base/recurring_thread_test.cpp
  4. +48 −82 ksp_plugin/vessel.cpp
  5. +9 −29 ksp_plugin/vessel.hpp
21 changes: 15 additions & 6 deletions base/recurring_thread.hpp
Original file line number Diff line number Diff line change
@@ -6,7 +6,9 @@

#include "absl/synchronization/mutex.h"
#include "base/jthread.hpp"
#include "base/macros.hpp"
#include "base/status.hpp"
#include "base/status_or.hpp"

namespace principia {
namespace base {
@@ -20,14 +22,20 @@ namespace internal_recurring_thread {
template<typename Input, typename Output>
class RecurringThread {
public:
using Action = std::function<Output(Input)>;
// If an action returns an error, no output in written to the output channel.
using Action = std::function<StatusOr<Output>(Input)>;

// Constructs a stoppable thread that executes the given |action| no more
// frequently than at the specified |period| (and less frequently if no input
// was provided).
// was provided). At construction the thread is in the stopped state.
RecurringThread(Action action,
std::chrono::milliseconds period);

// Starts or stops the thread. These functions are idempotent. Note that the
// thread is also stopped by the destruction of this object.
void Start();
void Stop();

// Overwrites the contents of the input channel. The |input| data will be
// either picked by the next execution of |action|, or overwritten by the next
// call to |Put|.
@@ -42,11 +50,12 @@ class RecurringThread {
Action const action_;
std::chrono::milliseconds const period_;

jthread jthread_;
absl::Mutex jthread_lock_;
jthread jthread_ GUARDED_BY(jthread_lock_);

absl::Mutex lock_;
std::optional<Input> input_;
std::optional<Output> output_;
absl::Mutex input_output_lock_;
std::optional<Input> input_ GUARDED_BY(input_output_lock_);
std::optional<Output> output_ GUARDED_BY(input_output_lock_);
};

} // namespace internal_recurring_thread
33 changes: 23 additions & 10 deletions base/recurring_thread_body.hpp
Original file line number Diff line number Diff line change
@@ -13,19 +13,32 @@ RecurringThread<Input, Output>::RecurringThread(
Action action,
std::chrono::milliseconds const period)
: action_(std::move(action)),
period_(period),
jthread_(MakeStoppableThread(
[this]() { Status const status = RepeatedlyRunAction(); })) {}
period_(period) {}

template<typename Input, typename Output>
void RecurringThread<Input, Output>::Start() {
absl::MutexLock l(&jthread_lock_);
if (!jthread_.joinable()) {
jthread_ = MakeStoppableThread(
[this]() { Status const status = RepeatedlyRunAction(); });
}
}

template<typename Input, typename Output>
void RecurringThread<Input, Output>::Stop() {
absl::MutexLock l(&jthread_lock_);
jthread_ = jthread();
}

template<typename Input, typename Output>
void RecurringThread<Input, Output>::Put(Input input) {
absl::MutexLock l(&lock_);
absl::MutexLock l(&input_output_lock_);
input_ = std::move(input);
}

template<typename Input, typename Output>
std::optional<Output> RecurringThread<Input, Output>::Get() {
absl::MutexLock l(&lock_);
absl::MutexLock l(&input_output_lock_);
std::optional<Output> result;
if (output_.has_value()) {
std::swap(result, output_);
@@ -42,7 +55,7 @@ Status RecurringThread<Input, Output>::RepeatedlyRunAction() {

std::optional<Input> input;
{
absl::MutexLock l(&lock_);
absl::MutexLock l(&input_output_lock_);
if (!input_.has_value()) {
// No input, let's wait for it to appear.
continue;
@@ -51,12 +64,12 @@ Status RecurringThread<Input, Output>::RepeatedlyRunAction() {
}
RETURN_IF_STOPPED;

Output output = action_(input.value());
StatusOr<Output> status_or_output = action_(input.value());
RETURN_IF_STOPPED;

{
absl::MutexLock l(&lock_);
output_ = std::move(output);
if (status_or_output.ok()) {
absl::MutexLock l(&input_output_lock_);
output_ = std::move(status_or_output.ValueOrDie());
}
RETURN_IF_STOPPED;
}
1 change: 1 addition & 0 deletions base/recurring_thread_test.cpp
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ TEST_F(RecurringThreadTest, Basic) {
};

ToyRecurringThread thread(std::move(add_one_half), 1ms);
thread.Start();

thread.Put(3);
{
130 changes: 48 additions & 82 deletions ksp_plugin/vessel.cpp
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
#include "ksp_plugin/vessel.hpp"

#include <algorithm>
#include <functional>
#include <limits>
#include <list>
#include <string>
@@ -31,6 +32,9 @@ using quantities::IsFinite;
using quantities::Length;
using quantities::Time;
using quantities::si::Metre;
using ::std::placeholders::_1;

using namespace std::chrono_literals;

bool operator!=(Vessel::PrognosticatorParameters const& left,
Vessel::PrognosticatorParameters const& right) {
@@ -59,6 +63,11 @@ Vessel::Vessel(GUID guid,
std::move(prediction_adaptive_step_parameters)),
parent_(parent),
ephemeris_(ephemeris),
prognosticator_(
[this](PrognosticatorParameters const& parameters) {
return FlowPrognostication(parameters);
},
20ms), // 50 Hz.
history_(make_not_null_unique<DiscreteTrajectory<Barycentric>>()) {
// Can't create the |psychohistory_| and |prediction_| here because |history_|
// is empty;
@@ -198,11 +207,6 @@ void Vessel::set_prediction_adaptive_step_parameters(
Ephemeris<Barycentric>::AdaptiveStepParameters const&
prediction_adaptive_step_parameters) {
prediction_adaptive_step_parameters_ = prediction_adaptive_step_parameters;
absl::MutexLock l(&prognosticator_lock_);
if (prognosticator_parameters_) {
prognosticator_parameters_->adaptive_step_parameters =
prediction_adaptive_step_parameters;
}
}

Ephemeris<Barycentric>::AdaptiveStepParameters const&
@@ -246,13 +250,14 @@ void Vessel::AdvanceTime() {
AppendToVesselTrajectory(&Part::psychohistory_begin,
&Part::psychohistory_end,
*psychohistory_);
{
absl::MutexLock l(&prognosticator_lock_);
if (prognostication_ == nullptr) {
AttachPrediction(std::move(prediction));
} else {
AttachPrediction(std::move(prognostication_));
}

// Attach the prognostication, if there is one. Otherwise fall back to the
// pre-existing prediction.
auto optional_prognostication = prognosticator_.Get();
if (optional_prognostication.has_value()) {
AttachPrediction(std::move(optional_prognostication.value()));
} else {
AttachPrediction(std::move(prediction));
}

for (auto const& [_, part] : parts_) {
@@ -318,27 +323,30 @@ Status Vessel::RebaseFlightPlan(Mass const& initial_mass) {
}

void Vessel::RefreshPrediction() {
absl::MutexLock l(&prognosticator_lock_);
// The |prognostication| is a root trajectory which is computed asynchronously
// and may be used as a prediction;
std::unique_ptr<DiscreteTrajectory<Barycentric>> prognostication;

// Note that we know that |RefreshPrediction| is called on the main thread,
// therefore the ephemeris currently covers the last time of the
// psychohistory. Were this to change, this code might have to change.
prognosticator_parameters_ =
PrognosticatorParameters{psychohistory_->back().time,
psychohistory_->back().degrees_of_freedom,
prediction_adaptive_step_parameters_};
PrognosticatorParameters prognosticator_parameters{
psychohistory_->back().time,
psychohistory_->back().degrees_of_freedom,
prediction_adaptive_step_parameters_};
if (synchronous_) {
std::unique_ptr<DiscreteTrajectory<Barycentric>> prognostication;
std::optional<PrognosticatorParameters> prognosticator_parameters;
std::swap(prognosticator_parameters, prognosticator_parameters_);
Status const status =
FlowPrognostication(std::move(*prognosticator_parameters),
prognostication);
SwapPrognostication(prognostication, status);
auto status_or_prognostication =
FlowPrognostication(std::move(prognosticator_parameters));
if (status_or_prognostication.ok()) {
prognostication = std::move(status_or_prognostication).ValueOrDie();
}
} else {
StartPrognosticatorIfNeeded();
prognosticator_.Put(std::move(prognosticator_parameters));
prognosticator_.Start();
prognostication = prognosticator_.Get().value_or(nullptr);
}
if (prognostication_ != nullptr) {
AttachPrediction(std::move(prognostication_));
if (prognostication != nullptr) {
AttachPrediction(std::move(prognostication));
}
}

@@ -348,7 +356,7 @@ void Vessel::RefreshPrediction(Instant const& time) {
}

void Vessel::StopPrognosticator() {
prognosticator_ = jthread();
prognosticator_.Stop();
}

std::string Vessel::ShortDebugString() const {
@@ -528,52 +536,13 @@ Vessel::Vessel()
prediction_adaptive_step_parameters_(DefaultPredictionParameters()),
parent_(testing_utilities::make_not_null<Celestial const*>()),
ephemeris_(testing_utilities::make_not_null<Ephemeris<Barycentric>*>()),
history_(make_not_null_unique<DiscreteTrajectory<Barycentric>>()) {}

void Vessel::StartPrognosticatorIfNeeded() {
prognosticator_lock_.AssertHeld();
if (!prognosticator_.joinable()) {
prognosticator_ = MakeStoppableThread(
std::bind(&Vessel::RepeatedlyFlowPrognostication, this));
}
}
history_(make_not_null_unique<DiscreteTrajectory<Barycentric>>()),
prognosticator_(nullptr, 20ms) {}

Status Vessel::RepeatedlyFlowPrognostication() {
for (std::chrono::steady_clock::time_point wakeup_time;;
std::this_thread::sleep_until(wakeup_time)) {
// No point in going faster than 50 Hz.
wakeup_time =
std::chrono::steady_clock::now() + std::chrono::milliseconds(20);
RETURN_IF_STOPPED;

std::optional<PrognosticatorParameters> prognosticator_parameters;
{
absl::MutexLock l(&prognosticator_lock_);
if (!prognosticator_parameters_) {
// No parameters, let's wait for them to appear.
continue;
}
std::swap(prognosticator_parameters, prognosticator_parameters_);
}
RETURN_IF_STOPPED;

std::unique_ptr<DiscreteTrajectory<Barycentric>> prognostication;
Status const status =
FlowPrognostication(std::move(*prognosticator_parameters),
prognostication);
RETURN_IF_STOPPED;
{
absl::MutexLock l(&prognosticator_lock_);
SwapPrognostication(prognostication, status);
}
}
return Status::OK;
}

Status Vessel::FlowPrognostication(
PrognosticatorParameters prognosticator_parameters,
std::unique_ptr<DiscreteTrajectory<Barycentric>>& prognostication) {
prognostication = std::make_unique<DiscreteTrajectory<Barycentric>>();
StatusOr<std::unique_ptr<DiscreteTrajectory<Barycentric>>>
Vessel::FlowPrognostication(
PrognosticatorParameters prognosticator_parameters) {
auto prognostication = std::make_unique<DiscreteTrajectory<Barycentric>>();
prognostication->Append(
prognosticator_parameters.first_time,
prognosticator_parameters.first_degrees_of_freedom);
@@ -598,15 +567,12 @@ Status Vessel::FlowPrognostication(
<< "Prognostication from " << prognosticator_parameters.first_time
<< " finished at " << prognostication->back().time << " with "
<< status.ToString() << " for " << ShortDebugString();
return status;
}

void Vessel::SwapPrognostication(
std::unique_ptr<DiscreteTrajectory<Barycentric>>& prognostication,
Status const& status) {
prognosticator_lock_.AssertHeld();
if (status.error() != Error::CANCELLED) {
prognostication_.swap(prognostication);
if (status.error() == Error::CANCELLED) {
return status;
} else {
// Unless we were stopped, ignore the status, which indicates a failure to
// reach |t_max|, and provide a short prognostication.
return std::move(prognostication);
}
}

38 changes: 9 additions & 29 deletions ksp_plugin/vessel.hpp
Original file line number Diff line number Diff line change
@@ -10,6 +10,8 @@

#include "absl/synchronization/mutex.h"
#include "base/jthread.hpp"
#include "base/recurring_thread.hpp"
#include "base/status_or.hpp"
#include "base/status.hpp"
#include "ksp_plugin/celestial.hpp"
#include "ksp_plugin/flight_plan.hpp"
@@ -27,7 +29,9 @@ namespace ksp_plugin {
namespace internal_vessel {

using base::not_null;
using base::RecurringThread;
using base::Status;
using base::StatusOr;
using geometry::Instant;
using geometry::Vector;
using physics::DegreesOfFreedom;
@@ -213,24 +217,10 @@ class Vessel {
using TrajectoryIterator =
DiscreteTrajectory<Barycentric>::Iterator (Part::*)();

// Starts the |prognosticator_| if it is not started already. The
// |prognosticator_parameters_| must have been set.
void StartPrognosticatorIfNeeded() REQUIRES(prognosticator_lock_);

// Run by the |prognosticator_| thread to periodically recompute the
// prognostication.
Status RepeatedlyFlowPrognostication();

// Runs the integrator to compute the |prognostication_| based on the given
// parameters.
Status FlowPrognostication(
PrognosticatorParameters prognosticator_parameters,
std::unique_ptr<DiscreteTrajectory<Barycentric>>& prognostication);

// Publishes the prognostication if the computation was not cancelled.
void SwapPrognostication(
std::unique_ptr<DiscreteTrajectory<Barycentric>>& prognostication,
Status const& status);
StatusOr<std::unique_ptr<DiscreteTrajectory<Barycentric>>>
FlowPrognostication(PrognosticatorParameters prognosticator_parameters);

// Appends to |trajectory| the centre of mass of the trajectories of the parts
// denoted by |part_trajectory_begin| and |part_trajectory_end|. Only the
@@ -257,26 +247,16 @@ class Vessel {
std::map<PartId, not_null<std::unique_ptr<Part>>> parts_;
std::set<PartId> kept_parts_;

mutable absl::Mutex prognosticator_lock_;
// This member only contains a value if |RefreshPrediction| has been called
// but the parameters have not been picked by the |prognosticator_|. It never
// contains a moved-from value, and is only read using |std::swap| to ensure
// that reading it clears it.
std::optional<PrognosticatorParameters> prognosticator_parameters_
GUARDED_BY(prognosticator_lock_);
base::jthread prognosticator_;

// See the comments in pile_up.hpp for an explanation of the terminology.
not_null<std::unique_ptr<DiscreteTrajectory<Barycentric>>> history_;
DiscreteTrajectory<Barycentric>* psychohistory_ = nullptr;

// The |prediction_| is forked off the end of the |psychohistory_|.
DiscreteTrajectory<Barycentric>* prediction_ = nullptr;

// The |prognostication_| is a root trajectory that's computed asynchronously
// and may or may not be used as a prediction;
std::unique_ptr<DiscreteTrajectory<Barycentric>> prognostication_
GUARDED_BY(prognosticator_lock_);
RecurringThread<PrognosticatorParameters,
std::unique_ptr<DiscreteTrajectory<Barycentric>>>
prognosticator_;

std::unique_ptr<FlightPlan> flight_plan_;