-
Notifications
You must be signed in to change notification settings - Fork 2
/
UpdateFromGithub.py
106 lines (98 loc) · 4.21 KB
/
UpdateFromGithub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import requests
import json
from hashlib import sha1
from pathlib import Path
class Repository:
def __init__(self, repo_owner, repo_name, branch='main', local_dir='.', data_file='github_data.json'):
self.local_dir = Path(local_dir)
self.data_file = self.local_dir / data_file
if not self.data_file.exists():
self.data_file.write_text("{}")
if not self.local_dir.exists():
raise FileNotFoundError("{} does not exist".format(local_dir))
self.repo_owner = repo_owner
self.repo_name = repo_name
self.branch = branch
self.urls = {
'tree': f'https://api.github.com/repos/{repo_owner}/{repo_name}/git/trees/{branch}?recursive=1',
'download': f'https://raw.githubusercontent.com/{repo_owner}/{repo_name}/{branch}/',
'last_commit_sha': f"https://api.github.com/repos/{repo_owner}/{repo_name}/commits/HEAD/branches-where-head",
}
with self.data_file.open(mode='r') as f:
self.saved_data = json.load(f)
def get_tree_data(self):
result = dict()
tree_url = self.urls['tree']
tree_data = json.loads(requests.get(tree_url).content)
for item in tree_data['tree']:
if item['type'] == 'blob':
result[item['path']] = item
tree_data['tree'] = result
return tree_data
def download_files(self, paths, max_download_size=float('inf')):
for p in paths:
download_url = self.urls['download'] + p
file_size_bytes = int(requests.head(download_url).headers['Content-Length'])
file_size = file_size_bytes
file_size_str = "Bytes"
if file_size > 1024:
file_size_str = "KB"
file_size /= 1024
if file_size > 1024:
file_size_str = "MB"
file_size /= 1024
file_size_str = f"{file_size:.2f} {file_size_str}"
if file_size_bytes > max_download_size:
print("Skipping {} ({})".format(p, file_size_str))
continue
print("Downloading {} ({})...".format(p, file_size_str), end="\t")
if not (self.local_dir / p).parent.exists():
(self.local_dir / p).parent.mkdir(parents=True)
with (self.local_dir / p).open(mode='wb') as f:
response = requests.get(download_url, stream=True)
if response.status_code != 200:
print("[ERROR]")
continue
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
print("[OK]")
def get_head_sha(self, timeout=5):
response = requests.get(self.urls['last_commit_sha'], timeout=timeout)
if response.status_code == 200:
branches = json.loads(response.content)
for b in branches:
if b['name'] == self.branch:
return b['commit']['sha']
return None
@staticmethod
def calc_sha(data):
if type(data) is str:
data = Path(data)
if isinstance(data, Path):
data = data.read_bytes()
return sha1(f"blob {len(data)}\0".encode() + data).hexdigest()
def auto_update(self, max_download_size=float('inf')):
tree_data = self.get_tree_data()
to_download = []
for path, data in tree_data['tree'].items():
if not (self.local_dir / path).exists():
to_download.append(path)
else:
local_sha = self.calc_sha(self.local_dir / path)
if local_sha != data['sha']:
to_download.append(path)
if len(to_download) > 0:
self.download_files(to_download, max_download_size)
else:
print("Everything is up to date")
self.saved_data['sha'] = tree_data['sha']
self.data_file.write_text(json.dumps(self.saved_data))
print("Finished updating")
def check_for_updates(self, timeout=5):
if "sha" not in self.saved_data:
return True
elif self.get_head_sha(timeout) != self.saved_data['sha']:
return True
else:
return False