Skip to content

Commit

Permalink
print stats
Browse files Browse the repository at this point in the history
  • Loading branch information
fschoell committed Jul 30, 2024
1 parent 2ea792d commit 347a37d
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 0 deletions.
11 changes: 11 additions & 0 deletions node-manager/app/firehose_reader/console_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"os"
"time"
)

type FirehoseReader struct {
Expand All @@ -22,6 +23,7 @@ type FirehoseReader struct {
callOpts []grpc.CallOption
zlogger *zap.Logger
cursorStateFile string
stats *firehoseReaderStats
}

func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, zlogger *zap.Logger) (*FirehoseReader, error) {
Expand All @@ -45,6 +47,7 @@ func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, z
closeFunc: closeFunc,
callOpts: callOpts,
zlogger: zlogger,
stats: newFirehoseReaderStats(),
}

return res, nil
Expand Down Expand Up @@ -73,6 +76,7 @@ func (f *FirehoseReader) Launch(startBlock, stopBlock uint64, cursorFile string)

f.firehoseStream = stream
f.cursorStateFile = cursorFile
f.stats.StartPeriodicLogToZap(context.Background(), f.zlogger, 10*time.Second)

return nil
}
Expand All @@ -93,6 +97,12 @@ func (f *FirehoseReader) ReadBlock() (obj *pbbstream.Block, err error) {
return nil, fmt.Errorf("failed to write cursor to state file: %w", err)
}

BlockReadCount.Inc()
f.stats.lastBlock = pbbstream.BlockRef{
Num: res.Metadata.Num,
Id: res.Metadata.Id,
}

return &pbbstream.Block{
Number: res.Metadata.Num,
Id: res.Metadata.Id,
Expand All @@ -110,5 +120,6 @@ func (f *FirehoseReader) Done() <-chan interface{} {
}

func (f *FirehoseReader) Close() error {
f.stats.StopPeriodicLogToZap()
return f.closeFunc()
}
11 changes: 11 additions & 0 deletions node-manager/app/firehose_reader/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package firehose_reader

import "github.com/streamingfast/dmetrics"

var metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("reader_node_firehose"))

func init() {
metrics.Register()
}

var BlockReadCount = metrics.NewCounter("block_read_count", "The number of blocks read by the Firehose reader")
55 changes: 55 additions & 0 deletions node-manager/app/firehose_reader/reader_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package firehose_reader

import (
"context"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dmetrics"
"go.uber.org/zap"
"time"
)

type firehoseReaderStats struct {
lastBlock pbbstream.BlockRef
blockRate *dmetrics.AvgRatePromCounter

cancelPeriodicLogger context.CancelFunc
}

func newFirehoseReaderStats() *firehoseReaderStats {
return &firehoseReaderStats{
lastBlock: pbbstream.BlockRef{},
blockRate: dmetrics.MustNewAvgRateFromPromCounter(BlockReadCount, 1*time.Second, 30*time.Second, "blocks"),
}
}

func (s *firehoseReaderStats) StartPeriodicLogToZap(ctx context.Context, logger *zap.Logger, logEach time.Duration) {
ctx, s.cancelPeriodicLogger = context.WithCancel(ctx)

go func() {
ticker := time.NewTicker(logEach)
for {
select {
case <-ticker.C:
logger.Info("reader node statistics", s.ZapFields()...)
case <-ctx.Done():
return
}
}
}()
}

func (s *firehoseReaderStats) StopPeriodicLogToZap() {
if s.cancelPeriodicLogger != nil {
s.cancelPeriodicLogger()
}
}

func (s *firehoseReaderStats) ZapFields() []zap.Field {
fields := []zap.Field{
zap.Stringer("block_rate", s.blockRate),
zap.Uint64("last_block_num", s.lastBlock.Num),
zap.String("last_block_id", s.lastBlock.Id),
}

return fields
}

0 comments on commit 347a37d

Please sign in to comment.