From a01523817c17b0a6ca65f842d91a98ce78a05979 Mon Sep 17 00:00:00 2001 From: dakimura <34202807+dakimura@users.noreply.github.com> Date: Sun, 19 Sep 2021 11:15:16 +0900 Subject: [PATCH] feat(wal): move wal file to a temporary file when WAL replay failed (#508) * feat(wal): move corrupted wal files to a temporary file --- contrib/xignitefeeder/feed/interval.go | 5 +- executor/errors.go | 14 ---- executor/instance.go | 23 ++++-- executor/wal.go | 98 +++----------------------- executor/wal/errors.go | 17 ++++- executor/wal/find.go | 44 ++++++++++++ executor/wal/move.go | 17 +++++ executor/wal_test.go | 4 +- executor/walclean.go | 75 ++++++++++++++++++++ executor/walreplay.go | 26 ++++--- 10 files changed, 201 insertions(+), 122 deletions(-) create mode 100644 executor/wal/find.go create mode 100644 executor/wal/move.go create mode 100644 executor/walclean.go diff --git a/contrib/xignitefeeder/feed/interval.go b/contrib/xignitefeeder/feed/interval.go index 7e41cda7..03d65af2 100644 --- a/contrib/xignitefeeder/feed/interval.go +++ b/contrib/xignitefeeder/feed/interval.go @@ -33,8 +33,11 @@ func (c *IntervalMarketTimeChecker) IsOpen(t time.Time) bool { func (c *IntervalMarketTimeChecker) intervalElapsed(t time.Time) bool { elapsed := t.Sub(c.LastTime) >= c.Interval if elapsed { + // log if this is not the first time + if !c.LastTime.IsZero() { + log.Debug("[Xignite Feeder] interval elapsed since last time: " + t.String()) + } c.LastTime = t - log.Debug("[Xignite Feeder] interval elapsed since last time: " + t.String()) } return elapsed } diff --git a/executor/errors.go b/executor/errors.go index 023a44cd..f62a9e66 100644 --- a/executor/errors.go +++ b/executor/errors.go @@ -2,8 +2,6 @@ package executor import ( "fmt" - "strconv" - "github.com/alpacahq/marketstore/v4/utils/io" "github.com/alpacahq/marketstore/v4/utils/log" ) @@ -63,18 +61,6 @@ func (msg WALWriteError) Error() string { return errReport("%s: Error Writing to WAL", string(msg)) } -// WALReplayError is used when the WALfile Replay process fails. -// If skipReplay:true, it will attempt to give up the Replay process, -// move the walfile to a temporary file, and continue with other marketstore processing. -type WALReplayError struct { - msg string - skipReplay bool -} - -func (e WALReplayError) Error() string { - return errReport("%s: Error Replaying WAL. skipReplay="+strconv.FormatBool(e.skipReplay), e.msg) -} - func errReport(base string, msg string) string { base = io.GetCallerFileContext(2) + ":" + base log.Error(base, msg) diff --git a/executor/instance.go b/executor/instance.go index 40e1d50a..aca5d429 100644 --- a/executor/instance.go +++ b/executor/instance.go @@ -2,11 +2,14 @@ package executor import ( "fmt" + "io/ioutil" "os" "path/filepath" "sync" "time" + "github.com/alpacahq/marketstore/v4/executor/wal" + "github.com/pkg/errors" "github.com/alpacahq/marketstore/v4/catalog" @@ -91,16 +94,28 @@ func NewInstanceSetup(relRootDir string, rs ReplicationSender, tm []*trigger.Tri WALBypass, &shutdownPend, walWG, tpd, txnPipe, ) if err != nil { - log.Error("Unable to create WAL. err="+ err.Error()) + log.Error("Unable to create WAL. err=" + err.Error()) return nil, nil, nil, fmt.Errorf("unable to create WAL: %w", err) } // Allocate a new WALFile and cache if !WALBypass { - err = ThisInstance.WALFile.cleanupOldWALFiles(rootDir) + //ignoreFile := filepath.Base(ThisInstance.WALFile.FilePtr.Name()) + ignoreFile := ThisInstance.WALFile.FilePtr.Name() + myInstanceID := ThisInstance.WALFile.OwningInstanceID + + finder := wal.NewFinder(ioutil.ReadDir) + walFileAbsPaths, err := finder.Find(filepath.Clean(rootDir)) + if err != nil { + walFileAbsPaths = []string{} + log.Error("failed to find wal files under %s: %w", filepath.Clean(rootDir), err) + } + + c := NewWALCleaner(ignoreFile, myInstanceID) + err = c.CleanupOldWALFiles(walFileAbsPaths) + // err = ThisInstance.WALFile.cleanupOldWALFiles(rootDir) if err != nil { - // TODO: error handling to move walfile to a temporary file and create a new one when walfile is corrupted - log.Fatal("Unable to startup Cache and WAL") + log.Fatal("Unable to startup Cache and WAL:" + err.Error()) } } if backgroundSync { diff --git a/executor/wal.go b/executor/wal.go index 0851a393..4aba6f9d 100644 --- a/executor/wal.go +++ b/executor/wal.go @@ -10,7 +10,6 @@ import ( "time" "bytes" - "io/ioutil" "path/filepath" "github.com/alpacahq/marketstore/v4/executor/buffile" @@ -80,10 +79,10 @@ func NewWALFile(rootDir string, owningInstanceID int64, rs ReplicationSender, } // TakeOverWALFile opens an existing wal file and returns WALFileType for it. -func TakeOverWALFile(rootDir, fileName string) (wf *WALFileType, err error) { +func TakeOverWALFile(filePath string) (wf *WALFileType, err error) { wf = new(WALFileType) wf.lastCommittedTGID = 0 - filePath := filepath.Join(rootDir, fileName) + //filePath := filepath.Join(rootDir, fileName) err = wf.open(filePath) if err != nil { @@ -131,21 +130,19 @@ func (wf *WALFileType) close(ReplayStatus wal.ReplayStateEnum) { wf.FilePtr.Close() } func (wf *WALFileType) Delete(callersInstanceID int64) (err error) { - canDeleteSafely, err := wf.canDeleteSafely(callersInstanceID) + err = wf.syncStatusRead() if err != nil { - return fmt.Errorf("check if wal file can be deleted safely. WALfile=%s: %w", wf.FilePtr.Name(), err) - } - if !canDeleteSafely { - return errors.New("BUG: cannot delete the current instance's WALfile:" + wf.FilePtr.Name()) + return fmt.Errorf("cannot delete wal because failed to read wal sync status. callersInstanceID=%d:%w", + callersInstanceID, err) } if !wf.IsOpen() { log.Warn(io.GetCallerFileContext(0) + ": Can not delete open WALFile") - return fmt.Errorf("WAL File is open") + return errors.New("WAL File is open") } if wf.isActive(callersInstanceID) { log.Warn(io.GetCallerFileContext(0) + ": Can not delete active WALFile") - return fmt.Errorf("WAL File is active") + return errors.New("WAL File is active") } needsReplay, err := wf.NeedsReplay() @@ -154,7 +151,7 @@ func (wf *WALFileType) Delete(callersInstanceID int64) (err error) { } if needsReplay { log.Warn(io.GetCallerFileContext(0) + ": WALFile needs replay, can not delete") - return fmt.Errorf("WAL File needs replay") + return errors.New("WAL File needs replay, can not delete") } wf.close(wal.REPLAYED) @@ -541,7 +538,7 @@ func (wf *WALFileType) syncStatusRead() error { } func readStatus(filePtr *os.File) (fileStatus wal.FileStatusEnum, replayStatus wal.ReplayStateEnum, owningInstanceID int64) { - // Read from beginning of file +1 to skip over the MID + // Read from beginning of file +1 to cont over the MID filePtr.Seek(1, goio.SeekStart) var err error fileStatus, replayStatus, owningInstanceID, err = wal.ReadStatus(filePtr) @@ -585,90 +582,13 @@ func (wf *WALFileType) CanWrite(msg string, callersInstanceID int64) (bool, erro } return true, nil } -func (wf *WALFileType) canDeleteSafely(callersInstanceID int64) (bool, error) { - err := wf.syncStatusRead() - if err != nil { - return false, fmt.Errorf("failed to read wal sync status. callersInstanceID=%d:%w", - callersInstanceID, err) - } - - if wf.isActive(callersInstanceID) { - log.Warn(io.GetCallerFileContext(0) + ": WALFile is active, can not delete") - return false, nil - } - needsReplay, err := wf.NeedsReplay() - if err != nil { - return false, fmt.Errorf("failed to check if wal needs replay: %w", err) - } - if needsReplay { - log.Warn(io.GetCallerFileContext(0) + ": WALFile needs replay, can not delete") - return false, nil - } - return true, nil -} func sanityCheckValue(fp *os.File, value int64) (isSane bool) { // As a sanity check, get the file size to ensure that TGLen is reasonable prior to buffer allocations fstat, _ := fp.Stat() sanityLen := 1000 * fstat.Size() return value < sanityLen } -func (wf *WALFileType) cleanupOldWALFiles(rootDir string) error { - rootDir = filepath.Clean(rootDir) - files, err := ioutil.ReadDir(rootDir) - if err != nil { - return fmt.Errorf("unable to read root directory %s: %w", rootDir, err) - } - myFileBase := filepath.Base(wf.FilePtr.Name()) - log.Info("My WALFILE: %s", myFileBase) - for _, file := range files { - // ignore directories - if file.IsDir() { - // ignore - continue - } - - // ignore files except wal - filename := file.Name() - if filepath.Ext(filename) != ".walfile" { - continue - } - - // ignore the newest wal file - if filename == myFileBase { - continue - } - - log.Info("Found a WALFILE: %s, entering replay...", filename) - filePath := filepath.Join(rootDir, filename) - fi, err := os.Stat(filePath) - if err != nil { - log.Error("failed to get fileStat of " + filePath) - continue - } - if fi.Size() < 11 { - log.Info("WALFILE: %s is empty, removing it...", filename) - err = os.Remove(filePath) - if err != nil { - log.Error("failed to remove an empty WALfile", filename) - } - continue - } - - w, err := TakeOverWALFile(rootDir, filename) - if err != nil { - return fmt.Errorf("opening %s: %w", filename, err) - } - if err = w.Replay(false); err != nil { - return fmt.Errorf("unable to replay %s: %w", filename, err) - } - - if err = w.Delete(wf.OwningInstanceID); err != nil { - return err - } - } - return nil -} var haveWALWriter = false diff --git a/executor/wal/errors.go b/executor/wal/errors.go index ecb14c72..c7fd15da 100644 --- a/executor/wal/errors.go +++ b/executor/wal/errors.go @@ -2,6 +2,7 @@ package wal import ( "fmt" + "strconv" "github.com/alpacahq/marketstore/v4/utils/io" "github.com/alpacahq/marketstore/v4/utils/log" @@ -10,11 +11,23 @@ import ( type ShortReadError string func (msg ShortReadError) Error() string { - return errReport("%s: Unexpectedly short read", string(msg)) + return errReport("Unexpectedly short read:%s", string(msg)) +} + +// ReplayError is used when the WALfile Replay process fails. +// If Cont:true, it will give up the Replay process, +// move the walfile to a temporary file, and continue with other marketstore processing. +type ReplayError struct { + Msg string + Cont bool +} + +func (e ReplayError) Error() string { + return errReport("Error Replaying WAL. Cont="+strconv.FormatBool(e.Cont)+":%s", e.Msg) } func errReport(base string, msg string) string { base = io.GetCallerFileContext(2) + ":" + base - log.Error(base, msg) + log.Warn(base, msg) return fmt.Sprintf(base, msg) } diff --git a/executor/wal/find.go b/executor/wal/find.go new file mode 100644 index 00000000..ed60f79e --- /dev/null +++ b/executor/wal/find.go @@ -0,0 +1,44 @@ +package wal + +import ( + "fmt" + "io/fs" + "path/filepath" + + "github.com/alpacahq/marketstore/v4/utils/log" +) + +type Finder struct { + dirRead func(dir string) ([]fs.FileInfo, error) +} + +func NewFinder(dirRead func(dir string) ([]fs.FileInfo, error)) *Finder { + return &Finder{dirRead: dirRead} +} + +// Find returns all absolute paths to "*.walfile" files directly under the directory. +func (f *Finder) Find(dir string) ([]string, error) { + var ret []string + //files, err := ioutil.ReadDir(rootDir) + files, err := f.dirRead(dir) + if err != nil { + return nil, fmt.Errorf("unable to read the directory %s: %w", dir, err) + } + for _, file := range files { + // ignore directories + if file.IsDir() { + // ignore + continue + } + + // ignore files except wal + filename := file.Name() + if filepath.Ext(filename) != ".walfile" { + continue + } + + log.Debug("found a WALFILE: %s", filename) + ret = append(ret, filepath.Join(dir, filename)) + } + return ret, nil +} diff --git a/executor/wal/move.go b/executor/wal/move.go new file mode 100644 index 00000000..f6a78f78 --- /dev/null +++ b/executor/wal/move.go @@ -0,0 +1,17 @@ +package wal + +import ( + "fmt" + "os" + + "github.com/alpacahq/marketstore/v4/utils/log" +) + +func Move(oldFP, newFP string) error { + err := os.Rename(oldFP, newFP) + if err != nil { + return fmt.Errorf("failed to move %s to %s:%w", oldFP, newFP, err) + } + log.Debug(fmt.Sprintf("moved %s to %s", oldFP, newFP)) + return nil +} diff --git a/executor/wal_test.go b/executor/wal_test.go index 9118b0a3..fb1addf0 100644 --- a/executor/wal_test.go +++ b/executor/wal_test.go @@ -144,7 +144,7 @@ func TestBrokenWAL(t *testing.T) { fp.Close() // Take over the broken WALFile and replay it - WALFile, err := executor.TakeOverWALFile(rootDir, BrokenWALFileName) + WALFile, err := executor.TakeOverWALFile(filepath.Join(rootDir, BrokenWALFileName)) assert.Nil(t, err) newTGC := executor.NewTransactionPipe() assert.NotNil(t, newTGC) @@ -233,7 +233,7 @@ func TestWALReplay(t *testing.T) { io.Syncfs() // Take over the new WALFile and replay it into a new TG cache - WALFile, err := executor.TakeOverWALFile(rootDir, newWALFileName) + WALFile, err := executor.TakeOverWALFile(filepath.Join(rootDir, newWALFileName)) assert.Nil(t, err) data, _ := ioutil.ReadFile(newWALFilePath) ioutil.WriteFile("/tmp/wal", data, 0644) diff --git a/executor/walclean.go b/executor/walclean.go new file mode 100644 index 00000000..fd7822dc --- /dev/null +++ b/executor/walclean.go @@ -0,0 +1,75 @@ +package executor + +import ( + "errors" + "fmt" + "github.com/alpacahq/marketstore/v4/executor/wal" + "os" + + "github.com/alpacahq/marketstore/v4/utils/log" +) + +type WALCleaner struct { + ignoreFile string + myInstanceID int64 +} + +func NewWALCleaner(ignoreFile string, myInstanceID int64) *WALCleaner { + return &WALCleaner{ + ignoreFile: ignoreFile, + myInstanceID: myInstanceID, + } +} + +func (c *WALCleaner) CleanupOldWALFiles(walfileAbsPaths []string) error { + for _, fp := range walfileAbsPaths { + if fp == c.ignoreFile { + continue + } + + log.Info("Found a WALFILE: %s, entering replay...", fp) + fi, err := os.Stat(fp) + if err != nil { + log.Error("failed to get fileStat of " + fp) + continue + } + if fi.Size() < 11 { + log.Info("WALFILE: %s is empty, removing it...", fp) + err = os.Remove(fp) + if err != nil { + log.Error("failed to remove an empty WALfile", fp) + } + continue + } + + w, err := TakeOverWALFile(fp) + if err != nil { + return fmt.Errorf("opening %s: %w", fp, err) + } + if err = w.Replay(false); err != nil { + // --- move walfile to a temporary file and skip replay to continue other marketstore process + var walReplayErr wal.ReplayError + if !errors.As(err, &walReplayErr) { + return fmt.Errorf("unable to replay %s: %w", fp, err) + } + if walReplayErr.Cont { + tmpFP := fp + ".tmp" + if err := wal.Move(fp, tmpFP); err != nil { + return fmt.Errorf("failed to move old wal file %s to a tmp file:%w", fp, err) + } + log.Info(fmt.Sprintf("Unable to replay. moved an old WAL file %s to a temporary file %s", + fp, tmpFP)) + } + + continue + } + + // delete if replay succeeds + //if err = w.Delete(wf.OwningInstanceID); err != nil { + if err = w.Delete(c.myInstanceID); err != nil { + return fmt.Errorf("failed to delete wal file after replay:%w", err) + } + + } + return nil +} diff --git a/executor/walreplay.go b/executor/walreplay.go index d0d022fa..1345610c 100644 --- a/executor/walreplay.go +++ b/executor/walreplay.go @@ -2,12 +2,13 @@ package executor import ( "fmt" - "github.com/alpacahq/marketstore/v4/executor/wal" - "github.com/alpacahq/marketstore/v4/utils/io" - "github.com/alpacahq/marketstore/v4/utils/log" goio "io" "path/filepath" "sort" + + "github.com/alpacahq/marketstore/v4/executor/wal" + "github.com/alpacahq/marketstore/v4/utils/io" + "github.com/alpacahq/marketstore/v4/utils/log" ) // Replay loads this WAL File's unwritten transactions to primary store and mark it completely processed. @@ -29,9 +30,9 @@ func (wf *WALFileType) Replay(dryRun bool) error { } if !needsReplay { log.Info("No WAL Replay needed.") - return WALReplayError{ - msg: "WALFileType.NeedsReplay No Replay Needed", - skipReplay: true, + return wal.ReplayError{ + Msg: "WALFileType.NeedsReplay No Replay Needed", + Cont: true, } } @@ -77,9 +78,9 @@ func (wf *WALFileType) Replay(dryRun bool) error { // give up Replay if there is already a TG data location in this WAL if _, ok := offsetTGDataInWAL[tgID]; ok { log.Error(io.GetCallerFileContext(0) + ": Duplicate TG Data in WAL") - return WALReplayError{ - msg: fmt.Sprintf("Duplicate TG Data in WAL. tgID=%d", tgID), - skipReplay: true, + return wal.ReplayError{ + Msg: fmt.Sprintf("Duplicate TG Data in WAL. tgID=%d", tgID), + Cont: true, } } // log.Info("Successfully read past TG data for tgID: %v", tgID) @@ -177,7 +178,12 @@ func (wf *WALFileType) replayTGData(tgID int64, wtSets []wal.WTSet) (err error) for _, wtSet := range wtSets { fp, err := cfp.GetFP(wtSet.FilePath) if err != nil { - return err + return wal.ReplayError{ + Msg: fmt.Sprintf("failed to open a filepath %s in write transaction set:%v", + wtSet.FilePath, err.Error(), + ), + Cont: true, + } } switch wtSet.RecordType { case io.FIXED: