Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Improve connect op metrics #186

Merged
merged 4 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '^1.20'
go-version: '^1.21'

- name: Get dependencies
run: go mod download
Expand All @@ -46,7 +46,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '^1.20'
go-version: '^1.21'

- name: Get dependencies
run: go mod download
Expand Down
6 changes: 3 additions & 3 deletions crew/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (t *Tunnel) connectWorker(ctx context.Context) (err error) {
// TODO: Clean this up.
t.connInfo.Lock()
defer t.connInfo.Unlock()
t.connInfo.Failed(fmt.Sprintf("failed to establish route: %s", err), "")
t.connInfo.Failed(fmt.Sprintf("SPN failed to establish route: %s", err), "")
t.connInfo.Save()

tracer.Warningf("spn/crew: failed to establish route for %s: %s", t.connInfo, err)
Expand All @@ -97,11 +97,11 @@ func (t *Tunnel) connectWorker(ctx context.Context) (err error) {

t.connInfo.Lock()
defer t.connInfo.Unlock()
t.connInfo.Failed(tErr.Error(), "")
t.connInfo.Failed(fmt.Sprintf("SPN failed to initialize data tunnel (connect op): %s", tErr.Error()), "")
t.connInfo.Save()

// TODO: try with another route?
tracer.Warningf("spn/crew: failed to initialize tunnel for %s: %s", t.connInfo, err)
tracer.Warningf("spn/crew: failed to initialize data tunnel (connect op) for %s: %s", t.connInfo, err)
return tErr
}

Expand Down
75 changes: 72 additions & 3 deletions crew/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ import (
)

var (
newConnectOp *metrics.Counter
connectOpCnt *metrics.Counter
connectOpCntError *metrics.Counter
connectOpCntBadRequest *metrics.Counter
connectOpCntCanceled *metrics.Counter
connectOpCntFailed *metrics.Counter
connectOpCntConnected *metrics.Counter
connectOpCntRateLimited *metrics.Counter

connectOpIncomingBytes *metrics.Counter
connectOpOutgoingBytes *metrics.Counter

Expand All @@ -29,9 +36,9 @@ func registerMetrics() (err error) {
return nil
}

// Connect Op Stats.
// Connect Op Stats on client.

newConnectOp, err = metrics.NewCounter(
connectOpCnt, err = metrics.NewCounter(
"spn/op/connect/total",
nil,
&metrics.Options{
Expand All @@ -45,6 +52,68 @@ func registerMetrics() (err error) {
return err
}

// Connect Op Stats on server.

connectOpCntOptions := &metrics.Options{
Name: "SPN Total Connect Operations",
Permission: api.PermitUser,
Persist: true,
}

connectOpCntError, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "error"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntBadRequest, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "bad_request"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntCanceled, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "canceled"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntFailed, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "failed"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntConnected, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "connected"},
connectOpCntOptions,
)
if err != nil {
return err
}

connectOpCntRateLimited, err = metrics.NewCounter(
"spn/op/connect/total",
map[string]string{"result": "rate_limited"},
connectOpCntOptions,
)
if err != nil {
return err
}

_, err = metrics.NewGauge(
"spn/op/connect/active",
nil,
Expand Down
75 changes: 59 additions & 16 deletions crew/op_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func init() {
// NewConnectOp starts a new connect operation.
func NewConnectOp(tunnel *Tunnel) (*ConnectOp, *terminal.Error) {
// Submit metrics.
newConnectOp.Inc()
connectOpCnt.Inc()

// Create request.
request := &ConnectRequest{
Expand Down Expand Up @@ -168,9 +168,6 @@ func NewConnectOp(tunnel *Tunnel) (*ConnectOp, *terminal.Error) {
}

func startConnectOp(t terminal.Terminal, opID uint32, data *container.Container) (terminal.Operation, *terminal.Error) {
// Submit metrics.
newConnectOp.Inc()

// Check if we are running a public hub.
if !conf.PublicHub() {
return nil, terminal.ErrPermissionDenied.With("connecting is only allowed on public hubs")
Expand All @@ -180,14 +177,17 @@ func startConnectOp(t terminal.Terminal, opID uint32, data *container.Container)
request := &ConnectRequest{}
_, err := dsd.Load(data.CompileData(), request)
if err != nil {
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
return nil, terminal.ErrMalformedData.With("failed to parse connect request: %w", err)
}
if request.QueueSize == 0 || request.QueueSize > terminal.MaxQueueSize {
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
return nil, terminal.ErrInvalidOptions.With("invalid queue size of %d", request.QueueSize)
}

// Check if IP seems valid.
if len(request.IP) != net.IPv4len && len(request.IP) != net.IPv6len {
connectOpCntError.Inc() // More like a protocol/system error than a bad request.
return nil, terminal.ErrInvalidOptions.With("ip address is not valid")
}

Expand All @@ -213,6 +213,7 @@ func (op *ConnectOp) handleSetup(_ context.Context) error {
if sessionTerm, ok := op.t.(terminal.SessionTerminal); ok {
session = sessionTerm.GetSession()
} else {
connectOpCntError.Inc()
log.Errorf("spn/crew: %T is not a session terminal, aborting op %s#%d", op.t, op.t.FmtID(), op.ID())
op.Stop(op, terminal.ErrInternalError.With("no session available"))
return nil
Expand All @@ -225,6 +226,7 @@ func (op *ConnectOp) handleSetup(_ context.Context) error {

// If context was canceled, stop operation.
if cancelErr != nil {
connectOpCntCanceled.Inc()
op.Stop(op, terminal.ErrCanceled.With(cancelErr.Error()))
}

Expand All @@ -235,11 +237,14 @@ func (op *ConnectOp) handleSetup(_ context.Context) error {
func (op *ConnectOp) setup(session *terminal.Session) {
// Rate limit before connecting.
if tErr := session.RateLimit(); tErr != nil {
// Fake connection error when rate limited.
// Add rate limit info to error.
if tErr.Is(terminal.ErrRateLimited) {
connectOpCntRateLimited.Inc()
op.Stop(op, tErr.With(session.RateLimitInfo()))
return
}

connectOpCntError.Inc()
op.Stop(op, tErr)
return
}
Expand All @@ -248,27 +253,31 @@ func (op *ConnectOp) setup(session *terminal.Session) {
ipScope := netutils.GetIPScope(op.request.IP)
if ipScope != netutils.Global {
session.ReportSuspiciousActivity(terminal.SusFactorQuiteUnusual)
connectOpCntBadRequest.Inc()
op.Stop(op, terminal.ErrPermissionDenied.With("denied request to connect to non-global IP %s", op.request.IP))
return
}

// Check exit policy.
if tErr := checkExitPolicy(op.request); tErr != nil {
session.ReportSuspiciousActivity(terminal.SusFactorQuiteUnusual)
connectOpCntBadRequest.Inc()
op.Stop(op, tErr)
return
}

// Check one last time before connecting if operation was not canceled.
if op.Ctx().Err() != nil {
op.Stop(op, terminal.ErrCanceled.With(op.Ctx().Err().Error()))
connectOpCntCanceled.Inc()
return
}

// Connect to destination.
dialNet := op.request.DialNetwork()
if dialNet == "" {
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
connectOpCntBadRequest.Inc()
op.Stop(op, terminal.ErrIncorrectUsage.With("protocol %s is not supported", op.request.Protocol))
return
}
Expand All @@ -285,10 +294,13 @@ func (op *ConnectOp) setup(session *terminal.Session) {
switch {
case errors.As(err, &netError) && netError.Timeout():
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
connectOpCntFailed.Inc()
case errors.Is(err, context.Canceled):
session.ReportSuspiciousActivity(terminal.SusFactorCommon)
connectOpCntCanceled.Inc()
default:
session.ReportSuspiciousActivity(terminal.SusFactorWeirdButOK)
connectOpCntFailed.Inc()
}

op.Stop(op, terminal.ErrConnectionError.With("failed to connect to %s: %w", op.request, err))
Expand All @@ -301,6 +313,7 @@ func (op *ConnectOp) setup(session *terminal.Session) {
module.StartWorker("connect op conn writer", op.connWriter)
module.StartWorker("connect op flow handler", op.dfq.FlowHandler)

connectOpCntConnected.Inc()
log.Infof("spn/crew: connected op %s#%d to %s", op.t.FmtID(), op.ID(), op.request)
}

Expand Down Expand Up @@ -516,18 +529,48 @@ func (op *ConnectOp) HandleStop(err *terminal.Error) (errorToSend *terminal.Erro
// Cancel workers.
op.cancelCtx()

// Avoid connecting to destination via this Hub if the was a connection
// error and no data was received.
if op.entry && // On clients only.
err.Is(terminal.ErrConnectionError) &&
op.outgoingTraffic.Load() == 0 {
// Only if no data was received (ie. sent to local application).
op.tunnel.avoidDestinationHub()
}
// Special client-side handling.
if op.entry {
// Mark the connection as failed if there was an error and no data was sent to the app yet.
if err.IsError() && op.outgoingTraffic.Load() == 0 {
// Set connection to failed and save it to propagate the update.
dhaavi marked this conversation as resolved.
Show resolved Hide resolved
c := op.tunnel.connInfo
func() {
c.Lock()
defer c.Unlock()

if err.IsExternal() {
c.Failed(fmt.Sprintf(
"the exit node reported an error: %s", err,
), "")
} else {
c.Failed(fmt.Sprintf(
"connection failed locally: %s", err,
), "")
}

// If we are on the client, don't leak local errors to the server.
if op.entry && !err.IsExternal() {
return terminal.ErrStopping
c.Save()
}()
}

// Avoid connecting to the destination via this Hub if:
// - The error is external - ie. from the server.
// - The error is a connection error.
// - No data was received.
// This indicates that there is some network level issue that we can
// possibly work around by using another exit node.
if err.IsError() && err.IsExternal() &&
err.Is(terminal.ErrConnectionError) &&
op.outgoingTraffic.Load() == 0 {
op.tunnel.avoidDestinationHub()
}

// Don't leak local errors to the server.
if !err.IsExternal() {
// Change error that is reported.
return terminal.ErrStopping
}
}

return err
}
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/safing/spn

go 1.19
go 1.21

require (
github.com/awalterschulze/gographviz v2.0.3+incompatible
Expand All @@ -11,12 +11,12 @@ require (
github.com/rot256/pblind v0.0.0-20230622102829-4dc2c6e4b857
github.com/safing/jess v0.3.1
github.com/safing/portbase v0.18.2
github.com/safing/portmaster v1.4.9
github.com/safing/portmaster v1.4.10-0.20231006102818-4f0adc87e70c
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.4
github.com/tevino/abool v1.2.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/net v0.15.0
golang.org/x/exp v0.0.0-20231005195138-3e424a577f31
golang.org/x/net v0.16.0
)

require (
Expand Down Expand Up @@ -68,13 +68,13 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.etcd.io/bbolt v1.3.7 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.13.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gvisor.dev/gvisor v0.0.0-20220817001344-846276b3dbc5 // indirect
gvisor.dev/gvisor v0.0.0-20231006032704-15cc3fcbbd77 // indirect
)
Loading
Loading