diff --git a/internal/tsdb_store.go b/internal/tsdb_store.go index 8e2f13d8b4f..411c5d48073 100644 --- a/internal/tsdb_store.go +++ b/internal/tsdb_store.go @@ -14,38 +14,40 @@ import ( // TSDBStoreMock is a mockable implementation of tsdb.Store. type TSDBStoreMock struct { - BackupShardFn func(id uint64, since time.Time, w io.Writer) error - BackupSeriesFileFn func(database string, w io.Writer) error - ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error - CloseFn func() error - CreateShardFn func(database, policy string, shardID uint64, enabled bool) error - CreateShardSnapshotFn func(id uint64) (string, error) - DatabasesFn func() []string - DeleteDatabaseFn func(name string) error - DeleteMeasurementFn func(ctx context.Context, database, name string) error - DeleteRetentionPolicyFn func(database, name string) error - DeleteSeriesFn func(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error - DeleteShardFn func(id uint64) error - DiskSizeFn func() (int64, error) - ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) - ImportShardFn func(id uint64, r io.Reader) error - MeasurementsCardinalityFn func(database string) (int64, error) - MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) - OpenFn func() error - PathFn func() string - RestoreShardFn func(id uint64, r io.Reader) error - SeriesCardinalityFn func(database string) (int64, error) - SetShardEnabledFn func(shardID uint64, enabled bool) error - ShardFn func(id uint64) *tsdb.Shard - ShardGroupFn func(ids []uint64) tsdb.ShardGroup - ShardIDsFn func() []uint64 - ShardNFn func() int - ShardRelativePathFn func(id uint64) (string, error) - ShardsFn func(ids []uint64) []*tsdb.Shard - TagKeysFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) - TagValuesFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) - WithLoggerFn func(log *zap.Logger) - WriteToShardFn func(shardID uint64, points []models.Point) error + BackupShardFn func(id uint64, since time.Time, w io.Writer) error + BackupSeriesFileFn func(database string, w io.Writer) error + ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error + CloseFn func() error + CreateShardFn func(database, policy string, shardID uint64, enabled bool) error + CreateShardSnapshotFn func(id uint64) (string, error) + DatabasesFn func() []string + DeleteDatabaseFn func(name string) error + DeleteMeasurementFn func(ctx context.Context, database, name string) error + DeleteRetentionPolicyFn func(database, name string) error + DeleteSeriesFn func(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error + DeleteShardFn func(id uint64) error + DiskSizeFn func() (int64, error) + ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error) + ImportShardFn func(id uint64, r io.Reader) error + MeasurementsCardinalityFn func(database string) (int64, error) + MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) + OpenFn func() error + PathFn func() string + RestoreShardFn func(id uint64, r io.Reader) error + SeriesCardinalityFn func(database string) (int64, error) + SetShardEnabledFn func(shardID uint64, enabled bool) error + SetShardNewReadersBlockedFn func(shardID uint64, blocked bool) error + ShardFn func(id uint64) *tsdb.Shard + ShardGroupFn func(ids []uint64) tsdb.ShardGroup + ShardIDsFn func() []uint64 + ShardInUseFn func(shardID uint64) (bool, error) + ShardNFn func() int + ShardRelativePathFn func(id uint64) (string, error) + ShardsFn func(ids []uint64) []*tsdb.Shard + TagKeysFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) + TagValuesFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) + WithLoggerFn func(log *zap.Logger) + WriteToShardFn func(shardID uint64, points []models.Point) error } func (s *TSDBStoreMock) BackupShard(id uint64, since time.Time, w io.Writer) error { @@ -112,6 +114,9 @@ func (s *TSDBStoreMock) SeriesCardinality(database string) (int64, error) { func (s *TSDBStoreMock) SetShardEnabled(shardID uint64, enabled bool) error { return s.SetShardEnabledFn(shardID, enabled) } +func (s *TSDBStoreMock) SetShardNewReadersBlocked(shardID uint64, blocked bool) error { + return s.SetShardNewReadersBlockedFn(shardID, blocked) +} func (s *TSDBStoreMock) Shard(id uint64) *tsdb.Shard { return s.ShardFn(id) } @@ -121,6 +126,9 @@ func (s *TSDBStoreMock) ShardGroup(ids []uint64) tsdb.ShardGroup { func (s *TSDBStoreMock) ShardIDs() []uint64 { return s.ShardIDsFn() } +func (s *TSDBStoreMock) ShardInUse(shardID uint64) (bool, error) { + return s.ShardInUseFn(shardID) +} func (s *TSDBStoreMock) ShardN() int { return s.ShardNFn() } diff --git a/tsdb/engine.go b/tsdb/engine.go index 8d87955840b..9c0287641bb 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -34,6 +34,9 @@ type Engine interface { SetCompactionsEnabled(enabled bool) ScheduleFullCompaction() error + SetNewReadersBlocked(blocked bool) error + InUse() (bool, error) + WithLogger(*zap.Logger) LoadMetadataIndex(shardID uint64, index Index) error diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index a2502ab10bc..9ea5b28e2a3 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -712,7 +712,7 @@ type Compactor struct { FileStore interface { NextGeneration() int - TSMReader(path string) *TSMReader + TSMReader(path string) (*TSMReader, error) } // RateLimit is the limit for disk writes for all concurrent compactions. @@ -943,7 +943,10 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([ default: } - tr := c.FileStore.TSMReader(file) + tr, err := c.FileStore.TSMReader(file) + if err != nil { + return nil, errCompactionAborted{fmt.Errorf("error creating reader for %q: %w", file, err)} + } if tr == nil { // This would be a bug if this occurred as tsmFiles passed in should only be // assigned to one compaction at any one time. A nil tr would mean the file diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 4c711fa1f72..39aeb45d652 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -3074,11 +3074,11 @@ func (w *fakeFileStore) BlockCount(path string, idx int) int { return w.blockCount } -func (w *fakeFileStore) TSMReader(path string) *tsm1.TSMReader { +func (w *fakeFileStore) TSMReader(path string) (*tsm1.TSMReader, error) { r := MustOpenTSMReader(path) w.readers = append(w.readers, r) r.Ref() - return r + return r, nil } func (w *fakeFileStore) Close() { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 0191a3b9f7b..633d65c15d7 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -507,6 +507,24 @@ func (e *Engine) disableSnapshotCompactions() { } } +// SetNewReadersBlocked sets if new readers can access the shard. If blocked +// is true, the number of reader blocks is incremented and new readers will +// receive an error instead of shard access. If blocked is false, the number +// of reader blocks is decremented. If the reader blocks drops to 0, then +// new readers will be granted access to the shard. +func (e *Engine) SetNewReadersBlocked(blocked bool) error { + e.mu.Lock() + defer e.mu.Unlock() + return e.FileStore.SetNewReadersBlocked(blocked) +} + +// InUse returns true if the shard is in-use by readers. +func (e *Engine) InUse() (bool, error) { + e.mu.RLock() + defer e.mu.RUnlock() + return e.FileStore.InUse() +} + // ScheduleFullCompaction will force the engine to fully compact all data stored. // This will cancel and running compactions and snapshot any data in the cache to // TSM files. This is an expensive operation. diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 148b7c15d10..731db0de955 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -35,6 +35,10 @@ const ( BadTSMFileExtension = "bad" ) +var ( + ErrNewReadersBlocked = errors.New("new readers blocked") +) + // TSMFile represents an on-disk TSM file. type TSMFile interface { // Path returns the underlying file path for the TSMFile. If the file @@ -190,6 +194,10 @@ type FileStore struct { obs tsdb.FileStoreObserver copyFiles bool + + // newReaderBlockCount keeps track of the current new reader block requests. + // If non-zero, no new TSMReader objects may be created. + newReaderBlockCount int } // FileStat holds information about a TSM file on disk. @@ -391,6 +399,10 @@ func (f *FileStore) WalkKeys(seek []byte, fn func(key []byte, typ byte) error) e f.mu.RUnlock() return nil } + if f.newReadersBlocked() { + f.mu.RUnlock() + return fmt.Errorf("WalkKeys: %q: %w", f.dir, ErrNewReadersBlocked) + } // Ensure files are not unmapped while we're iterating over them. for _, r := range f.files { @@ -449,6 +461,10 @@ func (f *FileStore) Apply(ctx context.Context, fn func(r TSMFile) error) error { limiter := limiter.NewFixed(runtime.GOMAXPROCS(0)) f.mu.RLock() + if f.newReadersBlocked() { + f.mu.RUnlock() + return fmt.Errorf("Apply: %q: %w", f.dir, ErrNewReadersBlocked) + } errC := make(chan error, len(f.files)) for _, f := range f.files { @@ -725,22 +741,85 @@ func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost { // Reader returns a TSMReader for path if one is currently managed by the FileStore. // Otherwise it returns nil. If it returns a file, you must call Unref on it when // you are done, and never use it after that. -func (f *FileStore) TSMReader(path string) *TSMReader { +func (f *FileStore) TSMReader(path string) (*TSMReader, error) { f.mu.RLock() defer f.mu.RUnlock() + if f.newReadersBlocked() { + return nil, fmt.Errorf("TSMReader: %q (%q): %w", f.dir, path, ErrNewReadersBlocked) + } for _, r := range f.files { if r.Path() == path { r.Ref() - return r.(*TSMReader) + return r.(*TSMReader), nil } } + return nil, nil +} + +// SetNewReadersBlocked sets if new readers can access the files in this FileStore. +// If blocked is true, the number of reader blocks is incremented and new readers will +// receive an error instead of reader access. If blocked is false, the number +// of reader blocks is decremented. If the reader blocks drops to 0, then +// new readers will be granted access to the files. +func (f *FileStore) SetNewReadersBlocked(block bool) error { + f.mu.Lock() + defer f.mu.Unlock() + if block { + if f.newReaderBlockCount < 0 { + return fmt.Errorf("newReaderBlockCount for %q was %d before block operation, block failed", f.dir, f.newReaderBlockCount) + } + f.newReaderBlockCount++ + } else { + if f.newReaderBlockCount <= 0 { + return fmt.Errorf("newReadersBlockCount for %q was %d before unblock operation, unblock failed", f.dir, f.newReaderBlockCount) + } + f.newReaderBlockCount-- + } return nil } +// newReadersBlocked returns true if new references to TSMReader objects are not allowed. +// Must be called with f.mu lock held (reader or writer). +// See SetNewReadersBlocked for interface to allow and block access to TSMReader objects. +func (f *FileStore) newReadersBlocked() bool { + return f.newReaderBlockCount > 0 +} + +// InUse returns true if any files in this FileStore are in-use. +// InUse can only be called if a new readers have been blocked using SetNewReadersBlocked. +// This is to avoid a race condition between calling InUse and attempting an operation +// that requires no active readers. Calling InUse without a new readers block results +// in an error. +func (f *FileStore) InUse() (bool, error) { + f.mu.RLock() + defer f.mu.RUnlock() + if !f.newReadersBlocked() { + return false, fmt.Errorf("InUse called without a new reader block for %q", f.dir) + } + for _, r := range f.files { + if r.InUse() { + return true, nil + } + } + return false, nil +} + // KeyCursor returns a KeyCursor for key and t across the files in the FileStore. func (f *FileStore) KeyCursor(ctx context.Context, key []byte, t int64, ascending bool) *KeyCursor { f.mu.RLock() defer f.mu.RUnlock() + if f.newReadersBlocked() { + // New readers are blocked for this FileStore because there is a delete attempt in progress. + // Return an empty cursor to appease the callers since they generally don't handle + // a nil KeyCursor gracefully. + return &KeyCursor{ + key: key, + seeks: nil, + ctx: ctx, + col: metrics.GroupFromContext(ctx), + ascending: ascending, + } + } return newKeyCursor(ctx, f, key, t, ascending) } @@ -1216,6 +1295,10 @@ func (f *FileStore) CreateSnapshot() (string, error) { f.traceLogger.Info("Creating snapshot", zap.String("dir", f.dir)) f.mu.Lock() + if f.newReadersBlocked() { + f.mu.Unlock() + return "", fmt.Errorf("CreateSnapshot: %q: %w", f.dir, ErrNewReadersBlocked) + } // create a copy of the files slice and ensure they aren't closed out from // under us, nor the slice mutated. files := make([]TSMFile, len(f.files)) diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index b09bf0b2788..c83446bd2ab 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -2784,6 +2785,153 @@ func newTestFileStore(tb testing.TB, dir string) *tsm1.FileStore { return fs } +func TestFileStore_ReaderBlocking(t *testing.T) { + dir := t.TempDir() + + // Create 3 TSM files... + data := []keyValues{ + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}}, + keyValues{"cpu", []tsm1.Value{tsm1.NewValue(1, 2.0)}}, + keyValues{"mem", []tsm1.Value{tsm1.NewValue(0, 1.0)}}, + } + + files, err := newFileDir(t, dir, data...) + require.NoError(t, err) + + fs := newTestFileStore(t, dir) + require.NoError(t, fs.Open(context.Background())) + + fsInUse := func() bool { + t.Helper() + require.NoError(t, fs.SetNewReadersBlocked(true)) + defer func() { + t.Helper() + require.NoError(t, fs.SetNewReadersBlocked(false)) + }() + inUse, err := fs.InUse() + require.NoError(t, err) + return inUse + } + + checkUnblocked := func() { + t.Helper() + + require.False(t, fsInUse()) + + var applyCount atomic.Uint32 + err = fs.Apply(context.Background(), func(r tsm1.TSMFile) error { + applyCount.Add(1) + return nil + }) + require.NoError(t, err) + require.Equal(t, uint32(len(files)), applyCount.Load(), "Apply should be called for all files") + + snap, err := fs.CreateSnapshot() + require.NoError(t, err) + require.NotEmpty(t, snap) + + buf := make([]tsm1.FloatValue, 1000) + c := fs.KeyCursor(context.Background(), []byte("cpu"), 0, true) + // closeC exists because we want to call c.Close() if a test fails in a defer, + // but we also need to call c.Close() as part of the test. closeC makes sure we + // don't double close it. + closeC := func() { + t.Helper() + if c != nil { + c.Close() // Close does not return anything + c = nil + } + } + defer closeC() + require.NotNil(t, c) + require.True(t, fsInUse()) + + values, err := c.ReadFloatBlock(&buf) + require.NoError(t, err) + require.Len(t, values, 1) + c.Next() + values, err = c.ReadFloatBlock(&buf) + require.NoError(t, err) + require.Len(t, values, 1) + c.Next() + values, err = c.ReadFloatBlock(&buf) + require.NoError(t, err) + require.Empty(t, values) + closeC() + require.False(t, fsInUse()) + + r, err := fs.TSMReader(files[0]) + require.NoError(t, err) + require.NotNil(t, r) + require.True(t, fsInUse()) + r.Unref() + require.False(t, fsInUse()) + + // Keys() will call WalkKeys() which will can be blocked. + keys := fs.Keys() + require.NotNil(t, keys) + require.Len(t, keys, 2) + + require.False(t, fsInUse()) + } + + checkBlocked := func() { + t.Helper() + + require.False(t, fsInUse()) + + applyCount := 0 + err := fs.Apply(context.Background(), func(r tsm1.TSMFile) error { + applyCount++ + return nil + }) + require.ErrorIs(t, err, tsm1.ErrNewReadersBlocked) + require.Zero(t, applyCount, "Apply should not be called for any files when new readers are blocked") + + snap, err := fs.CreateSnapshot() + require.ErrorIs(t, err, tsm1.ErrNewReadersBlocked) + require.Empty(t, snap) + + buf := make([]tsm1.FloatValue, 1000) + c := fs.KeyCursor(context.Background(), []byte("cpu"), 0, true) + require.NotNil(t, c) + defer c.Close() + values, err := c.ReadFloatBlock(&buf) + require.NoError(t, err) + require.Empty(t, values) + + r, err := fs.TSMReader(files[0]) + require.ErrorIs(t, err, tsm1.ErrNewReadersBlocked) + require.Nil(t, r) + + // Keys() will call WalkKeys() which will can be blocked. + keys := fs.Keys() + require.Nil(t, keys) + + require.False(t, fsInUse()) + } + + checkUnblocked() + require.NoError(t, fs.SetNewReadersBlocked(true)) + checkBlocked() + + // nested block + require.NoError(t, fs.SetNewReadersBlocked(true)) + checkBlocked() + + // still blocked + require.NoError(t, fs.SetNewReadersBlocked(false)) + checkBlocked() + + // unblocked + require.NoError(t, fs.SetNewReadersBlocked(false)) + checkUnblocked() + + // Too many unblocks, sir, too many. + require.Error(t, fs.SetNewReadersBlocked(false)) + checkUnblocked() +} + type mockObserver struct { fileFinishing func(path string) error fileUnlinking func(path string) error diff --git a/tsdb/shard.go b/tsdb/shard.go index 363dcc9869a..7059cd9f93c 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -200,6 +200,24 @@ func (s *Shard) setEnabledNoLock(enabled bool) { } } +// SetNewReadersBlocked sets if new readers can access the shard. If blocked +// is true, the number of reader blocks is incremented and new readers will +// receive an error instead of shard access. If blocked is false, the number +// of reader blocks is decremented. If the reader blocks drops to 0, then +// new readers will be granted access to the shard. +func (s *Shard) SetNewReadersBlocked(blocked bool) error { + s.mu.Lock() + defer s.mu.Unlock() + return s._engine.SetNewReadersBlocked(blocked) +} + +// InUse returns true if this shard is in-use. +func (s *Shard) InUse() (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s._engine.InUse() +} + // ScheduleFullCompaction forces a full compaction to be schedule on the shard. func (s *Shard) ScheduleFullCompaction() error { engine, err := s.Engine() diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 9c3bbc90f0e..f03fc7ff749 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -17,6 +17,9 @@ import ( "testing" "time" + assert2 "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/davecgh/go-spew/spew" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -29,7 +32,6 @@ import ( _ "github.com/influxdata/influxdb/v2/tsdb/engine" _ "github.com/influxdata/influxdb/v2/tsdb/index" "github.com/influxdata/influxql" - assert2 "github.com/stretchr/testify/assert" ) func TestShardWriteAndIndex(t *testing.T) { @@ -1151,6 +1153,87 @@ _reserved,region=uswest value="foo" 0 } } +func TestShard_ReadersBlocked(t *testing.T) { + setup := func(index string) Shards { + shards := NewShards(t, index, 2) + shards.MustOpen() + + shards[0].MustWritePointsString(`cpu,host=serverA,region=uswest a=2.2,b=33.3,value=100 0`) + + shards[1].MustWritePointsString(` + cpu,host=serverA,region=uswest a=2.2,c=12.3,value=100,z="hello" 0 + disk q=100 0 + `) + + shards[0].Shard.ScheduleFullCompaction() + shards[1].Shard.ScheduleFullCompaction() + + return shards + } + + shardInUse := func(sh *tsdb.Shard) bool { + t.Helper() + require.NoError(t, sh.SetNewReadersBlocked(true)) + defer sh.SetNewReadersBlocked(false) + inUse, err := sh.InUse() + require.NoError(t, err) + return inUse + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(fmt.Sprintf("%s_readers_blocked", index), func(t *testing.T) { + shards := setup(index) + defer shards.Close() + + s1 := shards[0].Shard + m := influxql.Measurement{ + Database: "db0", + RetentionPolicy: "rp0", + Name: "cpu", + } + opts := query.IteratorOptions{ + Aux: []influxql.VarRef{{Val: "a", Type: influxql.Float}, {Val: "b", Type: influxql.Float}}, + StartTime: models.MinNanoTime, + EndTime: models.MaxNanoTime, + Ascending: false, + Limit: 5, + Ordered: true, + Authorizer: query.OpenAuthorizer, + } + + // Block new readers. Due to internal interfaces, CreateIterator won't return an error but + // it should return a faux iterator. + require.NoError(t, s1.SetNewReadersBlocked(true)) + require.False(t, shardInUse(s1)) + it, err := s1.CreateIterator(context.Background(), &m, opts) + require.NoError(t, err) // It would be great to get an error, but alas that's major internal replumbing. + require.NotNil(t, it) + require.False(t, shardInUse(s1)) // Remember, it isn't a real iterator. + fit, ok := it.(query.FloatIterator) + require.True(t, ok) + p, err := fit.Next() + require.NoError(t, err) + require.Nil(t, p) + require.NoError(t, fit.Close()) + require.NoError(t, s1.SetNewReadersBlocked(false)) + require.False(t, shardInUse(s1)) + + // CreateIterator, make sure shard shows as in-use during iterator life. + require.False(t, shardInUse(s1)) + it, err = s1.CreateIterator(context.Background(), &m, opts) + require.NoError(t, err) + fit, ok = it.(query.FloatIterator) + require.True(t, ok) + p, err = fit.Next() + require.NoError(t, err) + require.NotNil(t, p) + require.True(t, shardInUse(s1)) + require.NoError(t, fit.Close()) + require.False(t, shardInUse(s1)) + }) + } +} + func TestShards_FieldKeysByMeasurement(t *testing.T) { var shards Shards diff --git a/tsdb/store.go b/tsdb/store.go index 6904d3abab0..14214d02bc4 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -757,6 +757,31 @@ func (s *Store) DeleteShards() error { return nil } +// SetShardNewReadersBlocked sets if new readers can access the shard. If blocked +// is true, the number of reader blocks is incremented and new readers will +// receive an error instead of shard access. If blocked is false, the number +// of reader blocks is decremented. If the reader blocks drops to 0, then +// new readers will be granted access to the shard. +func (s *Store) SetShardNewReadersBlocked(shardID uint64, blocked bool) error { + sh := s.Shard(shardID) + if sh == nil { + return fmt.Errorf("SetShardNewReadersBlocked: shardID=%d, blocked=%t: %w", shardID, blocked, ErrShardNotFound) + } + return sh.SetNewReadersBlocked(blocked) +} + +// ShardInUse returns true if a shard is in-use (e.g. has active readers). +// SetShardNewReadersBlocked(id, true) should be called before checking +// ShardInUse to prevent race conditions where a reader could gain +// access to the shard immediately after ShardInUse is called. +func (s *Store) ShardInUse(shardID uint64) (bool, error) { + sh := s.Shard(shardID) + if sh == nil { + return false, fmt.Errorf("ShardInUse: shardID=%d: %w", shardID, ErrShardNotFound) + } + return sh.InUse() +} + // DeleteShard removes a shard from disk. func (s *Store) DeleteShard(shardID uint64) error { sh := s.Shard(shardID) diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 89c8e0fc98b..fc40684b734 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -6,7 +6,6 @@ import ( "context" "errors" "fmt" - "github.com/influxdata/influxdb/v2/predicate" "math" "math/rand" "os" @@ -19,6 +18,8 @@ import ( "testing" "time" + "github.com/influxdata/influxdb/v2/predicate" + "github.com/davecgh/go-spew/spew" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/internal" @@ -653,6 +654,82 @@ func TestShards_CreateIterator(t *testing.T) { } } +// Test new reader blocking. +func TestStore_NewReadersBlocked(t *testing.T) { + //t.Parallel() + + test := func(index string) { + t.Helper() + s := MustOpenStore(t, index) + defer s.Close() + + shardInUse := func(shardID uint64) bool { + t.Helper() + require.NoError(t, s.SetShardNewReadersBlocked(shardID, true)) + inUse, err := s.ShardInUse(shardID) + require.NoError(t, err) + require.NoError(t, s.SetShardNewReadersBlocked(shardID, false)) + return inUse + } + + // Create shard #0 with data. + s.MustCreateShardWithData("db0", "rp0", 0, + `cpu,host=serverA value=1 0`, + `cpu,host=serverA value=2 10`, + `cpu,host=serverB value=3 20`, + ) + + // Flush WAL to TSM files. + sh0 := s.Shard(0) + require.NotNil(t, sh0) + sh0.ScheduleFullCompaction() + + // Retrieve shard group. + shards := s.ShardGroup([]uint64{0}) + + m := &influxql.Measurement{Name: "cpu"} + opts := query.IteratorOptions{ + Expr: influxql.MustParseExpr(`value`), + Dimensions: []string{"host"}, + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + } + + // Block new readers, iterator we get will be a faux iterator with no data. + require.NoError(t, s.SetShardNewReadersBlocked(0, true)) + require.False(t, shardInUse(0)) + itr, err := shards.CreateIterator(context.Background(), m, opts) + require.NoError(t, err) + require.False(t, shardInUse(0)) // Remember, itr is a faux iterator. + fitr, ok := itr.(query.FloatIterator) + require.True(t, ok) + p, err := fitr.Next() + require.NoError(t, err) + require.Nil(t, p) + require.NoError(t, itr.Close()) + require.False(t, shardInUse(0)) + require.NoError(t, s.SetShardNewReadersBlocked(0, false)) + + // Create iterator, no blocks present. + require.False(t, shardInUse(0)) + itr, err = shards.CreateIterator(context.Background(), m, opts) + require.NoError(t, err) + require.True(t, shardInUse(0)) + fitr, ok = itr.(query.FloatIterator) + require.True(t, ok) + p, err = fitr.Next() + require.NoError(t, err) + require.NotNil(t, p) + require.NoError(t, itr.Close()) + require.False(t, shardInUse(0)) + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(fmt.Sprintf("TestStore_NewReadersBlocked_%s", index), func(t *testing.T) { test(index) }) + } +} + // Ensure the store can backup a shard and another store can restore it. func TestStore_BackupRestoreShard(t *testing.T) { test := func(t *testing.T, index string) { diff --git a/v1/services/retention/service.go b/v1/services/retention/service.go index 92232091d56..c81825c7b68 100644 --- a/v1/services/retention/service.go +++ b/v1/services/retention/service.go @@ -28,6 +28,9 @@ type Service struct { TSDBStore interface { ShardIDs() []uint64 DeleteShard(shardID uint64) error + + SetShardNewReadersBlocked(shardID uint64, blocked bool) error + ShardInUse(shardID uint64) (bool, error) } // DropShardRef is a function that takes a shard ID and removes the @@ -140,13 +143,13 @@ func (s *Service) run(ctx context.Context) { return case <-ticker.C: - s.DeletionCheck() + s.DeletionCheck(ctx) } } } -func (s *Service) DeletionCheck() { - log, logEnd := logger.NewOperation(context.Background(), s.logger, "Retention policy deletion check", "retention_delete_check") +func (s *Service) DeletionCheck(ctx context.Context) { + log, logEnd := logger.NewOperation(ctx, s.logger, "Retention policy deletion check", "retention_delete_check") defer logEnd() type deletionInfo struct { @@ -163,18 +166,6 @@ func (s *Service) DeletionCheck() { } deletedShardIDs := make(map[uint64]deletionInfo) - dropShardMetaRef := func(id uint64, info deletionInfo) error { - if err := s.DropShardMetaRef(id, info.owners); err != nil { - log.Error("Failed to drop shard meta reference", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp), - zap.Error(err)) - return err - } - return nil - } - // Mark down if an error occurred during this function so we can inform the // user that we will try again on the next interval. // Without the message, they may see the error message and assume they @@ -192,25 +183,26 @@ func (s *Service) DeletionCheck() { // Determine all shards that have expired and need to be deleted. for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { - if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { - log.Info("Failed to delete shard group", - logger.Database(d.Name), - logger.ShardGroup(g.ID), - logger.RetentionPolicy(r.Name), - zap.Error(err)) - retryNeeded = true - continue - } - - log.Info("Deleted shard group", - logger.Database(d.Name), - logger.ShardGroup(g.ID), - logger.RetentionPolicy(r.Name)) - - // Store all the shard IDs that may possibly need to be removed locally. - for _, sh := range g.Shards { - deletedShardIDs[sh.ID] = newDeletionInfo(d.Name, r.Name, sh) - } + func() { + log, logEnd := logger.NewOperation(ctx, log, "Deleting expired shard group", "retention_delete_expired_shard_group", + logger.Database(d.Name), logger.ShardGroup(g.ID), logger.RetentionPolicy(r.Name)) + defer logEnd() + if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { + log.Error("Failed to delete shard group", zap.Error(err)) + retryNeeded = true + return + } + + log.Info("Deleted shard group") + + // Store all the shard IDs that may possibly need to be removed locally. + groupShards := make([]uint64, len(g.Shards)) + for _, sh := range g.Shards { + groupShards = append(groupShards, sh.ID) + deletedShardIDs[sh.ID] = newDeletionInfo(d.Name, r.Name, sh) + } + log.Info("Group's shards will be removed from local storage if found", zap.Uint64s("shards", groupShards)) + }() } } } @@ -219,58 +211,91 @@ func (s *Service) DeletionCheck() { for _, id := range s.TSDBStore.ShardIDs() { if info, ok := deletedShardIDs[id]; ok { delete(deletedShardIDs, id) - log.Info("Attempting deletion of shard from store", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp)) - if err := s.TSDBStore.DeleteShard(id); err != nil { - log.Error("Failed to delete shard", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp), - zap.Error(err)) - if errors.Is(err, tsdb.ErrShardNotFound) { - // At first you wouldn't think this could happen, we're iterating over shards - // in the store. However, if this has been a very long running operation the - // shard could have been dropped from the store while we were working on other shards. - log.Warn("Shard does not exist in store, continuing retention removal", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp)) - } else { - retryNeeded = true - continue + + err := func() (rErr error) { + log, logEnd := logger.NewOperation(ctx, log, "Deleting shard from shard group deleted based on retention policy", "retention_delete_shard", + logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp)) + defer func() { + if rErr != nil { + // Log the error before logEnd(). + log.Error("Error deleting shard", zap.Error(rErr)) + } + logEnd() + }() + + // Block new readers for shard and check if it is in-use before deleting. This is to prevent + // an issue where a shard that is stuck in-use will block the retention service. + if err := s.TSDBStore.SetShardNewReadersBlocked(id, true); err != nil { + return fmt.Errorf("error blocking new readers for shard: %w", err) } - } - log.Info("Deleted shard", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp)) - if err := dropShardMetaRef(id, info); err != nil { - // removeShardMetaReference already logged the error. + defer func() { + if rErr != nil && !errors.Is(rErr, tsdb.ErrShardNotFound) { + log.Info("Unblocking new readers for shard after delete failed") + if unblockErr := s.TSDBStore.SetShardNewReadersBlocked(id, false); unblockErr != nil { + log.Error("Error unblocking new readers for shard", zap.Error(unblockErr)) + } + } + }() + + // We should only try to delete shards that are not in-use. + if inUse, err := s.TSDBStore.ShardInUse(id); err != nil { + return fmt.Errorf("error checking if shard is in-use: %w", err) + } else if inUse { + return errors.New("can not delete an in-use shard") + } + + // Now it's time to delete the shard + if err := s.TSDBStore.DeleteShard(id); err != nil { + return fmt.Errorf("error deleting shard from store: %w", err) + } + log.Info("Deleted shard") + return nil + }() + // Check for error deleting the shard from the TSDB. Continue onto DropShardMetaRef if the + // error was tsdb.ErrShardNotFound. We got here because the shard was in the metadata, + // but it wasn't really in the store, so try deleting it out of the metadata. + if err != nil && !errors.Is(err, tsdb.ErrShardNotFound) { + // Logging of error was handled by the lambda in a defer so that it is within + // the operation instead of after the operation. retryNeeded = true continue } + + func() { + log, logEnd := logger.NewOperation(ctx, log, "Dropping shard meta references", "retention_drop_refs", + logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp), zap.Uint64s("owners", info.owners)) + defer logEnd() + if err := s.DropShardMetaRef(id, info.owners); err != nil { + log.Error("Error dropping shard meta reference", zap.Error(err)) + retryNeeded = true + return + } + }() } } // Check for expired phantom shards that exist in the metadata but not in the store. for id, info := range deletedShardIDs { - log.Error("Expired phantom shard detected during retention check, removing from metadata", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp)) - if err := dropShardMetaRef(id, info); err != nil { - // removeShardMetaReference already logged the error. - retryNeeded = true - continue - } + func() { + log, logEnd := logger.NewOperation(ctx, log, "Drop phantom shard references", "retention_drop_phantom_refs", + logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp), zap.Uint64s("owners", info.owners)) + defer logEnd() + log.Warn("Expired phantom shard detected during retention check, removing from metadata") + if err := s.DropShardMetaRef(id, info.owners); err != nil { + log.Error("Error dropping shard meta reference for phantom shard", zap.Error(err)) + retryNeeded = true + } + }() } - if err := s.MetaClient.PruneShardGroups(); err != nil { - log.Info("Problem pruning shard groups", zap.Error(err)) - retryNeeded = true - } + func() { + log, logEnd := logger.NewOperation(ctx, log, "Pruning shard groups after retention check", "retention_prune_shard_groups") + defer logEnd() + if err := s.MetaClient.PruneShardGroups(); err != nil { + log.Error("Error pruning shard groups", zap.Error(err)) + retryNeeded = true + } + }() if retryNeeded { log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) diff --git a/v1/services/retention/service_test.go b/v1/services/retention/service_test.go index 8ae1816d669..f02940b5b7e 100644 --- a/v1/services/retention/service_test.go +++ b/v1/services/retention/service_test.go @@ -79,7 +79,8 @@ func TestRetention_DeletionCheck(t *testing.T) { shardDuration := time.Hour * 24 * 14 shardGroupDuration := time.Hour * 24 foreverShard := uint64(1003) // a shard that can't be deleted - phantomShard := uint64(1006) + phantomShard := uint64(1006) // a shard that exists in meta data but not TSDB store + activeShard := uint64(1007) // a shard that is active dataUT := &meta.Data{ Users: []meta.UserInfo{}, Databases: []meta.DatabaseInfo{ @@ -160,6 +161,18 @@ func TestRetention_DeletionCheck(t *testing.T) { }, }, }, + // Shard group 7 is deleted and expired, but its shard is in-use and should not be deleted. + { + ID: 7, + StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 2*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + DeletedAt: now.Truncate(time.Hour * 24).Add(-1 * shardDuration).Add(meta.ShardGroupDeletedExpiration), + Shards: []meta.ShardInfo{ + { + ID: activeShard, + }, + }, + }, }, }, }, @@ -211,19 +224,47 @@ func TestRetention_DeletionCheck(t *testing.T) { shardIDs := func() []uint64 { return maps.Keys(shards) } + inUseShards := map[uint64]struct{}{ + activeShard: struct{}{}, + } + newReaderBlocks := make(map[uint64]int) // ShsrdID to number of active blocks + setShardNewReadersBlocked := func(shardID uint64, blocked bool) error { + t.Helper() + require.Contains(t, shards, shardID, "SetShardNewReadersBlocked for non-existant shard %d", shardID) + if blocked { + newReaderBlocks[shardID]++ + } else { + require.Greater(t, newReaderBlocks[shardID], 0) + newReaderBlocks[shardID]-- + } + return nil + } deleteShard := func(shardID uint64) error { + t.Helper() if _, ok := shards[shardID]; !ok { return tsdb.ErrShardNotFound } + require.Greater(t, newReaderBlocks[shardID], 0, "DeleteShard called on shard without a reader block (%d)", shardID) + require.NotContains(t, inUseShards, shardID, "DeleteShard called on an active shard (%d)", shardID) if shardID == foreverShard { return fmt.Errorf("unknown error deleting shard files for shard %d", shardID) } delete(shards, shardID) + delete(newReaderBlocks, shardID) return nil } + shardInUse := func(shardID uint64) (bool, error) { + if _, valid := shards[shardID]; !valid { + return false, tsdb.ErrShardNotFound + } + _, inUse := inUseShards[shardID] + return inUse, nil + } store := &internal.TSDBStoreMock{ - DeleteShardFn: deleteShard, - ShardIDsFn: shardIDs, + DeleteShardFn: deleteShard, + ShardIDsFn: shardIDs, + SetShardNewReadersBlockedFn: setShardNewReadersBlocked, + ShardInUseFn: shardInUse, } s := retention.NewService(cfg) @@ -231,7 +272,14 @@ func TestRetention_DeletionCheck(t *testing.T) { s.TSDBStore = store s.DropShardMetaRef = retention.OSSDropShardMetaRef(s.MetaClient) require.NoError(t, s.Open(context.Background())) - s.DeletionCheck() + deletionCheck := func() { + t.Helper() + s.DeletionCheck(context.Background()) + for k, v := range newReaderBlocks { + require.Zero(t, v, "shard %d has %d active blocks, should be zero", k, v) + } + } + deletionCheck() // Adjust expData to make it look like we expect. require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 1)) @@ -245,14 +293,14 @@ func TestRetention_DeletionCheck(t *testing.T) { // Check that multiple duplicate calls to DeletionCheck don't make further changes. // This is mostly for our friend foreverShard. for i := 0; i < 10; i++ { - s.DeletionCheck() + deletionCheck() require.Equal(t, expData, dataUT) require.Equal(t, collectShards(expData), shards) } // Our heroic support team hos fixed the issue with foreverShard. foreverShard = math.MaxUint64 - s.DeletionCheck() + deletionCheck() require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 3)) require.Equal(t, expData, dataUT) require.Equal(t, collectShards(expData), shards) @@ -404,6 +452,30 @@ func TestService_CheckShards(t *testing.T) { return nil } + shardBlockCount := map[uint64]int{} + s.TSDBStore.SetShardNewReadersBlockedFn = func(shardID uint64, blocked bool) error { + c := shardBlockCount[shardID] + if blocked { + c++ + } else { + c-- + if c < 0 { + return fmt.Errorf("too many unblocks: %d", shardID) + } + } + shardBlockCount[shardID] = c + return nil + } + + s.TSDBStore.ShardInUseFn = func(shardID uint64) (bool, error) { + c := shardBlockCount[shardID] + if c <= 0 { + return false, fmt.Errorf("ShardInUse called on unblocked shard: %d", shardID) + } + // TestService_DeletionCheck has tests for active shards. We're just checking for proper use. + return false, nil + } + if err := s.Open(context.Background()); err != nil { t.Fatalf("unexpected open error: %s", err) } @@ -620,6 +692,16 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) return nil } + s.TSDBStore.SetShardNewReadersBlockedFn = func(shardID uint64, blocked bool) error { + // This test does not simulate active / in-use shards. This can just be a stub. + return nil + } + + s.TSDBStore.ShardInUseFn = func(shardID uint64) (bool, error) { + // This does not simulate active / in-use shards. This can just be a stub. + return false, nil + } + return s, errC, done }