-
Notifications
You must be signed in to change notification settings - Fork 2
/
data_flow_graph.py
157 lines (119 loc) · 4.19 KB
/
data_flow_graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
Helper functions and classes used to generate data flow graphs
"""
from collections import defaultdict, Counter, OrderedDict
def format_tsv_line(source, edge, target, value=None, metadata=None):
"""
Render a single line for TSV file with data flow described
:type source str
:type edge str
:type target str
:type value float
:type metadata str
:rtype: str
"""
return '{source}\t{edge}\t{target}\t{value}\t{metadata}'.format(
source=source,
edge=edge,
target=target,
value='{:.4f}'.format(value) if value is not None else '',
metadata=metadata or ''
).rstrip(' \t')
def format_tsv_lines(lines):
"""
Render a set of data into a list of TSV-formatted lines
:type lines list[dict]
:rtype: list[str]
"""
return [format_tsv_line(**line) + '\n' for line in lines]
def escape_graphviz_entry(entry):
"""
:type entry str
:rtype: str
"""
return entry.replace('"', '\\"')
def format_graphviz_lines(lines):
"""
Render a .dot file with graph definition from a given set of data
:type lines list[dict]
:rtype: str
"""
# first, prepare the unique list of all nodes (sources and targets)
lines_nodes = set()
for line in lines:
lines_nodes.add(line['source'])
lines_nodes.add(line['target'])
# generate a list of all nodes and their names for graphviz graph
nodes = OrderedDict()
for i, node in enumerate(sorted(lines_nodes)):
nodes[node] = 'n{}'.format(i+1)
# print(lines_nodes, nodes)
graph = list()
# some basic style definition
# https://graphviz.gitlab.io/_pages/doc/info/lang.html
graph.append('digraph G {')
# https://graphviz.gitlab.io/_pages/doc/info/shapes.html#record
graph.append('\tgraph [ center=true, margin=0.75, nodesep=0.5, ranksep=0.75, rankdir=LR ];')
graph.append('\tnode [ shape=box, style="rounded,filled" width=0, height=0, '
'fontname=Helvetica, fontsize=11 ];')
graph.append('\tedge [ fontname=Helvetica, fontsize=9 ];')
# emit nodes definition
graph.append('\n\t// nodes')
# https://www.graphviz.org/doc/info/colors.html#brewer
group_colors = dict()
for label, name in nodes.items():
if ':' in label:
(group, label) = str(label).split(':', 1)
# register a new group for coloring
if group not in group_colors:
group_colors[group] = len(group_colors.keys()) + 1
else:
group = None
label = escape_graphviz_entry(label)
graph.append('\t{name} [label="{label}"{group}];'.format(
name=name,
label="{}\\n{}".format(group, label) if group is not None else label,
group=' group="{}" colorscheme=pastel28 color={}'.format(
group, group_colors[group]) if group is not None else ''
))
# now, connect the nodes
graph.append('\n\t// edges')
for line in lines:
label = line.get('metadata', '')
graph.append('\t{source} -> {target} [{label}];'.format(
source=nodes[line['source']],
target=nodes[line['target']],
label='label="{}"'.format(escape_graphviz_entry(label)) if label != '' else ''
))
graph.append('}')
return '\n'.join(graph)
def logs_map_and_reduce(logs, _map, _reduce):
"""
:type logs str[]
:type _map (list) -> str
:type _reduce (list) -> obj
"""
keys = []
mapped_count = Counter()
mapped = defaultdict(list)
# first map all entries
for log in logs:
key = _map(log)
mapped[key].append(log)
mapped_count[key] += 1
if key not in keys:
keys.append(key)
# the most common mapped item
top_count = mapped_count.most_common(1).pop()[1]
# now reduce mapped items
reduced = []
# keep the order under control
for key in keys:
entries = mapped[key]
# print(key, entries)
# add "value" field to each reduced item (1.0 will be assigned to the most "common" item)
item = _reduce(entries)
item['value'] = 1. * len(entries) / top_count
reduced.append(item)
# print(mapped)
return reduced