Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

monitor: Allow launching container from non-exec op with on-error #1807

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *Container) Exec(ctx context.Context, cfg *controllerapi.InvokeConfig, s
}

func exec(ctx context.Context, resultCtx *ResultContext, cfg *controllerapi.InvokeConfig, ctr gateway.Container, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
processCfg, err := resultCtx.getProcessConfig(cfg, stdin, stdout, stderr)
processCfg, err := resultCtx.getProcessConfig(ctx, cfg, stdin, stdout, stderr)
if err != nil {
return err
}
Expand Down
159 changes: 153 additions & 6 deletions build/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (

controllerapi "github.com/docker/buildx/controller/pb"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/exporter/containerimage/exptypes"
gateway "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/solver/result"
"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -101,11 +103,12 @@ func getResultAt(ctx context.Context, c *client.Client, solveOpt client.SolveOpt
})
return nil
})
resultCtx := ResultContext{}
resultCtx := ResultContext{def: targets}
if err := eg.Wait(); err != nil {
var se *errdefs.SolveError
if errors.As(err, &se) {
resultCtx.solveErr = se
resultCtx.rawErr = err
} else {
return nil, err
}
Expand Down Expand Up @@ -170,6 +173,9 @@ type ResultContext struct {

cleanups []func()
cleanupsMu sync.Mutex

def *result.Result[*pb.Definition]
rawErr error
}

func (r *ResultContext) Done() {
Expand All @@ -196,10 +202,115 @@ func (r *ResultContext) build(buildFunc gateway.BuildFunc) (err error) {
return err
}

func (r *ResultContext) solve(ctx context.Context, def *result.Result[*pb.Definition]) (*gateway.Result, error) {
resultCh := make(chan *gateway.Result)
errCh := make(chan error)
go func() {
err := r.build(func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// force evaluation of all targets in parallel
results := make(map[*pb.Definition]*gateway.Result)
resultsMu := sync.Mutex{}
eg, egCtx := errgroup.WithContext(ctx)
def.EachRef(func(def *pb.Definition) error {
eg.Go(func() error {
res2, err := c.Solve(egCtx, gateway.SolveRequest{
Evaluate: true,
Definition: def,
})
if err != nil {
return err
}
resultsMu.Lock()
results[def] = res2
resultsMu.Unlock()
return nil
})
return nil
})
if err := eg.Wait(); err != nil {
return nil, err
}
res2, _ := result.ConvertResult(def, func(def *pb.Definition) (gateway.Reference, error) {
if res, ok := results[def]; ok {
return res.Ref, nil
}
return nil, nil
})
select {
case resultCh <- res2:
case <-ctx.Done():
return nil, ctx.Err()
}
<-ctx.Done()
return nil, nil
})
if err != nil {
errCh <- err
}
}()
select {
case req := <-resultCh:
return req, nil
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (r *ResultContext) getParentResultOfError(ctx context.Context) (*gateway.Result, error) {
if r.def == nil || r.rawErr == nil || r.solveErr == nil {
return nil, errors.Errorf("no definition is provided")
}
var dgst digest.Digest
var ve *errdefs.VertexError
if errors.As(r.rawErr, &ve) {
dgst = digest.Digest(ve.Digest)
} else {
return nil, errors.Errorf("unsupported vertex: cannot get parent of error")
}

var parentSelector func(inputs []llb.Output) llb.Output
switch r.solveErr.Solve.Op.GetOp().(type) {
case *pb.Op_File:
parentSelector = func(inputs []llb.Output) llb.Output { return inputs[0] } // TODO: allow user selecting one input?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An op can have multiple inputs and the current implementation always uses the first input (input[0]). But maybe we need to introduce a way to ensure the input is actually the previous step's rootfs. Or maybe we can introduce a way to let the user choose one of the inputs.
Currently I don't have good idea for them so please let me know if there is a good way to solve it.

default:
return nil, errors.Errorf("unsupported error type: cannot create parent selector")
}
targets, err := result.ConvertResult(r.def, func(def *pb.Definition) (*pb.Definition, error) {
op, err := llb.NewDefinitionOp(def)
if err != nil {
return nil, err
}
stP, err := getStateOfDigest(ctx, llb.NewState(op), dgst)
if err != nil {
return nil, err
} else if stP == nil {
return nil, errors.Errorf("vertex %v not found", dgst)
}
st := *stP
if inputs := st.Output().Vertex(ctx, nil).Inputs(); len(inputs) > 0 {
def, err := llb.NewState(parentSelector(inputs)).Marshal(ctx)
if err != nil {
return nil, err
}
return def.ToPB(), nil
}
return nil, errors.Errorf("no input")
})
if err != nil {
return nil, err
}
return r.solve(ctx, targets)
}

func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client, cfg *controllerapi.InvokeConfig) (containerCfg gateway.NewContainerRequest, _ error) {
if r.res != nil && r.solveErr == nil {
logrus.Debugf("creating container from successful build")
ccfg, err := containerConfigFromResult(ctx, r.res, c, *cfg)
ccfg, err := containerConfigFromResult(ctx, r.res, *cfg)
if err != nil {
return containerCfg, err
}
Expand All @@ -208,14 +319,22 @@ func (r *ResultContext) getContainerConfig(ctx context.Context, c gateway.Client
logrus.Debugf("creating container from failed build %+v", cfg)
ccfg, err := containerConfigFromError(r.solveErr, *cfg)
if err != nil {
return containerCfg, errors.Wrapf(err, "no result nor error is available")
res, err := r.getParentResultOfError(ctx)
if err != nil {
return containerCfg, errors.Wrapf(err, "no result nor error is available")
}
cfg.Initial = false
ccfg, err = containerConfigFromResult(ctx, res, *cfg)
if err != nil {
return containerCfg, errors.Wrapf(err, "no result nor error is available. cannot fallback to parent of the error")
}
}
containerCfg = *ccfg
}
return containerCfg, nil
}

func (r *ResultContext) getProcessConfig(cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) (_ gateway.StartRequest, err error) {
func (r *ResultContext) getProcessConfig(ctx context.Context, cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) (_ gateway.StartRequest, err error) {
processCfg := newStartRequest(stdin, stdout, stderr)
if r.res != nil && r.solveErr == nil {
logrus.Debugf("creating container from successful build")
Expand All @@ -225,13 +344,20 @@ func (r *ResultContext) getProcessConfig(cfg *controllerapi.InvokeConfig, stdin
} else {
logrus.Debugf("creating container from failed build %+v", cfg)
if err := populateProcessConfigFromError(&processCfg, r.solveErr, *cfg); err != nil {
return processCfg, err
res, err := r.getParentResultOfError(ctx)
if err != nil {
return processCfg, err
}
cfg.Initial = false
if err := populateProcessConfigFromResult(&processCfg, res, *cfg); err != nil {
return processCfg, err
}
}
}
return processCfg, nil
}

func containerConfigFromResult(ctx context.Context, res *gateway.Result, c gateway.Client, cfg controllerapi.InvokeConfig) (*gateway.NewContainerRequest, error) {
func containerConfigFromResult(ctx context.Context, res *gateway.Result, cfg controllerapi.InvokeConfig) (*gateway.NewContainerRequest, error) {
if cfg.Initial {
return nil, errors.Errorf("starting from the container from the initial state of the step is supported only on the failed steps")
}
Expand Down Expand Up @@ -397,3 +523,24 @@ func newStartRequest(stdin io.ReadCloser, stdout io.WriteCloser, stderr io.Write
Stderr: stderr,
}
}

func getStateOfDigest(ctx context.Context, st llb.State, dgst digest.Digest) (*llb.State, error) {
vtxDgst, _, _, _, err := st.Output().Vertex(ctx, nil).Marshal(ctx, nil)
if err != nil {
return nil, err
}
if vtxDgst.String() == dgst.String() {
return &st, nil
}
inputs := st.Output().Vertex(ctx, nil).Inputs()
for _, in := range inputs {
got, err := getStateOfDigest(ctx, llb.NewState(in), dgst)
if err != nil {
return nil, err
}
if got != nil {
return got, nil
}
}
return nil, nil
}