Skip to content

Commit

Permalink
chore: remove trace batcher (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
makeavish authored Sep 20, 2023
1 parent 3a7486b commit e251846
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 149 deletions.
12 changes: 6 additions & 6 deletions exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ func newStructuredSpan(otelSpan ptrace.Span, ServiceName string, resource pcommo
func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error {

rss := td.ResourceSpans()
var batchOfSpans []*Span
for i := 0; i < rss.Len(); i++ {
// fmt.Printf("ResourceSpans #%d\n", i)
rs := rss.At(i)
Expand All @@ -400,16 +401,15 @@ func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error {

for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
// traceID := hex.EncodeToString(span.TraceID())
structuredSpan := newStructuredSpan(span, serviceName, rs.Resource(), s.config)
err := s.Writer.WriteSpan(structuredSpan)
if err != nil {
zap.S().Error("Error in writing spans to clickhouse: ", err)
}
batchOfSpans = append(batchOfSpans, structuredSpan)
}
}
}

err := s.Writer.WriteBatchOfSpans(batchOfSpans)
if err != nil {
zap.S().Error("Error in writing spans to clickhouse: ", err)
}
return nil
}

Expand Down
6 changes: 1 addition & 5 deletions exporter/clickhousetracesexporter/clickhouse_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Factory struct {

// Writer writes spans to storage.
type Writer interface {
WriteSpan(span *Span) error
WriteBatchOfSpans(span []*Span) error
}

type writerMaker func(WriterOptions) (Writer, error)
Expand Down Expand Up @@ -281,8 +281,6 @@ func (f *Factory) CreateSpanWriter() (Writer, error) {
attributeTable: cfg.AttributeTable,
attributeKeyTable: cfg.AttributeKeyTable,
encoding: cfg.Encoding,
delay: cfg.WriteBatchDelay,
size: cfg.WriteBatchSize,
})
}

Expand All @@ -302,8 +300,6 @@ func (f *Factory) CreateArchiveSpanWriter() (Writer, error) {
attributeTable: cfg.AttributeTable,
attributeKeyTable: cfg.AttributeKeyTable,
encoding: cfg.Encoding,
delay: cfg.WriteBatchDelay,
size: cfg.WriteBatchSize,
})
}

Expand Down
63 changes: 19 additions & 44 deletions exporter/clickhousetracesexporter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,31 @@ import (
"flag"
"fmt"
"net/url"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/spf13/viper"
)

const (
defaultDatasource string = "tcp://127.0.0.1:9000/?database=signoz_traces"
defaultTraceDatabase string = "signoz_traces"
defaultMigrations string = "/migrations"
defaultOperationsTable string = "distributed_signoz_operations"
defaultIndexTable string = "distributed_signoz_index_v2"
localIndexTable string = "signoz_index_v2"
defaultErrorTable string = "distributed_signoz_error_index_v2"
defaultSpansTable string = "distributed_signoz_spans"
defaultAttributeTable string = "distributed_span_attributes"
defaultAttributeKeyTable string = "distributed_span_attributes_keys"
defaultDurationSortTable string = "durationSort"
defaultDurationSortMVTable string = "durationSortMV"
defaultArchiveSpansTable string = "signoz_archive_spans"
defaultClusterName string = "cluster"
defaultDependencyGraphTable string = "dependency_graph_minutes"
defaultDependencyGraphServiceMV string = "dependency_graph_minutes_service_calls_mv"
defaultDependencyGraphDbMV string = "dependency_graph_minutes_db_calls_mv"
DependencyGraphMessagingMV string = "dependency_graph_minutes_messaging_calls_mv"
defaultWriteBatchDelay time.Duration = 2 * time.Second
defaultWriteBatchSize int = 100000
defaultEncoding Encoding = EncodingJSON
defaultDatasource string = "tcp://127.0.0.1:9000/?database=signoz_traces"
defaultTraceDatabase string = "signoz_traces"
defaultMigrations string = "/migrations"
defaultOperationsTable string = "distributed_signoz_operations"
defaultIndexTable string = "distributed_signoz_index_v2"
localIndexTable string = "signoz_index_v2"
defaultErrorTable string = "distributed_signoz_error_index_v2"
defaultSpansTable string = "distributed_signoz_spans"
defaultAttributeTable string = "distributed_span_attributes"
defaultAttributeKeyTable string = "distributed_span_attributes_keys"
defaultDurationSortTable string = "durationSort"
defaultDurationSortMVTable string = "durationSortMV"
defaultArchiveSpansTable string = "signoz_archive_spans"
defaultClusterName string = "cluster"
defaultDependencyGraphTable string = "dependency_graph_minutes"
defaultDependencyGraphServiceMV string = "dependency_graph_minutes_service_calls_mv"
defaultDependencyGraphDbMV string = "dependency_graph_minutes_db_calls_mv"
DependencyGraphMessagingMV string = "dependency_graph_minutes_messaging_calls_mv"
defaultEncoding Encoding = EncodingJSON
)

const (
Expand All @@ -57,8 +54,6 @@ const (
suffixOperationsTable = ".operations-table"
suffixIndexTable = ".index-table"
suffixSpansTable = ".spans-table"
suffixWriteBatchDelay = ".write-batch-delay"
suffixWriteBatchSize = ".write-batch-size"
suffixEncoding = ".encoding"
)

Expand All @@ -84,8 +79,6 @@ type namespaceConfig struct {
DependencyGraphMessagingMV string
DependencyGraphTable string
DockerMultiNodeCluster bool
WriteBatchDelay time.Duration
WriteBatchSize int
Encoding Encoding
Connector Connector
}
Expand Down Expand Up @@ -161,8 +154,6 @@ func NewOptions(migrations string, datasource string, dockerMultiNodeCluster boo
DependencyGraphDbMV: defaultDependencyGraphDbMV,
DependencyGraphMessagingMV: DependencyGraphMessagingMV,
DockerMultiNodeCluster: dockerMultiNodeCluster,
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,
},
Expand All @@ -178,8 +169,6 @@ func NewOptions(migrations string, datasource string, dockerMultiNodeCluster boo
OperationsTable: "",
IndexTable: "",
SpansTable: defaultArchiveSpansTable,
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,
}
Expand Down Expand Up @@ -233,18 +222,6 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
"Clickhouse spans table name.",
)

flagSet.Duration(
nsConfig.namespace+suffixWriteBatchDelay,
nsConfig.WriteBatchDelay,
"A duration after which spans are flushed to Clickhouse",
)

flagSet.Int(
nsConfig.namespace+suffixWriteBatchSize,
nsConfig.WriteBatchSize,
"A number of spans buffered before they are flushed to Clickhouse",
)

flagSet.String(
nsConfig.namespace+suffixEncoding,
string(nsConfig.Encoding),
Expand All @@ -267,8 +244,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.IndexTable = v.GetString(cfg.namespace + suffixIndexTable)
cfg.SpansTable = v.GetString(cfg.namespace + suffixSpansTable)
cfg.OperationsTable = v.GetString(cfg.namespace + suffixOperationsTable)
cfg.WriteBatchDelay = v.GetDuration(cfg.namespace + suffixWriteBatchDelay)
cfg.WriteBatchSize = v.GetInt(cfg.namespace + suffixWriteBatchSize)
cfg.Encoding = Encoding(v.GetString(cfg.namespace + suffixEncoding))
}

Expand Down
127 changes: 33 additions & 94 deletions exporter/clickhousetracesexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"strings"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -52,11 +51,6 @@ type SpanWriter struct {
attributeTable string
attributeKeyTable string
encoding Encoding
delay time.Duration
size int
spans chan *Span
finish chan bool
done sync.WaitGroup
}

type WriterOptions struct {
Expand All @@ -69,8 +63,6 @@ type WriterOptions struct {
attributeTable string
attributeKeyTable string
encoding Encoding
delay time.Duration
size int
}

// NewSpanWriter returns a SpanWriter for the database
Expand All @@ -88,91 +80,11 @@ func NewSpanWriter(options WriterOptions) *SpanWriter {
attributeTable: options.attributeTable,
attributeKeyTable: options.attributeKeyTable,
encoding: options.encoding,
delay: options.delay,
size: options.size,
spans: make(chan *Span, options.size),
finish: make(chan bool),
}

go writer.backgroundWriter()

return writer
}

func (w *SpanWriter) backgroundWriter() {
batch := make([]*Span, 0, w.size)

timer := time.After(w.delay)
last := time.Now()

for {
w.done.Add(1)

flush := false
finish := false

select {
case span := <-w.spans:
batch = append(batch, span)
flush = len(batch) == cap(batch)
case <-timer:
timer = time.After(w.delay)
flush = time.Since(last) > w.delay && len(batch) > 0
case <-w.finish:
finish = true
flush = len(batch) > 0
}

if flush {
if err := w.writeBatch(batch); err != nil {
w.logger.Error("Could not write a batch of spans", zap.Error(err))
}

batch = make([]*Span, 0, w.size)
last = time.Now()
}

w.done.Done()

if finish {
break
}
}
}

func (w *SpanWriter) writeBatch(batch []*Span) error {

if w.spansTable != "" {
if err := w.writeModelBatch(batch); err != nil {
logBatch := batch[:int(math.Min(10, float64(len(batch))))]
w.logger.Error("Could not write a batch of spans to model table: ", zap.Any("batch", logBatch), zap.Error(err))
return err
}
}
if w.indexTable != "" {
if err := w.writeIndexBatch(batch); err != nil {
logBatch := batch[:int(math.Min(10, float64(len(batch))))]
w.logger.Error("Could not write a batch of spans to index table: ", zap.Any("batch", logBatch), zap.Error(err))
return err
}
}
if w.errorTable != "" {
if err := w.writeErrorBatch(batch); err != nil {
logBatch := batch[:int(math.Min(10, float64(len(batch))))]
w.logger.Error("Could not write a batch of spans to error table: ", zap.Any("batch", logBatch), zap.Error(err))
return err
}
}
if w.attributeTable != "" && w.attributeKeyTable != "" {
if err := w.writeTagBatch(batch); err != nil {
w.logger.Error("Could not write a batch of spans to tag/tagKey tables: ", zap.Error(err))
return err
}
}

return nil
}

func (w *SpanWriter) writeIndexBatch(batchSpans []*Span) error {

ctx := context.Background()
Expand Down Expand Up @@ -439,15 +351,42 @@ func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error {
return nil
}

// WriteSpan writes the encoded span
func (w *SpanWriter) WriteSpan(span *Span) error {
w.spans <- span
// WriteBatchOfSpans writes the encoded batch of spans
func (w *SpanWriter) WriteBatchOfSpans(batch []*Span) error {
if w.spansTable != "" {
if err := w.writeModelBatch(batch); err != nil {
logBatch := batch[:int(math.Min(10, float64(len(batch))))]
w.logger.Error("Could not write a batch of spans to model table: ", zap.Any("batch", logBatch), zap.Error(err))
return err
}
}
if w.indexTable != "" {
if err := w.writeIndexBatch(batch); err != nil {
logBatch := batch[:int(math.Min(10, float64(len(batch))))]
w.logger.Error("Could not write a batch of spans to index table: ", zap.Any("batch", logBatch), zap.Error(err))
return err
}
}
if w.errorTable != "" {
if err := w.writeErrorBatch(batch); err != nil {
logBatch := batch[:int(math.Min(10, float64(len(batch))))]
w.logger.Error("Could not write a batch of spans to error table: ", zap.Any("batch", logBatch), zap.Error(err))
return err
}
}
if w.attributeTable != "" && w.attributeKeyTable != "" {
if err := w.writeTagBatch(batch); err != nil {
w.logger.Error("Could not write a batch of spans to tag/tagKey tables: ", zap.Error(err))
return err
}
}
return nil
}

// Close Implements io.Closer and closes the underlying storage
// Close closes the writer
func (w *SpanWriter) Close() error {
w.finish <- true
w.done.Wait()
if w.db != nil {
return w.db.Close()
}
return nil
}

0 comments on commit e251846

Please sign in to comment.