diff --git a/cmd/otelbenchctl/main.go b/cmd/otelbenchctl/main.go new file mode 100644 index 00000000..1060e59d --- /dev/null +++ b/cmd/otelbenchctl/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "context" + "flag" + + "github.com/go-faster/errors" + "github.com/go-faster/sdk/app" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func newMetrics() pmetric.Metrics { + m := pmetric.NewMetrics() + rl := m.ResourceMetrics().AppendEmpty() + rl.Resource().Attributes().PutStr("host.name", "testHost") + rl.SetSchemaUrl("resource_schema") + il := rl.ScopeMetrics().AppendEmpty() + il.Scope().SetName("name") + il.Scope().SetVersion("version") + il.Scope().Attributes().PutStr("oteldb.name", "testDB") + il.Scope().SetDroppedAttributesCount(1) + il.SetSchemaUrl("scope_schema") + im := il.Metrics().AppendEmpty() + im.SetName("metric_name") + im.SetDescription("metric_description") + dp := im.SetEmptyGauge().DataPoints() + for i := 0; i < 10000; i++ { + dp.AppendEmpty().SetIntValue(0) + } + + return m +} + +func main() { + app.Run(func(ctx context.Context, lg *zap.Logger, m *app.Metrics) error { + var arg struct { + Jobs int + } + flag.IntVar(&arg.Jobs, "j", 16, "jobs") + flag.Parse() + g, ctx := errgroup.WithContext(ctx) + for i := 0; i < arg.Jobs; i++ { + g.Go(func() error { + conn, err := grpc.DialContext(ctx, "127.0.0.1:3951", + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return errors.Wrap(err, "dial oteldb") + } + client := pmetricotlp.NewGRPCClient(conn) + metrics := newMetrics() + req := pmetricotlp.NewExportRequestFromMetrics(metrics) + for { + if _, err := client.Export(ctx, req); err != nil { + return errors.Wrap(err, "send metrics") + } + } + }) + } + return g.Wait() + }) +} diff --git a/cmd/otelbenchsrv/main.go b/cmd/otelbenchsrv/main.go new file mode 100644 index 00000000..ef7e8b58 --- /dev/null +++ b/cmd/otelbenchsrv/main.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "net" + "time" + + "github.com/go-faster/sdk/app" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.uber.org/atomic" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +func newMetrics() pmetric.Metrics { + m := pmetric.NewMetrics() + rl := m.ResourceMetrics().AppendEmpty() + rl.Resource().Attributes().PutStr("host.name", "testHost") + rl.SetSchemaUrl("resource_schema") + il := rl.ScopeMetrics().AppendEmpty() + il.Scope().SetName("name") + il.Scope().SetVersion("version") + il.Scope().Attributes().PutStr("oteldb.name", "testDB") + il.Scope().SetDroppedAttributesCount(1) + il.SetSchemaUrl("scope_schema") + im := il.Metrics().AppendEmpty() + im.SetName("metric_name") + im.SetDescription("metric_description") + points := im.SetEmptyGauge().DataPoints().AppendEmpty() + points.SetIntValue(0) + return m +} + +type noopServer struct { + pmetricotlp.UnimplementedGRPCServer + count atomic.Uint64 +} + +func (n *noopServer) Export(ctx context.Context, request pmetricotlp.ExportRequest) (pmetricotlp.ExportResponse, error) { + n.count.Add(1) + return pmetricotlp.NewExportResponse(), nil +} + +func main() { + app.Run(func(ctx context.Context, lg *zap.Logger, m *app.Metrics) error { + srv := grpc.NewServer() + h := &noopServer{} + go func() { + t := time.NewTicker(time.Second) + for range t.C { + lg.Info("Got", zap.Uint64("n", h.count.Swap(0))) + } + }() + go func() { + <-ctx.Done() + srv.GracefulStop() + }() + pmetricotlp.RegisterGRPCServer(srv, h) + ln, err := net.Listen("tcp", "127.0.0.1:3951") + if err != nil { + return err + } + return srv.Serve(ln) + }) +} diff --git a/cmd/oteltest/main.go b/cmd/oteltest/main.go new file mode 100644 index 00000000..31615156 --- /dev/null +++ b/cmd/oteltest/main.go @@ -0,0 +1,131 @@ +package main + +import ( + "context" + "crypto/sha1" + "flag" + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/dustin/go-humanize" + "github.com/go-faster/errors" + "github.com/go-faster/sdk/app" + "go.uber.org/atomic" + "go.uber.org/zap" + "go.ytsaurus.tech/yt/go/yt" + "go.ytsaurus.tech/yt/go/yt/ythttp" + "golang.org/x/sync/errgroup" + + "github.com/go-faster/oteldb/internal/metricsharding" + "github.com/go-faster/oteldb/internal/metricstorage" + "github.com/go-faster/oteldb/internal/otelstorage" +) + +func main() { + app.Run(func(ctx context.Context, lg *zap.Logger, m *app.Metrics) error { + var arg struct { + Proxy string + Token string + ProxyDiscovery bool + TotalPoints int + BatchSize int + TenantID int + Workers int + } + flag.IntVar(&arg.Workers, "j", 1, "concurrent jobs") + flag.IntVar(&arg.TenantID, "t", 222, "tenant id") + flag.IntVar(&arg.TotalPoints, "p", 1_000_000_000, "total points to insert") + flag.IntVar(&arg.BatchSize, "b", 5_000, "batch size") + flag.StringVar(&arg.Proxy, "proxy", "localhost:8000", "proxy address") + flag.StringVar(&arg.Token, "token", "admin", "token") + flag.BoolVar(&arg.ProxyDiscovery, "proxy-discovery", false, "proxy discovery") + flag.Parse() + + yc, err := ythttp.NewClient(&yt.Config{ + Proxy: arg.Proxy, + Token: arg.Token, + DisableProxyDiscovery: !arg.ProxyDiscovery, + }) + if err != nil { + return errors.Wrap(err, "yt.NewClient") + } + + sharder := metricsharding.NewSharder(yc, metricsharding.ShardingOptions{}) + if err := sharder.CreateTenant(ctx, metricsharding.TenantID(arg.TenantID), time.Now()); err != nil { + return errors.Wrap(err, "sharder.CreateTenant") + } + var ( + tenant = sharder.TenantPath(metricsharding.TenantID(arg.TenantID)) + active = tenant.Child("active") + now = time.Now() + ) + + var loaded atomic.Uint64 + data := make(chan []any, arg.Workers) + g, ctx := errgroup.WithContext(ctx) + for i := 0; i < arg.Workers; i++ { + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case points, ok := <-data: + if !ok { + return nil + } + bo := backoff.NewExponentialBackOff() + fn := func() error { + return yc.InsertRows(ctx, active.Child("points"), points, nil) + } + if err := backoff.RetryNotify(fn, backoff.WithContext(bo, ctx), func(err error, duration time.Duration) { + lg.Warn("retrying", zap.Error(err), zap.Duration("duration", duration)) + }); err != nil { + return errors.Wrap(err, "yc.InsertRows") + } + loaded.Add(uint64(len(points))) + } + } + }) + } + g.Go(func() error { + defer close(data) + for i := 0; i < arg.TotalPoints/arg.BatchSize; i++ { + var points []any + rh := sha1.Sum([]byte(fmt.Sprintf("r%d", i))) + ah := sha1.Sum([]byte(fmt.Sprintf("a%d", i))) + for j := 0; j < arg.BatchSize; j++ { + delta := time.Duration(i+j) * time.Millisecond + ts := now.Add(delta) + points = append(points, metricstorage.Point{ + Metric: "some.metric.name", + ResourceHash: rh[:], + AttributeHash: ah[:], + Timestamp: otelstorage.NewTimestampFromTime(ts), + Point: float64(j), + }) + } + select { + case <-ctx.Done(): + return ctx.Err() + case data <- points: + } + } + return nil + }) + g.Go(func() error { + ticker := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + value := float64(loaded.Swap(0)) + lg.Info("loaded", zap.String("points", humanize.SI(value, "P"))) + } + } + }) + + return g.Wait() + }) +} diff --git a/docs/architecture.md b/docs/architecture.md index 4abe6594..231bfe11 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -16,3 +16,42 @@ TODO: define attribute. ## Partition Data can be partitioned by tenant id and time. + +``` +Having: + 1) closed data partitioned by day (Δ=1D) + 2) active attributes and resources partitioned by hour (δ=1H) + +/metrics + /tenant-1 + /active + points + attributes/ + 2021-01-01-T-20-00 + 2021-01-01-T-21-00 + 2021-01-01-T-22-00 + 2021-01-01-T-23-00 + resources/ + 2021-01-01-T-20-00 + 2021-01-01-T-21-00 + 2021-01-01-T-22-00 + 2021-01-01-T-23-00 + /closed + /2021-01-01 + points + attributes + resources + +/metrics/tenant/active/{attributes, resources}/*: + dynamic tables of 2Δ data that is partitioned by δ (1H) +/metrics/tenant/points: + dynamic table that stores 2Δ (2D) of point data + +Each Δ: +1) Create new directory in /metrics/tenant/closed: + e.g. /metrics/tenant1/closed/2021-01-01 +2) Copy [T-2Δ, T-Δ) data from active/points to points static table +3) Merge [T-2Δ, T-Δ) data from active/attributes/* to attributes static table +4) Merge [T-2Δ, T-Δ) data from active/resources/* to resources static table +5) Delete data that is older than T-Δ from active tables +``` diff --git a/docs/metrics.md b/docs/metrics.md index 7af0c522..9f16400f 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -35,7 +35,6 @@ in single time shard, so we don't add metric name to key. 2. Resource set 3. Attribute set - Data should be stored in a way that allows exploit low cardinality of metric name and resource set, subsequently reducing series cardinality. @@ -80,7 +79,7 @@ value: 10 We compute hashes from attribute sets: | name | value | -|------------|----------------------------------| +| ---------- | -------------------------------- | | attributes | 3b52e723db6a5e0aaee48e0a984d33f9 | | resource | 4bfe5ab10b4f64a45383b67c222d962c | @@ -89,41 +88,72 @@ Attribute set is represented by an ordered list attributes to make hash determin #### `resource` | key | value | -|----------------------------------|---------------------------------------------| +| -------------------------------- | ------------------------------------------- | | 4bfe5ab10b4f64a45383b67c222d962c | `{"tenant.id": "1", "service.name": "api"}` | #### `attributes` | metric | key | value | -|-----------------------|----------------------------------|----------------------------| +| --------------------- | -------------------------------- | -------------------------- | | http.request.duration | 3b52e723db6a5e0aaee48e0a984d33f9 | `{"foo": 1, "bar": "baz"}` | #### `points` | name | resource_hash | attribute_hash | timestamp | value | -|-----------------------|----------------------------------|----------------------------------|----------------------|-------| +| --------------------- | -------------------------------- | -------------------------------- | -------------------- | ----- | | http.request.duration | 4bfe5ab10b4f64a45383b67c222d962c | 3b52e723db6a5e0aaee48e0a984d33f9 | 2021-01-01T00:00:00Z | 10 | ## Partitioning and sharding Data should be sharded by time (e.g. per day, week or similar, can be heterogeneous) and tenant id. -For example: +Having: + +1. closed data partitioned by day (Δ=1D) +2. active attributes and resources partitioned by hour (δ=1H) ``` /metrics - /tenant01 - /2021-01-01 <- time shard - values - attributes - resources - /2021-01-02 - /tenant02 - /2021-01-01 + /tenant-1 + /active + points + attributes/ + 2021-01-01-T-20-00 + 2021-01-01-T-21-00 + 2021-01-01-T-22-00 + 2021-01-01-T-23-00 + resources/ + 2021-01-01-T-20-00 + 2021-01-01-T-21-00 + 2021-01-01-T-22-00 + 2021-01-01-T-23-00 + /closed + /2021-01-01 + points + attributes + resources ``` +- `/metrics/tenant/active/{attributes, resources}/*`: + + dynamic tables of 2Δ data that is partitioned by δ (1H) + +- `/metrics/tenant/points`: + + dynamic table that stores 2Δ (2D) of point data + +Each Δ: + +1. Create new directory in `/metrics/tenant/closed`: + e.g. `/metrics/tenant1/closed/2021-01-01` +2. Copy `[T-2Δ, T-Δ)` data from `active/points` to `points` static table +3. Merge `[T-2Δ, T-Δ)` data from `active/attributes/\*` to `attributes` static table +4. Merge `[T-2Δ, T-Δ)` data from `active/resources/\*` to `resources` static table +5. Delete data that is older than `T-Δ` from active tables + Query can be efficiently executed concurrently. Also, there are concurrent `LogQL` executors that can exploit that. + - https://github.com/thanos-io/promql-engine ## Query execution diff --git a/go.mod b/go.mod index e4ae4bfb..76cdf995 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/fatih/color v1.15.0 github.com/go-faster/errors v0.6.1 github.com/go-faster/jx v1.1.0 - github.com/go-faster/sdk v0.8.2 + github.com/go-faster/sdk v0.9.2 github.com/go-faster/tcpproxy v0.1.0 github.com/go-logfmt/logfmt v0.6.0 github.com/google/uuid v1.3.1 @@ -44,6 +44,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.19.0 go.opentelemetry.io/otel/trace v1.19.0 go.opentelemetry.io/proto/otlp v1.0.0 + go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 go.ytsaurus.tech/library/go/core/log v0.0.3 @@ -156,14 +157,13 @@ require ( go.opentelemetry.io/collector/processor v0.86.0 // indirect go.opentelemetry.io/collector/semconv v0.86.0 // indirect go.opentelemetry.io/collector/service v0.86.0 // indirect - go.opentelemetry.io/contrib/instrumentation/runtime v0.42.0 // indirect - go.opentelemetry.io/contrib/propagators/autoprop v0.42.0 // indirect - go.opentelemetry.io/contrib/propagators/aws v1.17.0 // indirect + go.opentelemetry.io/contrib/instrumentation/runtime v0.43.0 // indirect + go.opentelemetry.io/contrib/propagators/autoprop v0.43.0 // indirect + go.opentelemetry.io/contrib/propagators/aws v1.18.0 // indirect go.opentelemetry.io/contrib/propagators/b3 v1.19.0 // indirect - go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 // indirect - go.opentelemetry.io/contrib/propagators/ot v1.17.0 // indirect + go.opentelemetry.io/contrib/propagators/jaeger v1.18.0 // indirect + go.opentelemetry.io/contrib/propagators/ot v1.18.0 // indirect go.opentelemetry.io/otel/bridge/opencensus v0.41.0 // indirect - go.opentelemetry.io/otel/exporters/jaeger v1.16.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.41.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.41.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.41.0 // indirect @@ -173,7 +173,6 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.41.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.41.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.18.0 // indirect - go.uber.org/atomic v1.11.0 // indirect go.ytsaurus.tech/library/go/blockcodecs v0.0.2 // indirect go.ytsaurus.tech/library/go/core/xerrors v0.0.3 // indirect go.ytsaurus.tech/library/go/ptr v0.0.1 // indirect diff --git a/go.sum b/go.sum index 068e7959..2cec05f1 100644 --- a/go.sum +++ b/go.sum @@ -135,8 +135,8 @@ github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06F github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg= github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb3skg= -github.com/go-faster/sdk v0.8.2 h1:lxg4+Kckfr4It42Dr3EM+UJ1G7K8IAaHY3FQ7Vdlx6w= -github.com/go-faster/sdk v0.8.2/go.mod h1:ezZalGGeuw9hQD843O0ZAYpAOC63H8iOzMVeu/skWJY= +github.com/go-faster/sdk v0.9.2 h1:W/Gfxk8xlj0s+DOrFhDv5RWsuiHfvCoJyABeHd3SICo= +github.com/go-faster/sdk v0.9.2/go.mod h1:gEgPq0HmgmBEAzEB5nAxAkb6YrkzkgJy30G3EMu8B+U= github.com/go-faster/tcpproxy v0.1.0 h1:DXWJlSc09p2YuaAmPT1FMOjR3bm78MZ4FTh7nALcYxs= github.com/go-faster/tcpproxy v0.1.0/go.mod h1:6znjpcjn87QLCv215RCCde6eUHbP7wQnGfOGfPCUaIY= github.com/go-faster/yaml v0.4.6 h1:lOK/EhI04gCpPgPhgt0bChS6bvw7G3WwI8xxVe0sw9I= @@ -481,7 +481,6 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -573,18 +572,18 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.4 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0/go.mod h1:vsh3ySueQCiKPxFLvjWC4Z135gIa34TQ/NSqkDTZYUM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 h1:x8Z78aZx8cOF0+Kkazoc7lwUNMGy0LrzEMxTm4BbTxg= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0/go.mod h1:62CPTSry9QZtOaSsE3tOzhx6LzDhHnXJ6xHeMNNiM6Q= -go.opentelemetry.io/contrib/instrumentation/runtime v0.42.0 h1:EbmAUG9hEAMXyfWEasIt2kmh/WmXUznUksChApTgBGc= -go.opentelemetry.io/contrib/instrumentation/runtime v0.42.0/go.mod h1:rD9feqRYP24P14t5kmhNMqsqm1jvKmpx2H2rKVw52V8= -go.opentelemetry.io/contrib/propagators/autoprop v0.42.0 h1:s2RzYOAqHVgG23q8fPWYChobUoZM6rJZ98EnylJr66w= -go.opentelemetry.io/contrib/propagators/autoprop v0.42.0/go.mod h1:Mv/tWNtZn+NbALDb2XcItP0OM3lWWZjAfSroINxfW+Y= -go.opentelemetry.io/contrib/propagators/aws v1.17.0 h1:IX8d7l2uRw61BlmZBOTQFaK+y22j6vytMVTs9wFrO+c= -go.opentelemetry.io/contrib/propagators/aws v1.17.0/go.mod h1:pAlCYRWff4uGqRXOVn3WP8pDZ5E0K56bEoG7a1VSL4k= +go.opentelemetry.io/contrib/instrumentation/runtime v0.43.0 h1:NunhgxcK14rU7Hw2gKtV6uCSyohkXPisqneRFjnZNKQ= +go.opentelemetry.io/contrib/instrumentation/runtime v0.43.0/go.mod h1:rwb7icgpDjIhhHqv1qPGw6dDjAdAR7IKAe4PQdzBbsg= +go.opentelemetry.io/contrib/propagators/autoprop v0.43.0 h1:5j/y9N1SKXDEnUWaNWqegOVDuYyeaCweyR2+tVFFWDk= +go.opentelemetry.io/contrib/propagators/autoprop v0.43.0/go.mod h1:EzMD9D3cdJTMDqh5gAjCODQ1+QlU8iREsqOfE28CgEA= +go.opentelemetry.io/contrib/propagators/aws v1.18.0 h1:8SFScyYfxZK/MaW1iW17h/RhHNogbDtpwNJ6Ce95h0A= +go.opentelemetry.io/contrib/propagators/aws v1.18.0/go.mod h1:0ssYM4GfgGWeoJKLcXduZowVIbIlWd8zCY0CdQqYA0w= go.opentelemetry.io/contrib/propagators/b3 v1.19.0 h1:ulz44cpm6V5oAeg5Aw9HyqGFMS6XM7untlMEhD7YzzA= go.opentelemetry.io/contrib/propagators/b3 v1.19.0/go.mod h1:OzCmE2IVS+asTI+odXQstRGVfXQ4bXv9nMBRK0nNyqQ= -go.opentelemetry.io/contrib/propagators/jaeger v1.17.0 h1:Zbpbmwav32Ea5jSotpmkWEl3a6Xvd4tw/3xxGO1i05Y= -go.opentelemetry.io/contrib/propagators/jaeger v1.17.0/go.mod h1:tcTUAlmO8nuInPDSBVfG+CP6Mzjy5+gNV4mPxMbL0IA= -go.opentelemetry.io/contrib/propagators/ot v1.17.0 h1:ufo2Vsz8l76eI47jFjuVyjyB3Ae2DmfiCV/o6Vc8ii0= -go.opentelemetry.io/contrib/propagators/ot v1.17.0/go.mod h1:SbKPj5XGp8K/sGm05XblaIABgMgw2jDczP8gGeuaVLk= +go.opentelemetry.io/contrib/propagators/jaeger v1.18.0 h1:T457dcPEUr4+wimXmIs+2lI8vpSnRpxEhSsY2n7+UjU= +go.opentelemetry.io/contrib/propagators/jaeger v1.18.0/go.mod h1:FTAfGYSYWANl3fOqHpZYeC7AAAv4sdYgJ724NnE1msY= +go.opentelemetry.io/contrib/propagators/ot v1.18.0 h1:VmzxO7BjUU6oo0ChcKuGdKaSR0vchPxwahHZl64zVUM= +go.opentelemetry.io/contrib/propagators/ot v1.18.0/go.mod h1:5VwcOJ7OjS0uPxaxuwKHwJtkt+EAC+cgjXleXMe51z4= go.opentelemetry.io/contrib/zpages v0.45.0 h1:jIwHHGoWzJoZdbIUtWdErjL85Gni6BignnAFqDtMRL4= go.opentelemetry.io/contrib/zpages v0.45.0/go.mod h1:4mIdA5hqH6hEx9sZgV50qKfQO8aIYolUZboHmz+G7vw= go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= @@ -593,8 +592,6 @@ go.opentelemetry.io/otel/bridge/opencensus v0.41.0 h1:VBpeaTbrvLFHvRtsyCJXjsTaic go.opentelemetry.io/otel/bridge/opencensus v0.41.0/go.mod h1:yCQB5IKRhgjlbTLc91+ixcZc2/8BncGGJ+CS3dZJwtY= go.opentelemetry.io/otel/bridge/opentracing v1.19.0 h1:HCvsUi6uuhat/nAuxCl41A+OPxXXPxMNTRxKZx7hTW4= go.opentelemetry.io/otel/bridge/opentracing v1.19.0/go.mod h1:n46h+7L/lcSuHhpqJQiUdb4eux19NNxTuWJ/ZMnIQMg= -go.opentelemetry.io/otel/exporters/jaeger v1.16.0 h1:YhxxmXZ011C0aDZKoNw+juVWAmEfv/0W2XBOv9aHTaA= -go.opentelemetry.io/otel/exporters/jaeger v1.16.0/go.mod h1:grYbBo/5afWlPpdPZYhyn78Bk04hnvxn2+hvxQhKIQM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.41.0 h1:k0k7hFNDd8K4iOMJXj7s8sHaC4mhTlAeppRmZXLgZ6k= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.41.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.41.0 h1:HgbDTD8pioFdY3NRc/YCvsWjqQPtweGyXxa32LgnTOw= diff --git a/internal/metricsharding/archive.go b/internal/metricsharding/archive.go new file mode 100644 index 00000000..255e5f79 --- /dev/null +++ b/internal/metricsharding/archive.go @@ -0,0 +1,142 @@ +package metricsharding + +import ( + "context" + "fmt" + "time" + + "github.com/go-faster/errors" + "github.com/go-faster/sdk/zctx" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "go.ytsaurus.tech/yt/go/mapreduce/spec" + "go.ytsaurus.tech/yt/go/schema" + "go.ytsaurus.tech/yt/go/ypath" + "go.ytsaurus.tech/yt/go/yt" + + "github.com/go-faster/oteldb/internal/metricstorage" +) + +// ArchiveTenant creates a new closed block, if needed. +func (s *Sharder) ArchiveTenant(ctx context.Context, tenant TenantID) (rerr error) { + var ( + tenantPath = s.tenantPath(tenant) + currentBlockStart = s.currentBlockStart() + + start = currentBlockStart.Add(-s.shardOpts.BlockDelta) + end = currentBlockStart + ) + + grp, grpCtx := errgroup.WithContext(ctx) + grp.Go(func() error { + ctx := grpCtx + return s.archivePoints(ctx, tenantPath, start, end) + }) + for _, block := range []struct { + Name string + Schema schema.Schema + }{ + {"attributes", metricstorage.Attributes{}.YTSchema()}, + {"resource", metricstorage.Resource{}.YTSchema()}, + } { + block := block + grp.Go(func() error { + ctx := grpCtx + return s.archiveAttributes(ctx, block.Name, block.Schema, tenantPath, start, end) + }) + } + + return grp.Wait() +} + +func (s *Sharder) archiveAttributes(ctx context.Context, + dir string, targetSchema schema.Schema, + tenantPath ypath.Path, + start, end time.Time, +) error { + var ( + activePath = tenantPath.Child("active").Child(dir) + targetPath = tenantPath.Child("closed").Child(start.Format(timeBlockLayout)).Child(dir) + ) + + if _, err := yt.CreateTable(ctx, s.yc, targetPath, + yt.WithSchema(targetSchema), + yt.WithRecursive(), + ); err != nil { + return errors.Wrapf(err, "create static table %q", targetPath) + } + + blocks, err := s.getBlocks(ctx, activePath, start, end) + if err != nil { + return errors.Wrap(err, "get attribute blocks to merge") + } + + opSpec := spec.Merge() + for _, block := range blocks { + opSpec = opSpec.AddInput(block.Root) + } + opSpec.OutputTablePath = targetPath + + lg := zctx.From(ctx) + op, err := s.mapreduce.Merge(opSpec) + if err != nil { + return errors.Wrap(err, "run merge operation") + } + lg.Info("Run merge operation", + zap.Stringer("id", op.ID()), + zap.Stringer("from", activePath), + zap.Stringer("to", targetPath), + ) + + if err := op.Wait(); err != nil { + return errors.Wrapf(err, "wait operation %q", op.ID()) + } + lg.Info("Merge operation done", zap.Stringer("id", op.ID())) + + return nil +} + +func (s *Sharder) archivePoints(ctx context.Context, + tenantPath ypath.Path, + start, end time.Time, +) (rerr error) { + const table = "points" + var ( + activePath = tenantPath.Child("active").Child(table) + targetPath = tenantPath.Child("closed").Child(start.Format(timeBlockLayout)).Child(table) + ) + + if _, err := yt.CreateTable(ctx, s.yc, targetPath, + yt.WithSchema(metricstorage.Point{}.YTSchema()), + yt.WithRecursive(), + ); err != nil { + return errors.Wrapf(err, "create static table %q", targetPath) + } + + opSpec := spec.Merge() + opSpec.InputTablePaths = []ypath.YPath{activePath} + opSpec.OutputTablePath = targetPath + opSpec.InputQuery = fmt.Sprintf( + "* FROM [%s] WHERE timestamp >= %d AND timestamp < %d", + activePath, start.UnixNano(), end.UnixNano(), + ) + + lg := zctx.From(ctx) + op, err := s.mapreduce.Merge(opSpec) + if err != nil { + return errors.Wrap(err, "run merge operation") + } + lg.Info("Run merge operation", + zap.Stringer("id", op.ID()), + zap.Stringer("from", activePath), + zap.Stringer("to", targetPath), + ) + + if err := op.Wait(); err != nil { + return errors.Wrapf(err, "wait operation %q", op.ID()) + } + lg.Info("Merge operation done", zap.Stringer("id", op.ID())) + + return nil +} diff --git a/internal/metricsharding/block.go b/internal/metricsharding/block.go new file mode 100644 index 00000000..98c23f80 --- /dev/null +++ b/internal/metricsharding/block.go @@ -0,0 +1,52 @@ +package metricsharding + +import ( + "time" + + "go.ytsaurus.tech/yt/go/ypath" +) + +// TenantID is a tenant ID. +type TenantID = int64 + +// Block is a metric points/attributes block. +type Block struct { + // Root is a root path. + Root ypath.Path + // At is a time block starting point. + At time.Time +} + +// Resource returns resource table path. +func (s Block) Resource() ypath.Path { + return s.Root.Child("resource") +} + +// Attributes returns attributes table path. +func (s Block) Attributes() ypath.Path { + return s.Root.Child("attributes") +} + +// Points returns points table path. +func (s Block) Points() ypath.Path { + return s.Root.Child("points") +} + +func newBlock(root ypath.Path, at time.Time) Block { + return Block{ + root, + at, + } +} + +// QueryBlocks describes block to query. +type QueryBlocks struct { + // Active is a list of dynamic tables with recent points. + Active []ypath.Path + // RecentAttributes is a list of block with recent attributes. + RecentAttributes []Block + // RecentResource is a list of block with recent resources. + RecentResource []Block + // Closed blocks with static tables. + Closed []Block +} diff --git a/internal/metricsharding/metricsharding.go b/internal/metricsharding/metricsharding.go new file mode 100644 index 00000000..ccb71269 --- /dev/null +++ b/internal/metricsharding/metricsharding.go @@ -0,0 +1,32 @@ +// Package metricsharding contains YT metric storage implementation. +package metricsharding + +import ( + "time" + + "go.ytsaurus.tech/yt/go/ypath" +) + +// ShardingOptions sets sharding options. +type ShardingOptions struct { + // Root path of storage. + Root ypath.Path + + // AttributeDelta defines partition (δ=1h) of the current block attributes. + AttributeDelta time.Duration + // BlockDelta defines partition (Δ=1d) of the closed blocks. + BlockDelta time.Duration +} + +// SetDefaults sets default options. +func (opts *ShardingOptions) SetDefaults() { + if opts.Root == "" { + opts.Root = ypath.Path("//oteldb/metrics") + } + if opts.AttributeDelta == 0 { + opts.AttributeDelta = time.Hour + } + if opts.BlockDelta == 0 { + opts.BlockDelta = 24 * time.Hour + } +} diff --git a/internal/metricsharding/ms_test.go b/internal/metricsharding/ms_test.go new file mode 100644 index 00000000..bde6e062 --- /dev/null +++ b/internal/metricsharding/ms_test.go @@ -0,0 +1,141 @@ +package metricsharding + +import ( + "context" + "crypto/sha1" + "fmt" + "io" + "math/rand" + "testing" + "time" + + "github.com/ClickHouse/ch-go" + "github.com/ClickHouse/ch-go/proto" + "github.com/stretchr/testify/require" + "go.ytsaurus.tech/yt/go/yt" + "go.ytsaurus.tech/yt/go/yt/ythttp" + + "github.com/go-faster/oteldb/internal/metricstorage" + "github.com/go-faster/oteldb/internal/otelstorage" +) + +const ( + totalTestPoints = 1_000_000_000 // 1B + uniqueRate = 5_000 // each 50K + totalBatches = totalTestPoints / uniqueRate +) + +func TestFoo(t *testing.T) { + ctx := context.Background() + + yc, err := ythttp.NewClient(&yt.Config{ + Proxy: "localhost:8000", + Token: "admin", + DisableProxyDiscovery: true, + }) + require.NoError(t, err) + + const tenantID = 222 + sharder := NewSharder(yc, ShardingOptions{}) + if err := sharder.CreateTenant(ctx, tenantID, time.Now()); err != nil { + t.Fatal(err) + } + + tenant := sharder.tenantPath(tenantID) + active := tenant.Child(`active`) + t.Logf("active: %#q", active) + + now := time.Now() + for i := 0; i < totalBatches; i++ { + var points []any + rh := sha1.Sum([]byte(fmt.Sprintf("r%d", i))) + ah := sha1.Sum([]byte(fmt.Sprintf("a%d", i))) + for j := 0; j < uniqueRate; j++ { + delta := time.Duration(i+j) * time.Millisecond + ts := now.Add(delta) + points = append(points, metricstorage.Point{ + Metric: "foo", + ResourceHash: rh[:], + AttributeHash: ah[:], + Timestamp: otelstorage.NewTimestampFromTime(ts), + Point: float64(j), + }) + } + if err := yc.InsertRows(ctx, active.Child("points"), points, nil); err != nil { + t.Fatal(err) + } + } +} + +func TestClickHouse(t *testing.T) { + ctx := context.Background() + c, err := ch.Dial(ctx, ch.Options{}) + require.NoError(t, err) + + ddl := ch.Query{ + Body: `CREATE TABLE IF NOT EXISTS metrics +( + name LowCardinality(String), + resource UInt128, + attributes UInt128, + timestamp DateTime64(9) CODEC(DoubleDelta), + value Float64 CODEC(Gorilla) +) + ENGINE = MergeTree() + PARTITION BY toYearWeek(timestamp) + ORDER BY (name, resource, attributes, timestamp);`, + } + require.NoError(t, c.Do(ctx, ddl)) + + var ( + cName = new(proto.ColStr).LowCardinality() + cResource = new(proto.ColUInt128) + cAttr = new(proto.ColUInt128) + cTime = new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano) + cValue = new(proto.ColFloat64) + ) + rows := proto.Input{ + {Name: "name", Data: cName}, + {Name: "resource", Data: cResource}, + {Name: "attributes", Data: cAttr}, + {Name: "timestamp", Data: cTime}, + {Name: "value", Data: cValue}, + } + var total int + rnd := rand.New(rand.NewSource(1)) + fill := func() { + rows.Reset() + ts := time.Now() + res := proto.UInt128{ + Low: rnd.Uint64(), + High: rnd.Uint64(), + } + attr := proto.UInt128{ + Low: rnd.Uint64(), + High: rnd.Uint64(), + } + for i := 0; i < uniqueRate; i++ { + cName.Append("foo") + cResource.Append(res) + cAttr.Append(attr) + cTime.Append(ts.Add(time.Duration(i) * time.Millisecond)) + cValue.Append(float64(i)) + } + } + fill() + q := ch.Query{ + Body: rows.Into("metrics"), + Input: rows, + OnInput: func(ctx context.Context) error { + total++ + if total >= totalBatches { + return io.EOF + } + fill() + return nil + }, + } + if err := c.Do(ctx, q); err != nil { + t.Fatal(err) + } +} diff --git a/internal/metricsharding/sharder.go b/internal/metricsharding/sharder.go new file mode 100644 index 00000000..e9335ef2 --- /dev/null +++ b/internal/metricsharding/sharder.go @@ -0,0 +1,242 @@ +package metricsharding + +import ( + "context" + "fmt" + "slices" + "sync" + "time" + + "github.com/go-faster/errors" + "github.com/go-faster/sdk/zctx" + "go.uber.org/zap" + "go.ytsaurus.tech/yt/go/mapreduce" + "go.ytsaurus.tech/yt/go/migrate" + "go.ytsaurus.tech/yt/go/ypath" + "go.ytsaurus.tech/yt/go/yt" + "golang.org/x/sync/errgroup" + + "github.com/go-faster/oteldb/internal/metricstorage" +) + +// Sharder controls sharding. +type Sharder struct { + yc yt.Client + mapreduce mapreduce.Client + + shardOpts ShardingOptions +} + +// NewSharder creates new [Sharder]. +func NewSharder(yc yt.Client, shardOpts ShardingOptions) *Sharder { + shardOpts.SetDefaults() + + return &Sharder{ + yc: yc, + mapreduce: mapreduce.New(yc), + shardOpts: shardOpts, + } +} + +const timeBlockLayout = "2006-01-02_15-04-05" + +func (s *Sharder) TenantPath(id TenantID) ypath.Path { + return s.tenantPath(id) +} + +func (s *Sharder) tenantPath(id TenantID) ypath.Path { + return s.shardOpts.Root.Child(fmt.Sprintf("tenant_%v", id)) +} + +func (s *Sharder) currentBlockStart() time.Time { + return time.Now().UTC().Truncate(s.shardOpts.BlockDelta) +} + +// CreateTenant creates storage strucute for given tenant. +func (s *Sharder) CreateTenant(ctx context.Context, tenant TenantID, at time.Time) error { + var ( + activePath = s.tenantPath(tenant).Child("active") + timePartition = at.UTC().Truncate(s.shardOpts.AttributeDelta).Format(timeBlockLayout) + ) + return migrate.EnsureTables(ctx, s.yc, + map[ypath.Path]migrate.Table{ + activePath.Child("resource").Child(timePartition): { + Schema: metricstorage.Resource{}.YTSchema(), + Attributes: map[string]any{ + "optimize_for": "scan", + }, + }, + activePath.Child("attributes").Child(timePartition): { + Schema: metricstorage.Attributes{}.YTSchema(), + Attributes: map[string]any{ + "optimize_for": "scan", + }, + }, + activePath.Child("points"): { + Schema: metricstorage.Point{}.YTSchema(), + Attributes: map[string]any{ + "optimize_for": "scan", + }, + }, + }, + migrate.OnConflictTryAlter(ctx, s.yc), + ) +} + +// GetBlocksForQuery returns list of blocks to query. +func (s *Sharder) GetBlocksForQuery(ctx context.Context, tenants []TenantID, start, end time.Time) (qb QueryBlocks, _ error) { + var ( + currentBlockStart = s.currentBlockStart() + + // Query closed blocks only if range includes points before start of the active block. + needClosed = start.Before(currentBlockStart) + // Query current blocks only if range includes points after start of the active block. + needActive = end.After(currentBlockStart) + ) + + var ( + attributeMux sync.Mutex + closedMux sync.Mutex + ) + grp, grpCtx := errgroup.WithContext(ctx) + for _, tenant := range tenants { + tenant := tenant + tenantPath := s.tenantPath(tenant) + + if needActive { + activePath := tenantPath.Child("active") + qb.Active = append(qb.Active, activePath.Child("points")) + + grp.Go(func() error { + ctx := grpCtx + + blocks, err := s.getBlocks(ctx, activePath.Child("attributes"), start, end) + if err != nil { + return errors.Wrapf(err, "get attributes block for tenant %v", tenant) + } + + attributeMux.Lock() + qb.RecentAttributes = append(qb.RecentAttributes, blocks...) + attributeMux.Unlock() + return nil + }) + grp.Go(func() error { + ctx := grpCtx + + blocks, err := s.getBlocks(ctx, activePath.Child("resource"), start, end) + if err != nil { + return errors.Wrapf(err, "get resource block for tenant %v", tenant) + } + + attributeMux.Lock() + qb.RecentResource = append(qb.RecentResource, blocks...) + attributeMux.Unlock() + return nil + }) + } + if needClosed { + closedPath := tenantPath.Child("closed") + grp.Go(func() error { + ctx := grpCtx + + blocks, err := s.getBlocks(ctx, closedPath, start, end) + if err != nil { + return errors.Wrapf(err, "get closed block for tenant %v", tenant) + } + + closedMux.Lock() + qb.Closed = append(qb.Closed, blocks...) + closedMux.Unlock() + return nil + }) + } + } + if err := grp.Wait(); err != nil { + return qb, err + } + return QueryBlocks{}, nil +} + +func (s *Sharder) getBlocks(ctx context.Context, + dir ypath.Path, + start, end time.Time, +) ([]Block, error) { + var ( + lg = zctx.From(ctx) + + dirs []string + ) + if err := s.yc.ListNode(ctx, dir, &dirs, &yt.ListNodeOptions{}); err != nil { + return nil, errors.Wrapf(err, "get %q dirs", dir) + } + if len(dirs) == 0 { + // Tenant has no data. + return nil, nil + } + + blocks := make([]timeBlock, 0, len(dirs)) + for _, dir := range dirs { + t, err := time.Parse(timeBlockLayout, dir) + if err != nil { + lg.Warn("Invalid time block format", zap.String("block_dir", dir)) + continue + } + blocks = append(blocks, timeBlock{ + start: t, + dir: dir, + }) + } + + var result []Block + for _, block := range timeBlocksForRange(blocks, start, end) { + result = append(result, + newBlock(dir.Child(block.dir), block.start), + ) + } + return result, nil +} + +type timeBlock struct { + start time.Time + end time.Time + dir string +} + +func timeBlocksForRange(blocks []timeBlock, start, end time.Time) []timeBlock { + if len(blocks) == 0 { + return blocks + } + + // Sort blocks in ascending order. + slices.SortFunc(blocks, func(a, b timeBlock) int { + return a.start.Compare(b.start) + }) + for i := range blocks { + if i < len(blocks)-1 { + next := blocks[i+1] + blocks[i].end = next.start + } + } + + // Find the leftmost block. + if !start.IsZero() { + for idx, block := range blocks { + if block.end.IsZero() || block.end.After(start) { + blocks = blocks[idx:] + break + } + } + } + + // Find the rightmost block. + if !end.IsZero() { + for idx, block := range blocks { + if block.start.After(end) { + blocks = blocks[:idx] + break + } + } + } + + return blocks +} diff --git a/internal/metricsharding/sharder_test.go b/internal/metricsharding/sharder_test.go new file mode 100644 index 00000000..243a04ab --- /dev/null +++ b/internal/metricsharding/sharder_test.go @@ -0,0 +1,146 @@ +package metricsharding + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_timeBlocksForRange(t *testing.T) { + someStart := time.Date(2023, time.September, 2, 23, 0, 0, 0, time.UTC) + someEnd := time.Date(2023, time.September, 4, 23, 0, 0, 0, time.UTC) + + tests := []struct { + dirs []string + start time.Time + end time.Time + expect []string + }{ + { + nil, + someStart, + someEnd, + nil, + }, + // No matching. + { + []string{ + // Too far in the future -> drop. + `2077-01-01_00-00-00`, + }, + someStart, + someEnd, + nil, + }, + { + []string{ + // Too far in the future -> drop. + `2076-01-01_00-00-00`, + `2077-01-01_00-00-00`, + }, + someStart, + someEnd, + nil, + }, + // Match + { + []string{ + // Starts before given start and still continues up to now -> keep. + `2023-09-01_00-00-00`, + }, + someStart, + someEnd, + []string{ + `2023-09-01_00-00-00`, + }, + }, + { + []string{ + // Starts before given start and still continues up to now -> keep. + `2023-09-01_00-00-00`, + // Starts after given end -> drop. + `2023-09-05_00-00-00`, + }, + someStart, + someEnd, + []string{ + `2023-09-01_00-00-00`, + }, + }, + { + []string{ + // Starts before given start and ends before given end -> keep. + `2023-09-01_00-00-00`, + // Starts after given start and still continues up to now -> keep. + `2023-09-03_00-00-00`, + }, + someStart, + someEnd, + []string{ + `2023-09-01_00-00-00`, + `2023-09-03_00-00-00`, + }, + }, + { + []string{ + // Starts before given start and ends before given end -> keep. + `2023-09-01_00-00-00`, + // Starts after given start and ends before given end -> keep. + `2023-09-03_00-00-00`, + // Starts after given end -> drop. + `2023-09-06_00-00-00`, + }, + someStart, + someEnd, + []string{ + `2023-09-01_00-00-00`, + `2023-09-03_00-00-00`, + }, + }, + { + []string{ + // Ends before start -> drop. + `2001-01-01_00-00-00`, + `2023-09-01_00-00-00`, + + // Within range -> keep. + `2023-09-02_00-00-00`, + `2023-09-03_00-00-00`, + `2023-09-04_00-00-00`, + + // Too far in the future -> drop. + `2023-09-05_00-00-00`, + `2077-01-01_00-00-00`, + }, + someStart, + someEnd, + []string{ + `2023-09-02_00-00-00`, + `2023-09-03_00-00-00`, + `2023-09-04_00-00-00`, + }, + }, + } + for i, tt := range tests { + tt := tt + t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { + timeShards := make([]timeBlock, len(tt.dirs)) + for i, dir := range tt.dirs { + at, err := time.Parse(timeBlockLayout, dir) + require.NoError(t, err) + timeShards[i] = timeBlock{ + start: at, + dir: dir, + } + } + + var got []string + for _, shard := range timeBlocksForRange(timeShards, tt.start, tt.end) { + got = append(got, shard.dir) + } + require.Equal(t, tt.expect, got) + }) + } +} diff --git a/internal/metricstorage/metricstorage.go b/internal/metricstorage/metricstorage.go new file mode 100644 index 00000000..8bfe9940 --- /dev/null +++ b/internal/metricstorage/metricstorage.go @@ -0,0 +1,2 @@ +// Package metricstorage defines storage structure for metric storage. +package metricstorage diff --git a/internal/metricstorage/schema.go b/internal/metricstorage/schema.go new file mode 100644 index 00000000..dced0e1a --- /dev/null +++ b/internal/metricstorage/schema.go @@ -0,0 +1,77 @@ +package metricstorage + +import ( + "go.ytsaurus.tech/yt/go/schema" + + "github.com/go-faster/oteldb/internal/otelstorage" +) + +// Point is a data structure for metric points. +type Point struct { + Metric string `json:"metric_name" yson:"metric_name"` + ResourceHash []byte `json:"resource_hash" yson:"resource_hash"` + AttributeHash []byte `json:"attr_hash" yson:"attr_hash"` + Timestamp otelstorage.Timestamp `json:"timestamp" yson:"timestamp"` + Point float64 `json:"point" yson:"point"` +} + +// YTSchema returns YTsaurus table schema for this structure. +func (Point) YTSchema() schema.Schema { + var ( + tsType = schema.TypeUint64 + hashType = schema.TypeBytes + ) + return schema.Schema{ + Columns: []schema.Column{ + {Name: "metric_name", ComplexType: schema.TypeString, SortOrder: schema.SortAscending}, + {Name: "resource_hash", ComplexType: hashType, SortOrder: schema.SortAscending}, + {Name: "attr_hash", ComplexType: hashType, SortOrder: schema.SortAscending}, + {Name: "timestamp", ComplexType: tsType, SortOrder: schema.SortAscending}, + {Name: "point", ComplexType: schema.TypeFloat64}, + }, + } +} + +// Resource is a data structure for resource. +type Resource struct { + Hash string `json:"hash" yson:"hash"` + Attrs otelstorage.Attrs `json:"attrs" yson:"attrs"` +} + +// YTSchema returns YTsaurus table schema for this structure. +func (Resource) YTSchema() schema.Schema { + var ( + hashType = schema.TypeString + attrsType = schema.Optional{Item: schema.TypeAny} + ) + + return schema.Schema{ + Columns: []schema.Column{ + {Name: "hash", ComplexType: hashType, SortOrder: schema.SortAscending}, + {Name: "attrs", ComplexType: attrsType}, + }, + } +} + +// Attributes is a data structure for attributes. +type Attributes struct { + Metric string `json:"metric_name" yson:"metric_name"` + Hash string `json:"hash" yson:"hash"` + Attrs otelstorage.Attrs `json:"attrs" yson:"attrs"` +} + +// YTSchema returns YTsaurus table schema for this structure. +func (Attributes) YTSchema() schema.Schema { + var ( + hashType = schema.TypeString + attrsType = schema.Optional{Item: schema.TypeAny} + ) + + return schema.Schema{ + Columns: []schema.Column{ + {Name: "metric_name", ComplexType: schema.TypeString, SortOrder: schema.SortAscending}, + {Name: "hash", ComplexType: hashType, SortOrder: schema.SortAscending}, + {Name: "attrs", ComplexType: attrsType}, + }, + } +} diff --git a/oteltest.Dockerfile b/oteltest.Dockerfile new file mode 100644 index 00000000..907f0d0c --- /dev/null +++ b/oteltest.Dockerfile @@ -0,0 +1,17 @@ +FROM golang:latest as builder + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . ./ +RUN CGO_ENABLED=0 GOOS=linux go build -o /app/oteltest ./cmd/oteltest + +FROM alpine:latest +RUN apk --no-cache add ca-certificates + +WORKDIR /app +COPY --from=builder /app/oteltest /oteltest + +ENTRYPOINT ["/oteltest"]