Skip to content

Commit

Permalink
Extract a generic computeClosure function
Browse files Browse the repository at this point in the history
Move the `closure` logic of `computeFSClosure` to its own (templated) function.

This doesn’t bring much by itself (except for the ability to properly
test the “closure” functionality independently from the rest), but it
allows reusing it (in particular for the realisations which will require
a very similar closure computation)
  • Loading branch information
thufschmitt committed May 19, 2021
1 parent 57f321f commit 4bcb217
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 84 deletions.
143 changes: 59 additions & 84 deletions src/libstore/misc.cc
Expand Up @@ -6,98 +6,73 @@
#include "thread-pool.hh"
#include "topo-sort.hh"
#include "callback.hh"
#include "closure.hh"

namespace nix {


void Store::computeFSClosure(const StorePathSet & startPaths,
StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
{
struct State
{
size_t pending;
StorePathSet & paths;
std::exception_ptr exc;
};

Sync<State> state_(State{0, paths_, 0});

std::function<void(const StorePath &)> enqueue;

std::condition_variable done;

enqueue = [&](const StorePath & path) -> void {
{
auto state(state_.lock());
if (state->exc) return;
if (!state->paths.insert(path).second) return;
state->pending++;
}

queryPathInfo(path, {[&](std::future<ref<const ValidPathInfo>> fut) {
// FIXME: calls to isValidPath() should be async

try {
auto info = fut.get();

if (flipDirection) {

StorePathSet referrers;
queryReferrers(path, referrers);
for (auto & ref : referrers)
if (ref != path)
enqueue(ref);

if (includeOutputs)
for (auto & i : queryValidDerivers(path))
enqueue(i);

if (includeDerivers && path.isDerivation())
for (auto & i : queryDerivationOutputs(path))
if (isValidPath(i) && queryPathInfo(i)->deriver == path)
enqueue(i);

} else {

for (auto & ref : info->references)
if (ref != path)
enqueue(ref);

if (includeOutputs && path.isDerivation())
for (auto & i : queryDerivationOutputs(path))
if (isValidPath(i)) enqueue(i);

if (includeDerivers && info->deriver && isValidPath(*info->deriver))
enqueue(*info->deriver);

}

{
auto state(state_.lock());
assert(state->pending);
if (!--state->pending) done.notify_one();
}

} catch (...) {
auto state(state_.lock());
if (!state->exc) state->exc = std::current_exception();
assert(state->pending);
if (!--state->pending) done.notify_one();
};
}});
};

for (auto & startPath : startPaths)
enqueue(startPath);

{
auto state(state_.lock());
while (state->pending) state.wait(done);
if (state->exc) std::rethrow_exception(state->exc);
}
std::function<std::set<StorePath>(const StorePath & path, std::future<ref<const ValidPathInfo>> &)> queryDeps;
if (flipDirection)
queryDeps = [&](const StorePath& path,
std::future<ref<const ValidPathInfo>> & fut) {
StorePathSet res;
StorePathSet referrers;
queryReferrers(path, referrers);
for (auto& ref : referrers)
if (ref != path)
res.insert(ref);

if (includeOutputs)
for (auto& i : queryValidDerivers(path))
res.insert(i);

if (includeDerivers && path.isDerivation())
for (auto& i : queryDerivationOutputs(path))
if (isValidPath(i) && queryPathInfo(i)->deriver == path)
res.insert(i);
return res;
};
else
queryDeps = [&](const StorePath& path,
std::future<ref<const ValidPathInfo>> & fut) {
StorePathSet res;
auto info = fut.get();
for (auto& ref : info->references)
if (ref != path)
res.insert(ref);

if (includeOutputs && path.isDerivation())
for (auto& i : queryDerivationOutputs(path))
if (isValidPath(i))
res.insert(i);

if (includeDerivers && info->deriver && isValidPath(*info->deriver))
res.insert(*info->deriver);
return res;
};

computeClosure<StorePath>(
startPaths, paths_,
[&](const StorePath& path,
std::function<void(std::promise<std::set<StorePath>>&)>
processEdges) {
std::promise<std::set<StorePath>> promise;
std::function<void(std::future<ref<const ValidPathInfo>>)>
getDependencies =
[&](std::future<ref<const ValidPathInfo>> fut) {
try {
promise.set_value(queryDeps(path, fut));
} catch (...) {
promise.set_exception(std::current_exception());
}
};
queryPathInfo(path, getDependencies);
processEdges(promise);
});
}


void Store::computeFSClosure(const StorePath & startPath,
StorePathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
{
Expand Down
69 changes: 69 additions & 0 deletions src/libutil/closure.hh
@@ -0,0 +1,69 @@
#include <set>
#include <future>
#include "sync.hh"

using std::set;

namespace nix {

template<typename T>
using GetEdgesAsync = std::function<void(const T &, std::function<void(std::promise<set<T>> &)>)>;

template<typename T>
void computeClosure(
const set<T> startElts,
set<T> & res,
GetEdgesAsync<T> getEdgesAsync
)
{
struct State
{
size_t pending;
set<T> & res;
std::exception_ptr exc;
};

Sync<State> state_(State{0, res, 0});

std::function<void(const T &)> enqueue;

std::condition_variable done;

enqueue = [&](const T & current) -> void {
{
auto state(state_.lock());
if (state->exc) return;
if (!state->res.insert(current).second) return;
state->pending++;
}

getEdgesAsync(current, [&](std::promise<set<T>> & prom) {
try {
auto children = prom.get_future().get();
for (auto & child : children)
enqueue(child);
{
auto state(state_.lock());
assert(state->pending);
if (!--state->pending) done.notify_one();
}
} catch (...) {
auto state(state_.lock());
if (!state->exc) state->exc = std::current_exception();
assert(state->pending);
if (!--state->pending) done.notify_one();
};
});
};

for (auto & startElt : startElts)
enqueue(startElt);

{
auto state(state_.lock());
while (state->pending) state.wait(done);
if (state->exc) std::rethrow_exception(state->exc);
}
}

}
70 changes: 70 additions & 0 deletions src/libutil/tests/closure.cc
@@ -0,0 +1,70 @@
#include "closure.hh"
#include <gtest/gtest.h>

namespace nix {

using namespace std;

map<string, set<string>> testGraph = {
{ "A", { "B", "C", "G" } },
{ "B", { "A" } }, // Loops back to A
{ "C", { "F" } }, // Indirect reference
{ "D", { "A" } }, // Not reachable, but has backreferences
{ "E", {} }, // Just not reachable
{ "F", {} },
{ "G", { "G" } }, // Self reference
};

TEST(closure, correctClosure) {
set<string> aClosure;
set<string> expectedClosure = {"A", "B", "C", "F", "G"};
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
promise<set<string>> promisedNodes;
promisedNodes.set_value(testGraph[currentNode]);
processEdges(promisedNodes);
}
);

ASSERT_EQ(aClosure, expectedClosure);
}

TEST(closure, properlyHandlesDirectExceptions) {
struct TestExn {};
set<string> aClosure;
EXPECT_THROW(
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
throw TestExn();
}
),
TestExn
);
}

TEST(closure, properlyHandlesExceptionsInPromise) {
struct TestExn {};
set<string> aClosure;
EXPECT_THROW(
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
promise<set<string>> promise;
try {
throw TestExn();
} catch (...) {
promise.set_exception(std::current_exception());
}
processEdges(promise);
}
),
TestExn
);
}

}

0 comments on commit 4bcb217

Please sign in to comment.