Skip to content

Commit

Permalink
pkg/eval: Simplify how the lifecycle of ports are tracked.
Browse files Browse the repository at this point in the history
The lifecycle of a port is often tied to the execution of a form - for example,
in "echo > out", port 1 is redirected to a file that should be closed when the
form finishes execution.

Previously, this is tracked with the following scheme:

- The port field has two fields - closeFile and closeChan - indicating whether
  the file and chan components should be closed when the form finishes execution.

- However, a port could be used in multiple forms: for example, in

  { echo foo; echo bar } > out

  The same port 1 is used for both echo commands, but the file in it should only
  be closed when the overall form finishes execution.

- To handle this correct, the idea of "forking" a port is introduced - the forked
  ports always have their closeFile and closeChan set to false. Similarly, when a
  Frame gets "forked", all its ports are forked as well.

This commit switches to a different approach based on the observation that there
are only two places that care about closeFile and closeChan: pipelineOp.exec and
redirOp.exec, and they are very close in the call frame (with only formOp.exec
between them).

So instead of embedding the ownership information to the port struct, track
it in a separate formOwnedPort struct, and thread a *[]formOwnedPort as an
additional argument of pipelineOp.exec, formOp.exec and redirOp.exec.

This allows us to get rid of concept of "forking" a port. The Frame.Fork method
still exists because it needs to make a shallow copy of the ports slice, but the
implementation is now simpler.
  • Loading branch information
xiaq committed Sep 10, 2024
1 parent adaf5f6 commit 8a99e2c
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 81 deletions.
2 changes: 0 additions & 2 deletions pkg/eval/builtin_fn_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func each(fm *Frame, f Callable, inputs Inputs) error {
}
newFm := fm.Fork()
ex := f.Call(newFm, []any{v}, NoOpts)
newFm.Close()

if ex != nil {
switch Reason(ex) {
Expand Down Expand Up @@ -111,7 +110,6 @@ func peach(fm *Frame, opts peachOpt, f Callable, inputs Inputs) error {
newFm := fm.Fork()
newFm.ports[0] = DummyInputPort
ex := f.Call(newFm, []any{v}, NoOpts)
newFm.Close()

if ex != nil {
switch Reason(ex) {
Expand Down
84 changes: 54 additions & 30 deletions pkg/eval/compile_effect.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ func (cp *compiler) pipelineOp(n *parse.Pipeline) *pipelineOp {

const pipelineChanBufferSize = 32

// Keeps track of whether the File and Port parts of a port are owned by a form
// and should be closed when the form finishes execution.
type formOwnedPort struct {
File bool
Chan bool
}

func (fop formOwnedPort) close(p *Port) {
if fop.File {
p.File.Close()
}
if fop.Chan {
close(p.Chan)
}
}

func (op *pipelineOp) exec(fm *Frame) Exception {
if fm.Canceled() {
return fm.errorp(op, ErrInterrupted)
Expand All @@ -89,10 +105,12 @@ func (op *pipelineOp) exec(fm *Frame) Exception {
// For each form, create a dedicated evalCtx and run asynchronously
for i, form := range op.forms {
newFm := fm.Fork()
var fops []formOwnedPort
inputIsPipe := i > 0
outputIsPipe := i < nforms-1
if inputIsPipe {
newFm.ports[0] = nextIn
growAccess(&fops, 0).File = true
}
if outputIsPipe {
// Each internal port pair consists of a (byte) pipe pair and a
Expand All @@ -108,16 +126,15 @@ func (op *pipelineOp) exec(fm *Frame) Exception {
readerGone := new(atomic.Bool)
newFm.ports[1] = &Port{
File: writer, Chan: ch,
closeFile: true, closeChan: true,
sendStop: sendStop, sendError: sendError, readerGone: readerGone}
*growAccess(&fops, 1) = formOwnedPort{File: true, Chan: true}
nextIn = &Port{
File: reader, Chan: ch,
closeFile: true, closeChan: false,
// Store in input port for ease of retrieval later
sendStop: sendStop, sendError: sendError, readerGone: readerGone}
}
f := func(form *formOp, pexc *Exception) {
exc := form.exec(newFm)
f := func(form *formOp, fops []formOwnedPort, pexc *Exception) {
exc := form.exec(newFm, &fops)
if exc != nil && !(outputIsPipe && isReaderGone(exc)) {
*pexc = exc
}
Expand All @@ -127,13 +144,15 @@ func (op *pipelineOp) exec(fm *Frame) Exception {
close(input.sendStop)
input.readerGone.Store(true)
}
newFm.Close()
for i, fop := range fops {
fop.close(newFm.ports[i])
}
wg.Done()
}
if i == nforms-1 && !op.bg {
f(form, &excs[i])
f(form, fops, &excs[i])
} else {
go f(form, &excs[i])
go f(form, fops, &excs[i])
}
}

Expand Down Expand Up @@ -236,13 +255,13 @@ func (cp *compiler) formBody(n *parse.Form) formBody {
return formBody{ordinaryCmd: ordinaryCmd{headOp, argOps, optsOp}}
}

func (op *formOp) exec(fm *Frame) (errRet Exception) {
func (op *formOp) exec(fm *Frame, fops *[]formOwnedPort) (errRet Exception) {
// fm here is always a sub-frame created in compiler.pipeline, so it can
// be safely modified.

// Redirections.
for _, redirOp := range op.redirs {
exc := redirOp.exec(fm)
exc := redirOp.exec(fm, fops)
if exc != nil {
return exc
}
Expand Down Expand Up @@ -377,7 +396,7 @@ type InvalidFD struct{ FD int }

func (err InvalidFD) Error() string { return fmt.Sprintf("invalid fd: %d", err.FD) }

func (op *redirOp) exec(fm *Frame) Exception {
func (op *redirOp) exec(fm *Frame, fops *[]formOwnedPort) Exception {
var dst int
if op.dstOp == nil {
// No explicit FD destination specified; use default destinations
Expand All @@ -398,8 +417,12 @@ func (op *redirOp) exec(fm *Frame) Exception {
}
}

growPorts(&fm.ports, dst+1)
fm.ports[dst].close()
dstPort := growAccess(&fm.ports, dst)
dstFop := growAccess(fops, dst)
if *dstPort != nil {
dstFop.close(*dstPort)
*dstFop = formOwnedPort{File: false, Chan: false}
}

if op.srcIsFd {
src, err := evalForFd(fm, op.srcOp, true, "redirection source")
Expand All @@ -409,13 +432,13 @@ func (op *redirOp) exec(fm *Frame) Exception {
switch {
case src == -1:
// close
fm.ports[dst] = &Port{
*dstPort = &Port{
// Ensure that writing to value output throws an exception
sendStop: closedSendStop, sendError: &ErrPortDoesNotSupportValueOutput}
case src >= len(fm.ports) || fm.ports[src] == nil:
return fm.errorp(op, InvalidFD{FD: src})
default:
fm.ports[dst] = fm.ports[src].fork()
*dstPort = fm.ports[src]
}
return nil
}
Expand All @@ -429,9 +452,10 @@ func (op *redirOp) exec(fm *Frame) Exception {
if err != nil {
return fm.errorpf(op, "failed to open file %s: %s", vals.ReprPlain(src), err)
}
fm.ports[dst] = fileRedirPort(op.mode, f, true)
*dstPort = fileRedirPort(op.mode, f)
dstFop.File = true
case vals.File:
fm.ports[dst] = fileRedirPort(op.mode, src, false)
*dstPort = fileRedirPort(op.mode, src)
default:
if _, isMap := src.(vals.Map); !isMap && !vals.IsFieldMap(src) {
return fm.errorp(op.srcOp, errs.BadValue{
Expand Down Expand Up @@ -463,39 +487,29 @@ func (op *redirOp) exec(fm *Frame) Exception {
default:
return fm.errorpf(op, "can only use < or > with maps")
}
fm.ports[dst] = fileRedirPort(op.mode, srcFile, false)
*dstPort = fileRedirPort(op.mode, srcFile)
}
return nil
}

// Creates a port that only have a file component, populating the
// channel-related fields with suitable values depending on the redirection
// mode.
func fileRedirPort(mode parse.RedirMode, f *os.File, closeFile bool) *Port {
func fileRedirPort(mode parse.RedirMode, f *os.File) *Port {
if mode == parse.Read {
return &Port{
File: f, closeFile: closeFile,
File: f,
// ClosedChan produces no values when reading.
Chan: ClosedChan,
}
}
return &Port{
File: f, closeFile: closeFile,
File: f,
// Throws errValueOutputIsClosed when writing.
Chan: nil, sendStop: closedSendStop, sendError: &ErrPortDoesNotSupportValueOutput,
}
}

// Makes the size of *ports at least n, adding nil's if necessary.
func growPorts(ports *[]*Port, n int) {
if len(*ports) >= n {
return
}
oldPorts := *ports
*ports = make([]*Port, n)
copy(*ports, oldPorts)
}

func evalForFd(fm *Frame, op valuesOp, closeOK bool, what string) (int, error) {
value, err := evalForValue(fm, op, what)
if err != nil {
Expand Down Expand Up @@ -538,3 +552,13 @@ func (op seqOp) exec(fm *Frame) Exception {
type nopOp struct{}

func (nopOp) exec(fm *Frame) Exception { return nil }

// Accesses s[i], growing the slice with zero values if necessary.
func growAccess[T any](s *[]T, i int) *T {
if i >= len(*s) {
old := *s
*s = make([]T, i+1)
copy(*s, old)
}
return &(*s)[i]
}
2 changes: 1 addition & 1 deletion pkg/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (ev *Evaler) prepareFrame(src parse.Source, cfg EvalCfg) (*Frame, func()) {
}

func fillDefaultDummyPorts(ports []*Port) []*Port {
growPorts(&ports, 3)
growAccess(&ports, 2)
if ports[0] == nil {
ports[0] = DummyInputPort
}
Expand Down
28 changes: 5 additions & 23 deletions pkg/eval/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"os"
"slices"
"sync"

"src.elv.sh/pkg/diag"
Expand Down Expand Up @@ -87,15 +88,6 @@ func (fm *Frame) Eval(src parse.Source, r diag.Ranger, ns *Ns) (*Ns, error) {
return newLocal, exec()
}

// Close releases resources allocated for this frame. It always returns a nil
// error. It may be called only once.
func (fm *Frame) Close() error {
for _, port := range fm.ports {
port.close()
}
return nil
}

// InputChan returns a channel from which input can be read.
func (fm *Frame) InputChan() chan any {
return fm.ports[0].Chan
Expand Down Expand Up @@ -190,21 +182,11 @@ func (fm *Frame) Canceled() bool {
}
}

// Fork returns a modified copy of fm. The ports are forked, and the name is
// changed to the given value. Other fields are copied shallowly.
// Fork returns a copy of fm, with the ports cloned.
func (fm *Frame) Fork() *Frame {
newPorts := make([]*Port, len(fm.ports))
for i, p := range fm.ports {
if p != nil {
newPorts[i] = p.fork()
}
}
return &Frame{
fm.Evaler, fm.src,
fm.local, fm.up, fm.defers,
fm.ctx, newPorts,
fm.traceback, fm.background,
}
newFm := *fm
newFm.ports = slices.Clone(fm.ports)
return &newFm
}

// A shorthand for forking a frame and setting the output port.
Expand Down
29 changes: 5 additions & 24 deletions pkg/eval/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ import (

// Port conveys data stream. It always consists of a byte band and a channel band.
type Port struct {
File *os.File
Chan chan any
closeFile bool
closeChan bool
File *os.File
Chan chan any

// The following two fields are populated as an additional control mechanism
// for output ports. When no more value should be send on Chan, sendError is
Expand All @@ -45,24 +43,6 @@ var closedSendStop = make(chan struct{})

func init() { close(closedSendStop) }

// Returns a copy of the Port with the Close* flags unset.
func (p *Port) fork() *Port {
return &Port{p.File, p.Chan, false, false, p.sendStop, p.sendError, p.readerGone}
}

// Closes a Port.
func (p *Port) close() {
if p == nil {
return
}
if p.closeFile {
p.File.Close()
}
if p.closeChan {
close(p.Chan)
}
}

var (
// ClosedChan is a closed channel, suitable as a placeholder input channel.
ClosedChan = getClosedChan()
Expand Down Expand Up @@ -131,9 +111,10 @@ func PipePort(vCb func(<-chan any), bCb func(*os.File)) (*Port, func(), error) {
bCb(r)
}()

port := &Port{Chan: ch, closeChan: true, File: w, closeFile: true}
port := &Port{File: w, Chan: ch}
done := func() {
port.close()
w.Close()
close(ch)
wg.Wait()
}
return port, done, nil
Expand Down
1 change: 0 additions & 1 deletion pkg/mods/re/re.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ func awk(fm *eval.Frame, opts awkOpt, f eval.Callable, inputs eval.Inputs) error
newFm := fm.Fork()
// TODO: Close port 0 of newFm.
ex := f.Call(newFm, args, eval.NoOpts)
newFm.Close()

if ex != nil {
switch eval.Reason(ex) {
Expand Down

0 comments on commit 8a99e2c

Please sign in to comment.