Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: otelbench #183

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions cmd/otelbenchctl/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"context"
"flag"

"github.com/go-faster/errors"
"github.com/go-faster/sdk/app"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func newMetrics() pmetric.Metrics {
m := pmetric.NewMetrics()
rl := m.ResourceMetrics().AppendEmpty()
rl.Resource().Attributes().PutStr("host.name", "testHost")
rl.SetSchemaUrl("resource_schema")
il := rl.ScopeMetrics().AppendEmpty()
il.Scope().SetName("name")
il.Scope().SetVersion("version")
il.Scope().Attributes().PutStr("oteldb.name", "testDB")
il.Scope().SetDroppedAttributesCount(1)
il.SetSchemaUrl("scope_schema")
im := il.Metrics().AppendEmpty()
im.SetName("metric_name")
im.SetDescription("metric_description")
dp := im.SetEmptyGauge().DataPoints()
for i := 0; i < 10000; i++ {
dp.AppendEmpty().SetIntValue(0)
}

return m
}

func main() {
app.Run(func(ctx context.Context, lg *zap.Logger, m *app.Metrics) error {
var arg struct {
Jobs int
}
flag.IntVar(&arg.Jobs, "j", 16, "jobs")
flag.Parse()
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < arg.Jobs; i++ {
g.Go(func() error {
conn, err := grpc.DialContext(ctx, "127.0.0.1:3951",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return errors.Wrap(err, "dial oteldb")
}
client := pmetricotlp.NewGRPCClient(conn)
metrics := newMetrics()
req := pmetricotlp.NewExportRequestFromMetrics(metrics)
for {
if _, err := client.Export(ctx, req); err != nil {
return errors.Wrap(err, "send metrics")
}
}
})
}
return g.Wait()
})
}
66 changes: 66 additions & 0 deletions cmd/otelbenchsrv/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"net"
"time"

"github.com/go-faster/sdk/app"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
)

func newMetrics() pmetric.Metrics {
m := pmetric.NewMetrics()
rl := m.ResourceMetrics().AppendEmpty()
rl.Resource().Attributes().PutStr("host.name", "testHost")
rl.SetSchemaUrl("resource_schema")
il := rl.ScopeMetrics().AppendEmpty()
il.Scope().SetName("name")
il.Scope().SetVersion("version")
il.Scope().Attributes().PutStr("oteldb.name", "testDB")
il.Scope().SetDroppedAttributesCount(1)
il.SetSchemaUrl("scope_schema")
im := il.Metrics().AppendEmpty()
im.SetName("metric_name")
im.SetDescription("metric_description")
points := im.SetEmptyGauge().DataPoints().AppendEmpty()
points.SetIntValue(0)
return m
}

type noopServer struct {
pmetricotlp.UnimplementedGRPCServer
count atomic.Uint64
}

func (n *noopServer) Export(ctx context.Context, request pmetricotlp.ExportRequest) (pmetricotlp.ExportResponse, error) {
n.count.Add(1)
return pmetricotlp.NewExportResponse(), nil
}

func main() {
app.Run(func(ctx context.Context, lg *zap.Logger, m *app.Metrics) error {
srv := grpc.NewServer()
h := &noopServer{}
go func() {
t := time.NewTicker(time.Second)
for range t.C {
lg.Info("Got", zap.Uint64("n", h.count.Swap(0)))
}
}()
go func() {
<-ctx.Done()
srv.GracefulStop()
}()
pmetricotlp.RegisterGRPCServer(srv, h)
ln, err := net.Listen("tcp", "127.0.0.1:3951")
if err != nil {
return err
}
return srv.Serve(ln)
})
}
131 changes: 131 additions & 0 deletions cmd/oteltest/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package main

import (
"context"
"crypto/sha1"
"flag"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/dustin/go-humanize"
"github.com/go-faster/errors"
"github.com/go-faster/sdk/app"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.ytsaurus.tech/yt/go/yt"
"go.ytsaurus.tech/yt/go/yt/ythttp"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/metricsharding"
"github.com/go-faster/oteldb/internal/metricstorage"
"github.com/go-faster/oteldb/internal/otelstorage"
)

func main() {
app.Run(func(ctx context.Context, lg *zap.Logger, m *app.Metrics) error {
var arg struct {
Proxy string
Token string
ProxyDiscovery bool
TotalPoints int
BatchSize int
TenantID int
Workers int
}
flag.IntVar(&arg.Workers, "j", 1, "concurrent jobs")
flag.IntVar(&arg.TenantID, "t", 222, "tenant id")
flag.IntVar(&arg.TotalPoints, "p", 1_000_000_000, "total points to insert")
flag.IntVar(&arg.BatchSize, "b", 5_000, "batch size")
flag.StringVar(&arg.Proxy, "proxy", "localhost:8000", "proxy address")
flag.StringVar(&arg.Token, "token", "admin", "token")
flag.BoolVar(&arg.ProxyDiscovery, "proxy-discovery", false, "proxy discovery")
flag.Parse()

yc, err := ythttp.NewClient(&yt.Config{
Proxy: arg.Proxy,
Token: arg.Token,
DisableProxyDiscovery: !arg.ProxyDiscovery,
})
if err != nil {
return errors.Wrap(err, "yt.NewClient")
}

sharder := metricsharding.NewSharder(yc, metricsharding.ShardingOptions{})
if err := sharder.CreateTenant(ctx, metricsharding.TenantID(arg.TenantID), time.Now()); err != nil {
return errors.Wrap(err, "sharder.CreateTenant")
}
var (
tenant = sharder.TenantPath(metricsharding.TenantID(arg.TenantID))
active = tenant.Child("active")
now = time.Now()
)

var loaded atomic.Uint64
data := make(chan []any, arg.Workers)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < arg.Workers; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case points, ok := <-data:
if !ok {
return nil
}
bo := backoff.NewExponentialBackOff()
fn := func() error {
return yc.InsertRows(ctx, active.Child("points"), points, nil)
}
if err := backoff.RetryNotify(fn, backoff.WithContext(bo, ctx), func(err error, duration time.Duration) {
lg.Warn("retrying", zap.Error(err), zap.Duration("duration", duration))
}); err != nil {
return errors.Wrap(err, "yc.InsertRows")
}
loaded.Add(uint64(len(points)))
}
}
})
}
g.Go(func() error {
defer close(data)
for i := 0; i < arg.TotalPoints/arg.BatchSize; i++ {
var points []any
rh := sha1.Sum([]byte(fmt.Sprintf("r%d", i)))
ah := sha1.Sum([]byte(fmt.Sprintf("a%d", i)))
for j := 0; j < arg.BatchSize; j++ {
delta := time.Duration(i+j) * time.Millisecond
ts := now.Add(delta)
points = append(points, metricstorage.Point{
Metric: "some.metric.name",
ResourceHash: rh[:],
AttributeHash: ah[:],
Timestamp: otelstorage.NewTimestampFromTime(ts),
Point: float64(j),
})
}
select {
case <-ctx.Done():
return ctx.Err()
case data <- points:
}
}
return nil
})
g.Go(func() error {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
value := float64(loaded.Swap(0))
lg.Info("loaded", zap.String("points", humanize.SI(value, "P")))
}
}
})

return g.Wait()
})
}
39 changes: 39 additions & 0 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,42 @@ TODO: define attribute.
## Partition

Data can be partitioned by tenant id and time.

```
Having:
1) closed data partitioned by day (Δ=1D)
2) active attributes and resources partitioned by hour (δ=1H)

/metrics
/tenant-1
/active
points
attributes/
2021-01-01-T-20-00
2021-01-01-T-21-00
2021-01-01-T-22-00
2021-01-01-T-23-00
resources/
2021-01-01-T-20-00
2021-01-01-T-21-00
2021-01-01-T-22-00
2021-01-01-T-23-00
/closed
/2021-01-01
points
attributes
resources

/metrics/tenant/active/{attributes, resources}/*:
dynamic tables of 2Δ data that is partitioned by δ (1H)
/metrics/tenant/points:
dynamic table that stores 2Δ (2D) of point data

Each Δ:
1) Create new directory in /metrics/tenant/closed:
e.g. /metrics/tenant1/closed/2021-01-01
2) Copy [T-2Δ, T-Δ) data from active/points to points static table
3) Merge [T-2Δ, T-Δ) data from active/attributes/* to attributes static table
4) Merge [T-2Δ, T-Δ) data from active/resources/* to resources static table
5) Delete data that is older than T-Δ from active tables
```
Loading