Skip to content

Commit

Permalink
fix: prevent retention service from hanging (#25121)
Browse files Browse the repository at this point in the history
Fix issue that can cause the retention service to hang waiting on a
`Shard.Close` call. When this occurs, no other shards will be deleted
by the retention service. This is usually noticed as an increase in
disk usage because old shards are not cleaned up.

The fix adds to new methods to `Store`, `SetShardNewReadersBlocked`
and `InUse`. `InUse` can be used to poll if a shard has active readers,
which the retention service uses to skip over in-use shards to prevent
the service from hanging. `SetShardNewReadersBlocked` determines if
new read access may be granted to a shard. This is required to prevent
race conditions around the use of `InUse` and the deletion of shards.

If the retention service skips over a shard because it is in-use, the
shard will be checked again the next time the retention service is run.
It can be deleted on subsequent checks if it is no longer in-use. If
the shards is stuck in-use, the retention service will not be able to
delete the shards, which can be observed in the logs for manual
intervention. Other shards can still be deleted by the retention service
even if a shard is stuck with readers.

This is a port of ad68ec8 from master-1.x to main-2.x.

closes: #25118
(cherry picked from commit b4bd607)
(cherry picked from commit cb8cfe3)
  • Loading branch information
gwossum authored Jul 1, 2024
1 parent 0b7cd24 commit e9e0f74
Show file tree
Hide file tree
Showing 13 changed files with 694 additions and 121 deletions.
72 changes: 40 additions & 32 deletions internal/tsdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,40 @@ import (

// TSDBStoreMock is a mockable implementation of tsdb.Store.
type TSDBStoreMock struct {
BackupShardFn func(id uint64, since time.Time, w io.Writer) error
BackupSeriesFileFn func(database string, w io.Writer) error
ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error
CloseFn func() error
CreateShardFn func(database, policy string, shardID uint64, enabled bool) error
CreateShardSnapshotFn func(id uint64) (string, error)
DatabasesFn func() []string
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(ctx context.Context, database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteSeriesFn func(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShardFn func(id uint64) error
DiskSizeFn func() (int64, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
ImportShardFn func(id uint64, r io.Reader) error
MeasurementsCardinalityFn func(database string) (int64, error)
MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
OpenFn func() error
PathFn func() string
RestoreShardFn func(id uint64, r io.Reader) error
SeriesCardinalityFn func(database string) (int64, error)
SetShardEnabledFn func(shardID uint64, enabled bool) error
ShardFn func(id uint64) *tsdb.Shard
ShardGroupFn func(ids []uint64) tsdb.ShardGroup
ShardIDsFn func() []uint64
ShardNFn func() int
ShardRelativePathFn func(id uint64) (string, error)
ShardsFn func(ids []uint64) []*tsdb.Shard
TagKeysFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
WithLoggerFn func(log *zap.Logger)
WriteToShardFn func(shardID uint64, points []models.Point) error
BackupShardFn func(id uint64, since time.Time, w io.Writer) error
BackupSeriesFileFn func(database string, w io.Writer) error
ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error
CloseFn func() error
CreateShardFn func(database, policy string, shardID uint64, enabled bool) error
CreateShardSnapshotFn func(id uint64) (string, error)
DatabasesFn func() []string
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(ctx context.Context, database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteSeriesFn func(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShardFn func(id uint64) error
DiskSizeFn func() (int64, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
ImportShardFn func(id uint64, r io.Reader) error
MeasurementsCardinalityFn func(database string) (int64, error)
MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
OpenFn func() error
PathFn func() string
RestoreShardFn func(id uint64, r io.Reader) error
SeriesCardinalityFn func(database string) (int64, error)
SetShardEnabledFn func(shardID uint64, enabled bool) error
SetShardNewReadersBlockedFn func(shardID uint64, blocked bool) error
ShardFn func(id uint64) *tsdb.Shard
ShardGroupFn func(ids []uint64) tsdb.ShardGroup
ShardIDsFn func() []uint64
ShardInUseFn func(shardID uint64) (bool, error)
ShardNFn func() int
ShardRelativePathFn func(id uint64) (string, error)
ShardsFn func(ids []uint64) []*tsdb.Shard
TagKeysFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
WithLoggerFn func(log *zap.Logger)
WriteToShardFn func(shardID uint64, points []models.Point) error
}

func (s *TSDBStoreMock) BackupShard(id uint64, since time.Time, w io.Writer) error {
Expand Down Expand Up @@ -112,6 +114,9 @@ func (s *TSDBStoreMock) SeriesCardinality(database string) (int64, error) {
func (s *TSDBStoreMock) SetShardEnabled(shardID uint64, enabled bool) error {
return s.SetShardEnabledFn(shardID, enabled)
}
func (s *TSDBStoreMock) SetShardNewReadersBlocked(shardID uint64, blocked bool) error {
return s.SetShardNewReadersBlockedFn(shardID, blocked)
}
func (s *TSDBStoreMock) Shard(id uint64) *tsdb.Shard {
return s.ShardFn(id)
}
Expand All @@ -121,6 +126,9 @@ func (s *TSDBStoreMock) ShardGroup(ids []uint64) tsdb.ShardGroup {
func (s *TSDBStoreMock) ShardIDs() []uint64 {
return s.ShardIDsFn()
}
func (s *TSDBStoreMock) ShardInUse(shardID uint64) (bool, error) {
return s.ShardInUseFn(shardID)
}
func (s *TSDBStoreMock) ShardN() int {
return s.ShardNFn()
}
Expand Down
3 changes: 3 additions & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type Engine interface {
SetCompactionsEnabled(enabled bool)
ScheduleFullCompaction() error

SetNewReadersBlocked(blocked bool) error
InUse() (bool, error)

WithLogger(*zap.Logger)

LoadMetadataIndex(shardID uint64, index Index) error
Expand Down
7 changes: 5 additions & 2 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ type Compactor struct {

FileStore interface {
NextGeneration() int
TSMReader(path string) *TSMReader
TSMReader(path string) (*TSMReader, error)
}

// RateLimit is the limit for disk writes for all concurrent compactions.
Expand Down Expand Up @@ -943,7 +943,10 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([
default:
}

tr := c.FileStore.TSMReader(file)
tr, err := c.FileStore.TSMReader(file)
if err != nil {
return nil, errCompactionAborted{fmt.Errorf("error creating reader for %q: %w", file, err)}
}
if tr == nil {
// This would be a bug if this occurred as tsmFiles passed in should only be
// assigned to one compaction at any one time. A nil tr would mean the file
Expand Down
4 changes: 2 additions & 2 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3074,11 +3074,11 @@ func (w *fakeFileStore) BlockCount(path string, idx int) int {
return w.blockCount
}

func (w *fakeFileStore) TSMReader(path string) *tsm1.TSMReader {
func (w *fakeFileStore) TSMReader(path string) (*tsm1.TSMReader, error) {
r := MustOpenTSMReader(path)
w.readers = append(w.readers, r)
r.Ref()
return r
return r, nil
}

func (w *fakeFileStore) Close() {
Expand Down
18 changes: 18 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,24 @@ func (e *Engine) disableSnapshotCompactions() {
}
}

// SetNewReadersBlocked sets if new readers can access the shard. If blocked
// is true, the number of reader blocks is incremented and new readers will
// receive an error instead of shard access. If blocked is false, the number
// of reader blocks is decremented. If the reader blocks drops to 0, then
// new readers will be granted access to the shard.
func (e *Engine) SetNewReadersBlocked(blocked bool) error {
e.mu.Lock()
defer e.mu.Unlock()
return e.FileStore.SetNewReadersBlocked(blocked)
}

// InUse returns true if the shard is in-use by readers.
func (e *Engine) InUse() (bool, error) {
e.mu.RLock()
defer e.mu.RUnlock()
return e.FileStore.InUse()
}

// ScheduleFullCompaction will force the engine to fully compact all data stored.
// This will cancel and running compactions and snapshot any data in the cache to
// TSM files. This is an expensive operation.
Expand Down
87 changes: 85 additions & 2 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
BadTSMFileExtension = "bad"
)

var (
ErrNewReadersBlocked = errors.New("new readers blocked")
)

// TSMFile represents an on-disk TSM file.
type TSMFile interface {
// Path returns the underlying file path for the TSMFile. If the file
Expand Down Expand Up @@ -190,6 +194,10 @@ type FileStore struct {
obs tsdb.FileStoreObserver

copyFiles bool

// newReaderBlockCount keeps track of the current new reader block requests.
// If non-zero, no new TSMReader objects may be created.
newReaderBlockCount int
}

// FileStat holds information about a TSM file on disk.
Expand Down Expand Up @@ -391,6 +399,10 @@ func (f *FileStore) WalkKeys(seek []byte, fn func(key []byte, typ byte) error) e
f.mu.RUnlock()
return nil
}
if f.newReadersBlocked() {
f.mu.RUnlock()
return fmt.Errorf("WalkKeys: %q: %w", f.dir, ErrNewReadersBlocked)
}

// Ensure files are not unmapped while we're iterating over them.
for _, r := range f.files {
Expand Down Expand Up @@ -449,6 +461,10 @@ func (f *FileStore) Apply(ctx context.Context, fn func(r TSMFile) error) error {
limiter := limiter.NewFixed(runtime.GOMAXPROCS(0))

f.mu.RLock()
if f.newReadersBlocked() {
f.mu.RUnlock()
return fmt.Errorf("Apply: %q: %w", f.dir, ErrNewReadersBlocked)
}
errC := make(chan error, len(f.files))

for _, f := range f.files {
Expand Down Expand Up @@ -725,22 +741,85 @@ func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost {
// Reader returns a TSMReader for path if one is currently managed by the FileStore.
// Otherwise it returns nil. If it returns a file, you must call Unref on it when
// you are done, and never use it after that.
func (f *FileStore) TSMReader(path string) *TSMReader {
func (f *FileStore) TSMReader(path string) (*TSMReader, error) {
f.mu.RLock()
defer f.mu.RUnlock()
if f.newReadersBlocked() {
return nil, fmt.Errorf("TSMReader: %q (%q): %w", f.dir, path, ErrNewReadersBlocked)
}
for _, r := range f.files {
if r.Path() == path {
r.Ref()
return r.(*TSMReader)
return r.(*TSMReader), nil
}
}
return nil, nil
}

// SetNewReadersBlocked sets if new readers can access the files in this FileStore.
// If blocked is true, the number of reader blocks is incremented and new readers will
// receive an error instead of reader access. If blocked is false, the number
// of reader blocks is decremented. If the reader blocks drops to 0, then
// new readers will be granted access to the files.
func (f *FileStore) SetNewReadersBlocked(block bool) error {
f.mu.Lock()
defer f.mu.Unlock()
if block {
if f.newReaderBlockCount < 0 {
return fmt.Errorf("newReaderBlockCount for %q was %d before block operation, block failed", f.dir, f.newReaderBlockCount)
}
f.newReaderBlockCount++
} else {
if f.newReaderBlockCount <= 0 {
return fmt.Errorf("newReadersBlockCount for %q was %d before unblock operation, unblock failed", f.dir, f.newReaderBlockCount)
}
f.newReaderBlockCount--
}
return nil
}

// newReadersBlocked returns true if new references to TSMReader objects are not allowed.
// Must be called with f.mu lock held (reader or writer).
// See SetNewReadersBlocked for interface to allow and block access to TSMReader objects.
func (f *FileStore) newReadersBlocked() bool {
return f.newReaderBlockCount > 0
}

// InUse returns true if any files in this FileStore are in-use.
// InUse can only be called if a new readers have been blocked using SetNewReadersBlocked.
// This is to avoid a race condition between calling InUse and attempting an operation
// that requires no active readers. Calling InUse without a new readers block results
// in an error.
func (f *FileStore) InUse() (bool, error) {
f.mu.RLock()
defer f.mu.RUnlock()
if !f.newReadersBlocked() {
return false, fmt.Errorf("InUse called without a new reader block for %q", f.dir)
}
for _, r := range f.files {
if r.InUse() {
return true, nil
}
}
return false, nil
}

// KeyCursor returns a KeyCursor for key and t across the files in the FileStore.
func (f *FileStore) KeyCursor(ctx context.Context, key []byte, t int64, ascending bool) *KeyCursor {
f.mu.RLock()
defer f.mu.RUnlock()
if f.newReadersBlocked() {
// New readers are blocked for this FileStore because there is a delete attempt in progress.
// Return an empty cursor to appease the callers since they generally don't handle
// a nil KeyCursor gracefully.
return &KeyCursor{
key: key,
seeks: nil,
ctx: ctx,
col: metrics.GroupFromContext(ctx),
ascending: ascending,
}
}
return newKeyCursor(ctx, f, key, t, ascending)
}

Expand Down Expand Up @@ -1216,6 +1295,10 @@ func (f *FileStore) CreateSnapshot() (string, error) {
f.traceLogger.Info("Creating snapshot", zap.String("dir", f.dir))

f.mu.Lock()
if f.newReadersBlocked() {
f.mu.Unlock()
return "", fmt.Errorf("CreateSnapshot: %q: %w", f.dir, ErrNewReadersBlocked)
}
// create a copy of the files slice and ensure they aren't closed out from
// under us, nor the slice mutated.
files := make([]TSMFile, len(f.files))
Expand Down
Loading

0 comments on commit e9e0f74

Please sign in to comment.