Skip to content

Commit

Permalink
Fix analysis (#34)
Browse files Browse the repository at this point in the history
* Fix analysis

* Run Just pre-commit
  • Loading branch information
charmoniumQ authored Jul 22, 2024
1 parent 8a98683 commit 67d5562
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 416 deletions.
2 changes: 1 addition & 1 deletion probe_src/libprobe/generated/libc_hooks.c
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ pid_t waitpid(pid_t pid, int *status_ptr, int options)
{
status_ptr = &status;
}
struct Op op = {wait_op_code, {.wait = {.task_type = TASK_TID, .task_id = 0, .options = options, .status = 0, .ferrno = 0}}, {0}, 0, 0};
struct Op op = {wait_op_code, {.wait = {.task_type = TASK_PID, .task_id = 0, .options = options, .status = 0, .ferrno = 0}}, {0}, 0, 0};
prov_log_try(op);
pid_t ret = unwrapped_waitpid(pid, status_ptr, options);
int saved_errno = errno;
Expand Down
2 changes: 1 addition & 1 deletion probe_src/libprobe/generator/libc_hooks_source.c
Original file line number Diff line number Diff line change
Expand Up @@ -2122,7 +2122,7 @@ pid_t waitpid (pid_t pid, int *status_ptr, int options) {
struct Op op = {
wait_op_code,
{.wait = {
.task_type = TASK_TID,
.task_type = TASK_PID,
.task_id = 0,
.options = options,
.status = 0,
Expand Down
234 changes: 127 additions & 107 deletions probe_src/probe_py/analysis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typing
import networkx as nx # type: ignore
from .parse_probe_log import Op, ProvLog, CloneOp, ExecOp, WaitOp, OpenOp, CloseOp, TaskType, InitProcessOp, InitExecEpochOp, InitThreadOp, CLONE_THREAD
from .parse_probe_log import Op, ProvLog, CloneOp, ExecOp, WaitOp, OpenOp, CloseOp, TaskType, InitProcessOp, InitExecEpochOp, InitThreadOp
from enum import IntEnum


Expand Down Expand Up @@ -40,6 +40,9 @@ def validate_provlog(
waited_processes.add((op.data.task_type, op.data.task_id))
elif isinstance(op.data, CloneOp) and op.data.ferrno == 0:
cloned_processes.add((op.data.task_type, op.data.task_id))
if op.data.task_type == TaskType.TASK_PID:
# New process implicitly also creates a new thread
cloned_processes.add((TaskType.TASK_TID, op.data.task_id))
elif isinstance(op.data, OpenOp) and op.data.ferrno == 0:
opened_fds.add(op.data.fd)
elif isinstance(op.data, CloseOp) and op.data.ferrno == 0:
Expand All @@ -59,19 +62,22 @@ def validate_provlog(
elif isinstance(op.data, InitProcessOp):
if exec_epoch_no != 0:
ret.append(f"InitProcessOp happened, but exec_epoch was not zero, was {exec_epoch_no}")
expected_epochs = set(range(0, max(epochs)))
if epochs - expected_epochs:
expected_epochs = set(range(0, max(epochs) + 1))
if expected_epochs - epochs:
ret.append(f"Missing epochs for pid={pid}: {sorted(epochs - expected_epochs)}")
reserved_fds = {0, 1, 2}
if False:
pass
elif closed_fds - opened_fds - reserved_fds:
ret.append(f"Closed more fds than we opened: {closed_fds - opened_fds - reserved_fds}")
ret.append(f"Closed more fds than we opened: {closed_fds - reserved_fds=} {opened_fds=}")
elif waited_processes - cloned_processes:
ret.append(f"Waited on more processes than we cloned: {waited_processes - cloned_processes}")
ret.append(f"Waited on more processes than we cloned: {waited_processes=} {cloned_processes=}")
return ret


# TODO: Rename "digraph" to "hb_graph" in the entire project.
# Digraph (aka "directed graph") is too vague a term; the proper name is "happens-before graph".
# Later on, we will have a function that transforms an hb graph to file graph (both of which are digraphs)
def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph:
# [pid, exec_epoch_no, tid, op_index]
Node: typing.TypeAlias = tuple[int, int, int, int]
Expand All @@ -91,101 +97,84 @@ def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph:
for tid, thread in exec_epoch.threads.items():
context = (pid, exec_epoch_no, tid)
ops = list[Node]()

# Filter just the ops we are interested in
op_index = 0
for op in thread.ops:
if isinstance(op.data, CloneOp | ExecOp | WaitOp | OpenOp | CloseOp):
ops.append((*context, op_index))
op_index+=1
for op_index, op in enumerate(thread.ops):
ops.append((*context, op_index))

# Add just those ops to the graph
nodes.extend(ops)
program_order_edges.extend(zip(ops[:-1], ops[1:]))

# Store these so we can hook up forks/joins between threads
proc_to_ops[context] = ops
if len(ops) != 0:
# to mark the end of the thread, edge from last op to (pid, -1, tid, -1)
program_order_edges.append((proc_to_ops[(pid, exec_epoch_no, tid)][-1], (pid, -1, tid, -1)))

# Define helper functions
def first(pid: int, exid: int, tid: int) -> Node:
if not proc_to_ops.get((pid, exid, tid)):
entry_node = (pid, exid, tid, -1)
# as Op object is not available, op_index will be -1
proc_to_ops[(pid, exid, tid)] = [entry_node]
nodes.append(entry_node)
return entry_node
else:
return proc_to_ops[(pid, exid, tid)][0]
return proc_to_ops[(pid, exid, tid)][0]

def last(pid: int, exid: int, tid: int) -> Node:
if not proc_to_ops.get((pid, exid, tid)):
# as Op object is not availaible, op_index will be -1
exit_node = (pid, exid, tid, -1)
proc_to_ops[(pid, exid, tid)] = [exit_node]
nodes.append(exit_node)
return exit_node
else:
return proc_to_ops[(pid, exid, tid)][-1]

def get_first_pthread(pid, exid, target_pthread_id):
for kthread_id, thread in process_tree_prov_log.processes[pid].exec_epochs[exid].threads.items():
op_index = 0
for op in thread.ops:
if op.pthread_id == target_pthread_id:
return (pid, exid, kthread_id, op_index)
op_index+=1
return (pid, -1, target_pthread_id, -1)

def get_last_pthread(pid, exid, target_pthread_id):
for kthread_id, thread in process_tree_prov_log.processes[pid].exec_epochs[exid].threads.items():
op_index = len(thread.ops) - 1
ops = thread.ops
while op_index >= 0:
op = ops[op_index]
if op.pthread_id == target_pthread_id:
return (pid, exid, kthread_id, op_index)
op_index-=1
return (pid, -1, target_pthread_id, -1)
return proc_to_ops[(pid, exid, tid)][-1]

def get_first_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]:
ret = list[Node]()
for pid, process in process_tree_prov_log.processes.items():
for exid, exec_epoch in process.exec_epochs.items():
for tid, thread in exec_epoch.threads.items():
for op_index, op in enumerate(thread.ops):
if op.pthread_id == target_pthread_id:
ret.append((pid, exid, tid, op_index))
return ret

def get_last_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]:
ret = list[Node]()
for pid, process in process_tree_prov_log.processes.items():
for exid, exec_epoch in process.exec_epochs.items():
for tid, thread in exec_epoch.threads.items():
for op_index, op in list(enumerate(thread.ops))[::-1]:
if op.pthread_id == target_pthread_id:
ret.append((pid, exid, tid, op_index))
return ret

# Hook up forks/joins
for node in list(nodes):
pid, exid, tid, op_index = node
op = process_tree_prov_log.processes[pid].exec_epochs[exid].threads[tid].ops[op_index].data
global target
target: tuple[int, int, int]
if False:
pass
elif isinstance(op, CloneOp) and op.data.ferrno == 0:
if op.task_type == TaskType.TASK_PID:
elif isinstance(op, CloneOp) and op.ferrno == 0:
if False:
pass
elif op.task_type == TaskType.TASK_PID:
# Spawning a thread links to the current PID and exec epoch
target = (op.task_id, 0, op.task_id)
fork_join_edges.append((node, first(*target)))
elif op.task_type == TaskType.TASK_TID:
target = (pid, exid, op.task_id)
fork_join_edges.append((node, first(*target)))
elif op.task_type == TaskType.TASK_PTHREAD:
target_pthread_id = op.task_id
dest = get_first_pthread(pid, exid, target_pthread_id)
fork_join_edges.append((node, dest))
continue
for dest in get_first_pthread(pid, exid, op.task_id):
fork_join_edges.append((node, dest))
else:
# New process always links to exec epoch 0 and main thread
# THe TID of the main thread is the same as the PID
target = (op.task_id, 0, op.task_id)
exec_edges.append((node, first(*target)))
raise RuntimeError(f"Task type {op.task_type} supported")
elif isinstance(op, WaitOp) and op.ferrno == 0 and op.task_id > 0:
# Always wait for main thread of the last exec epoch
if op.ferrno == 0 and (op.task_type == TaskType.TASK_PID or op.task_type == TaskType.TASK_TID):
if False:
pass
elif op.task_type == TaskType.TASK_PID:
target = (op.task_id, last_exec_epoch.get(op.task_id, 0), op.task_id)
fork_join_edges.append((last(*target), node))
elif op.task_type == TaskType.TASK_TID:
target = (pid, exid, op.task_id)
fork_join_edges.append((last(*target), node))
elif op.ferrno == 0 and op.task_type == TaskType.TASK_PTHREAD:
fork_join_edges.append((get_last_pthread(pid, exid, op.task_id), node))
for dest in get_last_pthread(pid, exid, op.task_id):
fork_join_edges.append((dest, node))
elif isinstance(op, ExecOp):
# Exec brings same pid, incremented exid, and main thread
target = pid, exid + 1, pid
fork_join_edges.append((node, first(*target)))

# Make the main thread exit at the same time as each thread
for pid, process in process_tree_prov_log.processes.items():
for exec_epoch_no, exec_epoch in process.exec_epochs.items():
for tid, thread in exec_epoch.threads.items():
if tid != pid:
fork_join_edges.append((last(pid, exec_epoch_no, tid), last(pid, exec_epoch_no, pid)))
exec_edges.append((node, first(*target)))

process_graph = nx.DiGraph()
for node in nodes:
Expand All @@ -210,25 +199,49 @@ def validate_hb_graph(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
provlog_reverse = process_graph.reverse()
found_entry = False
found_exit = False
reserved_fds = {0, 1, 2}
for node in process_graph.nodes:
op = prov_log_get_node(provlog, *node) if node[-1] != -1 else None
if op is None:
op = prov_log_get_node(provlog, *node)
if False:
pass
elif isinstance(op.data, CloseOp) and op.data.ferrno == 0:
for closed_fd in range(op.data.low_fd, op.data.high_fd + 1):
for pred_node in nx.dfs_preorder_nodes(provlog_reverse, node):
pred_op = prov_log_get_node(provlog, *pred_node) if pred_node[-1] != -1 else None
if isinstance(pred_op.data, OpenOp) and pred_op.data.fd == closed_fd and op.data.ferrno == 0:
break
else:
ret.append(f"Close of {closed_fd} is not preceeded by corresponding open")
if closed_fd not in reserved_fds:
for pred_node in nx.dfs_preorder_nodes(provlog_reverse, node):
pred_op = prov_log_get_node(provlog, *pred_node)
if isinstance(pred_op.data, OpenOp) and pred_op.data.fd == closed_fd and op.data.ferrno == 0:
break
else:
ret.append(f"Close of {closed_fd} is not preceeded by corresponding open")
elif isinstance(op.data, WaitOp) and op.data.ferrno == 0:
for pred_node in nx.dfs_preorder_nodes(provlog_reverse, node):
pred_op = prov_log_get_node(provlog, *pred_node) if pred_node[-1] != -1 else None
pred_op = prov_log_get_node(provlog, *pred_node)
pid1, eid1, tid1, opid1 = pred_node
if isinstance(pred_op.data, CloneOp) and pred_op.data.task_type == op.data.task_type and pred_op.data.task_id == op.data.task_type and op.data.ferrno == 0:
break
else:
ret.append(f"Close of {closed_fd} is not preceeded by corresponding open")
ret.append(f"Close of {closed_fd} in {node} is not preceeded by corresponding open")
elif isinstance(op.data, CloneOp) and op.data.ferrno == 0:
for node1 in process_graph.successors(node):
op1 = prov_log_get_node(provlog, *node1)
if False:
pass
elif op.data.task_type == TaskType.TASK_PID:
if isinstance(op1.data, InitProcessOp):
if op.data.task_id != pid1:
ret.append(f"CloneOp {node} returns {op.data.task_id} but the next op has pid {pid1}")
break
elif op.data.task_type == TaskType.TASK_TID:
if isinstance(op1.data, InitThreadOp):
if op.data.task_id != tid1:
ret.append(f"CloneOp {node} returns {op.data.task_id} but the next op has tid {tid1}")
break
elif op.data.task_type == TaskType.TASK_PTHREAD_ID and op.data.task_id == op1.pthread_id:
break
elif op.data.task_type == TaskType.TASK_ISO_C_THREAD_ID and op.data.task_id == op1.iso_c_thread_id:
break
else:
ret.append(f"Could not find a successor for CloneOp {node} {op.data.task_type} in the target thread")
if process_graph.in_degree(node) == 0:
if not found_entry:
found_entry = True
Expand All @@ -246,34 +259,15 @@ def validate_hb_graph(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
for (node0, node1) in process_graph.edges:
pid0, eid0, tid0, op0 = node0
pid1, eid1, tid1, op1 = node1
op0 = prov_log_get_node(provlog, *node0) if node0[-1] != -1 else None
op1 = prov_log_get_node(provlog, *node1) if node1[-1] != -1 else None
if op0 is None:
op0 = prov_log_get_node(provlog, *node0)
op1 = prov_log_get_node(provlog, *node1)
if False:
pass
elif isinstance(op0.data, ExecOp):
if eid0 + 1 != eid1:
ret.append(f"ExecOp {node0} is followed by {node1}, whose exec epoch id should be {eid0 + 1}")
if op1 is None or not isinstance(op1.data, InitExecEpochOp):
ret.append(f"ExecOp {node0} is followed by {op1}, which is not InitExecEpoch")
elif isinstance(op0.data, CloneOp) and op0.data.ferrno == 0:
if False:
pass
elif op0.data.task_type == TaskType.TASK_PID:
if op1 is None or not isinstance(op1.data, InitProcessOp):
ret.append(f"CloneOp {node0} with TASK_PID is followed by {node1} which is not InitProcessOp")
if op0.data.task_id != pid1:
ret.append(f"CloneOp {node0} returns {op0.data.task_id} but the next op has pid {pid1}")
elif op0.data.task_type == TaskType.TASK_TID:
if op1 is None or not isinstance(op1.data, InitThreadOp):
ret.append(f"CloneOp {node0} with TASK_TID is followed by {node1} which is not InitThreadOp")
if op1 is None or not isinstance(op1.data, InitThreadOp):
ret.append(f"CloneOp {node0} returns {op0.data.task_id} but the next op has tid {tid1}")
elif op0.data.task_type == TaskType.PTHREAD_PID:
if op1 is None or not op1.pthread_id != op0.data.task_id:
ret.append(f"CloneOp {node0} with TASK_PTHREAD is followed by {node1} which has a different pthread_id")
elif op0.data.task_type == TaskType.ISO_C_THREAD_PID:
if op1 is None or not op1.iso_c_thread_id != op0.data.task_id:
ret.append(f"CloneOp {node0} with TASK_ISO_C_THREAD is followed by {node1} which has a different pthread_id")
if not isinstance(op1.data, InitExecEpochOp):
ret.append(f"ExecOp {node0} is followed by {node1}, which is not InitExecEpoch")
try:
cycle = nx.find_cycle(process_graph)
except nx.NetworkXNoCycle:
Expand All @@ -283,7 +277,18 @@ def validate_hb_graph(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]:
return ret


def digraph_to_pydot_string(process_graph: nx.DiGraph) -> str:
def relax_node(graph: nx.DiGraph, node: typing.Any) -> list[tuple[typing.Any, typing.Any]]:
"""Remove node from graph and attach its predecessors to its successors"""
ret = list[tuple[typing.Any, typing.Any]]()
for predecessor in graph.predecessors:
for successor in graph.successors:
ret.append((predecessor, successor))
graph.add_edge(predecessor, successor)
graph.remove_node(node)
return ret


def digraph_to_pydot_string(prov_log: ProvLog, process_graph: nx.DiGraph) -> str:

label_color_map = {
EdgeLabels.EXEC: 'yellow',
Expand All @@ -292,8 +297,23 @@ def digraph_to_pydot_string(process_graph: nx.DiGraph) -> str:
}

for node0, node1, attrs in process_graph.edges(data=True):
label:EdgeLabels = attrs['label']
label: EdgeLabels = attrs['label']
process_graph[node0][node1]['color'] = label_color_map[label]
del attrs['label']

for node, data in process_graph.nodes(data=True):
pid, exid, tid, op_no = node
op = prov_log_get_node(prov_log, *node)
typ = type(op.data).__name__
data["label"] = f"{pid}.{exid}.{tid}.{op_no}\n{typ}"
if False:
pass
elif isinstance(op.data, OpenOp):
data["label"] += f"\nopen{op.data.path.path} (fd={op.data.fd})"
elif isinstance(op.data, CloseOp):
fds = list(range(op.data.low_fd, op.data.high_fd + 1))
data["label"] += "\nclose" + " ".join(map(str, fds))

pydot_graph = nx.drawing.nx_pydot.to_pydot(process_graph)
dot_string = typing.cast(str, pydot_graph.to_string())
return dot_string
Expand All @@ -307,4 +327,4 @@ def construct_process_graph(process_tree_prov_log: ProvLog) -> str:
"""

process_graph = provlog_to_digraph(process_tree_prov_log)
return digraph_to_pydot_string(process_graph)
return digraph_to_pydot_string(process_tree_prov_log, process_graph)
Loading

0 comments on commit 67d5562

Please sign in to comment.