From 26028225c492141bd18ecd69070154ec1f305a9b Mon Sep 17 00:00:00 2001 From: Justin Gallardo Date: Mon, 18 Nov 2024 04:22:24 -0800 Subject: [PATCH] Add tracing support via open telemetry --- go.mod | 44 ++++++---- go.sum | 92 +++++++++++--------- internal/connector/connector.go | 18 ++++ pkg/cli/commands.go | 32 +++++-- pkg/cli/service_unix.go | 3 +- pkg/cli/service_windows.go | 10 ++- pkg/connectorbuilder/connectorbuilder.go | 63 ++++++++++++++ pkg/dotc1z/assets.go | 6 ++ pkg/dotc1z/c1file.go | 12 +++ pkg/dotc1z/clone_sync.go | 3 + pkg/dotc1z/dotc1z.go | 4 + pkg/dotc1z/entitlements.go | 9 ++ pkg/dotc1z/grants.go | 18 ++++ pkg/dotc1z/manager/local/local.go | 21 ++++- pkg/dotc1z/manager/s3/s3.go | 23 ++++- pkg/dotc1z/resouce_types.go | 12 ++- pkg/dotc1z/resources.go | 9 ++ pkg/dotc1z/sql_helpers.go | 12 +++ pkg/dotc1z/sync_runs.go | 48 +++++++++++ pkg/field/defaults.go | 2 + pkg/provisioner/provisioner.go | 27 ++++++ pkg/sync/syncer.go | 76 +++++++++++++++++ pkg/tasks/c1api/bulk_create_tickets.go | 3 + pkg/tasks/c1api/bulk_get_tickets.go | 10 ++- pkg/tasks/c1api/c1api.go | 5 ++ pkg/tasks/c1api/create_account.go | 3 + pkg/tasks/c1api/create_resource.go | 3 + pkg/tasks/c1api/create_ticket.go | 3 + pkg/tasks/c1api/debug.go | 3 + pkg/tasks/c1api/delete_resource.go | 3 + pkg/tasks/c1api/full_sync.go | 16 +++- pkg/tasks/c1api/get_ticket.go | 10 ++- pkg/tasks/c1api/grant.go | 3 + pkg/tasks/c1api/hello.go | 10 ++- pkg/tasks/c1api/list_ticket_schemas.go | 3 + pkg/tasks/c1api/manager.go | 10 +++ pkg/tasks/c1api/revoke.go | 3 + pkg/tasks/c1api/rotate_credentials.go | 3 + pkg/tasks/c1api/service_client.go | 15 ++++ pkg/tasks/c1api/task_helpers.go | 9 ++ pkg/tasks/local/accounter.go | 5 ++ pkg/tasks/local/deleter.go | 5 ++ pkg/tasks/local/event_feed.go | 4 + pkg/tasks/local/granter.go | 5 ++ pkg/tasks/local/local.go | 5 ++ pkg/tasks/local/revoker.go | 5 ++ pkg/tasks/local/rotator.go | 5 ++ pkg/tasks/local/syncer.go | 5 ++ pkg/tasks/local/ticket.go | 7 +- pkg/uotel/uotel.go | 102 +++++++++++++++++++++++ 50 files changed, 723 insertions(+), 84 deletions(-) create mode 100644 pkg/tasks/c1api/c1api.go create mode 100644 pkg/tasks/local/local.go create mode 100644 pkg/uotel/uotel.go diff --git a/go.mod b/go.mod index 724a56b9..c95bfae6 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/conductorone/baton-sdk -go 1.21 +go 1.22.7 + +toolchain go1.22.9 require ( filippo.io/age v1.1.1 @@ -14,7 +16,7 @@ require ( github.com/aws/smithy-go v1.20.2 github.com/deckarep/golang-set/v2 v2.6.0 github.com/doug-martin/goqu/v9 v9.19.0 - github.com/envoyproxy/protoc-gen-validate v1.0.4 + github.com/envoyproxy/protoc-gen-validate v1.1.0 github.com/glebarez/go-sqlite v1.22.0 github.com/go-jose/go-jose/v3 v3.0.3 github.com/golang/protobuf v1.5.4 @@ -28,20 +30,24 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 - go.opentelemetry.io/otel v1.27.0 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0 + go.opentelemetry.io/otel v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.27.0 - go.opentelemetry.io/otel/metric v1.27.0 - go.opentelemetry.io/otel/sdk/metric v1.27.0 + go.opentelemetry.io/otel/metric v1.32.0 + go.opentelemetry.io/otel/sdk v1.32.0 + go.opentelemetry.io/otel/sdk/metric v1.32.0 go.uber.org/ratelimit v0.3.1 go.uber.org/zap v1.27.0 - golang.org/x/crypto v0.24.0 - golang.org/x/net v0.26.0 - golang.org/x/oauth2 v0.20.0 - golang.org/x/sync v0.7.0 - golang.org/x/sys v0.21.0 - google.golang.org/genproto/googleapis/rpc v0.0.0-20240506185236-b8a5c65736ae - google.golang.org/grpc v1.63.2 - google.golang.org/protobuf v1.34.1 + golang.org/x/crypto v0.28.0 + golang.org/x/net v0.30.0 + golang.org/x/oauth2 v0.23.0 + golang.org/x/sync v0.9.0 + golang.org/x/sys v0.27.0 + google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 + google.golang.org/grpc v1.68.0 + google.golang.org/protobuf v1.35.1 gopkg.in/square/go-jose.v2 v2.6.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -61,13 +67,15 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect github.com/benbjohnson/clock v1.3.5 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect @@ -90,14 +98,16 @@ require ( github.com/tklauser/go-sysconf v0.3.14 // indirect github.com/tklauser/numcpus v0.8.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.opentelemetry.io/otel/sdk v1.27.0 // indirect - go.opentelemetry.io/otel/trace v1.27.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect + go.opentelemetry.io/otel/trace v1.32.0 // indirect + go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.18.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/text v0.20.0 // indirect golang.org/x/tools v0.22.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect modernc.org/libc v1.50.5 // indirect diff --git a/go.sum b/go.sum index b34351be..fe234c35 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -68,8 +70,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= -github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= +github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -81,8 +83,8 @@ github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQr github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= @@ -111,6 +113,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN9coASF1GusYX6AlU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -161,8 +165,8 @@ github.com/pquerna/xjwt v0.2.0/go.mod h1:xkrUYjBzqP3vBET2QdLkjLTcpPFa0bhPa3H445N github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= @@ -217,18 +221,28 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= -go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0 h1:qtFISDHKolvIxzSs0gIaiPUPR0Cucb0F2coHC7ZLdps= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0/go.mod h1:Y+Pop1Q6hCOnETWTW4NROK/q1hv50hM7yDaUTjG8lp8= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0 h1:j7ZSD+5yn+lo3sGV69nW04rRR0jhYnBwjuX3r0HvnK0= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.32.0/go.mod h1:WXbYJTUaZXAbYd8lbgGuvih0yuCfOFC5RJoYnoLcGz8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 h1:IJFEoHiytixx8cMiVAO+GmHR6Frwu+u5Ur8njpFO6Ac= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0/go.mod h1:3rHrKNtLIoS0oZwkY2vxi+oJcwFRWdtUyRII+so45p8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0 h1:9kV11HXBHZAvuPUZxmMWrH8hZn/6UnHX4K0mu36vNsU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.32.0/go.mod h1:JyA0FHXe22E1NeNiHmVp7kFHglnexDQ7uRWDiiJ1hKQ= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.27.0 h1:/jlt1Y8gXWiHG9FBx6cJaIC5hYx5Fe64nC8w5Cylt/0= go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.27.0/go.mod h1:bmToOGOBZ4hA9ghphIc1PAf66VA8KOtsuy3+ScStG20= -go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= -go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= -go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI= -go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A= -go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2NemcCrOL8gI= -go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= -go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= -go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -250,8 +264,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= @@ -277,11 +291,11 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= -golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= +golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -289,8 +303,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -310,23 +324,23 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= -golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= -golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= +golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -349,21 +363,23 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240506185236-b8a5c65736ae h1:c55+MER4zkBS14uJhSZMGGmya0yJx5iHV4x/fpOSNRk= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240506185236-b8a5c65736ae/go.mod h1:I7Y+G38R2bu5j1aLzfFmQfTcU/WnFuqDwLZAbvKTKpM= +google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g= +google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= -google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= -google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= +google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= diff --git a/internal/connector/connector.go b/internal/connector/connector.go index 52c2759f..e12fe823 100644 --- a/internal/connector/connector.go +++ b/internal/connector/connector.go @@ -13,6 +13,8 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/propagation" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -152,6 +154,14 @@ func (cw *wrapper) Run(ctx context.Context, serverCfg *connectorwrapperV1.Server grpc.Creds(credentials.NewTLS(tlsConfig)), grpc.ChainUnaryInterceptor(ugrpc.UnaryServerInterceptor(ctx)...), grpc.ChainStreamInterceptor(ugrpc.StreamServerInterceptors(ctx)...), + grpc.StatsHandler(otelgrpc.NewServerHandler( + otelgrpc.WithPropagators( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ), + )), ) connectorV2.RegisterConnectorServiceServer(server, cw.server) connectorV2.RegisterGrantsServiceServer(server, cw.server) @@ -311,6 +321,14 @@ func (cw *wrapper) C(ctx context.Context) (types.ConnectorClient, error) { grpc.WithTransportCredentials(credentials.NewTLS(clientTLSConfig)), grpc.WithBlock(), grpc.WithChainUnaryInterceptor(ratelimit2.UnaryInterceptor(cw.now, cw.rlDescriptors...)), + grpc.WithStatsHandler(otelgrpc.NewClientHandler( + otelgrpc.WithPropagators( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ), + )), ) if err != nil { dialErr = err diff --git a/pkg/cli/commands.go b/pkg/cli/commands.go index c4c261a3..6b4e65c7 100644 --- a/pkg/cli/commands.go +++ b/pkg/cli/commands.go @@ -7,13 +7,6 @@ import ( "fmt" "os" - "github.com/conductorone/baton-sdk/internal/connector" - v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" - v1 "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1" - "github.com/conductorone/baton-sdk/pkg/connectorrunner" - "github.com/conductorone/baton-sdk/pkg/field" - "github.com/conductorone/baton-sdk/pkg/logging" - "github.com/conductorone/baton-sdk/pkg/types" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -21,6 +14,15 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + + "github.com/conductorone/baton-sdk/internal/connector" + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + v1 "github.com/conductorone/baton-sdk/pb/c1/connector_wrapper/v1" + "github.com/conductorone/baton-sdk/pkg/connectorrunner" + "github.com/conductorone/baton-sdk/pkg/field" + "github.com/conductorone/baton-sdk/pkg/logging" + "github.com/conductorone/baton-sdk/pkg/types" + "github.com/conductorone/baton-sdk/pkg/uotel" ) type GetConnectorFunc func(context.Context, *viper.Viper) (types.ConnectorServer, error) @@ -53,6 +55,12 @@ func MakeMainCommand( return err } + otelShutdown, err := uotel.InitOtel(context.Background(), v.GetString("otel-collector-endpoint"), name) + if err != nil { + return err + } + defer otelShutdown(context.Background()) + l := ctxzap.Extract(runCtx) if isService() { @@ -210,6 +218,16 @@ func MakeGRPCServerCommand( return err } + otelShutdown, err := uotel.InitOtel( + context.Background(), + v.GetString("otel-collector-endpoint"), + fmt.Sprintf("%s-grpc", name), + ) + if err != nil { + return err + } + defer otelShutdown(context.Background()) + // validate required fields and relationship constraints if err := field.Validate(confschema, v); err != nil { return err diff --git a/pkg/cli/service_unix.go b/pkg/cli/service_unix.go index d9e8ad8f..48d6aee6 100644 --- a/pkg/cli/service_unix.go +++ b/pkg/cli/service_unix.go @@ -5,9 +5,10 @@ package cli import ( "context" + "github.com/spf13/cobra" + "github.com/conductorone/baton-sdk/pkg/field" "github.com/conductorone/baton-sdk/pkg/logging" - "github.com/spf13/cobra" ) func isService() bool { diff --git a/pkg/cli/service_windows.go b/pkg/cli/service_windows.go index 5ed0fed7..9d7d95bc 100644 --- a/pkg/cli/service_windows.go +++ b/pkg/cli/service_windows.go @@ -13,16 +13,18 @@ import ( "strconv" "time" - "github.com/conductorone/baton-sdk/pkg/field" - "github.com/conductorone/baton-sdk/pkg/logging" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" - "github.com/spf13/cobra" "go.uber.org/zap" "golang.org/x/sys/windows/svc" "golang.org/x/sys/windows/svc/debug" "golang.org/x/sys/windows/svc/eventlog" "golang.org/x/sys/windows/svc/mgr" "gopkg.in/yaml.v2" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/spf13/cobra" + + "github.com/conductorone/baton-sdk/pkg/field" + "github.com/conductorone/baton-sdk/pkg/logging" ) const ( diff --git a/pkg/connectorbuilder/connectorbuilder.go b/pkg/connectorbuilder/connectorbuilder.go index b85e1d9d..edd197da 100644 --- a/pkg/connectorbuilder/connectorbuilder.go +++ b/pkg/connectorbuilder/connectorbuilder.go @@ -8,6 +8,7 @@ import ( "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.opentelemetry.io/otel" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -24,6 +25,8 @@ import ( "github.com/conductorone/baton-sdk/pkg/uhttp" ) +var tracer = otel.Tracer("baton-sdk/pkg.connectorbuilder") + type ResourceSyncer interface { ResourceType(ctx context.Context) *v2.ResourceType List(ctx context.Context, parentResourceID *v2.ResourceId, pToken *pagination.Token) ([]*v2.Resource, string, annotations.Annotations, error) @@ -96,6 +99,9 @@ type builderImpl struct { } func (b *builderImpl) BulkCreateTickets(ctx context.Context, request *v2.TicketsServiceBulkCreateTicketsRequest) (*v2.TicketsServiceBulkCreateTicketsResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.BulkCreateTickets") + defer span.End() + start := b.nowFunc() tt := tasks.BulkCreateTicketsType if b.ticketManager == nil { @@ -122,6 +128,9 @@ func (b *builderImpl) BulkCreateTickets(ctx context.Context, request *v2.Tickets } func (b *builderImpl) BulkGetTickets(ctx context.Context, request *v2.TicketsServiceBulkGetTicketsRequest) (*v2.TicketsServiceBulkGetTicketsResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.BulkGetTickets") + defer span.End() + start := b.nowFunc() tt := tasks.BulkGetTicketsType if b.ticketManager == nil { @@ -148,6 +157,9 @@ func (b *builderImpl) BulkGetTickets(ctx context.Context, request *v2.TicketsSer } func (b *builderImpl) ListTicketSchemas(ctx context.Context, request *v2.TicketsServiceListTicketSchemasRequest) (*v2.TicketsServiceListTicketSchemasResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.ListTicketSchemas") + defer span.End() + start := b.nowFunc() tt := tasks.ListTicketSchemasType if b.ticketManager == nil { @@ -177,6 +189,9 @@ func (b *builderImpl) ListTicketSchemas(ctx context.Context, request *v2.Tickets } func (b *builderImpl) CreateTicket(ctx context.Context, request *v2.TicketsServiceCreateTicketRequest) (*v2.TicketsServiceCreateTicketResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.CreateTicket") + defer span.End() + start := b.nowFunc() tt := tasks.CreateTicketType if b.ticketManager == nil { @@ -219,6 +234,9 @@ func (b *builderImpl) CreateTicket(ctx context.Context, request *v2.TicketsServi } func (b *builderImpl) GetTicket(ctx context.Context, request *v2.TicketsServiceGetTicketRequest) (*v2.TicketsServiceGetTicketResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.GetTicket") + defer span.End() + start := b.nowFunc() tt := tasks.GetTicketType if b.ticketManager == nil { @@ -247,6 +265,9 @@ func (b *builderImpl) GetTicket(ctx context.Context, request *v2.TicketsServiceG } func (b *builderImpl) GetTicketSchema(ctx context.Context, request *v2.TicketsServiceGetTicketSchemaRequest) (*v2.TicketsServiceGetTicketSchemaResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.GetTicketSchema") + defer span.End() + start := b.nowFunc() tt := tasks.GetTicketSchemaType if b.ticketManager == nil { @@ -402,6 +423,9 @@ func (b *builderImpl) ListResourceTypes( ctx context.Context, request *v2.ResourceTypesServiceListResourceTypesRequest, ) (*v2.ResourceTypesServiceListResourceTypesResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.ListResourceTypes") + defer span.End() + start := b.nowFunc() tt := tasks.ListResourceTypesType var out []*v2.ResourceType @@ -423,6 +447,9 @@ func (b *builderImpl) ListResourceTypes( // ListResources returns all available resources for a given resource type ID. func (b *builderImpl) ListResources(ctx context.Context, request *v2.ResourcesServiceListResourcesRequest) (*v2.ResourcesServiceListResourcesResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.ListResources") + defer span.End() + start := b.nowFunc() tt := tasks.ListResourcesType rb, ok := b.resourceBuilders[request.ResourceTypeId] @@ -455,6 +482,9 @@ func (b *builderImpl) ListResources(ctx context.Context, request *v2.ResourcesSe // ListEntitlements returns all the entitlements for a given resource. func (b *builderImpl) ListEntitlements(ctx context.Context, request *v2.EntitlementsServiceListEntitlementsRequest) (*v2.EntitlementsServiceListEntitlementsResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.ListEntitlements") + defer span.End() + start := b.nowFunc() tt := tasks.ListEntitlementsType rb, ok := b.resourceBuilders[request.Resource.Id.ResourceType] @@ -487,6 +517,9 @@ func (b *builderImpl) ListEntitlements(ctx context.Context, request *v2.Entitlem // ListGrants lists all the grants for a given resource. func (b *builderImpl) ListGrants(ctx context.Context, request *v2.GrantsServiceListGrantsRequest) (*v2.GrantsServiceListGrantsResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.ListGrants") + defer span.End() + start := b.nowFunc() tt := tasks.ListGrantsType rid := request.Resource.Id @@ -522,6 +555,9 @@ func (b *builderImpl) ListGrants(ctx context.Context, request *v2.GrantsServiceL // GetMetadata gets all metadata for a connector. func (b *builderImpl) GetMetadata(ctx context.Context, request *v2.ConnectorServiceGetMetadataRequest) (*v2.ConnectorServiceGetMetadataResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.GetMetadata") + defer span.End() + start := b.nowFunc() tt := tasks.GetMetadataType md, err := b.cb.Metadata(ctx) @@ -602,6 +638,9 @@ func getCapabilities(ctx context.Context, b *builderImpl) *v2.ConnectorCapabilit // Validate validates the connector. func (b *builderImpl) Validate(ctx context.Context, request *v2.ConnectorServiceValidateRequest) (*v2.ConnectorServiceValidateResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.Validate") + defer span.End() + annos, err := b.cb.Validate(ctx) if err != nil { return nil, err @@ -611,6 +650,9 @@ func (b *builderImpl) Validate(ctx context.Context, request *v2.ConnectorService } func (b *builderImpl) Grant(ctx context.Context, request *v2.GrantManagerServiceGrantRequest) (*v2.GrantManagerServiceGrantResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.Grant") + defer span.End() + start := b.nowFunc() tt := tasks.GrantType l := ctxzap.Extract(ctx) @@ -648,6 +690,9 @@ func (b *builderImpl) Grant(ctx context.Context, request *v2.GrantManagerService } func (b *builderImpl) Revoke(ctx context.Context, request *v2.GrantManagerServiceRevokeRequest) (*v2.GrantManagerServiceRevokeResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.Revoke") + defer span.End() + start := b.nowFunc() tt := tasks.RevokeType @@ -686,10 +731,16 @@ func (b *builderImpl) Revoke(ctx context.Context, request *v2.GrantManagerServic // GetAsset streams the asset to the client. // FIXME(jirwin): Asset streaming is disabled. func (b *builderImpl) GetAsset(request *v2.AssetServiceGetAssetRequest, server v2.AssetService_GetAssetServer) error { + _, span := tracer.Start(server.Context(), "builderImpl.GetAsset") + defer span.End() + return nil } func (b *builderImpl) ListEvents(ctx context.Context, request *v2.ListEventsRequest) (*v2.ListEventsResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.ListEvents") + defer span.End() + start := b.nowFunc() tt := tasks.ListEventsType if b.eventFeed == nil { @@ -714,6 +765,9 @@ func (b *builderImpl) ListEvents(ctx context.Context, request *v2.ListEventsRequ } func (b *builderImpl) CreateResource(ctx context.Context, request *v2.CreateResourceRequest) (*v2.CreateResourceResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.CreateResource") + defer span.End() + start := b.nowFunc() tt := tasks.CreateResourceType l := ctxzap.Extract(ctx) @@ -735,6 +789,9 @@ func (b *builderImpl) CreateResource(ctx context.Context, request *v2.CreateReso } func (b *builderImpl) DeleteResource(ctx context.Context, request *v2.DeleteResourceRequest) (*v2.DeleteResourceResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.DeleteResource") + defer span.End() + start := b.nowFunc() tt := tasks.DeleteResourceType @@ -757,6 +814,9 @@ func (b *builderImpl) DeleteResource(ctx context.Context, request *v2.DeleteReso } func (b *builderImpl) RotateCredential(ctx context.Context, request *v2.RotateCredentialRequest) (*v2.RotateCredentialResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.RotateCredential") + defer span.End() + start := b.nowFunc() tt := tasks.RotateCredentialsType l := ctxzap.Extract(ctx) @@ -801,6 +861,9 @@ func (b *builderImpl) RotateCredential(ctx context.Context, request *v2.RotateCr } func (b *builderImpl) CreateAccount(ctx context.Context, request *v2.CreateAccountRequest) (*v2.CreateAccountResponse, error) { + ctx, span := tracer.Start(ctx, "builderImpl.CreateAccount") + defer span.End() + start := b.nowFunc() tt := tasks.CreateAccountType l := ctxzap.Extract(ctx) diff --git a/pkg/dotc1z/assets.go b/pkg/dotc1z/assets.go index 2347a5be..6fc9066f 100644 --- a/pkg/dotc1z/assets.go +++ b/pkg/dotc1z/assets.go @@ -51,6 +51,9 @@ func (r *assetsTable) Schema() (string, []interface{}) { // PutAsset stores the given asset in the database. func (c *C1File) PutAsset(ctx context.Context, assetRef *v2.AssetRef, contentType string, data []byte) error { + ctx, span := tracer.Start(ctx, "C1File.PutAsset") + defer span.End() + l := ctxzap.Extract(ctx) if len(data) == 0 { @@ -98,6 +101,9 @@ func (c *C1File) PutAsset(ctx context.Context, assetRef *v2.AssetRef, contentTyp // GetAsset fetches the specified asset from the database, and returns the content type and an io.Reader for the caller to // read the asset from. func (c *C1File) GetAsset(ctx context.Context, request *v2.AssetServiceGetAssetRequest) (string, io.Reader, error) { + ctx, span := tracer.Start(ctx, "C1File.GetAsset") + defer span.End() + err := c.validateDb(ctx) if err != nil { return "", nil, err diff --git a/pkg/dotc1z/c1file.go b/pkg/dotc1z/c1file.go index 04a35292..3aa2a000 100644 --- a/pkg/dotc1z/c1file.go +++ b/pkg/dotc1z/c1file.go @@ -56,6 +56,9 @@ func WithC1FPragma(name string, value string) C1FOption { // Returns a C1File instance for the given db filepath. func NewC1File(ctx context.Context, dbFilePath string, opts ...C1FOption) (*C1File, error) { + ctx, span := tracer.Start(ctx, "NewC1File") + defer span.End() + rawDB, err := sql.Open("sqlite", dbFilePath) if err != nil { return nil, err @@ -101,6 +104,9 @@ func WithPragma(name string, value string) C1ZOption { // Returns a new C1File instance with its state stored at the provided filename. func NewC1ZFile(ctx context.Context, outputFilePath string, opts ...C1ZOption) (*C1File, error) { + ctx, span := tracer.Start(ctx, "NewC1ZFile") + defer span.End() + options := &c1zOptions{} for _, opt := range opts { opt(options) @@ -164,6 +170,9 @@ func (c *C1File) Close() error { // init ensures that the database has all of the required schema. func (c *C1File) init(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "C1File.init") + defer span.End() + err := c.validateDb(ctx) if err != nil { return err @@ -190,6 +199,9 @@ func (c *C1File) init(ctx context.Context) error { // Stats introspects the database and returns the count of objects for the given sync run. func (c *C1File) Stats(ctx context.Context) (map[string]int64, error) { + ctx, span := tracer.Start(ctx, "C1File.Stats") + defer span.End() + counts := make(map[string]int64) syncID, err := c.LatestSyncID(ctx) diff --git a/pkg/dotc1z/clone_sync.go b/pkg/dotc1z/clone_sync.go index 03bb27c5..d6b366a3 100644 --- a/pkg/dotc1z/clone_sync.go +++ b/pkg/dotc1z/clone_sync.go @@ -49,6 +49,9 @@ func cloneTableQuery(tableName string) (string, error) { // 4. Select directly from the cloned db and insert directly into the new database. // 5. Close and save the new database as a c1z at the configured path. func (c *C1File) CloneSync(ctx context.Context, outPath string, syncID string) error { + ctx, span := tracer.Start(ctx, "C1File.CloneSync") + defer span.End() + // Be sure that the output path is empty else return an error _, err := os.Stat(outPath) if err == nil || !errors.Is(err, fs.ErrNotExist) { diff --git a/pkg/dotc1z/dotc1z.go b/pkg/dotc1z/dotc1z.go index bf94f593..32b118b2 100644 --- a/pkg/dotc1z/dotc1z.go +++ b/pkg/dotc1z/dotc1z.go @@ -5,9 +5,13 @@ import ( "errors" "io" + "go.opentelemetry.io/otel" + "github.com/conductorone/baton-sdk/pkg/connectorstore" ) +var tracer = otel.Tracer("baton-sdk/pkg.dotc1z") + // NewC1FileReader returns a connectorstore.Reader implementation for the given sqlite db file path. func NewC1FileReader(ctx context.Context, dbFilePath string) (connectorstore.Reader, error) { return NewC1File(ctx, dbFilePath) diff --git a/pkg/dotc1z/entitlements.go b/pkg/dotc1z/entitlements.go index 7191d217..7dd0d9d1 100644 --- a/pkg/dotc1z/entitlements.go +++ b/pkg/dotc1z/entitlements.go @@ -49,6 +49,9 @@ func (r *entitlementsTable) Schema() (string, []interface{}) { } func (c *C1File) ListEntitlements(ctx context.Context, request *v2.EntitlementsServiceListEntitlementsRequest) (*v2.EntitlementsServiceListEntitlementsResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.ListEntitlements") + defer span.End() + objs, nextPageToken, err := c.listConnectorObjects(ctx, entitlements.Name(), request) if err != nil { return nil, fmt.Errorf("error listing entitlements: %w", err) @@ -71,6 +74,9 @@ func (c *C1File) ListEntitlements(ctx context.Context, request *v2.EntitlementsS } func (c *C1File) GetEntitlement(ctx context.Context, request *reader_v2.EntitlementsReaderServiceGetEntitlementRequest) (*reader_v2.EntitlementsReaderServiceGetEntitlementResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.GetEntitlement") + defer span.End() + ret := &v2.Entitlement{} err := c.getConnectorObject(ctx, entitlements.Name(), request.EntitlementId, ret) @@ -84,6 +90,9 @@ func (c *C1File) GetEntitlement(ctx context.Context, request *reader_v2.Entitlem } func (c *C1File) PutEntitlements(ctx context.Context, entitlementObjs ...*v2.Entitlement) error { + ctx, span := tracer.Start(ctx, "C1File.PutEntitlements") + defer span.End() + err := c.db.WithTx(func(tx *goqu.TxDatabase) error { err := bulkPutConnectorObjectTx(ctx, c, tx, entitlements.Name(), func(entitlement *v2.Entitlement) (goqu.Record, error) { diff --git a/pkg/dotc1z/grants.go b/pkg/dotc1z/grants.go index 0a3cf5a9..9d8b56e4 100644 --- a/pkg/dotc1z/grants.go +++ b/pkg/dotc1z/grants.go @@ -58,6 +58,9 @@ func (r *grantsTable) Schema() (string, []interface{}) { } func (c *C1File) ListGrants(ctx context.Context, request *v2.GrantsServiceListGrantsRequest) (*v2.GrantsServiceListGrantsResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.ListGrants") + defer span.End() + objs, nextPageToken, err := c.listConnectorObjects(ctx, grants.Name(), request) if err != nil { return nil, fmt.Errorf("error listing grants: %w", err) @@ -80,6 +83,9 @@ func (c *C1File) ListGrants(ctx context.Context, request *v2.GrantsServiceListGr } func (c *C1File) GetGrant(ctx context.Context, request *reader_v2.GrantsReaderServiceGetGrantRequest) (*reader_v2.GrantsReaderServiceGetGrantResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.GetGrant") + defer span.End() + ret := &v2.Grant{} err := c.getConnectorObject(ctx, grants.Name(), request.GrantId, ret) @@ -96,6 +102,9 @@ func (c *C1File) ListGrantsForEntitlement( ctx context.Context, request *reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest, ) (*reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.ListGrantsForEntitlement") + defer span.End() + objs, nextPageToken, err := c.listConnectorObjects(ctx, grants.Name(), request) if err != nil { return nil, fmt.Errorf("error listing grants for entitlement '%s': %w", request.GetEntitlement().GetId(), err) @@ -121,6 +130,9 @@ func (c *C1File) ListGrantsForPrincipal( ctx context.Context, request *reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest, ) (*reader_v2.GrantsReaderServiceListGrantsForEntitlementResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.ListGrantsForPrincipal") + defer span.End() + objs, nextPageToken, err := c.listConnectorObjects(ctx, grants.Name(), request) if err != nil { return nil, fmt.Errorf("error listing grants for principal '%s': %w", request.GetPrincipalId(), err) @@ -146,6 +158,9 @@ func (c *C1File) ListGrantsForResourceType( ctx context.Context, request *reader_v2.GrantsReaderServiceListGrantsForResourceTypeRequest, ) (*reader_v2.GrantsReaderServiceListGrantsForResourceTypeResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.ListGrantsForResourceType") + defer span.End() + objs, nextPageToken, err := c.listConnectorObjects(ctx, grants.Name(), request) if err != nil { return nil, fmt.Errorf("error listing grants for resource type '%s': %w", request.GetResourceTypeId(), err) @@ -168,6 +183,9 @@ func (c *C1File) ListGrantsForResourceType( } func (c *C1File) PutGrants(ctx context.Context, bulkGrants ...*v2.Grant) error { + ctx, span := tracer.Start(ctx, "C1File.PutGrants") + defer span.End() + err := c.db.WithTx(func(tx *goqu.TxDatabase) error { err := bulkPutConnectorObjectTx(ctx, c, tx, grants.Name(), func(grant *v2.Grant) (goqu.Record, error) { diff --git a/pkg/dotc1z/manager/local/local.go b/pkg/dotc1z/manager/local/local.go index 8232c26c..7f01e32a 100644 --- a/pkg/dotc1z/manager/local/local.go +++ b/pkg/dotc1z/manager/local/local.go @@ -6,11 +6,15 @@ import ( "io" "os" - "github.com/conductorone/baton-sdk/pkg/dotc1z" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.opentelemetry.io/otel" "go.uber.org/zap" + + "github.com/conductorone/baton-sdk/pkg/dotc1z" ) +var tracer = otel.Tracer("baton-sdk/pkg.dotc1z.manager.local") + type localManager struct { filePath string tmpPath string @@ -26,6 +30,9 @@ func WithTmpDir(tmpDir string) Option { } func (l *localManager) copyFileToTmp(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "localManager.copyFileToTmp") + defer span.End() + tmp, err := os.CreateTemp(l.tmpDir, "sync-*.c1z") if err != nil { return err @@ -56,6 +63,9 @@ func (l *localManager) copyFileToTmp(ctx context.Context) error { // LoadRaw returns an io.Reader of the bytes in the c1z file. func (l *localManager) LoadRaw(ctx context.Context) (io.ReadCloser, error) { + ctx, span := tracer.Start(ctx, "localManager.LoadRaw") + defer span.End() + err := l.copyFileToTmp(ctx) if err != nil { return nil, err @@ -71,6 +81,9 @@ func (l *localManager) LoadRaw(ctx context.Context) (io.ReadCloser, error) { // LoadC1Z loads the C1Z file from the local file system. func (l *localManager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) { + ctx, span := tracer.Start(ctx, "localManager.LoadC1Z") + defer span.End() + log := ctxzap.Extract(ctx) err := l.copyFileToTmp(ctx) @@ -89,6 +102,9 @@ func (l *localManager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) { // SaveC1Z saves the C1Z file to the local file system. func (l *localManager) SaveC1Z(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "localManager.SaveC1Z") + defer span.End() + log := ctxzap.Extract(ctx) if l.tmpPath == "" { @@ -127,6 +143,9 @@ func (l *localManager) SaveC1Z(ctx context.Context) error { } func (l *localManager) Close(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "localManager.Close") + defer span.End() + err := os.Remove(l.tmpPath) if err != nil { return err diff --git a/pkg/dotc1z/manager/s3/s3.go b/pkg/dotc1z/manager/s3/s3.go index edb88597..3e27df90 100644 --- a/pkg/dotc1z/manager/s3/s3.go +++ b/pkg/dotc1z/manager/s3/s3.go @@ -8,12 +8,16 @@ import ( "os" "github.com/aws/smithy-go" - "github.com/conductorone/baton-sdk/pkg/dotc1z" - "github.com/conductorone/baton-sdk/pkg/us3" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.opentelemetry.io/otel" "go.uber.org/zap" + + "github.com/conductorone/baton-sdk/pkg/dotc1z" + "github.com/conductorone/baton-sdk/pkg/us3" ) +var tracer = otel.Tracer("baton-sdk/pkg.dotc1z.manager.s3") + type s3Manager struct { client *us3.S3Client fileName string @@ -30,6 +34,9 @@ func WithTmpDir(tmpDir string) Option { } func (s *s3Manager) copyToTempFile(ctx context.Context, r io.Reader) error { + ctx, span := tracer.Start(ctx, "s3Manager.copyToTempFile") + defer span.End() + f, err := os.CreateTemp(s.tmpDir, "sync-*.c1z") if err != nil { return err @@ -51,6 +58,9 @@ func (s *s3Manager) copyToTempFile(ctx context.Context, r io.Reader) error { // LoadRaw loads the file from S3 and returns an io.Reader for the contents. func (s *s3Manager) LoadRaw(ctx context.Context) (io.ReadCloser, error) { + ctx, span := tracer.Start(ctx, "s3Manager.LoadRaw") + defer span.End() + out, err := s.client.Get(ctx, s.fileName) if err != nil { var ae smithy.APIError @@ -81,6 +91,9 @@ func (s *s3Manager) LoadRaw(ctx context.Context) (io.ReadCloser, error) { // LoadC1Z gets a file from the AWS S3 bucket and copies it to a temp file. func (s *s3Manager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) { + ctx, span := tracer.Start(ctx, "s3Manager.LoadC1Z") + defer span.End() + l := ctxzap.Extract(ctx) out, err := s.client.Get(ctx, s.fileName) @@ -108,6 +121,9 @@ func (s *s3Manager) LoadC1Z(ctx context.Context) (*dotc1z.C1File, error) { // SaveC1Z saves a file to the AWS S3 bucket. func (s *s3Manager) SaveC1Z(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "s3Manager.SaveC1Z") + defer span.End() + f, err := os.Open(s.tmpFile) if err != nil { return err @@ -130,6 +146,9 @@ func (s *s3Manager) SaveC1Z(ctx context.Context) error { } func (s *s3Manager) Close(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "s3Manager.Close") + defer span.End() + err := os.Remove(s.tmpFile) if err != nil { return err diff --git a/pkg/dotc1z/resouce_types.go b/pkg/dotc1z/resouce_types.go index a0bb4d71..d220a429 100644 --- a/pkg/dotc1z/resouce_types.go +++ b/pkg/dotc1z/resouce_types.go @@ -6,9 +6,10 @@ import ( "google.golang.org/protobuf/proto" + "github.com/doug-martin/goqu/v9" + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" reader_v2 "github.com/conductorone/baton-sdk/pb/c1/reader/v2" - "github.com/doug-martin/goqu/v9" ) const resourceTypesTableVersion = "1" @@ -44,6 +45,9 @@ func (r *resourceTypesTable) Schema() (string, []interface{}) { } func (c *C1File) ListResourceTypes(ctx context.Context, request *v2.ResourceTypesServiceListResourceTypesRequest) (*v2.ResourceTypesServiceListResourceTypesResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.ListResourceTypes") + defer span.End() + objs, nextPageToken, err := c.listConnectorObjects(ctx, resourceTypes.Name(), request) if err != nil { return nil, fmt.Errorf("error listing resource types: %w", err) @@ -66,6 +70,9 @@ func (c *C1File) ListResourceTypes(ctx context.Context, request *v2.ResourceType } func (c *C1File) GetResourceType(ctx context.Context, request *reader_v2.ResourceTypesReaderServiceGetResourceTypeRequest) (*reader_v2.ResourceTypesReaderServiceGetResourceTypeResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.GetResourceType") + defer span.End() + ret := &v2.ResourceType{} err := c.getConnectorObject(ctx, resourceTypes.Name(), request.ResourceTypeId, ret) @@ -79,6 +86,9 @@ func (c *C1File) GetResourceType(ctx context.Context, request *reader_v2.Resourc } func (c *C1File) PutResourceTypes(ctx context.Context, resourceTypesObjs ...*v2.ResourceType) error { + ctx, span := tracer.Start(ctx, "C1File.PutResourceTypes") + defer span.End() + err := c.db.WithTx(func(tx *goqu.TxDatabase) error { err := bulkPutConnectorObjectTx(ctx, c, tx, resourceTypes.Name(), func(resource *v2.ResourceType) (goqu.Record, error) { diff --git a/pkg/dotc1z/resources.go b/pkg/dotc1z/resources.go index dc235699..d0a0ab1c 100644 --- a/pkg/dotc1z/resources.go +++ b/pkg/dotc1z/resources.go @@ -56,6 +56,9 @@ func (r *resourcesTable) Schema() (string, []interface{}) { } func (c *C1File) ListResources(ctx context.Context, request *v2.ResourcesServiceListResourcesRequest) (*v2.ResourcesServiceListResourcesResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.ListResources") + defer span.End() + objs, nextPageToken, err := c.listConnectorObjects(ctx, resources.Name(), request) if err != nil { return nil, fmt.Errorf("error listing resources: %w", err) @@ -78,6 +81,9 @@ func (c *C1File) ListResources(ctx context.Context, request *v2.ResourcesService } func (c *C1File) GetResource(ctx context.Context, request *reader_v2.ResourcesReaderServiceGetResourceRequest) (*reader_v2.ResourcesReaderServiceGetResourceResponse, error) { + ctx, span := tracer.Start(ctx, "C1File.GetResource") + defer span.End() + ret := &v2.Resource{} annos := annotations.Annotations(request.GetAnnotations()) syncDetails := &c1zpb.SyncDetails{} @@ -98,6 +104,9 @@ func (c *C1File) GetResource(ctx context.Context, request *reader_v2.ResourcesRe } func (c *C1File) PutResources(ctx context.Context, resourceObjs ...*v2.Resource) error { + ctx, span := tracer.Start(ctx, "C1File.PutResources") + defer span.End() + err := c.db.WithTx(func(tx *goqu.TxDatabase) error { err := bulkPutConnectorObjectTx(ctx, c, tx, resources.Name(), func(resource *v2.Resource) (goqu.Record, error) { diff --git a/pkg/dotc1z/sql_helpers.go b/pkg/dotc1z/sql_helpers.go index 968e9277..0993e223 100644 --- a/pkg/dotc1z/sql_helpers.go +++ b/pkg/dotc1z/sql_helpers.go @@ -73,6 +73,9 @@ type protoHasID interface { // listConnectorObjects uses a connector list request to fetch the corresponding data from the local db. // It returns the raw bytes that need to be unmarshalled into the correct proto message. func (c *C1File) listConnectorObjects(ctx context.Context, tableName string, req proto.Message) ([][]byte, string, error) { + ctx, span := tracer.Start(ctx, "C1File.listConnectorObjects") + defer span.End() + err := c.validateDb(ctx) if err != nil { return nil, "", err @@ -238,6 +241,9 @@ func bulkPutConnectorObjectTx[T proto.Message](ctx context.Context, c *C1File, tableName string, extractFields func(m T) (goqu.Record, error), msgs ...T) error { + ctx, span := tracer.Start(ctx, "C1File.bulkPutConnectorObjectTx") + defer span.End() + err := c.validateSyncDb(ctx) if err != nil { return err @@ -284,6 +290,9 @@ func bulkPutConnectorObjectTx[T proto.Message](ctx context.Context, c *C1File, } func (c *C1File) getResourceObject(ctx context.Context, resourceID *v2.ResourceId, m *v2.Resource, syncID string) error { + ctx, span := tracer.Start(ctx, "C1File.getResourceObject") + defer span.End() + err := c.validateDb(ctx) if err != nil { return err @@ -342,6 +351,9 @@ func (c *C1File) getResourceObject(ctx context.Context, resourceID *v2.ResourceI } func (c *C1File) getConnectorObject(ctx context.Context, tableName string, id string, m proto.Message) error { + ctx, span := tracer.Start(ctx, "C1File.getConnectorObject") + defer span.End() + err := c.validateDb(ctx) if err != nil { return err diff --git a/pkg/dotc1z/sync_runs.go b/pkg/dotc1z/sync_runs.go index 9d1f7a79..4aa47837 100644 --- a/pkg/dotc1z/sync_runs.go +++ b/pkg/dotc1z/sync_runs.go @@ -55,6 +55,9 @@ type syncRun struct { } func (c *C1File) getLatestUnfinishedSync(ctx context.Context) (*syncRun, error) { + ctx, span := tracer.Start(ctx, "C1File.getLatestUnfinishedSync") + defer span.End() + err := c.validateDb(ctx) if err != nil { return nil, err @@ -86,6 +89,9 @@ func (c *C1File) getLatestUnfinishedSync(ctx context.Context) (*syncRun, error) } func (c *C1File) getFinishedSync(ctx context.Context, offset uint) (*syncRun, error) { + ctx, span := tracer.Start(ctx, "C1File.getFinishedSync") + defer span.End() + err := c.validateDb(ctx) if err != nil { return nil, err @@ -121,6 +127,9 @@ func (c *C1File) getFinishedSync(ctx context.Context, offset uint) (*syncRun, er } func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize uint) ([]*syncRun, string, error) { + ctx, span := tracer.Start(ctx, "C1File.ListSyncRuns") + defer span.End() + err := c.validateDb(ctx) if err != nil { return nil, "", err @@ -179,6 +188,9 @@ func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize ui } func (c *C1File) LatestSyncID(ctx context.Context) (string, error) { + ctx, span := tracer.Start(ctx, "C1File.LatestSyncID") + defer span.End() + s, err := c.getFinishedSync(ctx, 0) if err != nil { return "", err @@ -202,6 +214,9 @@ func (c *C1File) ViewSync(ctx context.Context, syncID string) error { } func (c *C1File) PreviousSyncID(ctx context.Context) (string, error) { + ctx, span := tracer.Start(ctx, "C1File.PreviousSyncID") + defer span.End() + s, err := c.getFinishedSync(ctx, 1) if err != nil { return "", err @@ -215,6 +230,9 @@ func (c *C1File) PreviousSyncID(ctx context.Context) (string, error) { } func (c *C1File) LatestFinishedSync(ctx context.Context) (string, error) { + ctx, span := tracer.Start(ctx, "C1File.LatestFinishedSync") + defer span.End() + s, err := c.getFinishedSync(ctx, 0) if err != nil { return "", err @@ -228,6 +246,9 @@ func (c *C1File) LatestFinishedSync(ctx context.Context) (string, error) { } func (c *C1File) getSync(ctx context.Context, syncID string) (*syncRun, error) { + ctx, span := tracer.Start(ctx, "C1File.getSync") + defer span.End() + err := c.validateDb(ctx) if err != nil { return nil, err @@ -254,6 +275,9 @@ func (c *C1File) getSync(ctx context.Context, syncID string) (*syncRun, error) { } func (c *C1File) getCurrentSync(ctx context.Context) (*syncRun, error) { + ctx, span := tracer.Start(ctx, "C1File.getCurrentSync") + defer span.End() + if c.currentSyncID == "" { return nil, fmt.Errorf("c1file: sync must be running to checkpoint") } @@ -262,6 +286,9 @@ func (c *C1File) getCurrentSync(ctx context.Context) (*syncRun, error) { } func (c *C1File) CheckpointSync(ctx context.Context, syncToken string) error { + ctx, span := tracer.Start(ctx, "C1File.CheckpointSync") + defer span.End() + err := c.validateSyncDb(ctx) if err != nil { return err @@ -288,6 +315,9 @@ func (c *C1File) CheckpointSync(ctx context.Context, syncToken string) error { // StartSync generates a sync ID to be associated with all objects discovered during this run. func (c *C1File) StartSync(ctx context.Context) (string, bool, error) { + ctx, span := tracer.Start(ctx, "C1File.StartSync") + defer span.End() + if c.currentSyncID != "" { return c.currentSyncID, false, nil } @@ -316,6 +346,9 @@ func (c *C1File) StartSync(ctx context.Context) (string, bool, error) { } func (c *C1File) StartNewSync(ctx context.Context) (string, error) { + ctx, span := tracer.Start(ctx, "C1File.StartNewSync") + defer span.End() + // Not sure if we want to do this here if c.currentSyncID != "" { return c.currentSyncID, nil @@ -347,6 +380,9 @@ func (c *C1File) StartNewSync(ctx context.Context) (string, error) { } func (c *C1File) CurrentSyncStep(ctx context.Context) (string, error) { + ctx, span := tracer.Start(ctx, "C1File.CurrentSyncStep") + defer span.End() + sr, err := c.getCurrentSync(ctx) if err != nil { return "", err @@ -357,6 +393,9 @@ func (c *C1File) CurrentSyncStep(ctx context.Context) (string, error) { // EndSync updates the current sync_run row with the end time, and removes any other objects that don't have the current sync ID. func (c *C1File) EndSync(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "C1File.EndSync") + defer span.End() + err := c.validateSyncDb(ctx) if err != nil { return err @@ -386,6 +425,9 @@ func (c *C1File) EndSync(ctx context.Context) error { } func (c *C1File) Cleanup(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "C1File.Cleanup") + defer span.End() + l := ctxzap.Extract(ctx) if skipCleanup, _ := strconv.ParseBool(os.Getenv("BATON_SKIP_CLEANUP")); skipCleanup { @@ -453,6 +495,9 @@ func (c *C1File) Cleanup(ctx context.Context) error { // DeleteSyncRun removes all the objects with a given syncID from the database. func (c *C1File) DeleteSyncRun(ctx context.Context, syncID string) error { + ctx, span := tracer.Start(ctx, "C1File.DeleteSyncRun") + defer span.End() + err := c.validateDb(ctx) if err != nil { return err @@ -484,6 +529,9 @@ func (c *C1File) DeleteSyncRun(ctx context.Context, syncID string) error { // Vacuum runs a VACUUM on the database to reclaim space. func (c *C1File) Vacuum(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "C1File.Vacuum") + defer span.End() + err := c.validateDb(ctx) if err != nil { return err diff --git a/pkg/field/defaults.go b/pkg/field/defaults.go index 1ba36a3a..c069499d 100644 --- a/pkg/field/defaults.go +++ b/pkg/field/defaults.go @@ -31,6 +31,7 @@ var ( ticketTemplatePathField = StringField("ticket-template-path", WithHidden(true), WithDescription("A JSON file describing the ticket to create"), WithPersistent(true)) logLevelField = StringField("log-level", WithDefaultValue("info"), WithDescription("The log level: debug, info, warn, error"), WithPersistent(true)) skipFullSync = BoolField("skip-full-sync", WithDescription("This must be set to skip a full sync"), WithPersistent(true)) + otelCollectorEndpoint = StringField("otel-collector-endpoint", WithDescription("The endpoint of the OpenTelemetry collector to send observability data to"), WithPersistent(true)) ) // DefaultFields list the default fields expected in every single connector. @@ -62,6 +63,7 @@ var DefaultFields = []SchemaField{ ticketTemplatePathField, logLevelField, skipFullSync, + otelCollectorEndpoint, } func IsFieldAmongDefaultList(f SchemaField) bool { diff --git a/pkg/provisioner/provisioner.go b/pkg/provisioner/provisioner.go index bb8b9501..d1ac9b8e 100644 --- a/pkg/provisioner/provisioner.go +++ b/pkg/provisioner/provisioner.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.opentelemetry.io/otel" "go.uber.org/zap" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" @@ -16,6 +17,8 @@ import ( "github.com/conductorone/baton-sdk/pkg/types" ) +var tracer = otel.Tracer("baton-sdk/pkg.provisioner") + type Provisioner struct { dbPath string connector types.ConnectorClient @@ -64,6 +67,9 @@ func makeCrypto(ctx context.Context) ([]byte, *v2.CredentialOptions, []*v2.Encry } func (p *Provisioner) Run(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "Provisioner.Run") + defer span.End() + switch { case p.revokeGrantID != "": return p.revoke(ctx) @@ -81,6 +87,9 @@ func (p *Provisioner) Run(ctx context.Context) error { } func (p *Provisioner) loadStore(ctx context.Context) (connectorstore.Reader, error) { + ctx, span := tracer.Start(ctx, "Provisioner.loadStore") + defer span.End() + if p.store != nil { return p.store, nil } @@ -103,6 +112,9 @@ func (p *Provisioner) loadStore(ctx context.Context) (connectorstore.Reader, err } func (p *Provisioner) Close(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "Provisioner.Close") + defer span.End() + var err error if p.store != nil { storeErr := p.store.Close() @@ -128,6 +140,9 @@ func (p *Provisioner) Close(ctx context.Context) error { } func (p *Provisioner) grant(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "Provisioner.grant") + defer span.End() + store, err := p.loadStore(ctx) if err != nil { return err @@ -172,6 +187,9 @@ func (p *Provisioner) grant(ctx context.Context) error { } func (p *Provisioner) revoke(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "Provisioner.revoke") + defer span.End() + store, err := p.loadStore(ctx) if err != nil { return err @@ -223,6 +241,9 @@ func (p *Provisioner) revoke(ctx context.Context) error { } func (p *Provisioner) createAccount(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "Provisioner.createAccount") + defer span.End() + l := ctxzap.Extract(ctx) var emails []*v2.AccountInfo_Email if p.createAccountEmail != "" { @@ -255,6 +276,9 @@ func (p *Provisioner) createAccount(ctx context.Context) error { } func (p *Provisioner) deleteResource(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "Provisioner.deleteResource") + defer span.End() + _, err := p.connector.DeleteResource(ctx, &v2.DeleteResourceRequest{ ResourceId: &v2.ResourceId{ Resource: p.deleteResourceID, @@ -268,6 +292,9 @@ func (p *Provisioner) deleteResource(ctx context.Context) error { } func (p *Provisioner) rotateCredentials(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "Provisioner.rotateCredentials") + defer span.End() + l := ctxzap.Extract(ctx) _, opts, config, err := makeCrypto(ctx) diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 63ad39dd..c3b586dc 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -12,6 +12,9 @@ import ( "strconv" "time" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "github.com/conductorone/baton-sdk/pkg/sync/expand" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" @@ -31,6 +34,8 @@ import ( const maxDepth = 8 +var tracer = otel.Tracer("baton-sdk/sync") + var dontFixCycles, _ = strconv.ParseBool(os.Getenv("BATON_DONT_FIX_CYCLES")) var ( @@ -60,6 +65,9 @@ type syncer struct { // Checkpoint marshals the current state and stores it. func (s *syncer) Checkpoint(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.Checkpoint") + defer span.End() + checkpoint, err := s.state.Marshal() if err != nil { return err @@ -89,6 +97,9 @@ func (s *syncer) handleProgress(ctx context.Context, a *Action, c int) { var attempts = 0 func shouldWaitAndRetry(ctx context.Context, err error) bool { + ctx, span := tracer.Start(ctx, "syncer.shouldWaitAndRetry") + defer span.End() + if err == nil { attempts = 0 return true @@ -144,6 +155,9 @@ func shouldWaitAndRetry(ctx context.Context, err error) bool { // an action is completed, it is popped off of the queue. Before processing each action, we checkpoint the state object // into the datasource. This allows for graceful resumes if a sync is interrupted. func (s *syncer) Sync(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.Sync") + defer span.End() + if s.skipFullSync { return s.SkipSync(ctx) } @@ -174,6 +188,8 @@ func (s *syncer) Sync(ctx context.Context) error { return err } + span.SetAttributes(attribute.String("sync_id", syncID)) + if newSync { l.Debug("beginning new sync", zap.String("sync_id", syncID)) } else { @@ -299,6 +315,9 @@ func (s *syncer) Sync(ctx context.Context) error { } func (s *syncer) SkipSync(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.SkipSync") + defer span.End() + l := ctxzap.Extract(ctx) l.Info("skipping sync") @@ -340,6 +359,9 @@ func (s *syncer) SkipSync(ctx context.Context) error { // SyncResourceTypes calls the ListResourceType() connector endpoint and persists the results in to the datasource. func (s *syncer) SyncResourceTypes(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.SyncResourceTypes") + defer span.End() + pageToken := s.state.PageToken(ctx) if pageToken == "" { @@ -379,6 +401,9 @@ func (s *syncer) SyncResourceTypes(ctx context.Context) error { // getSubResources fetches the sub resource types from a resources' annotations. func (s *syncer) getSubResources(ctx context.Context, parent *v2.Resource) error { + ctx, span := tracer.Start(ctx, "syncer.getSubResources") + defer span.End() + for _, a := range parent.Annotations { if a.MessageIs((*v2.ChildResourceType)(nil)) { crt := &v2.ChildResourceType{} @@ -403,6 +428,9 @@ func (s *syncer) getSubResources(ctx context.Context, parent *v2.Resource) error // SyncResources handles fetching all of the resources from the connector given the provided resource types. For each // resource, we gather any child resource types it may emit, and traverse the resource tree. func (s *syncer) SyncResources(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.SyncResources") + defer span.End() + if s.state.Current().ResourceTypeID == "" { pageToken := s.state.PageToken(ctx) @@ -437,6 +465,9 @@ func (s *syncer) SyncResources(ctx context.Context) error { // syncResources fetches a given resource from the connector, and returns a slice of new child resources to fetch. func (s *syncer) syncResources(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.syncResources") + defer span.End() + req := &v2.ResourcesServiceListResourcesRequest{ ResourceTypeId: s.state.ResourceTypeID(ctx), PageToken: s.state.PageToken(ctx), @@ -505,6 +536,9 @@ func (s *syncer) syncResources(ctx context.Context) error { } func (s *syncer) validateResourceTraits(ctx context.Context, r *v2.Resource) error { + ctx, span := tracer.Start(ctx, "syncer.validateResourceTraits") + defer span.End() + resourceTypeResponse, err := s.store.GetResourceType(ctx, &reader_v2.ResourceTypesReaderServiceGetResourceTypeRequest{ ResourceTypeId: r.Id.ResourceType, }) @@ -546,6 +580,9 @@ func (s *syncer) validateResourceTraits(ctx context.Context, r *v2.Resource) err // shouldSkipEntitlementsAndGrants determines if we should sync entitlements for a given resource. We cache the // result of this function for each resource type to avoid constant lookups in the database. func (s *syncer) shouldSkipEntitlementsAndGrants(ctx context.Context, r *v2.Resource) (bool, error) { + ctx, span := tracer.Start(ctx, "syncer.shouldSkipEntitlementsAndGrants") + defer span.End() + // We've checked this resource type, so we can return what we have cached directly. if skip, ok := s.skipEGForResourceType[r.Id.ResourceType]; ok { return skip, nil @@ -569,6 +606,9 @@ func (s *syncer) shouldSkipEntitlementsAndGrants(ctx context.Context, r *v2.Reso // SyncEntitlements fetches the entitlements from the connector. It first lists each resource from the datastore, // and pushes an action to fetch the entitlements for each resource. func (s *syncer) SyncEntitlements(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.SyncEntitlements") + defer span.End() + if s.state.ResourceTypeID(ctx) == "" && s.state.ResourceID(ctx) == "" { pageToken := s.state.PageToken(ctx) @@ -619,6 +659,9 @@ func (s *syncer) SyncEntitlements(ctx context.Context) error { // syncEntitlementsForResource fetches the entitlements for a specific resource from the connector. func (s *syncer) syncEntitlementsForResource(ctx context.Context, resourceID *v2.ResourceId) error { + ctx, span := tracer.Start(ctx, "syncer.syncEntitlementsForResource") + defer span.End() + resourceResponse, err := s.store.GetResource(ctx, &reader_v2.ResourcesReaderServiceGetResourceRequest{ ResourceId: resourceID, }) @@ -658,6 +701,9 @@ func (s *syncer) syncEntitlementsForResource(ctx context.Context, resourceID *v2 // include references to an asset. For each AssetRef, we then call GetAsset on the connector and stream the asset from the connector. // Once we have the entire asset, we put it in the database. func (s *syncer) syncAssetsForResource(ctx context.Context, resourceID *v2.ResourceId) error { + ctx, span := tracer.Start(ctx, "syncer.syncAssetsForResource") + defer span.End() + l := ctxzap.Extract(ctx) resourceResponse, err := s.store.GetResource(ctx, &reader_v2.ResourcesReaderServiceGetResourceRequest{ ResourceId: resourceID, @@ -761,6 +807,9 @@ func (s *syncer) syncAssetsForResource(ctx context.Context, resourceID *v2.Resou // SyncAssets iterates each resource in the data store, and adds an action to fetch all of the assets for that resource. func (s *syncer) SyncAssets(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.SyncAssets") + defer span.End() + if s.state.ResourceTypeID(ctx) == "" && s.state.ResourceID(ctx) == "" { pageToken := s.state.PageToken(ctx) @@ -806,6 +855,9 @@ func (s *syncer) SyncAssets(ctx context.Context) error { // SyncGrantExpansion // TODO(morgabra) Docs func (s *syncer) SyncGrantExpansion(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.SyncGrantExpansion") + defer span.End() + l := ctxzap.Extract(ctx) entitlementGraph := s.state.EntitlementGraph(ctx) if !entitlementGraph.Loaded { @@ -929,6 +981,9 @@ func (s *syncer) SyncGrantExpansion(ctx context.Context) error { // SyncGrants fetches the grants for each resource from the connector. It iterates each resource // from the datastore, and pushes a new action to sync the grants for each individual resource. func (s *syncer) SyncGrants(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.SyncGrants") + defer span.End() + if s.state.ResourceTypeID(ctx) == "" && s.state.ResourceID(ctx) == "" { pageToken := s.state.PageToken(ctx) @@ -982,6 +1037,9 @@ type latestSyncFetcher interface { } func (s *syncer) fetchResourceForPreviousSync(ctx context.Context, resourceID *v2.ResourceId) (string, *v2.ETag, error) { + ctx, span := tracer.Start(ctx, "syncer.fetchResourceForPreviousSync") + defer span.End() + l := ctxzap.Extract(ctx) var previousSyncID string @@ -1039,6 +1097,9 @@ func (s *syncer) fetchEtaggedGrantsForResource( prevSyncID string, grantResponse *v2.GrantsServiceListGrantsResponse, ) ([]*v2.Grant, bool, error) { + ctx, span := tracer.Start(ctx, "syncer.fetchEtaggedGrantsForResource") + defer span.End() + respAnnos := annotations.Annotations(grantResponse.GetAnnotations()) etagMatch := &v2.ETagMatch{} hasMatch, err := respAnnos.Pick(etagMatch) @@ -1099,6 +1160,9 @@ func (s *syncer) fetchEtaggedGrantsForResource( // syncGrantsForResource fetches the grants for a specific resource from the connector. func (s *syncer) syncGrantsForResource(ctx context.Context, resourceID *v2.ResourceId) error { + ctx, span := tracer.Start(ctx, "syncer.syncGrantsForResource") + defer span.End() + resourceResponse, err := s.store.GetResource(ctx, &reader_v2.ResourcesReaderServiceGetResourceRequest{ ResourceId: resourceID, }) @@ -1193,6 +1257,9 @@ func (s *syncer) syncGrantsForResource(ctx context.Context, resourceID *v2.Resou } func (s *syncer) runGrantExpandActions(ctx context.Context) (bool, error) { + ctx, span := tracer.Start(ctx, "syncer.runGrantExpandActions") + defer span.End() + l := ctxzap.Extract(ctx) graph := s.state.EntitlementGraph(ctx) @@ -1377,6 +1444,9 @@ func (s *syncer) newExpandedGrant(_ context.Context, descEntitlement *v2.Entitle // expandGrantsForEntitlements expands grants for the given entitlement. func (s *syncer) expandGrantsForEntitlements(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.expandGrantsForEntitlements") + defer span.End() + l := ctxzap.Extract(ctx) graph := s.state.EntitlementGraph(ctx) @@ -1442,6 +1512,9 @@ func (s *syncer) expandGrantsForEntitlements(ctx context.Context) error { } func (s *syncer) loadStore(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.loadStore") + defer span.End() + if s.store != nil { return nil } @@ -1466,6 +1539,9 @@ func (s *syncer) loadStore(ctx context.Context) error { // Close closes the datastorage to ensure it is updated on disk. func (s *syncer) Close(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "syncer.Close") + defer span.End() + var err error if s.store != nil { err = s.store.Close() diff --git a/pkg/tasks/c1api/bulk_create_tickets.go b/pkg/tasks/c1api/bulk_create_tickets.go index 3baab42d..24bb2b6c 100644 --- a/pkg/tasks/c1api/bulk_create_tickets.go +++ b/pkg/tasks/c1api/bulk_create_tickets.go @@ -25,6 +25,9 @@ type bulkCreateTicketTaskHandler struct { } func (c *bulkCreateTicketTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "bulkCreateTicketTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx) t := c.task.GetBulkCreateTickets() diff --git a/pkg/tasks/c1api/bulk_get_tickets.go b/pkg/tasks/c1api/bulk_get_tickets.go index 38bc09cb..703a5ef9 100644 --- a/pkg/tasks/c1api/bulk_get_tickets.go +++ b/pkg/tasks/c1api/bulk_get_tickets.go @@ -4,13 +4,14 @@ import ( "context" "errors" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" "github.com/conductorone/baton-sdk/pkg/annotations" "github.com/conductorone/baton-sdk/pkg/types" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" - "go.uber.org/zap" - "google.golang.org/protobuf/proto" ) type bulkGetTicketsTaskHelpers interface { @@ -24,6 +25,9 @@ type bulkGetTicketTaskHandler struct { } func (c *bulkGetTicketTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "bulkGetTicketTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx) cc := c.helpers.ConnectorClient() diff --git a/pkg/tasks/c1api/c1api.go b/pkg/tasks/c1api/c1api.go new file mode 100644 index 00000000..74a7d3b8 --- /dev/null +++ b/pkg/tasks/c1api/c1api.go @@ -0,0 +1,5 @@ +package c1api + +import "go.opentelemetry.io/otel" + +var tracer = otel.Tracer("baton-sdk/pkg.tasks.c1api") diff --git a/pkg/tasks/c1api/create_account.go b/pkg/tasks/c1api/create_account.go index 4e9d4c5c..7f7b442d 100644 --- a/pkg/tasks/c1api/create_account.go +++ b/pkg/tasks/c1api/create_account.go @@ -26,6 +26,9 @@ type createAccountTaskHandler struct { } func (g *createAccountTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "createAccountTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx).With(zap.String("task_id", g.task.Id), zap.Stringer("task_type", tasks.GetType(g.task))) t := g.task.GetCreateAccount() diff --git a/pkg/tasks/c1api/create_resource.go b/pkg/tasks/c1api/create_resource.go index 3256ab55..8b97f2eb 100644 --- a/pkg/tasks/c1api/create_resource.go +++ b/pkg/tasks/c1api/create_resource.go @@ -26,6 +26,9 @@ type createResourceTaskHandler struct { } func (g *createResourceTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "createResourceTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx).With(zap.String("task_id", g.task.Id), zap.Stringer("task_type", tasks.GetType(g.task))) t := g.task.GetCreateResource() diff --git a/pkg/tasks/c1api/create_ticket.go b/pkg/tasks/c1api/create_ticket.go index b4af9525..3cd82079 100644 --- a/pkg/tasks/c1api/create_ticket.go +++ b/pkg/tasks/c1api/create_ticket.go @@ -25,6 +25,9 @@ type createTicketTaskHandler struct { } func (c *createTicketTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "createTicketTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx) t := c.task.GetCreateTicketTask() diff --git a/pkg/tasks/c1api/debug.go b/pkg/tasks/c1api/debug.go index c05c5bcd..26aa2c03 100644 --- a/pkg/tasks/c1api/debug.go +++ b/pkg/tasks/c1api/debug.go @@ -13,6 +13,9 @@ func newStartDebugging(tm *c1ApiTaskManager) *debugHandler { } func (c *debugHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "debugHandler.HandleTask") + defer span.End() + c.taskmanager.runnerShouldDebug = true return nil } diff --git a/pkg/tasks/c1api/delete_resource.go b/pkg/tasks/c1api/delete_resource.go index fce46d78..13c61576 100644 --- a/pkg/tasks/c1api/delete_resource.go +++ b/pkg/tasks/c1api/delete_resource.go @@ -26,6 +26,9 @@ type deleteResourceTaskHandler struct { } func (g *deleteResourceTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "deleteResourceTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx).With(zap.String("task_id", g.task.Id), zap.Stringer("task_type", tasks.GetType(g.task))) t := g.task.GetDeleteResource() diff --git a/pkg/tasks/c1api/full_sync.go b/pkg/tasks/c1api/full_sync.go index e214b769..ba362876 100644 --- a/pkg/tasks/c1api/full_sync.go +++ b/pkg/tasks/c1api/full_sync.go @@ -7,14 +7,15 @@ import ( "os" "path/filepath" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" "github.com/conductorone/baton-sdk/pkg/annotations" sdkSync "github.com/conductorone/baton-sdk/pkg/sync" "github.com/conductorone/baton-sdk/pkg/tasks" "github.com/conductorone/baton-sdk/pkg/types" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" - "go.uber.org/zap" - "google.golang.org/protobuf/proto" ) type fullSyncHelpers interface { @@ -32,6 +33,9 @@ type fullSyncTaskHandler struct { } func (c *fullSyncTaskHandler) sync(ctx context.Context, c1zPath string) error { + ctx, span := tracer.Start(ctx, "fullSyncTaskHandler.sync") + defer span.End() + l := ctxzap.Extract(ctx).With(zap.String("task_id", c.task.GetId()), zap.Stringer("task_type", tasks.GetType(c.task))) syncOpts := []sdkSync.SyncOpt{ @@ -80,6 +84,9 @@ func (c *fullSyncTaskHandler) sync(ctx context.Context, c1zPath string) error { // task with a sync_id and it doesn't match our current state sync_id, we should reject the task. If we have a task // with a sync_id that does match our current state, we should resume our current sync, if possible. func (c *fullSyncTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "fullSyncTaskHandler.HandleTask") + defer span.End() + ctx, cancel := context.WithCancel(ctx) defer cancel() l := ctxzap.Extract(ctx).With(zap.String("task_id", c.task.GetId()), zap.Stringer("task_type", tasks.GetType(c.task))) @@ -148,6 +155,9 @@ func newFullSyncTaskHandler(task *v1.Task, helpers fullSyncHelpers, skipFullSync } func uploadDebugLogs(ctx context.Context, helper fullSyncHelpers) error { + ctx, span := tracer.Start(ctx, "uploadDebugLogs") + defer span.End() + l := ctxzap.Extract(ctx) debugfilelocation := filepath.Join(helper.TempDir(), "debug.log") diff --git a/pkg/tasks/c1api/get_ticket.go b/pkg/tasks/c1api/get_ticket.go index 18ce23e8..3406f7ea 100644 --- a/pkg/tasks/c1api/get_ticket.go +++ b/pkg/tasks/c1api/get_ticket.go @@ -4,13 +4,14 @@ import ( "context" "errors" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" "github.com/conductorone/baton-sdk/pkg/annotations" "github.com/conductorone/baton-sdk/pkg/types" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" - "go.uber.org/zap" - "google.golang.org/protobuf/proto" ) type getTicketTaskHelpers interface { @@ -24,6 +25,9 @@ type getTicketTaskHandler struct { } func (c *getTicketTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "getTicketTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx) cc := c.helpers.ConnectorClient() diff --git a/pkg/tasks/c1api/grant.go b/pkg/tasks/c1api/grant.go index 98294fc7..4d4c3801 100644 --- a/pkg/tasks/c1api/grant.go +++ b/pkg/tasks/c1api/grant.go @@ -26,6 +26,9 @@ type grantTaskHandler struct { } func (g *grantTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "grantTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx).With(zap.String("task_id", g.task.Id), zap.Stringer("task_type", tasks.GetType(g.task))) if g.task.GetGrant() == nil || g.task.GetGrant().GetEntitlement() == nil || g.task.GetGrant().GetPrincipal() == nil { diff --git a/pkg/tasks/c1api/hello.go b/pkg/tasks/c1api/hello.go index e3728433..da5b0223 100644 --- a/pkg/tasks/c1api/hello.go +++ b/pkg/tasks/c1api/hello.go @@ -5,13 +5,14 @@ import ( "errors" "runtime/debug" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/shirou/gopsutil/v3/host" + "go.uber.org/zap" + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" "github.com/conductorone/baton-sdk/pkg/tasks" "github.com/conductorone/baton-sdk/pkg/types" - "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" - "github.com/shirou/gopsutil/v3/host" - "go.uber.org/zap" ) type helloHelpers interface { @@ -66,6 +67,9 @@ func (c *helloTaskHandler) buildInfo(ctx context.Context) *v1.BatonServiceHelloR } func (c *helloTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "helloTaskHandler.HandleTask") + defer span.End() + if c.task == nil { return errors.New("cannot handle task: task is nil") } diff --git a/pkg/tasks/c1api/list_ticket_schemas.go b/pkg/tasks/c1api/list_ticket_schemas.go index 1c39af24..95579eb1 100644 --- a/pkg/tasks/c1api/list_ticket_schemas.go +++ b/pkg/tasks/c1api/list_ticket_schemas.go @@ -28,6 +28,9 @@ type listTicketSchemasTaskHandler struct { } func (c *listTicketSchemasTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "listTicketSchemasTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx) t := c.task.GetListTicketSchemas() diff --git a/pkg/tasks/c1api/manager.go b/pkg/tasks/c1api/manager.go index a7660576..b03eebcf 100644 --- a/pkg/tasks/c1api/manager.go +++ b/pkg/tasks/c1api/manager.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -79,6 +80,9 @@ func getNextPoll(d time.Duration) time.Duration { } func (c *c1ApiTaskManager) Next(ctx context.Context) (*v1.Task, time.Duration, error) { + ctx, span := tracer.Start(ctx, "c1ApiTaskManager.Next", trace.WithNewRoot()) + defer span.End() + l := ctxzap.Extract(ctx) c.mtx.Lock() @@ -130,6 +134,9 @@ func (c *c1ApiTaskManager) Next(ctx context.Context) (*v1.Task, time.Duration, e } func (c *c1ApiTaskManager) finishTask(ctx context.Context, task *v1.Task, resp proto.Message, annos annotations.Annotations, err error) error { + ctx, span := tracer.Start(ctx, "c1ApiTaskManager.finishTask") + defer span.End() + l := ctxzap.Extract(ctx) l = l.With( zap.String("task_id", task.GetId()), @@ -206,6 +213,9 @@ func (c *c1ApiTaskManager) ShouldDebug() bool { } func (c *c1ApiTaskManager) Process(ctx context.Context, task *v1.Task, cc types.ConnectorClient) error { + ctx, span := tracer.Start(ctx, "c1ApiTaskManager.Process", trace.WithNewRoot()) + defer span.End() + l := ctxzap.Extract(ctx) if task == nil { l.Debug("c1_api_task_manager.Process(): process called with nil task -- continuing") diff --git a/pkg/tasks/c1api/revoke.go b/pkg/tasks/c1api/revoke.go index 08dc93be..efe072cd 100644 --- a/pkg/tasks/c1api/revoke.go +++ b/pkg/tasks/c1api/revoke.go @@ -28,6 +28,9 @@ type revokeTaskHandler struct { } func (r *revokeTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "revokeTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx).With(zap.String("task_id", r.task.Id), zap.Stringer("task_type", tasks.GetType(r.task))) if r.task.GetRevoke() == nil || r.task.GetRevoke().GetGrant() == nil { diff --git a/pkg/tasks/c1api/rotate_credentials.go b/pkg/tasks/c1api/rotate_credentials.go index 2556f51f..36d5f803 100644 --- a/pkg/tasks/c1api/rotate_credentials.go +++ b/pkg/tasks/c1api/rotate_credentials.go @@ -26,6 +26,9 @@ type rotateCredentialsTaskHandler struct { } func (g *rotateCredentialsTaskHandler) HandleTask(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "rotateCredentialsTaskHandler.HandleTask") + defer span.End() + l := ctxzap.Extract(ctx).With(zap.String("task_id", g.task.Id), zap.Stringer("task_type", tasks.GetType(g.task))) t := g.task.GetRotateCredentials() diff --git a/pkg/tasks/c1api/service_client.go b/pkg/tasks/c1api/service_client.go index d483cf2e..eba85cfc 100644 --- a/pkg/tasks/c1api/service_client.go +++ b/pkg/tasks/c1api/service_client.go @@ -83,6 +83,9 @@ func (c *c1ServiceClient) getClientConn(ctx context.Context) (v1.BatonServiceCli } func (c *c1ServiceClient) Hello(ctx context.Context, in *v1.BatonServiceHelloRequest) (*v1.BatonServiceHelloResponse, error) { + ctx, span := tracer.Start(ctx, "c1ServiceClient.Hello") + defer span.End() + client, done, err := c.getClientConn(ctx) if err != nil { return nil, err @@ -95,6 +98,9 @@ func (c *c1ServiceClient) Hello(ctx context.Context, in *v1.BatonServiceHelloReq } func (c *c1ServiceClient) GetTask(ctx context.Context, in *v1.BatonServiceGetTaskRequest) (*v1.BatonServiceGetTaskResponse, error) { + ctx, span := tracer.Start(ctx, "c1ServiceClient.GetTask") + defer span.End() + client, done, err := c.getClientConn(ctx) if err != nil { return nil, err @@ -107,6 +113,9 @@ func (c *c1ServiceClient) GetTask(ctx context.Context, in *v1.BatonServiceGetTas } func (c *c1ServiceClient) Heartbeat(ctx context.Context, in *v1.BatonServiceHeartbeatRequest) (*v1.BatonServiceHeartbeatResponse, error) { + ctx, span := tracer.Start(ctx, "c1ServiceClient.Heartbeat") + defer span.End() + client, done, err := c.getClientConn(ctx) if err != nil { return nil, err @@ -119,6 +128,9 @@ func (c *c1ServiceClient) Heartbeat(ctx context.Context, in *v1.BatonServiceHear } func (c *c1ServiceClient) FinishTask(ctx context.Context, in *v1.BatonServiceFinishTaskRequest) (*v1.BatonServiceFinishTaskResponse, error) { + ctx, span := tracer.Start(ctx, "c1ServiceClient.FinishTask") + defer span.End() + client, done, err := c.getClientConn(ctx) if err != nil { return nil, err @@ -131,6 +143,9 @@ func (c *c1ServiceClient) FinishTask(ctx context.Context, in *v1.BatonServiceFin } func (c *c1ServiceClient) Upload(ctx context.Context, task *v1.Task, r io.ReadSeeker) error { + ctx, span := tracer.Start(ctx, "c1ServiceClient.Upload") + defer span.End() + l := ctxzap.Extract(ctx) client, done, err := c.getClientConn(ctx) diff --git a/pkg/tasks/c1api/task_helpers.go b/pkg/tasks/c1api/task_helpers.go index 90bd99c9..3244c97c 100644 --- a/pkg/tasks/c1api/task_helpers.go +++ b/pkg/tasks/c1api/task_helpers.go @@ -31,6 +31,9 @@ func (t *taskHelpers) ConnectorClient() types.ConnectorClient { } func (t *taskHelpers) Upload(ctx context.Context, r io.ReadSeeker) error { + ctx, span := tracer.Start(ctx, "taskHelpers.Upload") + defer span.End() + if t.task == nil { return errors.New("cannot upload: task is nil") } @@ -38,6 +41,9 @@ func (t *taskHelpers) Upload(ctx context.Context, r io.ReadSeeker) error { } func (t *taskHelpers) FinishTask(ctx context.Context, resp proto.Message, annos annotations.Annotations, err error) error { + ctx, span := tracer.Start(ctx, "taskHelpers.FinishTask") + defer span.End() + if t.task == nil { return errors.New("cannot finish task: task is nil") } @@ -58,6 +64,9 @@ func (t *taskHelpers) TempDir() string { // If the heartbeat fails, this function will retry up to taskMaximumHeartbeatFailures times before cancelling the returned context with ErrTaskHeartbeatFailed. // If the task is cancelled by the server, the returned context will be cancelled with ErrTaskCancelled. func (t *taskHelpers) HeartbeatTask(ctx context.Context, annos annotations.Annotations) (context.Context, error) { + ctx, span := tracer.Start(ctx, "taskHelpers.HeartbeatTask") + defer span.End() + l := ctxzap.Extract(ctx).With(zap.String("task_id", t.task.GetId()), zap.Stringer("task_type", tasks.GetType(t.task))) rCtx, rCancel := context.WithCancelCause(ctx) diff --git a/pkg/tasks/local/accounter.go b/pkg/tasks/local/accounter.go index 4173c250..67d51f5e 100644 --- a/pkg/tasks/local/accounter.go +++ b/pkg/tasks/local/accounter.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/trace" + v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" "github.com/conductorone/baton-sdk/pkg/provisioner" "github.com/conductorone/baton-sdk/pkg/tasks" @@ -38,6 +40,9 @@ func (m *localAccountManager) Next(ctx context.Context) (*v1.Task, time.Duration } func (m *localAccountManager) Process(ctx context.Context, task *v1.Task, cc types.ConnectorClient) error { + ctx, span := tracer.Start(ctx, "localAccountManager.Process", trace.WithNewRoot()) + defer span.End() + accountManager := provisioner.NewCreateAccountManager(cc, m.dbPath, m.login, m.email) err := accountManager.Run(ctx) diff --git a/pkg/tasks/local/deleter.go b/pkg/tasks/local/deleter.go index 7f2ef18b..d5db8cb1 100644 --- a/pkg/tasks/local/deleter.go +++ b/pkg/tasks/local/deleter.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/trace" + v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" "github.com/conductorone/baton-sdk/pkg/provisioner" "github.com/conductorone/baton-sdk/pkg/tasks" @@ -38,6 +40,9 @@ func (m *localResourceDeleter) Next(ctx context.Context) (*v1.Task, time.Duratio } func (m *localResourceDeleter) Process(ctx context.Context, task *v1.Task, cc types.ConnectorClient) error { + ctx, span := tracer.Start(ctx, "localResourceDeleter.Process", trace.WithNewRoot()) + defer span.End() + accountManager := provisioner.NewResourceDeleter(cc, m.dbPath, m.resourceId, m.resourceType) err := accountManager.Run(ctx) diff --git a/pkg/tasks/local/event_feed.go b/pkg/tasks/local/event_feed.go index 87f9cbd2..0415b4e6 100644 --- a/pkg/tasks/local/event_feed.go +++ b/pkg/tasks/local/event_feed.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/encoding/protojson" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" @@ -39,6 +40,9 @@ func (m *localEventFeed) Next(ctx context.Context) (*v1.Task, time.Duration, err } func (m *localEventFeed) Process(ctx context.Context, task *v1.Task, cc types.ConnectorClient) error { + ctx, span := tracer.Start(ctx, "localEventFeed.Process", trace.WithNewRoot()) + defer span.End() + var pageToken string for { resp, err := cc.ListEvents(ctx, &v2.ListEventsRequest{ diff --git a/pkg/tasks/local/granter.go b/pkg/tasks/local/granter.go index ce0f59a6..33c091a7 100644 --- a/pkg/tasks/local/granter.go +++ b/pkg/tasks/local/granter.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/trace" + v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" "github.com/conductorone/baton-sdk/pkg/provisioner" "github.com/conductorone/baton-sdk/pkg/tasks" @@ -39,6 +41,9 @@ func (m *localGranter) Next(ctx context.Context) (*v1.Task, time.Duration, error } func (m *localGranter) Process(ctx context.Context, task *v1.Task, cc types.ConnectorClient) error { + ctx, span := tracer.Start(ctx, "localGranter.Process", trace.WithNewRoot()) + defer span.End() + granter := provisioner.NewGranter(cc, m.dbPath, m.entitlementID, m.principalID, m.principalType) err := granter.Run(ctx) diff --git a/pkg/tasks/local/local.go b/pkg/tasks/local/local.go new file mode 100644 index 00000000..93cc6be0 --- /dev/null +++ b/pkg/tasks/local/local.go @@ -0,0 +1,5 @@ +package local + +import "go.opentelemetry.io/otel" + +var tracer = otel.Tracer("baton-sdk/pkg.tasks.local") diff --git a/pkg/tasks/local/revoker.go b/pkg/tasks/local/revoker.go index 828e0f54..b0ef0e9b 100644 --- a/pkg/tasks/local/revoker.go +++ b/pkg/tasks/local/revoker.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/trace" + v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" "github.com/conductorone/baton-sdk/pkg/provisioner" "github.com/conductorone/baton-sdk/pkg/tasks" @@ -37,6 +39,9 @@ func (m *localRevoker) Next(ctx context.Context) (*v1.Task, time.Duration, error } func (m *localRevoker) Process(ctx context.Context, task *v1.Task, cc types.ConnectorClient) error { + ctx, span := tracer.Start(ctx, "localRevoker.Process", trace.WithNewRoot()) + defer span.End() + granter := provisioner.NewRevoker(cc, m.dbPath, m.grantID) err := granter.Run(ctx) diff --git a/pkg/tasks/local/rotator.go b/pkg/tasks/local/rotator.go index 24a3d0fc..4984b6f1 100644 --- a/pkg/tasks/local/rotator.go +++ b/pkg/tasks/local/rotator.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/trace" + v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" "github.com/conductorone/baton-sdk/pkg/provisioner" "github.com/conductorone/baton-sdk/pkg/tasks" @@ -38,6 +40,9 @@ func (m *localCredentialRotator) Next(ctx context.Context) (*v1.Task, time.Durat } func (m *localCredentialRotator) Process(ctx context.Context, task *v1.Task, cc types.ConnectorClient) error { + ctx, span := tracer.Start(ctx, "localCredentialRotator.Process", trace.WithNewRoot()) + defer span.End() + accountManager := provisioner.NewCredentialRotator(cc, m.dbPath, m.resourceId, m.resourceType) err := accountManager.Run(ctx) diff --git a/pkg/tasks/local/syncer.go b/pkg/tasks/local/syncer.go index c2e1e49c..a6e51fff 100644 --- a/pkg/tasks/local/syncer.go +++ b/pkg/tasks/local/syncer.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/trace" + v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" sdkSync "github.com/conductorone/baton-sdk/pkg/sync" "github.com/conductorone/baton-sdk/pkg/tasks" @@ -45,6 +47,9 @@ func (m *localSyncer) Next(ctx context.Context) (*v1.Task, time.Duration, error) } func (m *localSyncer) Process(ctx context.Context, task *v1.Task, cc types.ConnectorClient) error { + ctx, span := tracer.Start(ctx, "localSyncer.Process", trace.WithNewRoot()) + defer span.End() + syncer, err := sdkSync.NewSyncer(ctx, cc, sdkSync.WithC1ZPath(m.dbPath), sdkSync.WithTmpDir(m.tmpDir)) if err != nil { return err diff --git a/pkg/tasks/local/ticket.go b/pkg/tasks/local/ticket.go index e3ab56d4..385e8b47 100644 --- a/pkg/tasks/local/ticket.go +++ b/pkg/tasks/local/ticket.go @@ -7,10 +7,12 @@ import ( "sync" "time" - "github.com/conductorone/baton-sdk/pkg/types/resource" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "github.com/conductorone/baton-sdk/pkg/types/resource" + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1" "github.com/conductorone/baton-sdk/pkg/tasks" @@ -64,6 +66,9 @@ func (m *localBulkCreateTicket) Next(ctx context.Context) (*v1.Task, time.Durati } func (m *localBulkCreateTicket) Process(ctx context.Context, task *v1.Task, cc types.ConnectorClient) error { + ctx, span := tracer.Start(ctx, "localBulkCreateTicket.Process", trace.WithNewRoot()) + defer span.End() + l := ctxzap.Extract(ctx) templates, err := m.loadTicketTemplate(ctx) diff --git a/pkg/uotel/uotel.go b/pkg/uotel/uotel.go new file mode 100644 index 00000000..b67bf0db --- /dev/null +++ b/pkg/uotel/uotel.go @@ -0,0 +1,102 @@ +package uotel + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func InitOtel(ctx context.Context, endpoint string, serviceName string) (func(context.Context) error, error) { + if endpoint == "" { + return func(context.Context) error { + return nil + }, nil + } + conn, err := grpc.NewClient(endpoint, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) + } + + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceNameKey.String(serviceName), + ), + ) + + var shutdowns []func(context.Context) error + + tracerShutdown, err := initTracerProvider(ctx, res, conn) + if err != nil { + return nil, fmt.Errorf("failed to initialize tracer provider: %w", err) + } + shutdowns = append(shutdowns, tracerShutdown) + + //_, err = initMeterProvider(ctx, res, conn) + //if err != nil { + // return nil, fmt.Errorf("failed to initialize meter provider: %w", err) + //} + // TODO(jirwin): the meter shutdown causes an error on shutdown + //shutdowns = append(shutdowns, meterShutdown) + + shutdowns = append(shutdowns, func(context.Context) error { + return conn.Close() + }) + + return func(ctx context.Context) error { + var errs error + for _, shutdown := range shutdowns { + if err := shutdown(ctx); err != nil { + errs = errors.Join(errs, err) + } + } + return errs + }, nil +} + +func initTracerProvider(ctx context.Context, res *resource.Resource, conn *grpc.ClientConn) (func(context.Context) error, error) { + traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + + ssp := sdktrace.NewSimpleSpanProcessor(traceExporter) + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithResource(res), + sdktrace.WithSpanProcessor(ssp), + ) + otel.SetTracerProvider(tracerProvider) + + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + + return tracerProvider.Shutdown, nil +} + +//func initMeterProvider(ctx context.Context, res *resource.Resource, conn *grpc.ClientConn) (func(context.Context) error, error) { +// metricExporter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithGRPCConn(conn)) +// if err != nil { +// return nil, fmt.Errorf("failed to create metrics exporter: %w", err) +// } +// +// meterProvider := sdkmetric.NewMeterProvider( +// sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter)), +// sdkmetric.WithResource(res), +// ) +// otel.SetMeterProvider(meterProvider) +// +// return meterProvider.Shutdown, nil +//}