-
Notifications
You must be signed in to change notification settings - Fork 5
/
utils.py
293 lines (258 loc) · 9.38 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
try:
import configparser
except ImportError:
import ConfigParser as configparser
import errno
import hashlib
import json
import os
import sys
import platform
from subprocess import Popen, PIPE, STDOUT, check_output, CalledProcessError
import time
_logfile = None
_logfile_sections = None
_screen_logfile = None
# To make testing easier, programs found in the directory of these scripts
# take precedence over programs in standard locations.
# So it's possible to just copy here, say, refmac5 binary and test it.
_dimple_dir = os.path.abspath(os.path.dirname(__file__))
# start log in ini-like format,
# readable for humans and easily parsed in Python:
# import configparser
# log = configparser.RawConfigParser()
# log.read('dimple.log')
# and then, for example:
# ini_free_r = log.getfloat('refmac5 restr', 'ini_free_r')
# free_r = log.getfloat('refmac5 restr', 'free_r')
# the first section is [workflow], next ones correspond to jobs:
# start_time = log.get(log.sections()[1], 'start_time') # first job start
# end_time = log.get(log.sections()[-1], 'end_time') # last job end
def start_log(filename, output_dir):
global _logfile # pylint: disable=global-statement
global _logfile_sections # pylint: disable=global-statement
_logfile = open(filename, 'w')
_logfile.write('# workflow log (compatible with Python configparser)\n')
_logfile_sections = set()
log_section('workflow')
log_value('host', platform.node())
log_value('platform', platform.platform())
log_value('cwd', os.getcwd())
log_value('prog', sys.argv[0])
if len(sys.argv) > 1:
_logfile.write('args:\n')
for arg in sys.argv[1:]:
_logfile.write(' %s\n' % arg)
log_value('output_dir', output_dir)
log_value('CCP4', os.getenv('CCP4', ''))
log_value('CCP4_SCR', os.getenv('CCP4_SCR', ''))
_logfile.flush()
def _log_comment(text):
global _logfile # pylint: disable=global-variable-not-assigned
if _logfile:
_logfile.write('# ')
_logfile.write(text.rstrip('\n').replace('\n', '\n# '))
_logfile.write('\n')
def log_section(name):
global _logfile # pylint: disable=global-variable-not-assigned
global _logfile_sections # pylint: disable=global-variable-not-assigned
if _logfile:
if name in _logfile_sections:
counter = 2
while ('%s %d' % (name, counter)) in _logfile_sections:
counter += 1
name = '%s %d' % (name, counter)
_logfile_sections.add(name)
_logfile.write('\n[%s]\n' % name)
def log_flush():
_logfile.flush()
def log_value(key, value):
global _logfile # pylint: disable=global-variable-not-assigned
if _logfile:
if isinstance(value, list):
value = json.dumps(value)
else:
value = str(value).rstrip()
value = value.rstrip().replace('\n', '\n ')
_logfile.write('%s: %s\n' % (key, value.replace('\n', '\n ')))
def log_time(key, timestamp):
log_value(key, time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(timestamp)))
def read_section_from_log(logfile, section):
conf = configparser.RawConfigParser()
try:
conf.read(logfile)
except configparser.MissingSectionHeaderError:
return
if conf.has_section(section):
d = {}
for k, v in conf.items(section):
if v[:1] == '[':
d[k] = json.loads(v.replace("'", '"'))
else:
d[k] = v
return d
def start_log_screen(filename):
global _screen_logfile # pylint: disable=global-statement
_screen_logfile = open(filename, 'a')
_screen_logfile.write('\n')
def _log_screen(text):
global _screen_logfile # pylint: disable=global-variable-not-assigned
if _screen_logfile:
_screen_logfile.write(text)
_screen_logfile.flush()
def put(text, ansi_code=None):
_log_screen(text)
if (ansi_code is not None and os.name != 'nt' and
hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()):
sys.stdout.write('\033[%dm%s\033[0m' % (ansi_code, text))
else:
sys.stdout.write(text)
def put_temporarily(text):
sys.stdout.write(text)
sys.stdout.flush()
sys.stdout.write('\b'*len(text))
def put_green(text):
put(text, ansi_code=92)
def comment(text):
put(text)
sys.stdout.flush()
_log_comment(text)
def reset_color():
if hasattr(sys.stdout, 'isatty') and sys.stdout.isatty():
if os.name != 'nt':
sys.stdout.write('\033[0m')
def put_error(err, comment=None): # pylint: disable=redefined-outer-name
_log_comment('Error: %s.' % err)
_log_screen('\nError: %s.\n' % err)
if hasattr(sys.stderr, 'isatty') and sys.stderr.isatty():
if os.name != 'nt':
err = '\033[91m%s\033[0m' % err # in bold red
sys.stdout.flush()
sys.stderr.write('\nError: %s.\n' % err)
if comment is not None:
_log_comment(comment)
_log_screen(comment + '\n')
sys.stderr.write(comment + '\n')
def check_prog(dirname, prog):
"""If prog(.exe) is in dirname return the path (without extension)."""
exe = '.exe' if os.name == 'nt' else ''
path = os.path.join(dirname, prog)
if os.path.exists(path + exe):
return path
def cbin(prog):
"""$CCP4/bin unless prog or prog.exe is found in the dimple directory.
Return value: path with filename without extension.
"""
assert os.environ.get('CCP4')
return (check_prog(_dimple_dir, prog) or
os.path.join(os.environ['CCP4'], 'bin', prog))
def syspath(prog):
"""Search prog(.exe) in the dimple directory and in the system $PATH.
Return value: path with filename without extension.
"""
dirs = [_dimple_dir] + os.environ['PATH'].split(os.pathsep)
for d in dirs:
path = check_prog(d, prog)
if path:
return path
put_error('Program not found: %s' % prog)
# Use relpath if possible, absolute paths clutter commands and make
# moving directory harder.
def adjust_path(path, relative_to):
if os.path.isabs(path) or os.path.isabs(relative_to):
return os.path.abspath(path)
elif os.path.realpath(relative_to) != os.path.abspath(relative_to):
# symlink in relative_to could make mess
return os.path.abspath(path)
else:
return os.path.relpath(path, relative_to)
def _find_mount_point(path):
path = os.path.abspath(path)
while not os.path.ismount(path):
path = os.path.dirname(path)
return path
def _report_quota(quota_prog, mount_point):
try:
out = check_output([quota_prog, '-w', '-p', '-f', mount_point],
stderr=STDOUT)
except CalledProcessError:
return
lines = out.splitlines()
if len(lines) == 3:
if lines[1].split()[1:3] == ['blocks', 'quota']:
blocks, quota = lines[2].split()[1:3]
try:
percent = '%.0f%%' % (100. * int(blocks) / int(quota))
except ValueError:
percent = '???'
comment('\nUsed quota on %s: %s / %s kB (%s)' %
(mount_point, blocks, quota, percent))
else:
try:
out = check_output([quota_prog], stderr=STDOUT)
except CalledProcessError:
return
if out:
_log_comment(out)
def report_disk_space(paths):
if os.name == 'nt':
return
previous_mount_points = []
for path in paths:
mount = _find_mount_point(path)
if mount in previous_mount_points:
continue
previous_mount_points.append(mount)
try:
s = os.statvfs(mount)
except AttributeError:
return
free = s.f_frsize * s.f_bavail
comment('\nFree space on %s: %.0f MB' % (mount, free / (1024.*1024)))
for d in os.environ['PATH'].split(os.pathsep):
quota = os.path.join(d, 'quota')
if os.path.exists(quota):
_report_quota(quota, mount)
break
def _get_sum(filename):
algo = hashlib.md5()
with open(filename, 'rb') as f:
buf = f.read(65536)
while len(buf) > 0:
algo.update(buf)
buf = f.read(65536)
return algo.hexdigest()
def filter_out_duplicate_files(filenames, relto=''):
unique = []
hashes = set()
# exclude empty files, here is md5 of nothing
hashes.add('d41d8cd98f00b204e9800998ecf8427e')
for filename in filenames:
try:
h = _get_sum(os.path.join(relto, filename))
_log_comment('%s %s' % (h, filename))
if h not in hashes:
hashes.add(h)
unique.append(filename)
except OSError:
unique.append(filename) # to be on the safe side
return unique
def silently_run(args, stdin_text='', cwd=None):
try:
process = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd)
except OSError as e:
if e.errno == errno.ENOENT:
raise RuntimeError('Program not found: %s\n' % args[0])
else:
raise
try:
out, err = process.communicate(input=stdin_text.encode())
except KeyboardInterrupt:
raise RuntimeError('Interrupted silent running of %s' % args[0])
return process.poll(), out, err
if __name__ == '__main__':
print('--- testing report_disk_space() ---')
path_args = sys.argv[1:] if len(sys.argv) > 1 else ['.']
report_disk_space(path_args)
print('')