Skip to content

Commit

Permalink
gmp: Skip WAL (passthrough) if no PRW is specified, normal agent mode…
Browse files Browse the repository at this point in the history
… otherwise.

Signed-off-by: bwplotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Oct 7, 2024
1 parent b68bf66 commit 1722faf
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 9 deletions.
32 changes: 31 additions & 1 deletion cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ func main() {
// This is passed to ruleManager.Update().
externalURL := cfg.web.ExternalURL.String()

gcmAgentWriteSkipper := &writeSkipperForNoRWConfig{logger: logger, noRWEndpointConfigured: atomic.NewBool(true)}
reloaders := []reloader{
{
name: "db_storage",
Expand All @@ -781,6 +782,15 @@ func main() {
}, {
name: "web_handler",
reloader: webHandler.ApplyConfig,
}, {
// NOTE(bwplotka): GMP forked logic.
name: "gmp_noopfornorwconfig_storage",
reloader: func(cfg *config.Config) error {
if agentMode {
return gcmAgentWriteSkipper.ApplyConfig(cfg)
}
return nil
},
}, {
name: "query_engine",
reloader: func(cfg *config.Config) error {
Expand Down Expand Up @@ -1115,7 +1125,7 @@ func main() {
func() error {
select {
case <-dbOpen:
// In case a shutdown is initiated before the dbOpen is released
// In case a shutdown is initiated before the dbOpen is released
case <-cancel:
reloadReady.Close()
return nil
Expand Down Expand Up @@ -1196,6 +1206,7 @@ func main() {
if agentMode {
// WAL storage.
opts := cfg.agent.ToAgentOptions()
opts.SkipWrite = gcmAgentWriteSkipper.anyRWEndpointConfigured

Check failure on line 1209 in cmd/prometheus/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

gcmAgentWriteSkipper.anyRWEndpointConfigured undefined (type *writeSkipperForNoRWConfig has no field or method anyRWEndpointConfigured) (typecheck)

Check failure on line 1209 in cmd/prometheus/main.go

View workflow job for this annotation

GitHub Actions / Go tests on Windows

gcmAgentWriteSkipper.anyRWEndpointConfigured undefined (type *writeSkipperForNoRWConfig has no field or method anyRWEndpointConfigured)

Check failure on line 1209 in cmd/prometheus/main.go

View workflow job for this annotation

GitHub Actions / golangci-lint

gcmAgentWriteSkipper.anyRWEndpointConfigured undefined (type *writeSkipperForNoRWConfig has no field or method anyRWEndpointConfigured) (typecheck)

Check failure on line 1209 in cmd/prometheus/main.go

View workflow job for this annotation

GitHub Actions / codeql / Analyze (go)

gcmAgentWriteSkipper.anyRWEndpointConfigured undefined (type *writeSkipperForNoRWConfig has no field or method anyRWEndpointConfigured) (typecheck)

Check failure on line 1209 in cmd/prometheus/main.go

View workflow job for this annotation

GitHub Actions / Go tests on Windows

gcmAgentWriteSkipper.anyRWEndpointConfigured undefined (type *writeSkipperForNoRWConfig has no field or method anyRWEndpointConfigured)

Check failure on line 1209 in cmd/prometheus/main.go

View workflow job for this annotation

GitHub Actions / codeql / Analyze (go)

gcmAgentWriteSkipper.anyRWEndpointConfigured undefined (type *writeSkipperForNoRWConfig has no field or method anyRWEndpointConfigured) (typecheck)
cancel := make(chan struct{})
g.Add(
func() error {
Expand Down Expand Up @@ -1770,3 +1781,22 @@ func deleteStorageData(agentMode bool, dataPath string) error {
}
return nil
}

type writeSkipperForNoRWConfig struct {
logger log.Logger
noRWEndpointConfigured *atomic.Bool
}

func (s *writeSkipperForNoRWConfig) ApplyConfig(conf *config.Config) error {
if len(conf.RemoteWriteConfigs) > 0 {
if s.noRWEndpointConfigured.Swap(false) {
level.Info(s.logger).Log("msg", "gmp forked logic: enabling agent storage appending given a new remote_write config entry")
}
} else {
if !s.noRWEndpointConfigured.Swap(true) {
level.Info(s.logger).Log("msg", "gmp forked logic: disabling agent storage appending given no remote_write was configured; no need to utilize agent WAL.")
// TODO(bwplotka): Remove left-over from WAL?
}
}
return nil
}
38 changes: 30 additions & 8 deletions tsdb/agent/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ type Options struct {

// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool

// NOTE: GCM forked logic, controls if we should skip WAL/WBL for GCM only mode.
SkipWrite *atomic.Bool
}

// DefaultOptions used for the WAL storage. They are reasonable for setups using
Expand All @@ -95,6 +98,7 @@ func DefaultOptions() *Options {
MinWALTime: DefaultMinWALTime,
MaxWALTime: DefaultMaxWALTime,
NoLockfile: false,
SkipWrite: atomic.NewBool(false),
}
}

Expand Down Expand Up @@ -298,6 +302,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin
pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
exportExemplars: make(map[storage.SeriesRef]record.RefExemplar, 10),
skipWrite: opts.SkipWrite.Load(),
}
}

Expand Down Expand Up @@ -349,6 +354,9 @@ func validateOptions(opts *Options) *Options {
if t := int64(opts.TruncateFrequency / time.Millisecond); opts.MaxWALTime < t {
opts.MaxWALTime = t
}
if opts.SkipWrite == nil {
opts.SkipWrite = atomic.NewBool(false)
}
return opts
}

Expand Down Expand Up @@ -621,6 +629,8 @@ Loop:
if ts < 0 {
ts = 0
}
// TODO(bwplotka): Debug, remove later.
level.Warn(db.logger).Log("msg", "gmp: truncating", "lowest", db.rs.LowestSentTimestamp(), "minTime", db.opts.MinWALTime, "result", ts)

// Network issues can prevent the result of getRemoteWriteTimestamp from
// changing. We don't want data in the WAL to grow forever, so we set a cap
Expand All @@ -629,8 +639,8 @@ Loop:
if maxTS := timestamp.FromTime(time.Now()) - db.opts.MaxWALTime; ts < maxTS {
ts = maxTS
}

level.Debug(db.logger).Log("msg", "truncating the WAL", "ts", ts)
// TODO(bwplotka): Move back to debug.
level.Warn(db.logger).Log("msg", "truncating the WAL", "ts", ts)
if err := db.truncate(ts); err != nil {
level.Warn(db.logger).Log("msg", "failed to truncate WAL", "err", err)
}
Expand Down Expand Up @@ -800,10 +810,13 @@ type appender struct {
// Series lock is not held on elements.
floatHistogramSeries []*memSeries

// NOTE: GCM forked logic
metadata gcm_export.MetadataFunc

// exemplars to be exported to GCM
// exemplars to be exported to GCM.
exportExemplars map[storage.SeriesRef]record.RefExemplar
// skipWrite ignore writes to WAL/WBL if true. This is to skip appending to
// WAL storage when no PRW is configured -- GCM export does not need that storage.
skipWrite bool
}

func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
Expand Down Expand Up @@ -831,7 +844,9 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
Labels: l,
})

a.metrics.numActiveSeries.Inc()
if !a.skipWrite {
a.metrics.numActiveSeries.Inc()
}
}
}

Expand Down Expand Up @@ -961,7 +976,9 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
Labels: l,
})

a.metrics.numActiveSeries.Inc()
if !a.skipWrite {
a.metrics.numActiveSeries.Inc()
}
}
}

Expand Down Expand Up @@ -1006,6 +1023,13 @@ func (a *appender) Commit() error {
a.mtx.RLock()
defer a.mtx.RUnlock()

// NOTE: GCM forked logic.
gcm_exportsetup.Global().Export(a.metadata, a.pendingSamples, a.exportExemplars)
if a.skipWrite {
return a.Rollback()
}
// ----

var encoder record.Encoder
buf := a.bufPool.Get().([]byte)

Expand Down Expand Up @@ -1069,8 +1093,6 @@ func (a *appender) Commit() error {
}
}

gcm_exportsetup.Global().Export(a.metadata, a.pendingSamples, a.exportExemplars)

//nolint:staticcheck
a.bufPool.Put(buf)
return a.Rollback()
Expand Down

0 comments on commit 1722faf

Please sign in to comment.