From e9e0f744fa892ce2f7936ff8f2bcbf286938c6a0 Mon Sep 17 00:00:00 2001 From: Geoffrey Wossum Date: Mon, 1 Jul 2024 14:43:20 -0500 Subject: [PATCH] fix: prevent retention service from hanging (#25121) Fix issue that can cause the retention service to hang waiting on a `Shard.Close` call. When this occurs, no other shards will be deleted by the retention service. This is usually noticed as an increase in disk usage because old shards are not cleaned up. The fix adds to new methods to `Store`, `SetShardNewReadersBlocked` and `InUse`. `InUse` can be used to poll if a shard has active readers, which the retention service uses to skip over in-use shards to prevent the service from hanging. `SetShardNewReadersBlocked` determines if new read access may be granted to a shard. This is required to prevent race conditions around the use of `InUse` and the deletion of shards. If the retention service skips over a shard because it is in-use, the shard will be checked again the next time the retention service is run. It can be deleted on subsequent checks if it is no longer in-use. If the shards is stuck in-use, the retention service will not be able to delete the shards, which can be observed in the logs for manual intervention. Other shards can still be deleted by the retention service even if a shard is stuck with readers. This is a port of ad68ec8 from master-1.x to main-2.x. closes: #25118 (cherry picked from commit b4bd607eefe4af3c38a73a5883866a66daecd7db) (cherry picked from commit cb8cfe3510cf6e147be7277cea1ba6ae2b74839b) --- internal/tsdb_store.go | 72 ++++++----- tsdb/engine.go | 3 + tsdb/engine/tsm1/compact.go | 7 +- tsdb/engine/tsm1/compact_test.go | 4 +- tsdb/engine/tsm1/engine.go | 18 +++ tsdb/engine/tsm1/file_store.go | 87 ++++++++++++- tsdb/engine/tsm1/file_store_test.go | 148 ++++++++++++++++++++++ tsdb/shard.go | 18 +++ tsdb/shard_test.go | 85 ++++++++++++- tsdb/store.go | 25 ++++ tsdb/store_test.go | 79 +++++++++++- v1/services/retention/service.go | 175 +++++++++++++++----------- v1/services/retention/service_test.go | 94 +++++++++++++- 13 files changed, 694 insertions(+), 121 deletions(-) diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index 8e2f13d8b4f..411c5d48073 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -14,38 +14,40 @@ import ( // TSDBStoreMock is a mockable implementation of tsdb.Store. type TSDBStoreMock struct { - BackupShardFn func(id uint64, since time.Time, w io.Writer) error - BackupSeriesFileFn func(database string, w io.Writer) error - ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error - CloseFn func() error - CreateShardFn func(database, policy string, shardID uint64, enabled bool) error - CreateShardSnapshotFn func(id uint64) (string, error) - DatabasesFn func() []string - DeleteDatabaseFn func(name string) error - DeleteMeasurementFn func(ctx context.Context, database, name string) error - DeleteRetentionPolicyFn func(database, name string) error - DeleteSeriesFn func(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error - DeleteShardFn func(id uint64) error - DiskSizeFn func() (int64, error) - ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) - ImportShardFn func(id uint64, r io.Reader) error - MeasurementsCardinalityFn func(database string) (int64, error) - MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) - OpenFn func() error - PathFn func() string - RestoreShardFn func(id uint64, r io.Reader) error - SeriesCardinalityFn func(database string) (int64, error) - SetShardEnabledFn func(shardID uint64, enabled bool) error - ShardFn func(id uint64) *tsdb.Shard - ShardGroupFn func(ids []uint64) tsdb.ShardGroup - ShardIDsFn func() []uint64 - ShardNFn func() int - ShardRelativePathFn func(id uint64) (string, error) - ShardsFn func(ids []uint64) []*tsdb.Shard - TagKeysFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) - TagValuesFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) - WithLoggerFn func(log *zap.Logger) - WriteToShardFn func(shardID uint64, points []models.Point) error + BackupShardFn func(id uint64, since time.Time, w io.Writer) error + BackupSeriesFileFn func(database string, w io.Writer) error + ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error + CloseFn func() error + CreateShardFn func(database, policy string, shardID uint64, enabled bool) error + CreateShardSnapshotFn func(id uint64) (string, error) + DatabasesFn func() []string + DeleteDatabaseFn func(name string) error + DeleteMeasurementFn func(ctx context.Context, database, name string) error + DeleteRetentionPolicyFn func(database, name string) error + DeleteSeriesFn func(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error + DeleteShardFn func(id uint64) error + DiskSizeFn func() (int64, error) + ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) + ImportShardFn func(id uint64, r io.Reader) error + MeasurementsCardinalityFn func(database string) (int64, error) + MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) + OpenFn func() error + PathFn func() string + RestoreShardFn func(id uint64, r io.Reader) error + SeriesCardinalityFn func(database string) (int64, error) + SetShardEnabledFn func(shardID uint64, enabled bool) error + SetShardNewReadersBlockedFn func(shardID uint64, blocked bool) error + ShardFn func(id uint64) *tsdb.Shard + ShardGroupFn func(ids []uint64) tsdb.ShardGroup + ShardIDsFn func() []uint64 + ShardInUseFn func(shardID uint64) (bool, error) + ShardNFn func() int + ShardRelativePathFn func(id uint64) (string, error) + ShardsFn func(ids []uint64) []*tsdb.Shard + TagKeysFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) + TagValuesFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) + WithLoggerFn func(log *zap.Logger) + WriteToShardFn func(shardID uint64, points []models.Point) error } func (s *TSDBStoreMock) BackupShard(id uint64, since time.Time, w io.Writer) error { @@ -112,6 +114,9 @@ func (s *TSDBStoreMock) SeriesCardinality(database string) (int64, error) { func (s *TSDBStoreMock) SetShardEnabled(shardID uint64, enabled bool) error { return s.SetShardEnabledFn(shardID, enabled) } +func (s *TSDBStoreMock) SetShardNewReadersBlocked(shardID uint64, blocked bool) error { + return s.SetShardNewReadersBlockedFn(shardID, blocked) +} func (s *TSDBStoreMock) Shard(id uint64) *tsdb.Shard { return s.ShardFn(id) } @@ -121,6 +126,9 @@ func (s *TSDBStoreMock) ShardGroup(ids []uint64) tsdb.ShardGroup { func (s *TSDBStoreMock) ShardIDs() []uint64 { return s.ShardIDsFn() } +func (s *TSDBStoreMock) ShardInUse(shardID uint64) (bool, error) { + return s.ShardInUseFn(shardID) +} func (s *TSDBStoreMock) ShardN() int { return s.ShardNFn() } diff --git a/tsdb/engine.go b/tsdb/engine.go index 8d87955840b..9c0287641bb 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -34,6 +34,9 @@ type Engine interface { SetCompactionsEnabled(enabled bool) ScheduleFullCompaction() error + SetNewReadersBlocked(blocked bool) error + InUse() (bool, error) + WithLogger(*zap.Logger) LoadMetadataIndex(shardID uint64, index Index) error diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index a2502ab10bc..9ea5b28e2a3 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -712,7 +712,7 @@ type Compactor struct { FileStore interface { NextGeneration() int - TSMReader(path string) *TSMReader + TSMReader(path string) (*TSMReader, error) } // RateLimit is the limit for disk writes for all concurrent compactions. @@ -943,7 +943,10 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([ default: } - tr := c.FileStore.TSMReader(file) + tr, err := c.FileStore.TSMReader(file) + if err != nil { + return nil, errCompactionAborted{fmt.Errorf("error creating reader for %q: %w", file, err)} + } if tr == nil { // This would be a bug if this occurred as tsmFiles passed in should only be // assigned to one compaction at any one time. A nil tr would mean the file diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 4c711fa1f72..39aeb45d652 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -3074,11 +3074,11 @@ func (w *fakeFileStore) BlockCount(path string, idx int) int { return w.blockCount } -func (w *fakeFileStore) TSMReader(path string) *tsm1.TSMReader { +func (w *fakeFileStore) TSMReader(path string) (*tsm1.TSMReader, error) { r := MustOpenTSMReader(path) w.readers = append(w.readers, r) r.Ref() - return r + return r, nil } func (w *fakeFileStore) Close() { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 0191a3b9f7b..633d65c15d7 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -507,6 +507,24 @@ func (e *Engine) disableSnapshotCompactions() { } } +// SetNewReadersBlocked sets if new readers can access the shard. If blocked +// is true, the number of reader blocks is incremented and new readers will +// receive an error instead of shard access. If blocked is false, the number +// of reader blocks is decremented. If the reader blocks drops to 0, then +// new readers will be granted access to the shard. +func (e *Engine) SetNewReadersBlocked(blocked bool) error { + e.mu.Lock() + defer e.mu.Unlock() + return e.FileStore.SetNewReadersBlocked(blocked) +} + +// InUse returns true if the shard is in-use by readers. +func (e *Engine) InUse() (bool, error) { + e.mu.RLock() + defer e.mu.RUnlock() + return e.FileStore.InUse() +} + // ScheduleFullCompaction will force the engine to fully compact all data stored. // This will cancel and running compactions and snapshot any data in the cache to // TSM files. This is an expensive operation. diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 148b7c15d10..731db0de955 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -35,6 +35,10 @@ const ( BadTSMFileExtension = "bad" ) +var ( + ErrNewReadersBlocked = errors.New("new readers blocked") +) + // TSMFile represents an on-disk TSM file. type TSMFile interface { // Path returns the underlying file path for the TSMFile. If the file @@ -190,6 +194,10 @@ type FileStore struct { obs tsdb.FileStoreObserver copyFiles bool + + // newReaderBlockCount keeps track of the current new reader block requests. + // If non-zero, no new TSMReader objects may be created. + newReaderBlockCount int } // FileStat holds information about a TSM file on disk. @@ -391,6 +399,10 @@ func (f *FileStore) WalkKeys(seek []byte, fn func(key []byte, typ byte) error) e f.mu.RUnlock() return nil } + if f.newReadersBlocked() { + f.mu.RUnlock() + return fmt.Errorf("WalkKeys: %q: %w", f.dir, ErrNewReadersBlocked) + } // Ensure files are not unmapped while we're iterating over them. for _, r := range f.files { @@ -449,6 +461,10 @@ func (f *FileStore) Apply(ctx context.Context, fn func(r TSMFile) error) error { limiter := limiter.NewFixed(runtime.GOMAXPROCS(0)) f.mu.RLock() + if f.newReadersBlocked() { + f.mu.RUnlock() + return fmt.Errorf("Apply: %q: %w", f.dir, ErrNewReadersBlocked) + } errC := make(chan error, len(f.files)) for _, f := range f.files { @@ -725,22 +741,85 @@ func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost { // Reader returns a TSMReader for path if one is currently managed by the FileStore. // Otherwise it returns nil. If it returns a file, you must call Unref on it when // you are done, and never use it after that. -func (f *FileStore) TSMReader(path string) *TSMReader { +func (f *FileStore) TSMReader(path string) (*TSMReader, error) { f.mu.RLock() defer f.mu.RUnlock() + if f.newReadersBlocked() { + return nil, fmt.Errorf("TSMReader: %q (%q): %w", f.dir, path, ErrNewReadersBlocked) + } for _, r := range f.files { if r.Path() == path { r.Ref() - return r.(*TSMReader) + return r.(*TSMReader), nil } } + return nil, nil +} + +// SetNewReadersBlocked sets if new readers can access the files in this FileStore. +// If blocked is true, the number of reader blocks is incremented and new readers will +// receive an error instead of reader access. If blocked is false, the number +// of reader blocks is decremented. If the reader blocks drops to 0, then +// new readers will be granted access to the files. +func (f *FileStore) SetNewReadersBlocked(block bool) error { + f.mu.Lock() + defer f.mu.Unlock() + if block { + if f.newReaderBlockCount < 0 { + return fmt.Errorf("newReaderBlockCount for %q was %d before block operation, block failed", f.dir, f.newReaderBlockCount) + } + f.newReaderBlockCount++ + } else { + if f.newReaderBlockCount <= 0 { + return fmt.Errorf("newReadersBlockCount for %q was %d before unblock operation, unblock failed", f.dir, f.newReaderBlockCount) + } + f.newReaderBlockCount-- + } return nil } +// newReadersBlocked returns true if new references to TSMReader objects are not allowed. +// Must be called with f.mu lock held (reader or writer). +// See SetNewReadersBlocked for interface to allow and block access to TSMReader objects. +func (f *FileStore) newReadersBlocked() bool { + return f.newReaderBlockCount > 0 +} + +// InUse returns true if any files in this FileStore are in-use. +// InUse can only be called if a new readers have been blocked using SetNewReadersBlocked. +// This is to avoid a race condition between calling InUse and attempting an operation +// that requires no active readers. Calling InUse without a new readers block results +// in an error. +func (f *FileStore) InUse() (bool, error) { + f.mu.RLock() + defer f.mu.RUnlock() + if !f.newReadersBlocked() { + return false, fmt.Errorf("InUse called without a new reader block for %q", f.dir) + } + for _, r := range f.files { + if r.InUse() { + return true, nil + } + } + return false, nil +} + // KeyCursor returns a KeyCursor for key and t across the files in the FileStore. func (f *FileStore) KeyCursor(ctx context.Context, key []byte, t int64, ascending bool) *KeyCursor { f.mu.RLock() defer f.mu.RUnlock() + if f.newReadersBlocked() { + // New readers are blocked for this FileStore because there is a delete attempt in progress. + // Return an empty cursor to appease the callers since they generally don't handle + // a nil KeyCursor gracefully. + return &KeyCursor{ + key: key, + seeks: nil, + ctx: ctx, + col: metrics.GroupFromContext(ctx), + ascending: ascending, + } + } return newKeyCursor(ctx, f, key, t, ascending) } @@ -1216,6 +1295,10 @@ func (f *FileStore) CreateSnapshot() (string, error) { f.traceLogger.Info("Creating snapshot", zap.String("dir", f.dir)) f.mu.Lock() + if f.newReadersBlocked() { + f.mu.Unlock() + return "", fmt.Errorf("CreateSnapshot: %q: %w", f.dir, ErrNewReadersBlocked) + } // create a copy of the files slice and ensure they aren't closed out from // under us, nor the slice mutated. files := make([]TSMFile, len(f.files)) diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index b09bf0b2788..c83446bd2ab 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -2784,6 +2785,153 @@ func newTestFileStore(tb testing.TB, dir string) *tsm1.FileStore { return fs } +func TestFileStore_ReaderBlocking(t *testing.T) { + dir := t.TempDir() + + // Create 3 TSM files... + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(1, 2.0)}}, + keyValues{"mem", []tsm1.Value{tsm1.NewValue(0, 1.0)}}, + } + + files, err := newFileDir(t, dir, data...) + require.NoError(t, err) + + fs := newTestFileStore(t, dir) + require.NoError(t, fs.Open(context.Background())) + + fsInUse := func() bool { + t.Helper() + require.NoError(t, fs.SetNewReadersBlocked(true)) + defer func() { + t.Helper() + require.NoError(t, fs.SetNewReadersBlocked(false)) + }() + inUse, err := fs.InUse() + require.NoError(t, err) + return inUse + } + + checkUnblocked := func() { + t.Helper() + + require.False(t, fsInUse()) + + var applyCount atomic.Uint32 + err = fs.Apply(context.Background(), func(r tsm1.TSMFile) error { + applyCount.Add(1) + return nil + }) + require.NoError(t, err) + require.Equal(t, uint32(len(files)), applyCount.Load(), "Apply should be called for all files") + + snap, err := fs.CreateSnapshot() + require.NoError(t, err) + require.NotEmpty(t, snap) + + buf := make([]tsm1.FloatValue, 1000) + c := fs.KeyCursor(context.Background(), []byte("cpu"), 0, true) + // closeC exists because we want to call c.Close() if a test fails in a defer, + // but we also need to call c.Close() as part of the test. closeC makes sure we + // don't double close it. + closeC := func() { + t.Helper() + if c != nil { + c.Close() // Close does not return anything + c = nil + } + } + defer closeC() + require.NotNil(t, c) + require.True(t, fsInUse()) + + values, err := c.ReadFloatBlock(&buf) + require.NoError(t, err) + require.Len(t, values, 1) + c.Next() + values, err = c.ReadFloatBlock(&buf) + require.NoError(t, err) + require.Len(t, values, 1) + c.Next() + values, err = c.ReadFloatBlock(&buf) + require.NoError(t, err) + require.Empty(t, values) + closeC() + require.False(t, fsInUse()) + + r, err := fs.TSMReader(files[0]) + require.NoError(t, err) + require.NotNil(t, r) + require.True(t, fsInUse()) + r.Unref() + require.False(t, fsInUse()) + + // Keys() will call WalkKeys() which will can be blocked. + keys := fs.Keys() + require.NotNil(t, keys) + require.Len(t, keys, 2) + + require.False(t, fsInUse()) + } + + checkBlocked := func() { + t.Helper() + + require.False(t, fsInUse()) + + applyCount := 0 + err := fs.Apply(context.Background(), func(r tsm1.TSMFile) error { + applyCount++ + return nil + }) + require.ErrorIs(t, err, tsm1.ErrNewReadersBlocked) + require.Zero(t, applyCount, "Apply should not be called for any files when new readers are blocked") + + snap, err := fs.CreateSnapshot() + require.ErrorIs(t, err, tsm1.ErrNewReadersBlocked) + require.Empty(t, snap) + + buf := make([]tsm1.FloatValue, 1000) + c := fs.KeyCursor(context.Background(), []byte("cpu"), 0, true) + require.NotNil(t, c) + defer c.Close() + values, err := c.ReadFloatBlock(&buf) + require.NoError(t, err) + require.Empty(t, values) + + r, err := fs.TSMReader(files[0]) + require.ErrorIs(t, err, tsm1.ErrNewReadersBlocked) + require.Nil(t, r) + + // Keys() will call WalkKeys() which will can be blocked. + keys := fs.Keys() + require.Nil(t, keys) + + require.False(t, fsInUse()) + } + + checkUnblocked() + require.NoError(t, fs.SetNewReadersBlocked(true)) + checkBlocked() + + // nested block + require.NoError(t, fs.SetNewReadersBlocked(true)) + checkBlocked() + + // still blocked + require.NoError(t, fs.SetNewReadersBlocked(false)) + checkBlocked() + + // unblocked + require.NoError(t, fs.SetNewReadersBlocked(false)) + checkUnblocked() + + // Too many unblocks, sir, too many. + require.Error(t, fs.SetNewReadersBlocked(false)) + checkUnblocked() +} + type mockObserver struct { fileFinishing func(path string) error fileUnlinking func(path string) error diff --git a/tsdb/shard.go b/tsdb/shard.go index 363dcc9869a..7059cd9f93c 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -200,6 +200,24 @@ func (s *Shard) setEnabledNoLock(enabled bool) { } } +// SetNewReadersBlocked sets if new readers can access the shard. If blocked +// is true, the number of reader blocks is incremented and new readers will +// receive an error instead of shard access. If blocked is false, the number +// of reader blocks is decremented. If the reader blocks drops to 0, then +// new readers will be granted access to the shard. +func (s *Shard) SetNewReadersBlocked(blocked bool) error { + s.mu.Lock() + defer s.mu.Unlock() + return s._engine.SetNewReadersBlocked(blocked) +} + +// InUse returns true if this shard is in-use. +func (s *Shard) InUse() (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s._engine.InUse() +} + // ScheduleFullCompaction forces a full compaction to be schedule on the shard. func (s *Shard) ScheduleFullCompaction() error { engine, err := s.Engine() diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 9c3bbc90f0e..f03fc7ff749 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -17,6 +17,9 @@ import ( "testing" "time" + assert2 "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/davecgh/go-spew/spew" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -29,7 +32,6 @@ import ( _ "github.com/influxdata/influxdb/v2/tsdb/engine" _ "github.com/influxdata/influxdb/v2/tsdb/index" "github.com/influxdata/influxql" - assert2 "github.com/stretchr/testify/assert" ) func TestShardWriteAndIndex(t *testing.T) { @@ -1151,6 +1153,87 @@ _reserved,region=uswest value="foo" 0 } } +func TestShard_ReadersBlocked(t *testing.T) { + setup := func(index string) Shards { + shards := NewShards(t, index, 2) + shards.MustOpen() + + shards[0].MustWritePointsString(`cpu,host=serverA,region=uswest a=2.2,b=33.3,value=100 0`) + + shards[1].MustWritePointsString(` + cpu,host=serverA,region=uswest a=2.2,c=12.3,value=100,z="hello" 0 + disk q=100 0 + `) + + shards[0].Shard.ScheduleFullCompaction() + shards[1].Shard.ScheduleFullCompaction() + + return shards + } + + shardInUse := func(sh *tsdb.Shard) bool { + t.Helper() + require.NoError(t, sh.SetNewReadersBlocked(true)) + defer sh.SetNewReadersBlocked(false) + inUse, err := sh.InUse() + require.NoError(t, err) + return inUse + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(fmt.Sprintf("%s_readers_blocked", index), func(t *testing.T) { + shards := setup(index) + defer shards.Close() + + s1 := shards[0].Shard + m := influxql.Measurement{ + Database: "db0", + RetentionPolicy: "rp0", + Name: "cpu", + } + opts := query.IteratorOptions{ + Aux: []influxql.VarRef{{Val: "a", Type: influxql.Float}, {Val: "b", Type: influxql.Float}}, + StartTime: models.MinNanoTime, + EndTime: models.MaxNanoTime, + Ascending: false, + Limit: 5, + Ordered: true, + Authorizer: query.OpenAuthorizer, + } + + // Block new readers. Due to internal interfaces, CreateIterator won't return an error but + // it should return a faux iterator. + require.NoError(t, s1.SetNewReadersBlocked(true)) + require.False(t, shardInUse(s1)) + it, err := s1.CreateIterator(context.Background(), &m, opts) + require.NoError(t, err) // It would be great to get an error, but alas that's major internal replumbing. + require.NotNil(t, it) + require.False(t, shardInUse(s1)) // Remember, it isn't a real iterator. + fit, ok := it.(query.FloatIterator) + require.True(t, ok) + p, err := fit.Next() + require.NoError(t, err) + require.Nil(t, p) + require.NoError(t, fit.Close()) + require.NoError(t, s1.SetNewReadersBlocked(false)) + require.False(t, shardInUse(s1)) + + // CreateIterator, make sure shard shows as in-use during iterator life. + require.False(t, shardInUse(s1)) + it, err = s1.CreateIterator(context.Background(), &m, opts) + require.NoError(t, err) + fit, ok = it.(query.FloatIterator) + require.True(t, ok) + p, err = fit.Next() + require.NoError(t, err) + require.NotNil(t, p) + require.True(t, shardInUse(s1)) + require.NoError(t, fit.Close()) + require.False(t, shardInUse(s1)) + }) + } +} + func TestShards_FieldKeysByMeasurement(t *testing.T) { var shards Shards diff --git a/tsdb/store.go b/tsdb/store.go index 6904d3abab0..14214d02bc4 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -757,6 +757,31 @@ func (s *Store) DeleteShards() error { return nil } +// SetShardNewReadersBlocked sets if new readers can access the shard. If blocked +// is true, the number of reader blocks is incremented and new readers will +// receive an error instead of shard access. If blocked is false, the number +// of reader blocks is decremented. If the reader blocks drops to 0, then +// new readers will be granted access to the shard. +func (s *Store) SetShardNewReadersBlocked(shardID uint64, blocked bool) error { + sh := s.Shard(shardID) + if sh == nil { + return fmt.Errorf("SetShardNewReadersBlocked: shardID=%d, blocked=%t: %w", shardID, blocked, ErrShardNotFound) + } + return sh.SetNewReadersBlocked(blocked) +} + +// ShardInUse returns true if a shard is in-use (e.g. has active readers). +// SetShardNewReadersBlocked(id, true) should be called before checking +// ShardInUse to prevent race conditions where a reader could gain +// access to the shard immediately after ShardInUse is called. +func (s *Store) ShardInUse(shardID uint64) (bool, error) { + sh := s.Shard(shardID) + if sh == nil { + return false, fmt.Errorf("ShardInUse: shardID=%d: %w", shardID, ErrShardNotFound) + } + return sh.InUse() +} + // DeleteShard removes a shard from disk. func (s *Store) DeleteShard(shardID uint64) error { sh := s.Shard(shardID) diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 89c8e0fc98b..fc40684b734 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -6,7 +6,6 @@ import ( "context" "errors" "fmt" - "github.com/influxdata/influxdb/v2/predicate" "math" "math/rand" "os" @@ -19,6 +18,8 @@ import ( "testing" "time" + "github.com/influxdata/influxdb/v2/predicate" + "github.com/davecgh/go-spew/spew" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/internal" @@ -653,6 +654,82 @@ func TestShards_CreateIterator(t *testing.T) { } } +// Test new reader blocking. +func TestStore_NewReadersBlocked(t *testing.T) { + //t.Parallel() + + test := func(index string) { + t.Helper() + s := MustOpenStore(t, index) + defer s.Close() + + shardInUse := func(shardID uint64) bool { + t.Helper() + require.NoError(t, s.SetShardNewReadersBlocked(shardID, true)) + inUse, err := s.ShardInUse(shardID) + require.NoError(t, err) + require.NoError(t, s.SetShardNewReadersBlocked(shardID, false)) + return inUse + } + + // Create shard #0 with data. + s.MustCreateShardWithData("db0", "rp0", 0, + `cpu,host=serverA value=1 0`, + `cpu,host=serverA value=2 10`, + `cpu,host=serverB value=3 20`, + ) + + // Flush WAL to TSM files. + sh0 := s.Shard(0) + require.NotNil(t, sh0) + sh0.ScheduleFullCompaction() + + // Retrieve shard group. + shards := s.ShardGroup([]uint64{0}) + + m := &influxql.Measurement{Name: "cpu"} + opts := query.IteratorOptions{ + Expr: influxql.MustParseExpr(`value`), + Dimensions: []string{"host"}, + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + } + + // Block new readers, iterator we get will be a faux iterator with no data. + require.NoError(t, s.SetShardNewReadersBlocked(0, true)) + require.False(t, shardInUse(0)) + itr, err := shards.CreateIterator(context.Background(), m, opts) + require.NoError(t, err) + require.False(t, shardInUse(0)) // Remember, itr is a faux iterator. + fitr, ok := itr.(query.FloatIterator) + require.True(t, ok) + p, err := fitr.Next() + require.NoError(t, err) + require.Nil(t, p) + require.NoError(t, itr.Close()) + require.False(t, shardInUse(0)) + require.NoError(t, s.SetShardNewReadersBlocked(0, false)) + + // Create iterator, no blocks present. + require.False(t, shardInUse(0)) + itr, err = shards.CreateIterator(context.Background(), m, opts) + require.NoError(t, err) + require.True(t, shardInUse(0)) + fitr, ok = itr.(query.FloatIterator) + require.True(t, ok) + p, err = fitr.Next() + require.NoError(t, err) + require.NotNil(t, p) + require.NoError(t, itr.Close()) + require.False(t, shardInUse(0)) + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(fmt.Sprintf("TestStore_NewReadersBlocked_%s", index), func(t *testing.T) { test(index) }) + } +} + // Ensure the store can backup a shard and another store can restore it. func TestStore_BackupRestoreShard(t *testing.T) { test := func(t *testing.T, index string) { diff --git a/v1/services/retention/service.go b/v1/services/retention/service.go index 92232091d56..c81825c7b68 100644 --- a/v1/services/retention/service.go +++ b/v1/services/retention/service.go @@ -28,6 +28,9 @@ type Service struct { TSDBStore interface { ShardIDs() []uint64 DeleteShard(shardID uint64) error + + SetShardNewReadersBlocked(shardID uint64, blocked bool) error + ShardInUse(shardID uint64) (bool, error) } // DropShardRef is a function that takes a shard ID and removes the @@ -140,13 +143,13 @@ func (s *Service) run(ctx context.Context) { return case <-ticker.C: - s.DeletionCheck() + s.DeletionCheck(ctx) } } } -func (s *Service) DeletionCheck() { - log, logEnd := logger.NewOperation(context.Background(), s.logger, "Retention policy deletion check", "retention_delete_check") +func (s *Service) DeletionCheck(ctx context.Context) { + log, logEnd := logger.NewOperation(ctx, s.logger, "Retention policy deletion check", "retention_delete_check") defer logEnd() type deletionInfo struct { @@ -163,18 +166,6 @@ func (s *Service) DeletionCheck() { } deletedShardIDs := make(map[uint64]deletionInfo) - dropShardMetaRef := func(id uint64, info deletionInfo) error { - if err := s.DropShardMetaRef(id, info.owners); err != nil { - log.Error("Failed to drop shard meta reference", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp), - zap.Error(err)) - return err - } - return nil - } - // Mark down if an error occurred during this function so we can inform the // user that we will try again on the next interval. // Without the message, they may see the error message and assume they @@ -192,25 +183,26 @@ func (s *Service) DeletionCheck() { // Determine all shards that have expired and need to be deleted. for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { - if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { - log.Info("Failed to delete shard group", - logger.Database(d.Name), - logger.ShardGroup(g.ID), - logger.RetentionPolicy(r.Name), - zap.Error(err)) - retryNeeded = true - continue - } - - log.Info("Deleted shard group", - logger.Database(d.Name), - logger.ShardGroup(g.ID), - logger.RetentionPolicy(r.Name)) - - // Store all the shard IDs that may possibly need to be removed locally. - for _, sh := range g.Shards { - deletedShardIDs[sh.ID] = newDeletionInfo(d.Name, r.Name, sh) - } + func() { + log, logEnd := logger.NewOperation(ctx, log, "Deleting expired shard group", "retention_delete_expired_shard_group", + logger.Database(d.Name), logger.ShardGroup(g.ID), logger.RetentionPolicy(r.Name)) + defer logEnd() + if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { + log.Error("Failed to delete shard group", zap.Error(err)) + retryNeeded = true + return + } + + log.Info("Deleted shard group") + + // Store all the shard IDs that may possibly need to be removed locally. + groupShards := make([]uint64, len(g.Shards)) + for _, sh := range g.Shards { + groupShards = append(groupShards, sh.ID) + deletedShardIDs[sh.ID] = newDeletionInfo(d.Name, r.Name, sh) + } + log.Info("Group's shards will be removed from local storage if found", zap.Uint64s("shards", groupShards)) + }() } } } @@ -219,58 +211,91 @@ func (s *Service) DeletionCheck() { for _, id := range s.TSDBStore.ShardIDs() { if info, ok := deletedShardIDs[id]; ok { delete(deletedShardIDs, id) - log.Info("Attempting deletion of shard from store", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp)) - if err := s.TSDBStore.DeleteShard(id); err != nil { - log.Error("Failed to delete shard", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp), - zap.Error(err)) - if errors.Is(err, tsdb.ErrShardNotFound) { - // At first you wouldn't think this could happen, we're iterating over shards - // in the store. However, if this has been a very long running operation the - // shard could have been dropped from the store while we were working on other shards. - log.Warn("Shard does not exist in store, continuing retention removal", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp)) - } else { - retryNeeded = true - continue + + err := func() (rErr error) { + log, logEnd := logger.NewOperation(ctx, log, "Deleting shard from shard group deleted based on retention policy", "retention_delete_shard", + logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp)) + defer func() { + if rErr != nil { + // Log the error before logEnd(). + log.Error("Error deleting shard", zap.Error(rErr)) + } + logEnd() + }() + + // Block new readers for shard and check if it is in-use before deleting. This is to prevent + // an issue where a shard that is stuck in-use will block the retention service. + if err := s.TSDBStore.SetShardNewReadersBlocked(id, true); err != nil { + return fmt.Errorf("error blocking new readers for shard: %w", err) } - } - log.Info("Deleted shard", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp)) - if err := dropShardMetaRef(id, info); err != nil { - // removeShardMetaReference already logged the error. + defer func() { + if rErr != nil && !errors.Is(rErr, tsdb.ErrShardNotFound) { + log.Info("Unblocking new readers for shard after delete failed") + if unblockErr := s.TSDBStore.SetShardNewReadersBlocked(id, false); unblockErr != nil { + log.Error("Error unblocking new readers for shard", zap.Error(unblockErr)) + } + } + }() + + // We should only try to delete shards that are not in-use. + if inUse, err := s.TSDBStore.ShardInUse(id); err != nil { + return fmt.Errorf("error checking if shard is in-use: %w", err) + } else if inUse { + return errors.New("can not delete an in-use shard") + } + + // Now it's time to delete the shard + if err := s.TSDBStore.DeleteShard(id); err != nil { + return fmt.Errorf("error deleting shard from store: %w", err) + } + log.Info("Deleted shard") + return nil + }() + // Check for error deleting the shard from the TSDB. Continue onto DropShardMetaRef if the + // error was tsdb.ErrShardNotFound. We got here because the shard was in the metadata, + // but it wasn't really in the store, so try deleting it out of the metadata. + if err != nil && !errors.Is(err, tsdb.ErrShardNotFound) { + // Logging of error was handled by the lambda in a defer so that it is within + // the operation instead of after the operation. retryNeeded = true continue } + + func() { + log, logEnd := logger.NewOperation(ctx, log, "Dropping shard meta references", "retention_drop_refs", + logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp), zap.Uint64s("owners", info.owners)) + defer logEnd() + if err := s.DropShardMetaRef(id, info.owners); err != nil { + log.Error("Error dropping shard meta reference", zap.Error(err)) + retryNeeded = true + return + } + }() } } // Check for expired phantom shards that exist in the metadata but not in the store. for id, info := range deletedShardIDs { - log.Error("Expired phantom shard detected during retention check, removing from metadata", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp)) - if err := dropShardMetaRef(id, info); err != nil { - // removeShardMetaReference already logged the error. - retryNeeded = true - continue - } + func() { + log, logEnd := logger.NewOperation(ctx, log, "Drop phantom shard references", "retention_drop_phantom_refs", + logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp), zap.Uint64s("owners", info.owners)) + defer logEnd() + log.Warn("Expired phantom shard detected during retention check, removing from metadata") + if err := s.DropShardMetaRef(id, info.owners); err != nil { + log.Error("Error dropping shard meta reference for phantom shard", zap.Error(err)) + retryNeeded = true + } + }() } - if err := s.MetaClient.PruneShardGroups(); err != nil { - log.Info("Problem pruning shard groups", zap.Error(err)) - retryNeeded = true - } + func() { + log, logEnd := logger.NewOperation(ctx, log, "Pruning shard groups after retention check", "retention_prune_shard_groups") + defer logEnd() + if err := s.MetaClient.PruneShardGroups(); err != nil { + log.Error("Error pruning shard groups", zap.Error(err)) + retryNeeded = true + } + }() if retryNeeded { log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) diff --git a/v1/services/retention/service_test.go b/v1/services/retention/service_test.go index 8ae1816d669..f02940b5b7e 100644 --- a/v1/services/retention/service_test.go +++ b/v1/services/retention/service_test.go @@ -79,7 +79,8 @@ func TestRetention_DeletionCheck(t *testing.T) { shardDuration := time.Hour * 24 * 14 shardGroupDuration := time.Hour * 24 foreverShard := uint64(1003) // a shard that can't be deleted - phantomShard := uint64(1006) + phantomShard := uint64(1006) // a shard that exists in meta data but not TSDB store + activeShard := uint64(1007) // a shard that is active dataUT := &meta.Data{ Users: []meta.UserInfo{}, Databases: []meta.DatabaseInfo{ @@ -160,6 +161,18 @@ func TestRetention_DeletionCheck(t *testing.T) { }, }, }, + // Shard group 7 is deleted and expired, but its shard is in-use and should not be deleted. + { + ID: 7, + StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 2*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + DeletedAt: now.Truncate(time.Hour * 24).Add(-1 * shardDuration).Add(meta.ShardGroupDeletedExpiration), + Shards: []meta.ShardInfo{ + { + ID: activeShard, + }, + }, + }, }, }, }, @@ -211,19 +224,47 @@ func TestRetention_DeletionCheck(t *testing.T) { shardIDs := func() []uint64 { return maps.Keys(shards) } + inUseShards := map[uint64]struct{}{ + activeShard: struct{}{}, + } + newReaderBlocks := make(map[uint64]int) // ShsrdID to number of active blocks + setShardNewReadersBlocked := func(shardID uint64, blocked bool) error { + t.Helper() + require.Contains(t, shards, shardID, "SetShardNewReadersBlocked for non-existant shard %d", shardID) + if blocked { + newReaderBlocks[shardID]++ + } else { + require.Greater(t, newReaderBlocks[shardID], 0) + newReaderBlocks[shardID]-- + } + return nil + } deleteShard := func(shardID uint64) error { + t.Helper() if _, ok := shards[shardID]; !ok { return tsdb.ErrShardNotFound } + require.Greater(t, newReaderBlocks[shardID], 0, "DeleteShard called on shard without a reader block (%d)", shardID) + require.NotContains(t, inUseShards, shardID, "DeleteShard called on an active shard (%d)", shardID) if shardID == foreverShard { return fmt.Errorf("unknown error deleting shard files for shard %d", shardID) } delete(shards, shardID) + delete(newReaderBlocks, shardID) return nil } + shardInUse := func(shardID uint64) (bool, error) { + if _, valid := shards[shardID]; !valid { + return false, tsdb.ErrShardNotFound + } + _, inUse := inUseShards[shardID] + return inUse, nil + } store := &internal.TSDBStoreMock{ - DeleteShardFn: deleteShard, - ShardIDsFn: shardIDs, + DeleteShardFn: deleteShard, + ShardIDsFn: shardIDs, + SetShardNewReadersBlockedFn: setShardNewReadersBlocked, + ShardInUseFn: shardInUse, } s := retention.NewService(cfg) @@ -231,7 +272,14 @@ func TestRetention_DeletionCheck(t *testing.T) { s.TSDBStore = store s.DropShardMetaRef = retention.OSSDropShardMetaRef(s.MetaClient) require.NoError(t, s.Open(context.Background())) - s.DeletionCheck() + deletionCheck := func() { + t.Helper() + s.DeletionCheck(context.Background()) + for k, v := range newReaderBlocks { + require.Zero(t, v, "shard %d has %d active blocks, should be zero", k, v) + } + } + deletionCheck() // Adjust expData to make it look like we expect. require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 1)) @@ -245,14 +293,14 @@ func TestRetention_DeletionCheck(t *testing.T) { // Check that multiple duplicate calls to DeletionCheck don't make further changes. // This is mostly for our friend foreverShard. for i := 0; i < 10; i++ { - s.DeletionCheck() + deletionCheck() require.Equal(t, expData, dataUT) require.Equal(t, collectShards(expData), shards) } // Our heroic support team hos fixed the issue with foreverShard. foreverShard = math.MaxUint64 - s.DeletionCheck() + deletionCheck() require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 3)) require.Equal(t, expData, dataUT) require.Equal(t, collectShards(expData), shards) @@ -404,6 +452,30 @@ func TestService_CheckShards(t *testing.T) { return nil } + shardBlockCount := map[uint64]int{} + s.TSDBStore.SetShardNewReadersBlockedFn = func(shardID uint64, blocked bool) error { + c := shardBlockCount[shardID] + if blocked { + c++ + } else { + c-- + if c < 0 { + return fmt.Errorf("too many unblocks: %d", shardID) + } + } + shardBlockCount[shardID] = c + return nil + } + + s.TSDBStore.ShardInUseFn = func(shardID uint64) (bool, error) { + c := shardBlockCount[shardID] + if c <= 0 { + return false, fmt.Errorf("ShardInUse called on unblocked shard: %d", shardID) + } + // TestService_DeletionCheck has tests for active shards. We're just checking for proper use. + return false, nil + } + if err := s.Open(context.Background()); err != nil { t.Fatalf("unexpected open error: %s", err) } @@ -620,6 +692,16 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) return nil } + s.TSDBStore.SetShardNewReadersBlockedFn = func(shardID uint64, blocked bool) error { + // This test does not simulate active / in-use shards. This can just be a stub. + return nil + } + + s.TSDBStore.ShardInUseFn = func(shardID uint64) (bool, error) { + // This does not simulate active / in-use shards. This can just be a stub. + return false, nil + } + return s, errC, done }