Skip to content

Commit

Permalink
Fix RemoteStore::addToStore() latency
Browse files Browse the repository at this point in the history
Since 6185d25, this was very
latency-bound since it required a round-trip for every 32 KiB. So for
example copying a 514 MiB closure over a virtual ethernet device with
a articial delay of just 1 ms took 343s. Now it takes 2.7s.

Fixes #3372.
  • Loading branch information
edolstra committed Jul 28, 2020
1 parent c159f48 commit 4c0077a
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 19 deletions.
90 changes: 75 additions & 15 deletions src/libstore/daemon.cc
Expand Up @@ -86,7 +86,7 @@ struct TunnelLogger : public Logger
}

/* startWork() means that we're starting an operation for which we
want to send out stderr to the client. */
want to send out stderr to the client. */
void startWork()
{
auto state(state_.lock());
Expand Down Expand Up @@ -703,24 +703,84 @@ static void performOp(TunnelLogger * logger, ref<Store> store,
if (!trusted)
info.ultimate = false;

std::unique_ptr<Source> source;
if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
source = std::make_unique<TunnelSource>(from, to);
else {
StringSink saved;
TeeSource tee { from, saved };
ParseSink ether;
parseDump(ether, tee);
source = std::make_unique<StringSource>(std::move(*saved.s));
if (GET_PROTOCOL_MINOR(clientVersion) >= 23) {

struct FramedSource : Source
{
Source & from;
bool eof = false;
std::vector<unsigned char> pending;
size_t pos = 0;

FramedSource(Source & from) : from(from)
{ }

~FramedSource()
{
if (!eof) {
while (true) {
auto n = readInt(from);
if (!n) break;
std::vector<unsigned char> data(n);
from(data.data(), n);
}
}
}

size_t read(unsigned char * data, size_t len) override
{
if (eof) throw EndOfFile("reached end of FramedSource");

if (pos >= pending.size()) {
size_t len = readInt(from);
if (!len) {
eof = true;
return 0;
}
pending = std::vector<unsigned char>(len);
pos = 0;
from(pending.data(), len);
}

auto n = std::min(len, pending.size() - pos);
memcpy(data, pending.data() + pos, n);
pos += n;
return n;
}
};

logger->startWork();

{
FramedSource source(from);
store->addToStore(info, source, (RepairFlag) repair,
dontCheckSigs ? NoCheckSigs : CheckSigs);
}

logger->stopWork();
}

logger->startWork();
else {
std::unique_ptr<Source> source;
if (GET_PROTOCOL_MINOR(clientVersion) >= 21)
source = std::make_unique<TunnelSource>(from, to);
else {
StringSink saved;
TeeSource tee { from, saved };
ParseSink ether;
parseDump(ether, tee);
source = std::make_unique<StringSource>(std::move(*saved.s));
}

// FIXME: race if addToStore doesn't read source?
store->addToStore(info, *source, (RepairFlag) repair,
dontCheckSigs ? NoCheckSigs : CheckSigs);
logger->startWork();

// FIXME: race if addToStore doesn't read source?
store->addToStore(info, *source, (RepairFlag) repair,
dontCheckSigs ? NoCheckSigs : CheckSigs);

logger->stopWork();
}

logger->stopWork();
break;
}

Expand Down
81 changes: 78 additions & 3 deletions src/libstore/remote-store.cc
Expand Up @@ -503,9 +503,84 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
conn->to << info.registrationTime << info.narSize
<< info.ultimate << info.sigs << renderContentAddress(info.ca)
<< repair << !checkSigs;
bool tunnel = GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21;
if (!tunnel) copyNAR(source, conn->to);
conn.processStderr(0, tunnel ? &source : nullptr);

if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {

std::exception_ptr ex;

struct FramedSink : BufferedSink
{
ConnectionHandle & conn;
std::exception_ptr & ex;

FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
{ }

~FramedSink()
{
try {
conn->to << 0;
conn->to.flush();
} catch (...) {
ignoreException();
}
}

void write(const unsigned char * data, size_t len) override
{
/* Don't send more data if the remote has
encountered an error. */
if (ex) {
auto ex2 = ex;
ex = nullptr;
std::rethrow_exception(ex2);
}
conn->to << len;
conn->to(data, len);
};
};

/* Handle log messages / exceptions from the remote on a
separate thread. */
std::thread stderrThread([&]()
{
try {
conn.processStderr(0, nullptr);
} catch (...) {
ex = std::current_exception();
}
});

Finally joinStderrThread([&]()
{
if (stderrThread.joinable()) {
stderrThread.join();
if (ex) {
try {
std::rethrow_exception(ex);
} catch (...) {
ignoreException();
}
}
}
});

{
FramedSink sink(conn, ex);
copyNAR(source, sink);
sink.flush();
}

stderrThread.join();
if (ex)
std::rethrow_exception(ex);

} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
conn.processStderr(0, &source);
} else {
copyNAR(source, conn->to);
conn.processStderr(0, nullptr);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/libstore/worker-protocol.hh
Expand Up @@ -6,7 +6,7 @@ namespace nix {
#define WORKER_MAGIC_1 0x6e697863
#define WORKER_MAGIC_2 0x6478696f

#define PROTOCOL_VERSION 0x116
#define PROTOCOL_VERSION 0x117
#define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00)
#define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff)

Expand Down

0 comments on commit 4c0077a

Please sign in to comment.