Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added: continuing from the last read position #4

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 25 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,55 +34,47 @@ import (
)

func main() {
// Initialize the WAL options
opts := &wal.WALOptions{
LogDir: "data/", // Log files will be stored in the "data" directory
MaxLogSize: 1024 * 1024, // 1 MB maximum log size per file
MaxSegments: 10, // Maximum number of log segments
log: zap.NewExample(), // Replace with your logger instance
log, err := zap.NewProduction()
if err != nil {
panic(err)
}

// Create a new Write-Ahead Log
wal, err := wal.NewWriteAheadLog(opts)
wal, err := NewWriteAheadLog(&WALOptions{
LogDir: "data/",
MaxLogSize: 40 * 1024 * 1024, // 400 MB (log rotation size)
MaxSegments: 2,
Log: log,
MaxWaitBeforeSync: 1 * time.Second,
SyncMaxBytes: 1000,
})
if err != nil {
fmt.Println("Error creating Write-Ahead Log:", err)
return
}
defer wal.Close()

// Simulate some database changes and write them to the WAL
data1 := []byte("Change 1")
offset1, err := wal.Write(data1)
if err != nil {
fmt.Println("Error writing to WAL:", err)
return
for i := 0; i < 10000000; i++ {
_, err := wal.Write([]byte("Simulate some database changes and write them to the WAL"))
if err != nil {
panic(err)
}
}

data2 := []byte("Change 2")
offset2, err := wal.Write(data2)
// Sync all the logs to the file at the end so that we dont loose any data
err = wal.Sync()
if err != nil {
fmt.Println("Error writing to WAL:", err)
return
fmt.Println("Error in final sync", err)
}

// You can add more database changes here...

// Database changes recorded in the Write-Ahead Log.
fmt.Println("Current log offset:", offset2)

// Simulate crash recovery by replaying the log from a specific offset
replayFromOffset := offset1 // Change this offset to replay from a different point

err = wal.Replay(replayFromOffset, func(data []byte) error {
// Apply the changes to the database
// For this example, we're just printing the data.
fmt.Println("Replaying log entry:", string(data))
startTime := time.Now()
var numLines int
wal.Replay(0, func(b []byte) error {
numLines++
return nil
})

if err != nil {
fmt.Println("Error replaying log:", err)
}
fmt.Println("Total lines replayed:", numLines)
fmt.Println("time taken:", time.Since(startTime))
}
```

Expand Down
12 changes: 9 additions & 3 deletions example.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {

wal, err := NewWriteAheadLog(&WALOptions{
LogDir: "data/",
MaxLogSize: 40 * 1024 * 1024, // 400 MB (log rotation size)
MaxLogSize: 40 * 1024 * 1024, // 40 MB (log rotation size)
MaxSegments: 2,
Log: log,
MaxWaitBeforeSync: 1 * time.Second,
Expand All @@ -34,13 +34,19 @@ func main() {
}
}

// Sync all the logs to the file at the end so that we dont loose any data
err = wal.Sync()
if err != nil {
fmt.Println("Error in final sync", err)
}

startTime := time.Now()
var numLines int
wal.Replay(0, func(b []byte) error {
numLines++
return nil
})

fmt.Println("Total lines", numLines)
fmt.Println("time taken", time.Since(startTime))
fmt.Println("Total lines replayed:", numLines)
fmt.Println("time taken:", time.Since(startTime))
}
150 changes: 126 additions & 24 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -70,39 +71,130 @@ type WriteAheadLog struct {
func NewWriteAheadLog(opts *WALOptions) (*WriteAheadLog, error) {
walLogFilePrefix := opts.LogDir + "wal"

firstLogFileName := walLogFilePrefix + ".0.0" // prefix + . {segmentID} + . {starting_offset}
file, err := os.OpenFile(firstLogFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
wal := &WriteAheadLog{
logFileName: walLogFilePrefix,
maxLogSize: opts.MaxLogSize,
maxSegments: opts.MaxSegments,
log: opts.Log,
syncMaxBytes: opts.SyncMaxBytes,
syncTimer: time.NewTicker(opts.MaxWaitBeforeSync),
maxWaitBeforeSync: opts.MaxWaitBeforeSync,
}
err := wal.openExistingOrCreateNew(opts.LogDir)
if err != nil {
return nil, err
}

fi, err := file.Stat()
go wal.keepSyncing()

return wal, nil
}

func isDirectoryEmpty(dirPath string) (bool, error) {
// Open the directory
dir, err := os.Open(dirPath)
if err != nil {
return nil, err
return false, err
}
defer dir.Close()

wal := &WriteAheadLog{
logFileName: walLogFilePrefix,
file: file,
// Read the directory contents
fileList, err := dir.ReadDir(1) // Read the first entry
if err != nil && err != io.EOF {
return false, err
}

maxLogSize: opts.MaxLogSize,
logSize: fi.Size(),
// If the list of files is empty, the directory is empty
return len(fileList) == 0, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per documentation Incase of empty fileList, readdir would return non nil error also, therefore we should check for io.eof error as well.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the exception for EOF in the check above. Here, if the error is EOF, the fileList will be empty by default. So this logic should work. Let me know if you find otherwise.

}

segmentCount: 0,
func (wal *WriteAheadLog) openExistingOrCreateNew(dirPath string) error {
empty, err := isDirectoryEmpty(dirPath)
if err != nil {
return err
}

if empty {
// Create the first log file
firstLogFileName := wal.logFileName + ".0.0" // prefix + . {segmentID} + . {starting_offset}
file, err := os.OpenFile(firstLogFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return err
}

maxSegments: opts.MaxSegments,
currentSegmentID: 0,
curOffset: -1,
log: opts.Log,
fi, err := file.Stat()
if err != nil {
return err
}

bufWriter: bufio.NewWriter(file),
// Set default values since this is the first log we are opening
wal.file = file
wal.bufWriter = bufio.NewWriter(file)
wal.logSize = fi.Size()
aarthikrao marked this conversation as resolved.
Show resolved Hide resolved
wal.segmentCount = 0
wal.currentSegmentID = 0
wal.curOffset = -1

} else {
// Fetch all the file names in the path and sort them
logFiles, err := filepath.Glob(wal.logFileName + "*")
if err != nil {
return err
}
sort.Strings(logFiles)

// open the last file
fileName := logFiles[len(logFiles)-1]
file, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return err
}

fi, err := file.Stat()
if err != nil {
return err
}

// Find the current segment count and the latest offset from file name
s := strings.Split(fileName, ".")
lastSegment, err := strconv.Atoi(s[1])
if err != nil {
return err
}

latestOffset, err := strconv.Atoi(s[2])
if err != nil {
return err
}
offset := int64(latestOffset)

// Go to the end of file and calculate the offset
file.Seek(0, io.SeekStart)
bufReader := bufio.NewReader(file)
n, err := wal.seekOffset(math.MaxInt, offset, *bufReader)
if err != nil && err != io.EOF {
return err
}
offset += n

wal.file = file
file.Seek(0, io.SeekEnd)
wal.bufWriter = bufio.NewWriter(file)
wal.logSize = fi.Size()
wal.currentSegmentID = lastSegment
wal.curOffset = offset
wal.segmentCount = len(logFiles) - 1

wal.log.Info("appending wal",
zap.String("file", fileName),
zap.Int64("latestOffset", wal.curOffset),
zap.Int("latestSegment", lastSegment),
zap.Int("segmentCount", wal.segmentCount),
)

syncTimer: time.NewTicker(opts.MaxWaitBeforeSync),
syncMaxBytes: opts.SyncMaxBytes,
}
go wal.keepSyncing()

return wal, nil
return nil
}

func (wal *WriteAheadLog) keepSyncing() {
Expand All @@ -128,15 +220,18 @@ func (wal *WriteAheadLog) Write(data []byte) (int64, error) {
defer wal.mu.Unlock()

entrySize := 4 + len(data) // 4 bytes for the size prefix

if wal.logSize+int64(entrySize) > wal.maxLogSize {
// Flushing all the in-memory changes to disk
// Flushing all the in-memory changes to disk, and rotating the log
if err := wal.Sync(); err != nil {
return 0, err
}

if err := wal.rotateLog(); err != nil {
return 0, err
}

wal.resetTimer()
}

_, err := wal.file.Seek(0, io.SeekEnd)
Expand Down Expand Up @@ -169,13 +264,16 @@ func (wal *WriteAheadLog) Close() error {
return wal.file.Close()
}

// GetOffset returns the current log offset.
func (wal *WriteAheadLog) GetOffset() int64 {
wal.mu.Lock()
defer wal.mu.Unlock()

return wal.curOffset
}

// Sync writes all the data to the disk.
// Since Sync is a expensive call, calling this often will slow down the throughput.
func (wal *WriteAheadLog) Sync() error {
err := wal.bufWriter.Flush()
if err != nil {
Expand All @@ -184,6 +282,8 @@ func (wal *WriteAheadLog) Sync() error {
return wal.file.Sync()
}

// rotateLog closes the current file, opens a new one.
// It also cleans up the oldest log files if the number of log files are greater than maxSegments
func (wal *WriteAheadLog) rotateLog() error {
if err := wal.file.Close(); err != nil {
return err
Expand Down Expand Up @@ -211,6 +311,7 @@ func (wal *WriteAheadLog) rotateLog() error {
return nil
}

// deleteOldestSegment removes the oldest log file from the logs
func (wal *WriteAheadLog) deleteOldestSegment() error {
oldestSegment := fmt.Sprintf("%s.%d.*", wal.logFileName, wal.currentSegmentID-wal.maxSegments)

Expand Down Expand Up @@ -250,7 +351,7 @@ func (wal *WriteAheadLog) findStartingLogFile(offset int64, files []string) (i i
return -1, -1, errors.New("offset doesn't exsists")
}

func (wal *WriteAheadLog) seekOffset(offset int64, startingOffset int64, file bufio.Reader) (err error) {
func (wal *WriteAheadLog) seekOffset(offset int64, startingOffset int64, file bufio.Reader) (n int64, err error) {
var readBytes []byte
for startingOffset < offset {
readBytes, err = file.Peek(lengthBufferSize)
Expand All @@ -262,9 +363,10 @@ func (wal *WriteAheadLog) seekOffset(offset int64, startingOffset int64, file bu
if err != nil {
break
}
startingOffset++
startingOffset++ // Check logic
n++
}
return err
return n, err
}

func (wal *WriteAheadLog) Replay(offset int64, f func([]byte) error) error {
Expand All @@ -285,7 +387,7 @@ func (wal *WriteAheadLog) Replay(offset int64, f func([]byte) error) error {
}
bufReader.Reset(file)
if i == 0 {
if err = wal.seekOffset(offset, startingOffset, bufReader); err != nil {
if _, err = wal.seekOffset(offset, startingOffset, bufReader); err != nil {
return err
}
}
Expand Down