Skip to content

Commit

Permalink
feat(wal): move wal file to a temporary file when WAL replay failed (#…
Browse files Browse the repository at this point in the history
…508)

* feat(wal): move corrupted wal files to a temporary file
  • Loading branch information
dakimura authored Sep 19, 2021
1 parent 8d561eb commit a015238
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 122 deletions.
5 changes: 4 additions & 1 deletion contrib/xignitefeeder/feed/interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ func (c *IntervalMarketTimeChecker) IsOpen(t time.Time) bool {
func (c *IntervalMarketTimeChecker) intervalElapsed(t time.Time) bool {
elapsed := t.Sub(c.LastTime) >= c.Interval
if elapsed {
// log if this is not the first time
if !c.LastTime.IsZero() {
log.Debug("[Xignite Feeder] interval elapsed since last time: " + t.String())
}
c.LastTime = t
log.Debug("[Xignite Feeder] interval elapsed since last time: " + t.String())
}
return elapsed
}
14 changes: 0 additions & 14 deletions executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package executor

import (
"fmt"
"strconv"

"github.com/alpacahq/marketstore/v4/utils/io"
"github.com/alpacahq/marketstore/v4/utils/log"
)
Expand Down Expand Up @@ -63,18 +61,6 @@ func (msg WALWriteError) Error() string {
return errReport("%s: Error Writing to WAL", string(msg))
}

// WALReplayError is used when the WALfile Replay process fails.
// If skipReplay:true, it will attempt to give up the Replay process,
// move the walfile to a temporary file, and continue with other marketstore processing.
type WALReplayError struct {
msg string
skipReplay bool
}

func (e WALReplayError) Error() string {
return errReport("%s: Error Replaying WAL. skipReplay="+strconv.FormatBool(e.skipReplay), e.msg)
}

func errReport(base string, msg string) string {
base = io.GetCallerFileContext(2) + ":" + base
log.Error(base, msg)
Expand Down
23 changes: 19 additions & 4 deletions executor/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package executor

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"

"github.com/alpacahq/marketstore/v4/executor/wal"

"github.com/pkg/errors"

"github.com/alpacahq/marketstore/v4/catalog"
Expand Down Expand Up @@ -91,16 +94,28 @@ func NewInstanceSetup(relRootDir string, rs ReplicationSender, tm []*trigger.Tri
WALBypass, &shutdownPend, walWG, tpd, txnPipe,
)
if err != nil {
log.Error("Unable to create WAL. err="+ err.Error())
log.Error("Unable to create WAL. err=" + err.Error())
return nil, nil, nil, fmt.Errorf("unable to create WAL: %w", err)
}

// Allocate a new WALFile and cache
if !WALBypass {
err = ThisInstance.WALFile.cleanupOldWALFiles(rootDir)
//ignoreFile := filepath.Base(ThisInstance.WALFile.FilePtr.Name())
ignoreFile := ThisInstance.WALFile.FilePtr.Name()
myInstanceID := ThisInstance.WALFile.OwningInstanceID

finder := wal.NewFinder(ioutil.ReadDir)
walFileAbsPaths, err := finder.Find(filepath.Clean(rootDir))
if err != nil {
walFileAbsPaths = []string{}
log.Error("failed to find wal files under %s: %w", filepath.Clean(rootDir), err)
}

c := NewWALCleaner(ignoreFile, myInstanceID)
err = c.CleanupOldWALFiles(walFileAbsPaths)
// err = ThisInstance.WALFile.cleanupOldWALFiles(rootDir)
if err != nil {
// TODO: error handling to move walfile to a temporary file and create a new one when walfile is corrupted
log.Fatal("Unable to startup Cache and WAL")
log.Fatal("Unable to startup Cache and WAL:" + err.Error())
}
}
if backgroundSync {
Expand Down
98 changes: 9 additions & 89 deletions executor/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"bytes"
"io/ioutil"
"path/filepath"

"github.com/alpacahq/marketstore/v4/executor/buffile"
Expand Down Expand Up @@ -80,10 +79,10 @@ func NewWALFile(rootDir string, owningInstanceID int64, rs ReplicationSender,
}

// TakeOverWALFile opens an existing wal file and returns WALFileType for it.
func TakeOverWALFile(rootDir, fileName string) (wf *WALFileType, err error) {
func TakeOverWALFile(filePath string) (wf *WALFileType, err error) {
wf = new(WALFileType)
wf.lastCommittedTGID = 0
filePath := filepath.Join(rootDir, fileName)
//filePath := filepath.Join(rootDir, fileName)

err = wf.open(filePath)
if err != nil {
Expand Down Expand Up @@ -131,21 +130,19 @@ func (wf *WALFileType) close(ReplayStatus wal.ReplayStateEnum) {
wf.FilePtr.Close()
}
func (wf *WALFileType) Delete(callersInstanceID int64) (err error) {
canDeleteSafely, err := wf.canDeleteSafely(callersInstanceID)
err = wf.syncStatusRead()
if err != nil {
return fmt.Errorf("check if wal file can be deleted safely. WALfile=%s: %w", wf.FilePtr.Name(), err)
}
if !canDeleteSafely {
return errors.New("BUG: cannot delete the current instance's WALfile:" + wf.FilePtr.Name())
return fmt.Errorf("cannot delete wal because failed to read wal sync status. callersInstanceID=%d:%w",
callersInstanceID, err)
}

if !wf.IsOpen() {
log.Warn(io.GetCallerFileContext(0) + ": Can not delete open WALFile")
return fmt.Errorf("WAL File is open")
return errors.New("WAL File is open")
}
if wf.isActive(callersInstanceID) {
log.Warn(io.GetCallerFileContext(0) + ": Can not delete active WALFile")
return fmt.Errorf("WAL File is active")
return errors.New("WAL File is active")
}

needsReplay, err := wf.NeedsReplay()
Expand All @@ -154,7 +151,7 @@ func (wf *WALFileType) Delete(callersInstanceID int64) (err error) {
}
if needsReplay {
log.Warn(io.GetCallerFileContext(0) + ": WALFile needs replay, can not delete")
return fmt.Errorf("WAL File needs replay")
return errors.New("WAL File needs replay, can not delete")
}

wf.close(wal.REPLAYED)
Expand Down Expand Up @@ -541,7 +538,7 @@ func (wf *WALFileType) syncStatusRead() error {
}

func readStatus(filePtr *os.File) (fileStatus wal.FileStatusEnum, replayStatus wal.ReplayStateEnum, owningInstanceID int64) {
// Read from beginning of file +1 to skip over the MID
// Read from beginning of file +1 to cont over the MID
filePtr.Seek(1, goio.SeekStart)
var err error
fileStatus, replayStatus, owningInstanceID, err = wal.ReadStatus(filePtr)
Expand Down Expand Up @@ -585,90 +582,13 @@ func (wf *WALFileType) CanWrite(msg string, callersInstanceID int64) (bool, erro
}
return true, nil
}
func (wf *WALFileType) canDeleteSafely(callersInstanceID int64) (bool, error) {
err := wf.syncStatusRead()
if err != nil {
return false, fmt.Errorf("failed to read wal sync status. callersInstanceID=%d:%w",
callersInstanceID, err)
}

if wf.isActive(callersInstanceID) {
log.Warn(io.GetCallerFileContext(0) + ": WALFile is active, can not delete")
return false, nil
}
needsReplay, err := wf.NeedsReplay()
if err != nil {
return false, fmt.Errorf("failed to check if wal needs replay: %w", err)
}
if needsReplay {
log.Warn(io.GetCallerFileContext(0) + ": WALFile needs replay, can not delete")
return false, nil
}

return true, nil
}
func sanityCheckValue(fp *os.File, value int64) (isSane bool) {
// As a sanity check, get the file size to ensure that TGLen is reasonable prior to buffer allocations
fstat, _ := fp.Stat()
sanityLen := 1000 * fstat.Size()
return value < sanityLen
}
func (wf *WALFileType) cleanupOldWALFiles(rootDir string) error {
rootDir = filepath.Clean(rootDir)
files, err := ioutil.ReadDir(rootDir)
if err != nil {
return fmt.Errorf("unable to read root directory %s: %w", rootDir, err)
}
myFileBase := filepath.Base(wf.FilePtr.Name())
log.Info("My WALFILE: %s", myFileBase)
for _, file := range files {
// ignore directories
if file.IsDir() {
// ignore
continue
}

// ignore files except wal
filename := file.Name()
if filepath.Ext(filename) != ".walfile" {
continue
}

// ignore the newest wal file
if filename == myFileBase {
continue
}

log.Info("Found a WALFILE: %s, entering replay...", filename)
filePath := filepath.Join(rootDir, filename)
fi, err := os.Stat(filePath)
if err != nil {
log.Error("failed to get fileStat of " + filePath)
continue
}
if fi.Size() < 11 {
log.Info("WALFILE: %s is empty, removing it...", filename)
err = os.Remove(filePath)
if err != nil {
log.Error("failed to remove an empty WALfile", filename)
}
continue
}

w, err := TakeOverWALFile(rootDir, filename)
if err != nil {
return fmt.Errorf("opening %s: %w", filename, err)
}
if err = w.Replay(false); err != nil {
return fmt.Errorf("unable to replay %s: %w", filename, err)
}

if err = w.Delete(wf.OwningInstanceID); err != nil {
return err
}
}
return nil
}

var haveWALWriter = false

Expand Down
17 changes: 15 additions & 2 deletions executor/wal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wal

import (
"fmt"
"strconv"

"github.com/alpacahq/marketstore/v4/utils/io"
"github.com/alpacahq/marketstore/v4/utils/log"
Expand All @@ -10,11 +11,23 @@ import (
type ShortReadError string

func (msg ShortReadError) Error() string {
return errReport("%s: Unexpectedly short read", string(msg))
return errReport("Unexpectedly short read:%s", string(msg))
}

// ReplayError is used when the WALfile Replay process fails.
// If Cont:true, it will give up the Replay process,
// move the walfile to a temporary file, and continue with other marketstore processing.
type ReplayError struct {
Msg string
Cont bool
}

func (e ReplayError) Error() string {
return errReport("Error Replaying WAL. Cont="+strconv.FormatBool(e.Cont)+":%s", e.Msg)
}

func errReport(base string, msg string) string {
base = io.GetCallerFileContext(2) + ":" + base
log.Error(base, msg)
log.Warn(base, msg)
return fmt.Sprintf(base, msg)
}
44 changes: 44 additions & 0 deletions executor/wal/find.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package wal

import (
"fmt"
"io/fs"
"path/filepath"

"github.com/alpacahq/marketstore/v4/utils/log"
)

type Finder struct {
dirRead func(dir string) ([]fs.FileInfo, error)
}

func NewFinder(dirRead func(dir string) ([]fs.FileInfo, error)) *Finder {
return &Finder{dirRead: dirRead}
}

// Find returns all absolute paths to "*.walfile" files directly under the directory.
func (f *Finder) Find(dir string) ([]string, error) {
var ret []string
//files, err := ioutil.ReadDir(rootDir)
files, err := f.dirRead(dir)
if err != nil {
return nil, fmt.Errorf("unable to read the directory %s: %w", dir, err)
}
for _, file := range files {
// ignore directories
if file.IsDir() {
// ignore
continue
}

// ignore files except wal
filename := file.Name()
if filepath.Ext(filename) != ".walfile" {
continue
}

log.Debug("found a WALFILE: %s", filename)
ret = append(ret, filepath.Join(dir, filename))
}
return ret, nil
}
17 changes: 17 additions & 0 deletions executor/wal/move.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package wal

import (
"fmt"
"os"

"github.com/alpacahq/marketstore/v4/utils/log"
)

func Move(oldFP, newFP string) error {
err := os.Rename(oldFP, newFP)
if err != nil {
return fmt.Errorf("failed to move %s to %s:%w", oldFP, newFP, err)
}
log.Debug(fmt.Sprintf("moved %s to %s", oldFP, newFP))
return nil
}
4 changes: 2 additions & 2 deletions executor/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestBrokenWAL(t *testing.T) {
fp.Close()

// Take over the broken WALFile and replay it
WALFile, err := executor.TakeOverWALFile(rootDir, BrokenWALFileName)
WALFile, err := executor.TakeOverWALFile(filepath.Join(rootDir, BrokenWALFileName))
assert.Nil(t, err)
newTGC := executor.NewTransactionPipe()
assert.NotNil(t, newTGC)
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestWALReplay(t *testing.T) {
io.Syncfs()

// Take over the new WALFile and replay it into a new TG cache
WALFile, err := executor.TakeOverWALFile(rootDir, newWALFileName)
WALFile, err := executor.TakeOverWALFile(filepath.Join(rootDir, newWALFileName))
assert.Nil(t, err)
data, _ := ioutil.ReadFile(newWALFilePath)
ioutil.WriteFile("/tmp/wal", data, 0644)
Expand Down
Loading

0 comments on commit a015238

Please sign in to comment.