-
Notifications
You must be signed in to change notification settings - Fork 72
/
make_command_o2.py
341 lines (300 loc) · 12.4 KB
/
make_command_o2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
#!/usr/bin/env python3
"""
Generates full O2 command based on a YAML database of workflows and options.
Author: Vít Kučera <vit.kucera@cern.ch>
"""
import argparse
import os
import sys
from typing import List
import yaml # pylint: disable=import-error
def eprint(*args, **kwargs):
"""Print to stderr."""
print(*args, file=sys.stderr, **kwargs)
def msg_err(message: str):
"""Print an error message."""
eprint("\x1b[1;31mError: %s\x1b[0m" % message)
def msg_fatal(message: str):
"""Print an error message and exit."""
msg_err(message)
sys.exit(1)
def msg_warn(message: str):
"""Print a warning message."""
eprint("\x1b[1;36mWarning:\x1b[0m %s" % message)
def msg_bold(message: str):
"""Print a boldface message."""
eprint("\x1b[1m%s\x1b[0m" % message)
def join_strings(obj) -> str:
"""Return strings concatenated into one."""
if isinstance(obj, str):
return obj
elif isinstance(obj, list):
return " ".join(obj)
else:
msg_fatal("Cannot convert %s into a string" % type(obj))
return ""
def join_to_list(obj, list_out: list):
"""Append string or list to another list."""
if isinstance(obj, str):
list_out.append(obj)
elif isinstance(obj, list):
list_out += obj
else:
msg_fatal("Cannot convert %s into a string" % type(obj))
def make_table_output(spec: str) -> str:
"""Format the output table descriptor."""
words = spec.split("/")
if len(words) > 2:
return spec
if len(words) == 1:
return f"AOD/{spec}/0"
if len(words) == 2:
if words[0] in ("AOD", "AOD1", "DYN"):
return f"{spec}/0"
if words[1].isdigit():
return f"AOD/{spec}"
return spec
def healthy_structure(dic_full: dict):
"""Check correct structure of the database."""
if not isinstance(dic_full, dict):
msg_err("No dictionary found.")
return False
# Check mandatory database keys.
good = True
for key in ["workflows", "options"]:
if key not in dic_full:
msg_err('Key "%s" not found in the database.' % key)
good = False
if not good:
return False
# Check the options database.
dic_opt = dic_full["options"]
if not isinstance(dic_opt, dict):
msg_err('"options" is not a dictionary.')
return False
# Check mandatory option keys.
good = True
for key in ["global", "local"]:
if key not in dic_opt:
msg_err('Key "%s" not found in the option database.' % key)
good = False
if not good:
return False
# Check the workflow database.
dic_wf = dic_full["workflows"]
if not isinstance(dic_wf, dict):
msg_err('"workflows" is not a dictionary.')
return False
# Check workflow keys.
for wf in dic_wf:
dic_wf_single = dic_wf[wf]
if not isinstance(dic_wf_single, dict):
msg_err("%s is not a dictionary." % wf)
return False
if "activate" in dic_wf_single and not isinstance(dic_wf_single["activate"], bool):
msg_err('"activate" in workflow %s is not a boolean.' % wf)
return False
return True
def activate_workflow(wf: str, dic_wf: dict, mc=False, level=0, debug=False):
"""Activate a workflows and its dependencies."""
if debug:
eprint((level + 1) * " " + wf)
if wf in dic_wf:
dic_wf_single = dic_wf[wf]
# Deactivate workflow if it needs MC and input is not MC.
if "requires_mc" in dic_wf_single and dic_wf_single["requires_mc"] and not mc:
msg_warn("Deactivated %s because of non-MC input" % wf)
# Throw error if this is a dependency.
if level > 0:
msg_fatal("Workflows requiring this dependency would fail!")
dic_wf_single["activate"] = False
return
# Activate.
if "activate" not in dic_wf_single or not dic_wf_single["activate"]:
dic_wf_single["activate"] = True
# Activate dependencies recursively.
if "dependencies" in dic_wf_single:
list_dep: List[str] = []
join_to_list(dic_wf_single["dependencies"], list_dep)
for wf_dep in list_dep:
activate_workflow(wf_dep, dic_wf, mc, level + 1, debug)
else:
# Add in the dictionary if not present.
msg_warn("Adding an unknown workflow %s" % wf)
dic_wf[wf] = {"activate": True}
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="Generates full O2 command based on a YAML " "database of workflows and options."
)
parser.add_argument("database", help="database with workflows and options")
parser.add_argument("-w", "--workflows", type=str, help="explicitly requested workflows")
parser.add_argument("--mc", action="store_true", help="Monte Carlo mode")
parser.add_argument("-t", "--tables", action="store_true", help="save table into trees")
parser.add_argument("-g", "--graph", action="store_true", help="make topology graph")
parser.add_argument("-d", "--debug", action="store_true", help="print debugging info")
parser.add_argument("-p", "--perf", action="store_true", help="produce performance profiling stats")
args = parser.parse_args()
path_file_database = args.database
debug = args.debug
workflows_add = args.workflows.split() if args.workflows else ""
mc_mode = args.mc
save_tables = args.tables
make_graph = args.graph
perf = args.perf
# Open database input file.
if debug:
eprint("Input database: " + path_file_database)
try:
with open(path_file_database, "r") as file_database:
dic_in = yaml.safe_load(file_database)
except IOError:
msg_fatal("Failed to open file " + path_file_database)
# Check valid structure of the input database.
if not healthy_structure(dic_in):
msg_fatal("Bad structure!")
if mc_mode:
msg_warn("MC mode is on.")
if save_tables:
msg_warn("Tables will be saved in trees.")
if perf:
msg_warn(
"Performance profiling stats will be saved in perf.data files.\n"
" Convert them with: perf script --demangle -i perf.data --no-inline |"
" c++filt -r -t > profile.linux-perf.txt\n"
" and upload the output to https://www.speedscope.app/."
)
# Get workflow-independent options.
dic_opt = dic_in["options"]
# options that appear only once
opt_global = join_strings(dic_opt["global"])
# options that appear for each workflow
opt_local = join_strings(dic_opt["local"])
# Get the workflow database.
dic_wf = dic_in["workflows"]
# Get list of primary workflows to run.
# already activated in the database
list_wf_activated = [wf for wf in dic_wf if "activate" in dic_wf[wf] and dic_wf[wf]["activate"]]
if debug and list_wf_activated:
eprint("\nWorkflows activated in the database:")
eprint("\n".join(" " + wf for wf in list_wf_activated))
# requested on command line
if workflows_add:
if debug:
eprint("\nWorkflows specified on command line:")
eprint("\n".join(" " + wf for wf in workflows_add))
list_wf_activated += workflows_add
# Remove duplicities.
list_wf_activated = list(dict.fromkeys(list_wf_activated))
if debug:
eprint("\nPrimary workflows to run:")
eprint("\n".join(" " + wf for wf in list_wf_activated))
# Activate all needed workflows recursively.
if debug:
eprint("\nActivating workflows")
for wf in list_wf_activated:
activate_workflow(wf, dic_wf, mc_mode, 0, debug)
# Get the list of tables and add the option to the local options.
if save_tables:
tables = [] # list of all tables of activated workflows
for wf, dic_wf_single in dic_wf.items():
if "activate" not in dic_wf_single or not dic_wf_single["activate"]:
continue
if "tables" not in dic_wf_single:
continue
tab_wf = dic_wf_single["tables"]
if isinstance(tab_wf, (str, list)):
join_to_list(tab_wf, tables)
elif isinstance(tab_wf, dict):
if "default" in tab_wf:
join_to_list(tab_wf["default"], tables)
if not mc_mode and "real" in tab_wf:
join_to_list(tab_wf["real"], tables)
if mc_mode and "mc" in tab_wf:
join_to_list(tab_wf["mc"], tables)
else:
msg_fatal('"tables" in %s must be str, list or dict, is %s' % (wf, type(tab_wf)))
string_tables = ",".join(make_table_output(t) for t in tables)
if string_tables:
opt_local += " --aod-writer-keep " + string_tables
# Compose the full command with all options.
command = ""
eprint("\nActivated workflows:")
for wf, dic_wf_single in dic_wf.items():
if "activate" not in dic_wf_single or not dic_wf_single["activate"]:
continue
msg_bold(" " + wf)
# Determine the workflow executable.
if "executable" in dic_wf_single:
exec_wf = dic_wf_single["executable"]
if not isinstance(exec_wf, str):
msg_fatal('"executable" in %s must be str, is %s' % (wf, type(exec_wf)))
string_wf = exec_wf
else:
string_wf = wf
# Detect duplicate workflows.
if string_wf + " " in command:
msg_warn("Workflow %s is already present." % string_wf)
# Process options.
if "options" in dic_wf_single:
opt_wf = dic_wf_single["options"]
if isinstance(opt_wf, (str, list)):
string_wf += " " + join_strings(opt_wf)
elif isinstance(opt_wf, dict):
if "default" in opt_wf:
string_wf += " " + join_strings(opt_wf["default"])
if not mc_mode and "real" in opt_wf:
string_wf += " " + join_strings(opt_wf["real"])
if mc_mode and "mc" in opt_wf:
string_wf += " " + join_strings(opt_wf["mc"])
else:
msg_fatal('"options" in %s must be str, list or dict, is %s' % (wf, type(opt_wf)))
if opt_local:
string_wf += " " + opt_local
command += "| \\\n" + string_wf + " "
if not command:
msg_fatal("Nothing to do!")
# Remove the leading "| \\\n".
command = command[4:]
# Append performance profiling options.
if perf:
opt_perf = "perf record -F 99 -g --call-graph dwarf --user-callchains"
command = command.replace(string_wf, f"{opt_perf} {string_wf}")
# Append global options.
if opt_global:
command += " " + opt_global
# Print out the command.
print(command)
# Produce topology graph.
if make_graph:
basename, _ = os.path.splitext(path_file_database)
ext_graph = "pdf"
path_file_dot = basename + ".gv"
path_file_graph = basename + "." + ext_graph
eprint("Making diagram in: %s" % path_file_dot)
dot = "digraph {\n"
dot += " edge [dir=back] // inverted arrow direction\n"
dot += " rankdir=BT // bottom to top drawing\n"
dot += " ranksep=2 // vertical node separation\n"
dot += ' node [shape=box, style="filled,rounded", fillcolor=papayawhip, fontname=Courier, fontsize=20]\n'
for wf, dic_wf_single in dic_wf.items():
if "activate" not in dic_wf_single or not dic_wf_single["activate"]:
continue
# Hyphens are not allowed in node names.
node_wf = wf.replace("-", "_")
# Replace hyphens with line breaks to save horizontal space.
label_wf = wf.replace("o2-analysis-", "")
label_wf = label_wf.replace("-", "\\n")
dot += ' %s [label="%s"]\n' % (node_wf, label_wf)
if "dependencies" in dic_wf_single:
nodes_dep = join_strings(dic_wf_single["dependencies"]).replace("-", "_")
dot += " %s -> {%s}\n" % (node_wf, nodes_dep)
dot += "}\n"
try:
with open(path_file_dot, "w") as file_dot:
file_dot.write(dot)
except IOError:
msg_fatal("Failed to open file " + path_file_dot)
eprint("Produce graph with Graphviz: dot -T%s %s -o %s" % (ext_graph, path_file_dot, path_file_graph))
if __name__ == "__main__":
main()