Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make goals use C++20 coroutines #11005

Merged
merged 18 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 67 additions & 74 deletions src/libstore/build/derivation-goal.cc

Large diffs are not rendered by default.

32 changes: 14 additions & 18 deletions src/libstore/build/derivation-goal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,6 @@ struct DerivationGoal : public Goal
*/
std::optional<DerivationType> derivationType;

typedef void (DerivationGoal::*GoalState)();
GoalState state;

BuildMode buildMode;

std::unique_ptr<MaintainCount<uint64_t>> mcExpectedBuilds, mcRunningBuilds;
Expand Down Expand Up @@ -227,8 +224,6 @@ struct DerivationGoal : public Goal

std::string key() override;

void work() override;

/**
* Add wanted outputs to an already existing derivation goal.
*/
Expand All @@ -237,18 +232,19 @@ struct DerivationGoal : public Goal
/**
* The states.
*/
void getDerivation();
void loadDerivation();
void haveDerivation();
void outputsSubstitutionTried();
void gaveUpOnSubstitution();
void closureRepaired();
void inputsRealised();
void tryToBuild();
virtual void tryLocalBuild();
void buildDone();
Co init() override;
Co getDerivation();
Co loadDerivation();
Co haveDerivation();
Co outputsSubstitutionTried();
Co gaveUpOnSubstitution();
Co closureRepaired();
Co inputsRealised();
Co tryToBuild();
virtual Co tryLocalBuild();
Co buildDone();

void resolvedFinished();
Co resolvedFinished();

/**
* Is the build hook willing to perform the build?
Expand Down Expand Up @@ -329,11 +325,11 @@ struct DerivationGoal : public Goal
*/
virtual void killChild();

void repairClosure();
Co repairClosure();

void started();

void done(
Done done(
BuildResult::Status status,
SingleDrvOutputs builtOutputs = {},
std::optional<Error> ex = {});
Expand Down
208 changes: 96 additions & 112 deletions src/libstore/build/drv-output-substitution-goal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,146 +14,135 @@ DrvOutputSubstitutionGoal::DrvOutputSubstitutionGoal(
: Goal(worker, DerivedPath::Opaque { StorePath::dummy })
, id(id)
{
state = &DrvOutputSubstitutionGoal::init;
name = fmt("substitution of '%s'", id.to_string());
trace("created");
}


void DrvOutputSubstitutionGoal::init()
Goal::Co DrvOutputSubstitutionGoal::init()
{
trace("init");

/* If the derivation already exists, we’re done */
if (worker.store.queryRealisation(id)) {
amDone(ecSuccess);
return;
co_return amDone(ecSuccess);
}

subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
tryNext();
}

void DrvOutputSubstitutionGoal::tryNext()
{
trace("trying next substituter");

if (subs.size() == 0) {
/* None left. Terminate this goal and let someone else deal
with it. */
debug("derivation output '%s' is required, but there is no substituter that can provide it", id.to_string());

/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
amDone(substituterFailed ? ecFailed : ecNoSubstituters);

if (substituterFailed) {
worker.failedSubstitutions++;
worker.updateProgress();
auto subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();

bool substituterFailed = false;

for (auto sub : subs) {
trace("trying next substituter");

/* The callback of the curl download below can outlive `this` (if
some other error occurs), so it must not touch `this`. So put
the shared state in a separate refcounted object. */
auto outPipe = std::make_shared<MuxablePipe>();
#ifndef _WIN32
outPipe->create();
#else
outPipe->createAsyncPipe(worker.ioport.get());
#endif

auto promise = std::make_shared<std::promise<std::shared_ptr<const Realisation>>>();

sub->queryRealisation(
id,
{ [outPipe(outPipe), promise(promise)](std::future<std::shared_ptr<const Realisation>> res) {
try {
Finally updateStats([&]() { outPipe->writeSide.close(); });
promise->set_value(res.get());
} catch (...) {
promise->set_exception(std::current_exception());
}
} });

worker.childStarted(shared_from_this(), {
#ifndef _WIN32
outPipe->readSide.get()
#else
&outPipe
#endif
}, true, false);

co_await Suspend{};

worker.childTerminated(this);

/*
* The realisation corresponding to the given output id.
* Will be filled once we can get it.
*/
std::shared_ptr<const Realisation> outputInfo;

try {
outputInfo = promise->get_future().get();
} catch (std::exception & e) {
printError(e.what());
substituterFailed = true;
}

return;
}

sub = subs.front();
subs.pop_front();

// FIXME: Make async
// outputInfo = sub->queryRealisation(id);

/* The callback of the curl download below can outlive `this` (if
some other error occurs), so it must not touch `this`. So put
the shared state in a separate refcounted object. */
downloadState = std::make_shared<DownloadState>();
#ifndef _WIN32
downloadState->outPipe.create();
#else
downloadState->outPipe.createAsyncPipe(worker.ioport.get());
#endif

sub->queryRealisation(
id,
{ [downloadState(downloadState)](std::future<std::shared_ptr<const Realisation>> res) {
try {
Finally updateStats([&]() { downloadState->outPipe.writeSide.close(); });
downloadState->promise.set_value(res.get());
} catch (...) {
downloadState->promise.set_exception(std::current_exception());
if (!outputInfo) continue;

bool failed = false;

for (const auto & [depId, depPath] : outputInfo->dependentRealisations) {
if (depId != id) {
if (auto localOutputInfo = worker.store.queryRealisation(depId);
localOutputInfo && localOutputInfo->outPath != depPath) {
warn(
"substituter '%s' has an incompatible realisation for '%s', ignoring.\n"
"Local: %s\n"
"Remote: %s",
sub->getUri(),
depId.to_string(),
worker.store.printStorePath(localOutputInfo->outPath),
worker.store.printStorePath(depPath)
);
failed = true;
break;
}
addWaitee(worker.makeDrvOutputSubstitutionGoal(depId));
}
} });

worker.childStarted(shared_from_this(), {
#ifndef _WIN32
downloadState->outPipe.readSide.get()
#else
&downloadState->outPipe
#endif
}, true, false);

state = &DrvOutputSubstitutionGoal::realisationFetched;
}
}

void DrvOutputSubstitutionGoal::realisationFetched()
{
worker.childTerminated(this);
if (failed) continue;

try {
outputInfo = downloadState->promise.get_future().get();
} catch (std::exception & e) {
printError(e.what());
substituterFailed = true;
co_return realisationFetched(outputInfo, sub);
}

if (!outputInfo) {
return tryNext();
}
/* None left. Terminate this goal and let someone else deal
with it. */
debug("derivation output '%s' is required, but there is no substituter that can provide it", id.to_string());

for (const auto & [depId, depPath] : outputInfo->dependentRealisations) {
if (depId != id) {
if (auto localOutputInfo = worker.store.queryRealisation(depId);
localOutputInfo && localOutputInfo->outPath != depPath) {
warn(
"substituter '%s' has an incompatible realisation for '%s', ignoring.\n"
"Local: %s\n"
"Remote: %s",
sub->getUri(),
depId.to_string(),
worker.store.printStorePath(localOutputInfo->outPath),
worker.store.printStorePath(depPath)
);
tryNext();
return;
}
addWaitee(worker.makeDrvOutputSubstitutionGoal(depId));
}
if (substituterFailed) {
worker.failedSubstitutions++;
worker.updateProgress();
}

/* Hack: don't indicate failure if there were no substituters.
In that case the calling derivation should just do a
build. */
co_return amDone(substituterFailed ? ecFailed : ecNoSubstituters);
}

Goal::Co DrvOutputSubstitutionGoal::realisationFetched(std::shared_ptr<const Realisation> outputInfo, nix::ref<nix::Store> sub) {
addWaitee(worker.makePathSubstitutionGoal(outputInfo->outPath));

if (waitees.empty()) outPathValid();
else state = &DrvOutputSubstitutionGoal::outPathValid;
}
if (!waitees.empty()) co_await Suspend{};

void DrvOutputSubstitutionGoal::outPathValid()
{
assert(outputInfo);
trace("output path substituted");

if (nrFailed > 0) {
debug("The output path of the derivation output '%s' could not be substituted", id.to_string());
amDone(nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed);
return;
co_return amDone(nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed);
}

worker.store.registerDrvOutput(*outputInfo);
finished();
}

void DrvOutputSubstitutionGoal::finished()
{
trace("finished");
amDone(ecSuccess);
co_return amDone(ecSuccess);
}

std::string DrvOutputSubstitutionGoal::key()
Expand All @@ -163,14 +152,9 @@ std::string DrvOutputSubstitutionGoal::key()
return "a$" + std::string(id.to_string());
}

void DrvOutputSubstitutionGoal::work()
{
(this->*state)();
}

void DrvOutputSubstitutionGoal::handleEOF(Descriptor fd)
{
if (fd == downloadState->outPipe.readSide.get()) worker.wakeUp(shared_from_this());
worker.wakeUp(shared_from_this());
}


Expand Down
37 changes: 2 additions & 35 deletions src/libstore/build/drv-output-substitution-goal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,52 +27,19 @@ class DrvOutputSubstitutionGoal : public Goal {
*/
DrvOutput id;

/**
* The realisation corresponding to the given output id.
* Will be filled once we can get it.
*/
std::shared_ptr<const Realisation> outputInfo;

/**
* The remaining substituters.
*/
std::list<ref<Store>> subs;

/**
* The current substituter.
*/
std::shared_ptr<Store> sub;

struct DownloadState
{
MuxablePipe outPipe;
std::promise<std::shared_ptr<const Realisation>> promise;
};

std::shared_ptr<DownloadState> downloadState;

/**
* Whether a substituter failed.
*/
bool substituterFailed = false;

public:
DrvOutputSubstitutionGoal(const DrvOutput& id, Worker & worker, RepairFlag repair = NoRepair, std::optional<ContentAddress> ca = std::nullopt);

typedef void (DrvOutputSubstitutionGoal::*GoalState)();
GoalState state;

void init();
void tryNext();
void realisationFetched();
void outPathValid();
void finished();
Co init() override;
Co realisationFetched(std::shared_ptr<const Realisation> outputInfo, nix::ref<nix::Store> sub);

void timedOut(Error && ex) override { abort(); };

std::string key() override;

void work() override;
void handleEOF(Descriptor fd) override;

JobCategory jobCategory() const override {
Expand Down
Loading
Loading