diff --git a/go.mod b/go.mod index 1af6780..bb89db0 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/integrii/flaggy v1.4.4 - github.com/lawl/pulseaudio v0.0.0-20210928141934-ed754c0c6618 + github.com/noisetorch/pulseaudio v0.0.0-20220603053345-9303200c3861 github.com/nsf/termbox-go v1.1.1 github.com/pkg/errors v0.9.1 gonum.org/v1/gonum v0.11.0 diff --git a/go.sum b/go.sum index 33c4c31..655f20b 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,9 @@ github.com/integrii/flaggy v1.4.4 h1:8fGyiC14o0kxhTqm2VBoN19fDKPZsKipP7yggreTMDc= github.com/integrii/flaggy v1.4.4/go.mod h1:tnTxHeTJbah0gQ6/K0RW0J7fMUBk9MCF5blhm43LNpI= -github.com/lawl/pulseaudio v0.0.0-20210928141934-ed754c0c6618 h1:lktbhQBHluc1oWEDow4DEv13qkWJ8zm/dTUSKer2iKk= -github.com/lawl/pulseaudio v0.0.0-20210928141934-ed754c0c6618/go.mod h1:9h36x4KH7r2V8DOCKoPMt87IXZ++X90y8D5nnuwq290= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/noisetorch/pulseaudio v0.0.0-20220603053345-9303200c3861 h1:Xng5X+MlNK7Y/Ede75B86wJgaFMFvuey1K4Suh9k2E4= +github.com/noisetorch/pulseaudio v0.0.0-20220603053345-9303200c3861/go.mod h1:/zosM8PSkhuVyfJ9c/qzBhPSm3k06m9U4y4SDfH0jeA= github.com/nsf/termbox-go v1.1.1 h1:nksUPLCb73Q++DwbYUBEglYBRPZyoXJdrj5L+TkjyZY= github.com/nsf/termbox-go v1.1.1/go.mod h1:T0cTdVuOwf7pHQNtfhnEbzHbcNyCEcVU4YPpouCbVxo= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/input/all/all.go b/input/all/all.go index f00c86b..8c5eeed 100644 --- a/input/all/all.go +++ b/input/all/all.go @@ -4,4 +4,5 @@ package all import ( _ "github.com/noriah/catnip/input/ffmpeg" _ "github.com/noriah/catnip/input/parec" + _ "github.com/noriah/catnip/input/pipewire" ) diff --git a/input/common/execread/execread.go b/input/common/execread/execread.go index b661434..5006224 100644 --- a/input/common/execread/execread.go +++ b/input/common/execread/execread.go @@ -9,6 +9,7 @@ import ( "os" "os/exec" "sync" + "time" "github.com/noriah/catnip/input" "github.com/pkg/errors" @@ -16,6 +17,9 @@ import ( // Session is a session that reads floating-point audio values from a Cmd. type Session struct { + // OnStart is called when the session starts. Nil by default. + OnStart func(ctx context.Context, cmd *exec.Cmd) error + argv []string cfg input.SessionConfig @@ -26,9 +30,9 @@ type Session struct { } // NewSession creates a new execread session. It never returns an error. -func NewSession(argv []string, f32mode bool, cfg input.SessionConfig) (*Session, error) { +func NewSession(argv []string, f32mode bool, cfg input.SessionConfig) *Session { if len(argv) < 1 { - return nil, errors.New("argv has no arg0") + panic("argv has no arg0") } return &Session{ @@ -36,7 +40,7 @@ func NewSession(argv []string, f32mode bool, cfg input.SessionConfig) (*Session, cfg: cfg, f32mode: f32mode, samples: cfg.SampleSize * cfg.FrameSize, - }, nil + } } func (s *Session) Start(ctx context.Context, dst [][]input.Sample, kickChan chan bool, mu *sync.Mutex) error { @@ -44,10 +48,8 @@ func (s *Session) Start(ctx context.Context, dst [][]input.Sample, kickChan chan return errors.New("invalid dst length given") } - // Take argv and free it soon after, since we won't be needing it again. cmd := exec.CommandContext(ctx, s.argv[0], s.argv[1:]...) cmd.Stderr = os.Stderr - s.argv = nil o, err := cmd.StdoutPipe() if err != nil { @@ -55,9 +57,20 @@ func (s *Session) Start(ctx context.Context, dst [][]input.Sample, kickChan chan } defer o.Close() - bufsz := s.samples * 4 - if !s.f32mode { - bufsz *= 2 + // We need o as an *os.File for SetWriteDeadline. + of, ok := o.(*os.File) + if !ok { + return errors.New("stdout pipe is not an *os.File (bug)") + } + + if err := cmd.Start(); err != nil { + return errors.Wrap(err, "failed to start "+s.argv[0]) + } + + if s.OnStart != nil { + if err := s.OnStart(ctx, cmd); err != nil { + return err + } } framesz := s.cfg.FrameSize @@ -66,36 +79,72 @@ func (s *Session) Start(ctx context.Context, dst [][]input.Sample, kickChan chan f64: !s.f32mode, } - // Allocate 4 times the buffer. We should ensure that we can read some of - // the overflow. - raw := make([]byte, bufsz) - - if err := cmd.Start(); err != nil { - return errors.Wrap(err, "failed to start ffmpeg") + bufsz := s.samples + if !s.f32mode { + bufsz *= 2 } - for { - reader.reset(raw) + raw := make([]byte, bufsz*4) - mu.Lock() - for n := 0; n < s.samples; n++ { - dst[n%framesz][n/framesz] = reader.next() - } - mu.Unlock() + // We double this as a workaround because sampleDuration is less than the + // actual time that ReadFull blocks for some reason, probably because the + // process decides to discard audio when it overflows. + sampleDuration := time.Duration( + float64(s.cfg.SampleSize) / s.cfg.SampleRate * float64(time.Second)) + // We also keep track of whether the deadline was hit once so we can half + // the sample duration. This smooths out the jitter. + var readExpired bool - select { - case <-ctx.Done(): - return ctx.Err() - // default: - case kickChan <- true: + for { + // Set us a read deadline. If the deadline is reached, we'll write zeros + // to the buffer. + timeout := sampleDuration + if !readExpired { + timeout *= 2 + } + if err := of.SetReadDeadline(time.Now().Add(timeout)); err != nil { + return errors.Wrap(err, "failed to set read deadline") } _, err := io.ReadFull(o, raw) if err != nil { - if errors.Is(err, io.EOF) { + switch { + case errors.Is(err, io.EOF): return nil + case errors.Is(err, os.ErrDeadlineExceeded): + readExpired = true + default: + return err } - return err + } else { + readExpired = false + } + + if readExpired { + mu.Lock() + // We can write directly to dst just so we can avoid parsing zero + // bytes to floats. + for _, buf := range dst { + // Go should optimize this to a memclr. + for i := range buf { + buf[i] = 0 + } + } + mu.Unlock() + } else { + reader.reset(raw) + mu.Lock() + for n := 0; n < s.samples; n++ { + dst[n%framesz][n/framesz] = reader.next() + } + mu.Unlock() + } + + // Signal that we've written to dst. + select { + case <-ctx.Done(): + return ctx.Err() + case kickChan <- true: } } } @@ -103,23 +152,21 @@ func (s *Session) Start(ctx context.Context, dst [][]input.Sample, kickChan chan type floatReader struct { order binary.ByteOrder buf []byte - n int64 f64 bool } func (f *floatReader) reset(b []byte) { - f.n = 0 f.buf = b } func (f *floatReader) next() float64 { - n := f.n - if f.f64 { - f.n += 8 - return math.Float64frombits(f.order.Uint64(f.buf[n:])) + b := f.buf[:8] + f.buf = f.buf[8:] + return math.Float64frombits(f.order.Uint64(b)) } - f.n += 4 - return float64(math.Float32frombits(f.order.Uint32(f.buf[n:]))) + b := f.buf[:4] + f.buf = f.buf[4:] + return float64(math.Float32frombits(f.order.Uint32(b))) } diff --git a/input/ffmpeg/ffmpeg.go b/input/ffmpeg/ffmpeg.go index 0f545df..7a70846 100644 --- a/input/ffmpeg/ffmpeg.go +++ b/input/ffmpeg/ffmpeg.go @@ -21,5 +21,5 @@ func NewSession(b FFmpegBackend, cfg input.SessionConfig) (*execread.Session, er "-", ) - return execread.NewSession(args, false, cfg) + return execread.NewSession(args, false, cfg), nil } diff --git a/input/parec/parec.go b/input/parec/parec.go index 628c81e..bd121f4 100644 --- a/input/parec/parec.go +++ b/input/parec/parec.go @@ -3,7 +3,7 @@ package parec import ( "fmt" - "github.com/lawl/pulseaudio" + "github.com/noisetorch/pulseaudio" "github.com/noriah/catnip/input" "github.com/noriah/catnip/input/common/execread" "github.com/pkg/errors" @@ -83,5 +83,5 @@ func NewSession(cfg input.SessionConfig) (*execread.Session, error) { args = append(args, "-d", dv.String()) } - return execread.NewSession(args, true, cfg) + return execread.NewSession(args, true, cfg), nil } diff --git a/input/pipewire/dump.go b/input/pipewire/dump.go new file mode 100644 index 0000000..11b0378 --- /dev/null +++ b/input/pipewire/dump.go @@ -0,0 +1,140 @@ +package pipewire + +import ( + "context" + "encoding/json" + "os" + "os/exec" + + "github.com/pkg/errors" +) + +type pwObjects []pwObject + +func pwDump(ctx context.Context) (pwObjects, error) { + cmd := exec.CommandContext(ctx, "pw-dump") + cmd.Stderr = os.Stderr + + dumpOutput, err := cmd.Output() + if err != nil { + var execErr *exec.ExitError + if errors.As(err, &execErr) { + return nil, errors.Wrapf(err, "failed to run pw-dump: %s", execErr.Stderr) + } + return nil, errors.Wrap(err, "failed to run pw-dump") + } + + var dump pwObjects + if err := json.Unmarshal(dumpOutput, &dump); err != nil { + return nil, errors.Wrap(err, "failed to parse pw-dump output") + } + + return dump, nil +} + +// Filter filters for the devices that satisfies f. +func (d pwObjects) Filter(fns ...func(pwObject) bool) pwObjects { + filtered := make(pwObjects, 0, len(d)) +loop: + for _, device := range d { + for _, f := range fns { + if !f(device) { + continue loop + } + } + filtered = append(filtered, device) + } + return filtered +} + +// Find returns the first object that satisfies f. +func (d pwObjects) Find(f func(pwObject) bool) *pwObject { + for i, device := range d { + if f(device) { + return &d[i] + } + } + return nil +} + +// ResolvePorts returns all PipeWire port objects that belong to the given +// object. +func (d pwObjects) ResolvePorts(object *pwObject, dir pwPortDirection) pwObjects { + return d.Filter( + func(o pwObject) bool { return o.Type == pwInterfacePort }, + func(o pwObject) bool { + return o.Info.Props.NodeID == object.ID && o.Info.Props.PortDirection == dir + }, + ) +} + +type pwObjectID int64 + +type pwObjectType string + +const ( + pwInterfaceDevice pwObjectType = "PipeWire:Interface:Device" + pwInterfaceNode pwObjectType = "PipeWire:Interface:Node" + pwInterfacePort pwObjectType = "PipeWire:Interface:Port" + pwInterfaceLink pwObjectType = "PipeWire:Interface:Link" +) + +type pwObject struct { + ID pwObjectID `json:"id"` + Type pwObjectType `json:"type"` + Info struct { + Props pwInfoProps `json:"props"` + } `json:"info"` +} + +type pwInfoProps struct { + pwDeviceProps + pwNodeProps + pwPortProps + MediaClass string `json:"media.class"` + + JSON json.RawMessage `json:"-"` +} + +func (p *pwInfoProps) UnmarshalJSON(data []byte) error { + type Alias pwInfoProps + if err := json.Unmarshal(data, (*Alias)(p)); err != nil { + return err + } + p.JSON = append([]byte(nil), data...) + return nil +} + +type pwDeviceProps struct { + DeviceName string `json:"device.name"` +} + +// pwNodeProps is for Audio/Sink only. +type pwNodeProps struct { + NodeName string `json:"node.name"` + NodeNick string `json:"node.nick"` + NodeDescription string `json:"node.description"` +} + +// Constants for MediaClass. +const ( + pwAudioDevice string = "Audio/Device" + pwAudioSink string = "Audio/Sink" + pwStreamOutputAudio string = "Stream/Output/Audio" +) + +type pwPortDirection string + +const ( + pwPortIn = "in" + pwPortOut = "out" +) + +type pwPortProps struct { + PortID pwObjectID `json:"port.id"` + PortName string `json:"port.name"` + PortAlias string `json:"port.alias"` + PortDirection pwPortDirection `json:"port.direction"` + NodeID pwObjectID `json:"node.id"` + ObjectPath string `json:"object.path"` +} diff --git a/input/pipewire/link.go b/input/pipewire/link.go new file mode 100644 index 0000000..6848d7d --- /dev/null +++ b/input/pipewire/link.go @@ -0,0 +1,127 @@ +package pipewire + +import ( + "bufio" + "context" + "fmt" + "os" + "os/exec" + "strconv" + "strings" + + "github.com/pkg/errors" +) + +func pwLink(outPortID, inPortID pwObjectID) error { + cmd := exec.Command("pw-link", "-L", fmt.Sprint(outPortID), fmt.Sprint(inPortID)) + if err := cmd.Run(); err != nil { + var exitErr *exec.ExitError + if errors.As(err, &exitErr) && exitErr.Stderr != nil { + return errors.Wrapf(err, "failed to run pw-link: %s", exitErr.Stderr) + } + return err + } + return nil +} + +type pwLinkObject struct { + ID pwObjectID + DeviceName string + PortName string // usually like {input,output}_{FL,FR} +} + +func pwLinkObjectParse(line string) (pwLinkObject, error) { + var obj pwLinkObject + + idStr, portStr, ok := strings.Cut(line, " ") + if !ok { + return obj, fmt.Errorf("failed to parse pw-link object %q", line) + } + + id, err := strconv.Atoi(idStr) + if err != nil { + return obj, errors.Wrapf(err, "failed to parse pw-link object id %q", idStr) + } + + name, port, ok := strings.Cut(portStr, ":") + if !ok { + return obj, fmt.Errorf("failed to parse pw-link port string %q", portStr) + } + + obj = pwLinkObject{ + ID: pwObjectID(id), + DeviceName: name, + PortName: port, + } + + return obj, nil +} + +type pwLinkType string + +const ( + pwLinkInputPorts pwLinkType = "i" + pwLinkOutputPorts pwLinkType = "o" +) + +type pwLinkEvent interface { + pwLinkEvent() +} + +type pwLinkAdd pwLinkObject +type pwLinkRemove pwLinkObject + +func (pwLinkAdd) pwLinkEvent() {} +func (pwLinkRemove) pwLinkEvent() {} + +func pwLinkMonitor(ctx context.Context, typ pwLinkType, ch chan<- pwLinkEvent) error { + cmd := exec.CommandContext(ctx, "pw-link", "-mI"+string(typ)) + cmd.Stderr = os.Stderr + + o, err := cmd.StdoutPipe() + if err != nil { + return errors.Wrap(err, "failed to get stdout pipe") + } + defer o.Close() + + if err := cmd.Start(); err != nil { + return errors.Wrap(err, "pw-link -m") + } + + scanner := bufio.NewScanner(o) + for scanner.Scan() { + line := scanner.Text() + if line == "" { + continue + } + + mark := line[0] + + line = strings.TrimSpace(line[1:]) + + obj, err := pwLinkObjectParse(line) + if err != nil { + continue + } + + var ev pwLinkEvent + switch mark { + case '=': + fallthrough + case '+': + ev = pwLinkAdd(obj) + case '-': + ev = pwLinkRemove(obj) + default: + continue + } + + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- ev: + } + } + + return errors.Wrap(cmd.Wait(), "pw-link exited") +} diff --git a/input/pipewire/pipewire.go b/input/pipewire/pipewire.go new file mode 100644 index 0000000..f980c4c --- /dev/null +++ b/input/pipewire/pipewire.go @@ -0,0 +1,267 @@ +package pipewire + +import ( + "context" + "encoding/base64" + "encoding/binary" + "encoding/json" + "fmt" + "log" + "os" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/noriah/catnip/input" + "github.com/noriah/catnip/input/common/execread" + "github.com/pkg/errors" +) + +func init() { + input.RegisterBackend("pipewire", Backend{}) +} + +type Backend struct{} + +func (p Backend) Init() error { + return nil +} + +func (p Backend) Close() error { + return nil +} + +func (p Backend) Devices() ([]input.Device, error) { + pwObjs, err := pwDump(context.Background()) + if err != nil { + return nil, err + } + + pwSinks := pwObjs.Filter(func(o pwObject) bool { + return o.Type == pwInterfaceNode && + o.Info.Props.MediaClass == pwAudioSink || + o.Info.Props.MediaClass == pwStreamOutputAudio + }) + + devices := make([]input.Device, len(pwSinks)) + for i, device := range pwSinks { + devices[i] = AudioDevice{device.Info.Props.NodeName} + } + + return devices, nil +} + +func (p Backend) DefaultDevice() (input.Device, error) { + return AudioDevice{"auto"}, nil +} + +func (p Backend) Start(cfg input.SessionConfig) (input.Session, error) { + return NewSession(cfg) +} + +type AudioDevice struct { + name string +} + +func (d AudioDevice) String() string { + return d.name +} + +type catnipProps struct { + ApplicationName string `json:"application.name"` + CatnipID string `json:"catnip.id"` +} + +// Session is a PipeWire session. +type Session struct { + session execread.Session + props catnipProps + targetName string +} + +// NewSession creates a new PipeWire session. +func NewSession(cfg input.SessionConfig) (*Session, error) { + currentProps := catnipProps{ + ApplicationName: "catnip", + CatnipID: generateID(), + } + + propsJSON, err := json.Marshal(currentProps) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal props") + } + + dv, ok := cfg.Device.(AudioDevice) + if !ok { + return nil, fmt.Errorf("invalid device type %T", cfg.Device) + } + + target := "0" + if dv.name == "auto" { + target = dv.name + } + + args := []string{ + "pw-cat", + "--record", + "--format", "f32", + "--rate", fmt.Sprint(cfg.SampleRate), + "--latency", fmt.Sprint(cfg.SampleSize), + "--channels", fmt.Sprint(cfg.FrameSize), + "--target", target, // see .relink comment below + "--quality", "0", + "--media-category", "Capture", + "--media-role", "DSP", + "--properties", string(propsJSON), + "-", + } + + return &Session{ + session: *execread.NewSession(args, true, cfg), + props: currentProps, + targetName: dv.name, + }, nil +} + +// Start starts the session. It implements input.Session. +func (s *Session) Start(ctx context.Context, dst [][]input.Sample, kickChan chan bool, mu *sync.Mutex) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + errCh := make(chan error, 1) + setErr := func(err error) { + select { + case errCh <- err: + default: + } + } + + var wg sync.WaitGroup + defer wg.Wait() + + wg.Add(1) + go func() { + defer wg.Done() + setErr(s.session.Start(ctx, dst, kickChan, mu)) + }() + + // No relinking needed if we're not connecting to a specific device. + if s.targetName != "auto" { + wg.Add(1) + go func() { + defer wg.Done() + setErr(s.startRelinker(ctx)) + }() + } + + return <-errCh +} + +// We do a bit of tomfoolery here. Wireplumber actually is pretty incompetent at +// handling target.device, so our --target flag is pretty much useless. We have +// to do the node links ourselves. +// +// Relevant issues: +// +// - https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/2731 +// - https://gitlab.freedesktop.org/pipewire/wireplumber/-/issues/358 +// +func (s *Session) startRelinker(ctx context.Context) error { + var catnipPorts map[string]pwObjectID + var err error + // Employ this awful hack to get the needed port IDs for our session. We + // won't rely on the pwLinkMonitor below, since it may appear out of order. + for i := 0; i < 20; i++ { + catnipPorts, err = findCatnipPorts(ctx, s.props) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + if err != nil { + return errors.Wrap(err, "failed to find catnip's input ports") + } + + linkEvents := make(chan pwLinkEvent) + linkError := make(chan error, 1) + go func() { linkError <- pwLinkMonitor(ctx, pwLinkOutputPorts, linkEvents) }() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-linkError: + return err + case event := <-linkEvents: + switch event := event.(type) { + case pwLinkAdd: + if event.DeviceName == s.targetName { + catnipPort := "input_" + strings.TrimPrefix(event.PortName, "output_") + catnipPortID := catnipPorts[catnipPort] + targetPortID := event.ID + + // Link the catnip node to the device node. + if err := pwLink(targetPortID, catnipPortID); err != nil { + log.Printf( + "failed to link catnip port %d to device port %d: %v", + catnipPortID, targetPortID, err) + } + } + } + } + } +} + +func findCatnipPorts(ctx context.Context, ourProps catnipProps) (map[string]pwObjectID, error) { + objs, err := pwDump(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to get pw-dump") + } + + // Find the catnip node. + nodeObj := objs.Find(func(obj pwObject) bool { + if obj.Type != pwInterfaceNode { + return false + } + var props catnipProps + err := json.Unmarshal(obj.Info.Props.JSON, &props) + return err == nil && props == ourProps + }) + if nodeObj == nil { + return nil, errors.New("failed to find catnip node in PipeWire") + } + + // Find all of catnip's ports. We want catnip's input ports. + portObjs := objs.ResolvePorts(nodeObj, pwPortIn) + if len(portObjs) == 0 { + return nil, errors.New("failed to find any catnip port in PipeWire") + } + + portMap := make(map[string]pwObjectID) + for _, obj := range portObjs { + portMap[obj.Info.Props.PortName] = obj.ID + } + + return portMap, nil +} + +var sessionCounter uint64 + +// generateID generates a unique ID for this session. +func generateID() string { + return fmt.Sprintf( + "%d@%s#%d", + os.Getpid(), + shortEpoch(), + atomic.AddUint64(&sessionCounter, 1), + ) +} + +// shortEpoch generates a small string that is unique to the current epoch. +func shortEpoch() string { + now := time.Now().Unix() + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], uint64(now)) + return base64.RawURLEncoding.EncodeToString(buf[:]) +}