Skip to content
This repository has been archived by the owner on Jan 27, 2020. It is now read-only.

Commit

Permalink
Fixes bug when aggregating delete events that have overlapping filename
Browse files Browse the repository at this point in the history
  • Loading branch information
Zillode committed Jul 31, 2015
1 parent 185a992 commit ea4a7c0
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 31 deletions.
53 changes: 36 additions & 17 deletions syncwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ var (
)

const (
usage = "syncthing-inotify [options]"
extraUsage = `
pathSeparator = string(os.PathSeparator)
usage = "syncthing-inotify [options]"
extraUsage = `
The -logflags value is a sum of the following:
1 Date
Expand Down Expand Up @@ -678,7 +679,7 @@ func accumulateChanges(debounceTimeout time.Duration,
}
} else {
// Do not track more than maxFiles changes, inform syncthing to rescan entire folder
err = aggregateChanges(folder, folderPath, dirVsFiles, callback, []string{folderPath})
err = callback(folder, []string{""})
if err == nil {
for path, progress := range inProgress {
if progress.fsEvent {
Expand Down Expand Up @@ -706,26 +707,36 @@ func aggregateChanges(folder string, folderPath string, dirVsFiles int, callback
if len(paths) == 0 {
return errors.New("No changes to aggregate")
}
trackedPaths := make(map[string]int) // Map directories to scores; if score == -1 the path is a filename
trackedPaths := make(map[string]int) // Map paths to scores; if score == -1 the path is a filename
trackedDirs := make(map[string]bool) // Map of directories
sort.Strings(paths) // Make sure parent paths are processed first
previousPath := "" // Filter duplicates
for i := range paths {
path := filepath.Clean(paths[i])
if path == "." {
path = ""
}
if path == previousPath {
continue
}
previousPath = path
fi, _ := os.Stat(path)
path = strings.TrimPrefix(path, folderPath)
path = strings.TrimPrefix(path, string(os.PathSeparator))
path = strings.TrimPrefix(path, pathSeparator)
var dir string
if fi == nil || fi.IsDir() {
// Definitely inform if:
// - If the path does not exist anymore
// - It is a directory
if fi == nil {
// Definitely inform if the path does not exist anymore
dir = path
trackedPaths[path] = dirVsFiles
Debug.Println("[AG] Not found:", path)
} else if fi.IsDir() {
// Definitely inform if a directory changed
dir = path
trackedPaths[path] = dirVsFiles
trackedDirs[dir] = true
Debug.Println("[AG] Is a dir:", dir)
} else {
Debug.Println("[AG] Is file:", path)
// Files are linked to -1 scores
// Also increment the parent path with 1
dir = filepath.Dir(path)
Expand All @@ -734,12 +745,14 @@ func aggregateChanges(folder string, folderPath string, dirVsFiles int, callback
}
trackedPaths[path] = -1
trackedPaths[dir] += 1
trackedDirs[dir] = true
}
// Search for existing parent directory relations in the map
for trackedPath, _ := range trackedPaths {
if strings.HasPrefix(dir, trackedPath) {
// Increment score of tracked current/parent directory
if trackedDirs[trackedPath] && strings.HasPrefix(dir, trackedPath+pathSeparator) {
// Increment score of tracked parent directory
trackedPaths[trackedPath] += 1 // for each file
Debug.Println("[AG] Increment:", trackedPath, trackedPaths, trackedPaths[trackedPath])
}
}
}
Expand All @@ -748,20 +761,26 @@ func aggregateChanges(folder string, folderPath string, dirVsFiles int, callback
keys = append(keys, k)
}
sort.Strings(keys) // Sort directories before their own files
previousDone, previousPath := false, ""
previousPath = ""
var scans []string
for i := range keys {
trackedPath := keys[i]
trackedPathScore, _ := trackedPaths[trackedPath]
if previousDone && strings.HasPrefix(trackedPath, previousPath) {
if strings.HasPrefix(trackedPath, previousPath+pathSeparator) {
// Already informed parent directory change
continue
} // Already informed parent directory change
}
if trackedPathScore < dirVsFiles && trackedPathScore != -1 {
// Not enough files for this directory or it is a file
continue
} // Not enough files for this directory or it is a file
previousDone = trackedPathScore != -1
}
previousPath = trackedPath
Debug.Println("[AG] Appending path:", trackedPath, previousPath)
scans = append(scans, trackedPath)
if trackedPath == "" {
// If we need to scan everything, skip the rest
break
}
}
return callback(folder, scans)
}
Expand Down Expand Up @@ -923,7 +942,7 @@ func expandTilde(p string) string {
return getHomeDir()
}
p = filepath.FromSlash(p)
if !strings.HasPrefix(p, fmt.Sprintf("~%c", os.PathSeparator)) {
if !strings.HasPrefix(p, fmt.Sprintf("~%c", pathSeparator)) {
return p
}
return filepath.Join(getHomeDir(), p[2:])
Expand Down
180 changes: 166 additions & 14 deletions syncwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestDebouncedFileWatch(t *testing.T) {
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for i := range testFiles {
fsChan <- testDirectory + slash + testFiles[i]
fsChan <- testDirectory + testFiles[i]
}
time.Sleep(testDebounceTimeout * 10)
if !testOK {
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestDebouncedDirectoryWatch(t *testing.T) {
return nil
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
fsChan <- testDirectory + slash + testFile
fsChan <- testDirectory + testFile
time.Sleep(testDebounceTimeout * 10)
if !testOK {
t.Error("Callback not triggered")
Expand All @@ -123,7 +123,7 @@ func TestDebouncedParentDirectoryWatch(t *testing.T) {
testChangeDir+"file3.ogg")
defer clearTestDir()
testDebounceTimeout := 20 * time.Millisecond
testDirVsFiles := 3
testDirVsFiles := 2
stChan := make(chan STEvent, 10)
fsChan := make(chan string, 10)
fileChange := func(repo string, sub []string) error {
Expand All @@ -138,7 +138,7 @@ func TestDebouncedParentDirectoryWatch(t *testing.T) {
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for i := range testFiles {
fsChan <- testDirectory + slash + testFiles[i]
fsChan <- testDirectory + testFiles[i]
}
time.Sleep(testDebounceTimeout * 10)
if !testOK {
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestDebouncedParentDirectoryWatch2(t *testing.T) {
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for i := range testFiles {
fsChan <- testDirectory + slash + testFiles[i]
fsChan <- testDirectory + testFiles[i]
}
time.Sleep(testDebounceTimeout * 10)
if testOK != 2 {
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestDebouncedParentDirectoryWatch3(t *testing.T) {
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for i := range testFiles {
fsChan <- testDirectory + slash + testFiles[i]
fsChan <- testDirectory + testFiles[i]
}
time.Sleep(testDebounceTimeout * 10)
if testOK != 3 {
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestDebouncedParentDirectoryWatch4(t *testing.T) {
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for i := range testFiles {
fsChan <- testDirectory + slash + testFiles[i]
fsChan <- testDirectory + testFiles[i]
}
time.Sleep(testDebounceTimeout * 10)
if testOK != 2 {
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestDebouncedParentDirectoryWatch5(t *testing.T) {
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for i := range testFiles {
fsChan <- testDirectory + slash + testFiles[i]
fsChan <- testDirectory + testFiles[i]
}
time.Sleep(testDebounceTimeout * 10)
if !testOK {
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestDebouncedParentDirectoryWatch6(t *testing.T) {
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for i := range testFiles {
fsChan <- testDirectory + slash + testFiles[i]
fsChan <- testDirectory + testFiles[i]
}
time.Sleep(testDebounceTimeout * 10)
if testOK != 1 {
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestDebouncedParentDirectoryRemovedWatch(t *testing.T) {
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for i := range testFiles {
fsChan <- testDirectory + slash + testFiles[i]
fsChan <- testDirectory + testFiles[i]
}
time.Sleep(testDebounceTimeout * 10)
if testOK != 1 {
Expand Down Expand Up @@ -389,12 +389,164 @@ func TestSTEvents(t *testing.T) {
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
stChan <- STEvent{Path: ""}
for i := range testFiles {
stChan <- STEvent{Path: testDirectory + slash + testFiles[i], Finished: false}
fsChan <- testDirectory + slash + testFiles[i]
stChan <- STEvent{Path: testDirectory + slash + testFiles[i], Finished: true}
stChan <- STEvent{Path: testDirectory + testFiles[i], Finished: false}
fsChan <- testDirectory + testFiles[i]
stChan <- STEvent{Path: testDirectory + testFiles[i], Finished: true}
}
time.Sleep(testDebounceTimeout * 10)
time.Sleep(testDebounceTimeout * 50)
if !testOK {
t.Error("Callback not correctly triggered")
}
}

func TestFilesAggregation(t *testing.T) {
nrFiles := 50
testOK := false
testRepo := "test1"
testFiles := make([]string, nrFiles)
for i := 0; i < nrFiles; i++ {
testFiles[i] = createTestPath(t, "a"+slash+strconv.Itoa(i))
}
defer clearTestDir()
testDebounceTimeout := 20 * time.Millisecond
testDirVsFiles := nrFiles + 1
stop := make(chan int, 1)
stChan := make(chan STEvent, nrFiles)
fsChan := make(chan string, nrFiles)
fileChange := func(repo string, sub []string) error {
if len(sub) == 1 && sub[0] == ".stfolder" {
return nil
}
if repo != testRepo || len(sub) != 50 || sub[0] != "a/0" {
t.Error("Invalid result for directory change: "+repo, sub)
}
if testOK {
t.Error("Callback triggered multiple times")
}
testOK = true
close(stop)
return nil
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for _, testFile := range testFiles {
fsChan <- testDirectory + testFile
}
<-stop
time.Sleep(250 * time.Millisecond)
if !testOK {
t.Error("Callback not triggered")
}
}
func TestManyFilesAggregation(t *testing.T) {
nrFiles := 5000
testOK := false
testRepo := "test1"
testFiles := make([]string, nrFiles)
for i := 0; i < nrFiles; i++ {
testFiles[i] = createTestPath(t, "a"+slash+strconv.Itoa(i))
}
defer clearTestDir()
testDebounceTimeout := 20 * time.Millisecond
testDirVsFiles := 10
stop := make(chan int, 1)
stChan := make(chan STEvent, nrFiles)
fsChan := make(chan string, nrFiles)
fileChange := func(repo string, sub []string) error {
if len(sub) == 1 && sub[0] == ".stfolder" {
return nil
}
if repo != testRepo || len(sub) != 1 || sub[0] != "" {
t.Error("Invalid result for directory change: "+repo, sub)
}
if testOK {
t.Error("Callback triggered multiple times")
}
testOK = true
close(stop)
return nil
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for _, testFile := range testFiles {
fsChan <- testDirectory + testFile
}
<-stop
time.Sleep(250 * time.Millisecond)
if !testOK {
t.Error("Callback not triggered")
}
}

func TestDeletesAggregation(t *testing.T) {
nrFiles := 50
testOK := false
testRepo := "test1"
testFiles := make([]string, nrFiles)
for i := 0; i < nrFiles; i++ {
testFiles[i] = "a" + slash + strconv.Itoa(i)
}
testDebounceTimeout := 20 * time.Millisecond
testDirVsFiles := 10
stop := make(chan int, 1)
stChan := make(chan STEvent, nrFiles)
fsChan := make(chan string, nrFiles)
fileChange := func(repo string, sub []string) error {
if len(sub) == 1 && sub[0] == ".stfolder" {
return nil
}
if repo != testRepo || len(sub) != 50 || sub[0] != "a/0" {
t.Error("Invalid result for directory change: "+repo, sub)
}
if testOK {
t.Error("Callback triggered multiple times")
}
testOK = true
close(stop)
return nil
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for _, testFile := range testFiles {
fsChan <- testDirectory + testFile
}
<-stop
time.Sleep(250 * time.Millisecond)
if !testOK {
t.Error("Callback not triggered")
}
}
func TestManyDeletesAggregation(t *testing.T) {
nrFiles := 5000
testOK := false
testRepo := "test1"
testFiles := make([]string, nrFiles)
for i := 0; i < nrFiles; i++ {
testFiles[i] = "a" + slash + strconv.Itoa(i)
}
testDebounceTimeout := 20 * time.Millisecond
testDirVsFiles := 10
stop := make(chan int, 1)
stChan := make(chan STEvent, nrFiles)
fsChan := make(chan string, nrFiles)
fileChange := func(repo string, sub []string) error {
if len(sub) == 1 && sub[0] == ".stfolder" {
return nil
}
if repo != testRepo || len(sub) != 1 || sub[0] != "" {
t.Error("Invalid result for directory change: "+repo, sub)
}
if testOK {
t.Error("Callback triggered multiple times")
}
testOK = true
close(stop)
return nil
}
go accumulateChanges(testDebounceTimeout, testRepo, testDirectory, testDirVsFiles, stChan, fsChan, fileChange)
for _, testFile := range testFiles {
fsChan <- testDirectory + testFile
}
<-stop
time.Sleep(250 * time.Millisecond)
if !testOK {
t.Error("Callback not triggered")
}
}

0 comments on commit ea4a7c0

Please sign in to comment.