Skip to content
This repository has been archived by the owner on Jan 27, 2020. It is now read-only.

Commit

Permalink
Merge branch 'rjeczalik-notify'
Browse files Browse the repository at this point in the history
  • Loading branch information
Zillode committed Apr 22, 2015
2 parents 2ed1d91 + 8745958 commit 0447914
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 1,097 deletions.
147 changes: 84 additions & 63 deletions syncwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"errors"
"flag"
"fmt"
"github.com/cenkalti/backoff"
"io"
"io/ioutil"
"log"
Expand All @@ -26,6 +25,9 @@ import (
"strings"
"text/tabwriter"
"time"

"github.com/cenkalti/backoff"
"github.com/rjeczalik/notify"
)

type Configuration struct {
Expand Down Expand Up @@ -94,15 +96,15 @@ var (

// HTTP Timeouts
var (
requestTimeout = 30 * time.Second
requestTimeout = 180 * time.Second
)

// HTTP Debounce
var (
debounceTimeout = 500 * time.Millisecond
remoteIndexTimeout = 600 * time.Millisecond
configSyncTimeout = 5 * time.Second
dirVsFiles = 100
dirVsFiles = 256
maxFiles = 5000
)

Expand Down Expand Up @@ -202,7 +204,7 @@ func init() {
authPass, _ = stdin.ReadString('\n')
}
if len(watchFolders) != 0 && len(skipFolders) != 0 {
log.Fatalln("Either provide a list of folders to be watched or to be ignored. Not both.")
log.Fatalln("Either provide a list of folders to be watched or to be ignored, not both.")
}
}

Expand Down Expand Up @@ -289,26 +291,26 @@ func filterFolders(folders []FolderConfiguration) []FolderConfiguration {

func getIgnorePatterns(folder string) []Pattern {
for {
Trace.Println("Getting Ignore Patterns: " + folder)
r, err := http.NewRequest("GET", target+"/rest/ignores?folder="+folder, nil)
Trace.Println("Getting ignore patterns for " + folder)
r, err := http.NewRequest("GET", target+"/rest/db/ignores?folder="+folder, nil)
res, err := performRequest(r)
defer func() {
if res != nil && res.Body != nil {
res.Body.Close()
}
}()
if err != nil {
Warning.Println("Failed to perform request /rest/ignores: ", err)
Warning.Println("Failed to perform request /rest/db/ignores: ", err)
time.Sleep(configSyncTimeout)
continue
}
if res.StatusCode == 500 {
Warning.Println("Syncthing not ready in " + folder + " for /rest/ignores")
Warning.Println("Syncthing not ready in " + folder + " for /rest/db/ignores")
time.Sleep(configSyncTimeout)
continue
}
if res.StatusCode != 200 {
log.Fatalf("Status %d != 200 for GET /rest/ignores: ", res.StatusCode, res)
log.Fatalf("Status %d != 200 for GET /rest/db/ignores: ", res.StatusCode, res)
}
bs, err := ioutil.ReadAll(res.Body)
if err != nil {
Expand All @@ -334,18 +336,18 @@ func getIgnorePatterns(folder string) []Pattern {

func getFolders() []FolderConfiguration {
Trace.Println("Getting Folders")
r, err := http.NewRequest("GET", target+"/rest/config", nil)
r, err := http.NewRequest("GET", target+"/rest/system/config", nil)
res, err := performRequest(r)
defer func() {
if res != nil && res.Body != nil {
res.Body.Close()
}
}()
if err != nil {
log.Fatalln("Failed to perform request /rest/config: ", err)
log.Fatalln("Failed to perform request /rest/system/config: ", err)
}
if res.StatusCode != 200 {
log.Fatalf("Status %d != 200 for GET /rest/config: ", res.StatusCode)
log.Fatalf("Status %d != 200 for GET /rest/system/config: ", res.StatusCode)
}
bs, err := ioutil.ReadAll(res.Body)
if err != nil {
Expand All @@ -363,29 +365,20 @@ func watchFolder(folder FolderConfiguration, stInput chan STEvent) {
folderPath := expandTilde(folder.Path)
ignorePatterns := getIgnorePatterns(folder.ID)
fsInput := make(chan string)
sw, err := NewSyncWatcher(folderPath, ignorePaths, ignorePatterns)
if sw == nil || err != nil {
Warning.Println(err)
c := make(chan notify.EventInfo, maxFiles)
if err := notify.Watch(filepath.Join(folderPath, "..."), c, notify.All); err != nil {
Warning.Println("Failed to install inotify handlers", err)
informError("Failed to install inotify handler for " + folder.ID)
return
}
defer sw.Close()
err = sw.Watch(folderPath)
if err != nil {
Warning.Println("Failed to watch", folderPath)
if strings.Contains(err.Error(), "no space left on device") {
Warning.Println("Please use the following workaround:")
Warning.Println("(OSX) sudo sh -c 'echo kern.maxfiles=20480\\nkern.maxfilesperproc=18000 >> /etc/sysctl.conf'")
Warning.Println("(Linux) sudo sh -c 'echo fs.inotify.max_user_watches=20480\\n >> /etc/sysctl.conf'")
}
log.Fatalln(err)
}
defer notify.Stop(c)
go accumulateChanges(debounceTimeout, folder.ID, folderPath, dirVsFiles, stInput, fsInput, informChange)
OK.Println("Watching " + folder.ID + ": " + folderPath)
if folder.RescanIntervalS < 1800 {
OK.Printf("The rescan interval of folder %s can be increased to 3600 (an hour) or even 86400 (a day) as changes should be observed immediately while syncthing-inotify is running.", folder.ID)
}
for {
evPath := waitForEvent(sw)
evPath := waitForEvent(c)
Debug.Println("Change detected in: " + evPath + " (could still be ignored)")
ev := relativePath(evPath, folderPath)
if shouldIgnore(ignorePaths, ignorePatterns, ev) {
Expand All @@ -407,15 +400,13 @@ func relativePath(path string, folderPath string) string {
return path
}

func waitForEvent(sw *SyncWatcher) string {
func waitForEvent(c chan notify.EventInfo) string {
select {
case ev, ok := <-sw.Event:
case ev, ok := <-c:
if !ok {
Warning.Println("Error: channel closed")
}
return ev.Name
case err, eok := <-sw.Error:
Warning.Println(err, eok)
return ev.Path()
}
return ""
}
Expand Down Expand Up @@ -476,7 +467,7 @@ func performRequest(r *http.Request) (*http.Response, error) {

func testWebGuiPost() error {
Trace.Println("Testing WebGUI")
r, err := http.NewRequest("POST", target+"/rest/404", nil)
r, err := http.NewRequest("GET", target+"/rest/404", nil)
res, err := performRequest(r)
defer func() {
if res != nil && res.Body != nil {
Expand All @@ -487,19 +478,43 @@ func testWebGuiPost() error {
Warning.Println("Cannot connect to Syncthing:", err)
return err
}
body, _ := ioutil.ReadAll(res.Body)
if res.StatusCode != 404 {
Warning.Printf("Cannot connect to Syncthing, Status %d != 404 for POST\n", res.StatusCode)
Warning.Printf("Cannot connect to Syncthing, Status %d != 404 for POST\n", res.StatusCode, string(body))
return errors.New("Invalid HTTP status code")
}
return nil
}

func informChange(folder string, sub string) error {
func informError(msg string) error {
Trace.Printf("Informing ST about inotify error: %v", msg)
r, _ := http.NewRequest("POST", target+"/rest/system/error", strings.NewReader("[Inotify] "+msg))
r.Header.Set("Content-Type", "plain/text")
res, err := performRequest(r)
defer func() {
if res != nil && res.Body != nil {
res.Body.Close()
}
}()
if err != nil {
Warning.Println("Failed to inform Syncthing about", msg, err)
return err
}
if res.StatusCode != 200 {
Warning.Printf("Error: Status %d != 200 for POST.\n%v: %v %v", msg, res.StatusCode)
return errors.New("Invalid HTTP status code")
}
return err
}

func informChange(folder string, subs []string) error {
data := url.Values{}
data.Set("folder", folder)
data.Set("sub", sub)
Trace.Println("Informing ST: " + folder + " :" + sub)
r, _ := http.NewRequest("POST", target+"/rest/scan?"+data.Encode(), nil)
for _, sub := range subs {
data.Add("sub", sub)
}
Trace.Printf("Informing ST: %v: %v", folder, subs)
r, _ := http.NewRequest("POST", target+"/rest/db/scan?"+data.Encode(), nil)
res, err := performRequest(r)
defer func() {
if res != nil && res.Body != nil {
Expand All @@ -511,10 +526,10 @@ func informChange(folder string, sub string) error {
return err
}
if res.StatusCode != 200 {
Warning.Printf("Error: Status %d != 200 for POST.\n"+folder+": "+sub, res.StatusCode)
Warning.Printf("Error: Status %d != 200 for POST.\n%v: %v %v", folder, res.StatusCode)
return errors.New("Invalid HTTP status code")
} else {
OK.Println("Syncthing is indexing change in " + folder + ": " + sub)
OK.Printf("Syncthing is indexing change in %v: %v", folder, subs)
}
// Wait until scan finishes
_, err = ioutil.ReadAll(res.Body)
Expand All @@ -524,41 +539,43 @@ func informChange(folder string, sub string) error {
func accumulateChanges(interval time.Duration,
folder string, folderPath string, dirVsFiles int,
stInput chan STEvent, fsInput chan string,
callback func(folder string, sub string) error) func(string) {
callback func(folder string, subs []string) error) func(string) {
inProgress := make(map[string]bool) // [Path string, InProgress bool]
currInterval := interval
for {
select {
case item := <-stInput:
Debug.Println("STInput")
if item.Path == "" {
// Prepare for incoming changes
currInterval = remoteIndexTimeout
Debug.Println("Incoming Changes")
Debug.Println("[ST] Incoming Changes for " + folder + ", increasing inotify timeout parameters")
continue
}
if item.Finished {
// Ensure path is cleared when receiving itemFinished
delete(inProgress, item.Path)
Debug.Println("Remove Tracking ST: " + item.Path)
Debug.Println("[ST] Removed tracking for " + item.Path)
continue
}
if len(inProgress) > maxFiles {
Debug.Println("[ST] Tracking too many files, aggregating STEvent: " + item.Path)
continue
}
Debug.Println("[ST] Incoming: " + item.Path)
inProgress[item.Path] = true
case item := <-fsInput:
Debug.Println("FSInput")
p, ok := inProgress[item]
if p && ok {
// Change originated from ST
delete(inProgress, item)
Debug.Println("Remove Tracking FS: " + item)
Debug.Println("[FS] Removed tracking for " + item)
continue
}
if len(inProgress) > maxFiles {
Debug.Println("[FS] Tracking too many files, aggregating FSEvent: " + item)
continue
}
Debug.Println("[FS] Tracking: " + item)
inProgress[item] = false
case <-time.After(currInterval):
currInterval = interval
Expand All @@ -569,20 +586,22 @@ func accumulateChanges(interval time.Duration,
var err error
var paths []string
if len(inProgress) < maxFiles {
paths = make([]string, len(inProgress))
i := 0
for path, progress := range inProgress {
if path == "" {
continue
}
if !progress {
paths[i] = path
i++
paths = append(paths, path)
Debug.Println("Informing about " + path)
} else {
Debug.Println("Waiting for: " + path)
Debug.Println("Waiting for " + path)
}
}
if len(paths) == 0 {
Debug.Println("Empty paths")
continue
}

// Try to inform changes to syncthing and if succeeded, clean up
err = aggregateChanges(folder, folderPath, dirVsFiles, callback, paths)
} else {
Expand All @@ -592,14 +611,14 @@ func accumulateChanges(interval time.Duration,
if err == nil {
for _, path := range paths {
delete(inProgress, path)
Debug.Println("Remove Tracking Informed: " + path)
Debug.Println("[INFORMED] Removed tracking for " + path)
}
}
}
}
}

func aggregateChanges(folder string, folderPath string, dirVsFiles int, callback func(folder string, folderPath string) error, paths []string) error {
func aggregateChanges(folder string, folderPath string, dirVsFiles int, callback func(folder string, folderPaths []string) error, paths []string) error {
// This function optimises tracking in two ways:
// - If there are more than `dirVsFiles` changes in a directory, we inform Syncthing to scan the entire directory
// - Directories with parent directory changes are aggregated. If A/B has 3 changes and A/C has 8, A will have 11 changes and if this is bigger than dirVsFiles we will scan A.
Expand Down Expand Up @@ -647,6 +666,7 @@ func aggregateChanges(folder string, folderPath string, dirVsFiles int, callback
}
sort.Strings(keys) // Sort directories before their own files
previousDone, previousPath := false, ""
var scans []string
for i := range keys {
trackedPath := keys[i]
trackedPathScore, _ := trackedPaths[trackedPath]
Expand All @@ -658,22 +678,23 @@ func aggregateChanges(folder string, folderPath string, dirVsFiles int, callback
} // Not enough files for this directory or it is a file
previousDone = trackedPathScore != -1
previousPath = trackedPath
sub := strings.TrimPrefix(trackedPath, folderPath)
sub = strings.TrimPrefix(sub, string(os.PathSeparator))
err := callback(folder, sub)
if err != nil {
return err
}
scans = append(scans, trackedPath)
}
return nil
return callback(folder, scans)
}

func watchSTEvents(stChans map[string]chan STEvent, folders []FolderConfiguration) {
lastSeenID := 0
for {
events, err := getSTEvents(lastSeenID)
if err != nil {
// Probably Syncthing restarted
// Work-around for Go <1.5 (https://github.com/golang/go/issues/9405)
if strings.Contains(err.Error(), "use of closed network connection") {
continue
}

// Syncthing probably restarted
Debug.Println("Resetting STEvents", err)
lastSeenID = 0
time.Sleep(configSyncTimeout)
continue
Expand Down Expand Up @@ -769,15 +790,15 @@ func waitForSyncAndExitIfNeeded(folders []FolderConfiguration) {
func waitForSync() {
for {
Trace.Println("Waiting for Sync")
r, err := http.NewRequest("GET", target+"/rest/config/sync", nil)
r, err := http.NewRequest("GET", target+"/rest/system/config/insync", nil)
res, err := performRequest(r)
defer func() {
if res != nil && res.Body != nil {
res.Body.Close()
}
}()
if err != nil {
Warning.Println("Failed to perform request /rest/config/sync", err)
Warning.Println("Failed to perform request /rest/system/config/insync", err)
time.Sleep(configSyncTimeout)
continue
}
Expand Down
Loading

0 comments on commit 0447914

Please sign in to comment.