-
Notifications
You must be signed in to change notification settings - Fork 0
/
file_tracker.py
149 lines (115 loc) · 4.11 KB
/
file_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import os
from abc import ABC
from contextlib import contextmanager, suppress
from time import time
from typing import Generic, TypeVar, TextIO
import inject
import jsons
import yaml
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer as DirectoryObserver
from watchdog.observers.api import DEFAULT_OBSERVER_TIMEOUT
from _meta import IndirectDependency, APP_DIR
from utils import app_abs_path
T = TypeVar('T')
class DataFileSyncer(Generic[T]):
JSON_DUMPER_KWARGS = {}
YAML_DUMPER_KWARGS = {}
def __init__(self,
filename: str,
data: T,
data_type=None,
extension="yaml"):
self.data = data
self._class = data_type or type(data)
self.filename = f'{filename}.{extension}'
self.dir_observer = LazyDirectoryObserver()
def start(self):
self.load_file()
self._watch_file()
@inject.params(carryon=IndirectDependency.CARRYON_BEFORE_UPDATE)
def preserve_on_update(self, carryon: list[str]):
carryon.append(self.filename)
def _watch_file(self):
handler = PatternMatchingEventHandler(patterns=[app_abs_path(self.filename)],
case_sensitive=True)
def on_modified(event):
if not self.dir_observer.is_completely_awake():
return
self.load_file()
handler.on_modified = on_modified
self.dir_observer.schedule(handler, APP_DIR)
self.dir_observer.start()
def load_file(self):
if not os.path.exists(self.filename):
self.save_file()
return
with self.dir_observer.overlook():
with open(app_abs_path(self.filename), encoding="utf-8-sig") as f:
new_data: T = self._load(f)
if new_data is None:
self.save_file()
return
if new_data != self.data:
self.data = new_data
self.on_file_reloaded()
def _load(self, stream: TextIO):
with suppress(jsons.DeserializationError, yaml.YAMLError):
return jsons.load(yaml.load(
stream, yaml.CSafeLoader
) or {}, self._class)
def save_file(self):
with self.dir_observer.overlook():
with open(app_abs_path(self.filename), "w", encoding="utf-8") as f:
self._dump(f)
def _dump(self, stream: TextIO):
yaml.dump(jsons.dump(self.data, **self.JSON_DUMPER_KWARGS),
stream, yaml.CSafeDumper, **self.YAML_DUMPER_KWARGS)
def on_file_reloaded(self):
pass
class Syncable(ABC):
def __init__(self, syncer: DataFileSyncer):
self._syncer = syncer
self.filename = syncer.filename
def load(self):
self._syncer.load_file()
def save(self):
self._syncer.save_file()
class LazyDirectoryObserver(DirectoryObserver):
def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
super().__init__(timeout)
self._is_sleeping = False
self._still_sleepy = False
self._awake_complete_time = 0 # timestamp
def dispatch_events(self, *args, **kwargs):
if self._is_sleeping:
return
if self._still_sleepy:
if not self.is_sleepy():
self._finish_awakening()
return
super(LazyDirectoryObserver, self).dispatch_events(*args, **kwargs)
@contextmanager
def overlook(self):
self.sleep()
try:
yield
finally:
self.wakeup()
def sleep(self):
if not self.is_alive():
return
self._is_sleeping = True
self._still_sleepy = True
def wakeup(self):
if not self._is_sleeping:
return
self._is_sleeping = False
self._awake_complete_time = time() + self.timeout
def _finish_awakening(self):
self.event_queue.queue.clear()
self._still_sleepy = False
def is_completely_awake(self):
return not self._is_sleeping and not self._still_sleepy
def is_sleepy(self):
return time() <= self._awake_complete_time