diff --git a/README.md b/README.md index 6bf64f3..590765b 100644 --- a/README.md +++ b/README.md @@ -34,55 +34,47 @@ import ( ) func main() { - // Initialize the WAL options - opts := &wal.WALOptions{ - LogDir: "data/", // Log files will be stored in the "data" directory - MaxLogSize: 1024 * 1024, // 1 MB maximum log size per file - MaxSegments: 10, // Maximum number of log segments - log: zap.NewExample(), // Replace with your logger instance + log, err := zap.NewProduction() + if err != nil { + panic(err) } - // Create a new Write-Ahead Log - wal, err := wal.NewWriteAheadLog(opts) + wal, err := NewWriteAheadLog(&WALOptions{ + LogDir: "data/", + MaxLogSize: 40 * 1024 * 1024, // 400 MB (log rotation size) + MaxSegments: 2, + Log: log, + MaxWaitBeforeSync: 1 * time.Second, + SyncMaxBytes: 1000, + }) if err != nil { fmt.Println("Error creating Write-Ahead Log:", err) return } defer wal.Close() - // Simulate some database changes and write them to the WAL - data1 := []byte("Change 1") - offset1, err := wal.Write(data1) - if err != nil { - fmt.Println("Error writing to WAL:", err) - return + for i := 0; i < 10000000; i++ { + _, err := wal.Write([]byte("Simulate some database changes and write them to the WAL")) + if err != nil { + panic(err) + } } - data2 := []byte("Change 2") - offset2, err := wal.Write(data2) + // Sync all the logs to the file at the end so that we dont loose any data + err = wal.Sync() if err != nil { - fmt.Println("Error writing to WAL:", err) - return + fmt.Println("Error in final sync", err) } - // You can add more database changes here... - - // Database changes recorded in the Write-Ahead Log. - fmt.Println("Current log offset:", offset2) - - // Simulate crash recovery by replaying the log from a specific offset - replayFromOffset := offset1 // Change this offset to replay from a different point - - err = wal.Replay(replayFromOffset, func(data []byte) error { - // Apply the changes to the database - // For this example, we're just printing the data. - fmt.Println("Replaying log entry:", string(data)) + startTime := time.Now() + var numLines int + wal.Replay(0, func(b []byte) error { + numLines++ return nil }) - if err != nil { - fmt.Println("Error replaying log:", err) - } + fmt.Println("Total lines replayed:", numLines) + fmt.Println("time taken:", time.Since(startTime)) } ``` diff --git a/example.go b/example.go index 35e2e5c..c5dd09f 100644 --- a/example.go +++ b/example.go @@ -15,7 +15,7 @@ func main() { wal, err := NewWriteAheadLog(&WALOptions{ LogDir: "data/", - MaxLogSize: 40 * 1024 * 1024, // 400 MB (log rotation size) + MaxLogSize: 40 * 1024 * 1024, // 40 MB (log rotation size) MaxSegments: 2, Log: log, MaxWaitBeforeSync: 1 * time.Second, @@ -34,6 +34,12 @@ func main() { } } + // Sync all the logs to the file at the end so that we dont loose any data + err = wal.Sync() + if err != nil { + fmt.Println("Error in final sync", err) + } + startTime := time.Now() var numLines int wal.Replay(0, func(b []byte) error { @@ -41,6 +47,6 @@ func main() { return nil }) - fmt.Println("Total lines", numLines) - fmt.Println("time taken", time.Since(startTime)) + fmt.Println("Total lines replayed:", numLines) + fmt.Println("time taken:", time.Since(startTime)) } diff --git a/wal.go b/wal.go index 1fe49cd..1116198 100644 --- a/wal.go +++ b/wal.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "math" "os" "path/filepath" "sort" @@ -70,39 +71,125 @@ type WriteAheadLog struct { func NewWriteAheadLog(opts *WALOptions) (*WriteAheadLog, error) { walLogFilePrefix := opts.LogDir + "wal" - firstLogFileName := walLogFilePrefix + ".0.0" // prefix + . {segmentID} + . {starting_offset} - file, err := os.OpenFile(firstLogFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + wal := &WriteAheadLog{ + logFileName: walLogFilePrefix, + maxLogSize: opts.MaxLogSize, + maxSegments: opts.MaxSegments, + log: opts.Log, + syncMaxBytes: opts.SyncMaxBytes, + syncTimer: time.NewTicker(opts.MaxWaitBeforeSync), + maxWaitBeforeSync: opts.MaxWaitBeforeSync, + } + err := wal.openExistingOrCreateNew(opts.LogDir) if err != nil { return nil, err } - fi, err := file.Stat() + go wal.keepSyncing() + + return wal, nil +} + +func isDirectoryEmpty(dirPath string) (bool, error) { + // Open the directory + dir, err := os.Open(dirPath) if err != nil { - return nil, err + return false, err } + defer dir.Close() - wal := &WriteAheadLog{ - logFileName: walLogFilePrefix, - file: file, + // Read the directory contents + fileList, err := dir.ReadDir(1) // Read the first entry + if err != nil && err != io.EOF { + return false, err + } - maxLogSize: opts.MaxLogSize, - logSize: fi.Size(), + // If the list of files is empty, the directory is empty + return len(fileList) == 0, nil +} - segmentCount: 0, +func (wal *WriteAheadLog) openExistingOrCreateNew(dirPath string) error { + empty, err := isDirectoryEmpty(dirPath) + if err != nil { + return err + } - maxSegments: opts.MaxSegments, - currentSegmentID: 0, - curOffset: -1, - log: opts.Log, + if empty { + // Create the first log file + firstLogFileName := wal.logFileName + ".0.0" // prefix + . {segmentID} + . {starting_offset} + file, err := os.OpenFile(firstLogFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return err + } - bufWriter: bufio.NewWriter(file), + // Set default values since this is the first log we are opening + wal.file = file + wal.bufWriter = bufio.NewWriter(file) + wal.logSize = 0 // since fi.Size() is 0 for newly created file + wal.segmentCount = 0 + wal.currentSegmentID = 0 + wal.curOffset = -1 + + } else { + // Fetch all the file names in the path and sort them + logFiles, err := filepath.Glob(wal.logFileName + "*") + if err != nil { + return err + } + sort.Strings(logFiles) + + // open the last file + fileName := logFiles[len(logFiles)-1] + file, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return err + } + + fi, err := file.Stat() + if err != nil { + return err + } + + // Find the current segment count and the latest offset from file name + s := strings.Split(fileName, ".") + lastSegment, err := strconv.Atoi(s[1]) + if err != nil { + return err + } + + latestOffset, err := strconv.Atoi(s[2]) + if err != nil { + return err + } + offset := int64(latestOffset) + + // Go to the end of file and calculate the offset + file.Seek(0, io.SeekStart) + bufReader := bufio.NewReader(file) + n, err := wal.seekOffset(math.MaxInt, offset, *bufReader) + if err != nil && err != io.EOF { + return err + } + offset += n + + wal.file = file + file.Seek(0, io.SeekEnd) + wal.bufWriter = bufio.NewWriter(file) + wal.logSize = fi.Size() + wal.currentSegmentID = lastSegment + wal.curOffset = offset + wal.segmentCount = len(logFiles) - 1 + + wal.log.Info("appending wal", + zap.String("file", fileName), + zap.Int64("latestOffset", wal.curOffset), + zap.Int("latestSegment", lastSegment), + zap.Int("segmentCount", wal.segmentCount), + ) - syncTimer: time.NewTicker(opts.MaxWaitBeforeSync), - syncMaxBytes: opts.SyncMaxBytes, } - go wal.keepSyncing() - return wal, nil + return nil } func (wal *WriteAheadLog) keepSyncing() { @@ -128,8 +215,9 @@ func (wal *WriteAheadLog) Write(data []byte) (int64, error) { defer wal.mu.Unlock() entrySize := 4 + len(data) // 4 bytes for the size prefix + if wal.logSize+int64(entrySize) > wal.maxLogSize { - // Flushing all the in-memory changes to disk + // Flushing all the in-memory changes to disk, and rotating the log if err := wal.Sync(); err != nil { return 0, err } @@ -137,6 +225,8 @@ func (wal *WriteAheadLog) Write(data []byte) (int64, error) { if err := wal.rotateLog(); err != nil { return 0, err } + + wal.resetTimer() } _, err := wal.file.Seek(0, io.SeekEnd) @@ -169,6 +259,7 @@ func (wal *WriteAheadLog) Close() error { return wal.file.Close() } +// GetOffset returns the current log offset. func (wal *WriteAheadLog) GetOffset() int64 { wal.mu.Lock() defer wal.mu.Unlock() @@ -176,6 +267,8 @@ func (wal *WriteAheadLog) GetOffset() int64 { return wal.curOffset } +// Sync writes all the data to the disk. +// Since Sync is a expensive call, calling this often will slow down the throughput. func (wal *WriteAheadLog) Sync() error { err := wal.bufWriter.Flush() if err != nil { @@ -184,6 +277,8 @@ func (wal *WriteAheadLog) Sync() error { return wal.file.Sync() } +// rotateLog closes the current file, opens a new one. +// It also cleans up the oldest log files if the number of log files are greater than maxSegments func (wal *WriteAheadLog) rotateLog() error { if err := wal.file.Close(); err != nil { return err @@ -211,6 +306,7 @@ func (wal *WriteAheadLog) rotateLog() error { return nil } +// deleteOldestSegment removes the oldest log file from the logs func (wal *WriteAheadLog) deleteOldestSegment() error { oldestSegment := fmt.Sprintf("%s.%d.*", wal.logFileName, wal.currentSegmentID-wal.maxSegments) @@ -250,7 +346,7 @@ func (wal *WriteAheadLog) findStartingLogFile(offset int64, files []string) (i i return -1, -1, errors.New("offset doesn't exsists") } -func (wal *WriteAheadLog) seekOffset(offset int64, startingOffset int64, file bufio.Reader) (err error) { +func (wal *WriteAheadLog) seekOffset(offset int64, startingOffset int64, file bufio.Reader) (n int64, err error) { var readBytes []byte for startingOffset < offset { readBytes, err = file.Peek(lengthBufferSize) @@ -262,9 +358,10 @@ func (wal *WriteAheadLog) seekOffset(offset int64, startingOffset int64, file bu if err != nil { break } - startingOffset++ + startingOffset++ // Check logic + n++ } - return err + return n, err } func (wal *WriteAheadLog) Replay(offset int64, f func([]byte) error) error { @@ -285,7 +382,7 @@ func (wal *WriteAheadLog) Replay(offset int64, f func([]byte) error) error { } bufReader.Reset(file) if i == 0 { - if err = wal.seekOffset(offset, startingOffset, bufReader); err != nil { + if _, err = wal.seekOffset(offset, startingOffset, bufReader); err != nil { return err } }