diff --git a/probe_src/libprobe/generated/libc_hooks.c b/probe_src/libprobe/generated/libc_hooks.c index af606494..38fc2716 100644 --- a/probe_src/libprobe/generated/libc_hooks.c +++ b/probe_src/libprobe/generated/libc_hooks.c @@ -1928,7 +1928,7 @@ pid_t waitpid(pid_t pid, int *status_ptr, int options) { status_ptr = &status; } - struct Op op = {wait_op_code, {.wait = {.task_type = TASK_TID, .task_id = 0, .options = options, .status = 0, .ferrno = 0}}, {0}, 0, 0}; + struct Op op = {wait_op_code, {.wait = {.task_type = TASK_PID, .task_id = 0, .options = options, .status = 0, .ferrno = 0}}, {0}, 0, 0}; prov_log_try(op); pid_t ret = unwrapped_waitpid(pid, status_ptr, options); int saved_errno = errno; diff --git a/probe_src/libprobe/generator/libc_hooks_source.c b/probe_src/libprobe/generator/libc_hooks_source.c index d35304f6..890a7247 100644 --- a/probe_src/libprobe/generator/libc_hooks_source.c +++ b/probe_src/libprobe/generator/libc_hooks_source.c @@ -2122,7 +2122,7 @@ pid_t waitpid (pid_t pid, int *status_ptr, int options) { struct Op op = { wait_op_code, {.wait = { - .task_type = TASK_TID, + .task_type = TASK_PID, .task_id = 0, .options = options, .status = 0, diff --git a/probe_src/probe_py/analysis.py b/probe_src/probe_py/analysis.py index 04a4bb3c..207efb7d 100644 --- a/probe_src/probe_py/analysis.py +++ b/probe_src/probe_py/analysis.py @@ -1,6 +1,6 @@ import typing import networkx as nx # type: ignore -from .parse_probe_log import Op, ProvLog, CloneOp, ExecOp, WaitOp, OpenOp, CloseOp, TaskType, InitProcessOp, InitExecEpochOp, InitThreadOp, CLONE_THREAD +from .parse_probe_log import Op, ProvLog, CloneOp, ExecOp, WaitOp, OpenOp, CloseOp, TaskType, InitProcessOp, InitExecEpochOp, InitThreadOp from enum import IntEnum @@ -40,6 +40,9 @@ def validate_provlog( waited_processes.add((op.data.task_type, op.data.task_id)) elif isinstance(op.data, CloneOp) and op.data.ferrno == 0: cloned_processes.add((op.data.task_type, op.data.task_id)) + if op.data.task_type == TaskType.TASK_PID: + # New process implicitly also creates a new thread + cloned_processes.add((TaskType.TASK_TID, op.data.task_id)) elif isinstance(op.data, OpenOp) and op.data.ferrno == 0: opened_fds.add(op.data.fd) elif isinstance(op.data, CloseOp) and op.data.ferrno == 0: @@ -59,19 +62,22 @@ def validate_provlog( elif isinstance(op.data, InitProcessOp): if exec_epoch_no != 0: ret.append(f"InitProcessOp happened, but exec_epoch was not zero, was {exec_epoch_no}") - expected_epochs = set(range(0, max(epochs))) - if epochs - expected_epochs: + expected_epochs = set(range(0, max(epochs) + 1)) + if expected_epochs - epochs: ret.append(f"Missing epochs for pid={pid}: {sorted(epochs - expected_epochs)}") reserved_fds = {0, 1, 2} if False: pass elif closed_fds - opened_fds - reserved_fds: - ret.append(f"Closed more fds than we opened: {closed_fds - opened_fds - reserved_fds}") + ret.append(f"Closed more fds than we opened: {closed_fds - reserved_fds=} {opened_fds=}") elif waited_processes - cloned_processes: - ret.append(f"Waited on more processes than we cloned: {waited_processes - cloned_processes}") + ret.append(f"Waited on more processes than we cloned: {waited_processes=} {cloned_processes=}") return ret +# TODO: Rename "digraph" to "hb_graph" in the entire project. +# Digraph (aka "directed graph") is too vague a term; the proper name is "happens-before graph". +# Later on, we will have a function that transforms an hb graph to file graph (both of which are digraphs) def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph: # [pid, exec_epoch_no, tid, op_index] Node: typing.TypeAlias = tuple[int, int, int, int] @@ -91,101 +97,84 @@ def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph: for tid, thread in exec_epoch.threads.items(): context = (pid, exec_epoch_no, tid) ops = list[Node]() + # Filter just the ops we are interested in op_index = 0 - for op in thread.ops: - if isinstance(op.data, CloneOp | ExecOp | WaitOp | OpenOp | CloseOp): - ops.append((*context, op_index)) - op_index+=1 + for op_index, op in enumerate(thread.ops): + ops.append((*context, op_index)) + # Add just those ops to the graph nodes.extend(ops) program_order_edges.extend(zip(ops[:-1], ops[1:])) # Store these so we can hook up forks/joins between threads proc_to_ops[context] = ops - if len(ops) != 0: - # to mark the end of the thread, edge from last op to (pid, -1, tid, -1) - program_order_edges.append((proc_to_ops[(pid, exec_epoch_no, tid)][-1], (pid, -1, tid, -1))) + # Define helper functions def first(pid: int, exid: int, tid: int) -> Node: - if not proc_to_ops.get((pid, exid, tid)): - entry_node = (pid, exid, tid, -1) - # as Op object is not available, op_index will be -1 - proc_to_ops[(pid, exid, tid)] = [entry_node] - nodes.append(entry_node) - return entry_node - else: - return proc_to_ops[(pid, exid, tid)][0] + return proc_to_ops[(pid, exid, tid)][0] def last(pid: int, exid: int, tid: int) -> Node: - if not proc_to_ops.get((pid, exid, tid)): - # as Op object is not availaible, op_index will be -1 - exit_node = (pid, exid, tid, -1) - proc_to_ops[(pid, exid, tid)] = [exit_node] - nodes.append(exit_node) - return exit_node - else: - return proc_to_ops[(pid, exid, tid)][-1] - - def get_first_pthread(pid, exid, target_pthread_id): - for kthread_id, thread in process_tree_prov_log.processes[pid].exec_epochs[exid].threads.items(): - op_index = 0 - for op in thread.ops: - if op.pthread_id == target_pthread_id: - return (pid, exid, kthread_id, op_index) - op_index+=1 - return (pid, -1, target_pthread_id, -1) - - def get_last_pthread(pid, exid, target_pthread_id): - for kthread_id, thread in process_tree_prov_log.processes[pid].exec_epochs[exid].threads.items(): - op_index = len(thread.ops) - 1 - ops = thread.ops - while op_index >= 0: - op = ops[op_index] - if op.pthread_id == target_pthread_id: - return (pid, exid, kthread_id, op_index) - op_index-=1 - return (pid, -1, target_pthread_id, -1) + return proc_to_ops[(pid, exid, tid)][-1] + + def get_first_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]: + ret = list[Node]() + for pid, process in process_tree_prov_log.processes.items(): + for exid, exec_epoch in process.exec_epochs.items(): + for tid, thread in exec_epoch.threads.items(): + for op_index, op in enumerate(thread.ops): + if op.pthread_id == target_pthread_id: + ret.append((pid, exid, tid, op_index)) + return ret + + def get_last_pthread(pid: int, exid: int, target_pthread_id: int) -> list[Node]: + ret = list[Node]() + for pid, process in process_tree_prov_log.processes.items(): + for exid, exec_epoch in process.exec_epochs.items(): + for tid, thread in exec_epoch.threads.items(): + for op_index, op in list(enumerate(thread.ops))[::-1]: + if op.pthread_id == target_pthread_id: + ret.append((pid, exid, tid, op_index)) + return ret # Hook up forks/joins for node in list(nodes): pid, exid, tid, op_index = node op = process_tree_prov_log.processes[pid].exec_epochs[exid].threads[tid].ops[op_index].data - global target + target: tuple[int, int, int] if False: pass - elif isinstance(op, CloneOp) and op.data.ferrno == 0: - if op.task_type == TaskType.TASK_PID: + elif isinstance(op, CloneOp) and op.ferrno == 0: + if False: + pass + elif op.task_type == TaskType.TASK_PID: # Spawning a thread links to the current PID and exec epoch + target = (op.task_id, 0, op.task_id) + fork_join_edges.append((node, first(*target))) + elif op.task_type == TaskType.TASK_TID: target = (pid, exid, op.task_id) + fork_join_edges.append((node, first(*target))) elif op.task_type == TaskType.TASK_PTHREAD: - target_pthread_id = op.task_id - dest = get_first_pthread(pid, exid, target_pthread_id) - fork_join_edges.append((node, dest)) - continue + for dest in get_first_pthread(pid, exid, op.task_id): + fork_join_edges.append((node, dest)) else: - # New process always links to exec epoch 0 and main thread - # THe TID of the main thread is the same as the PID - target = (op.task_id, 0, op.task_id) - exec_edges.append((node, first(*target))) + raise RuntimeError(f"Task type {op.task_type} supported") elif isinstance(op, WaitOp) and op.ferrno == 0 and op.task_id > 0: - # Always wait for main thread of the last exec epoch - if op.ferrno == 0 and (op.task_type == TaskType.TASK_PID or op.task_type == TaskType.TASK_TID): + if False: + pass + elif op.task_type == TaskType.TASK_PID: target = (op.task_id, last_exec_epoch.get(op.task_id, 0), op.task_id) fork_join_edges.append((last(*target), node)) + elif op.task_type == TaskType.TASK_TID: + target = (pid, exid, op.task_id) + fork_join_edges.append((last(*target), node)) elif op.ferrno == 0 and op.task_type == TaskType.TASK_PTHREAD: - fork_join_edges.append((get_last_pthread(pid, exid, op.task_id), node)) + for dest in get_last_pthread(pid, exid, op.task_id): + fork_join_edges.append((dest, node)) elif isinstance(op, ExecOp): # Exec brings same pid, incremented exid, and main thread target = pid, exid + 1, pid - fork_join_edges.append((node, first(*target))) - - # Make the main thread exit at the same time as each thread - for pid, process in process_tree_prov_log.processes.items(): - for exec_epoch_no, exec_epoch in process.exec_epochs.items(): - for tid, thread in exec_epoch.threads.items(): - if tid != pid: - fork_join_edges.append((last(pid, exec_epoch_no, tid), last(pid, exec_epoch_no, pid))) + exec_edges.append((node, first(*target))) process_graph = nx.DiGraph() for node in nodes: @@ -210,25 +199,49 @@ def validate_hb_graph(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: provlog_reverse = process_graph.reverse() found_entry = False found_exit = False + reserved_fds = {0, 1, 2} for node in process_graph.nodes: - op = prov_log_get_node(provlog, *node) if node[-1] != -1 else None - if op is None: + op = prov_log_get_node(provlog, *node) + if False: pass elif isinstance(op.data, CloseOp) and op.data.ferrno == 0: for closed_fd in range(op.data.low_fd, op.data.high_fd + 1): - for pred_node in nx.dfs_preorder_nodes(provlog_reverse, node): - pred_op = prov_log_get_node(provlog, *pred_node) if pred_node[-1] != -1 else None - if isinstance(pred_op.data, OpenOp) and pred_op.data.fd == closed_fd and op.data.ferrno == 0: - break - else: - ret.append(f"Close of {closed_fd} is not preceeded by corresponding open") + if closed_fd not in reserved_fds: + for pred_node in nx.dfs_preorder_nodes(provlog_reverse, node): + pred_op = prov_log_get_node(provlog, *pred_node) + if isinstance(pred_op.data, OpenOp) and pred_op.data.fd == closed_fd and op.data.ferrno == 0: + break + else: + ret.append(f"Close of {closed_fd} is not preceeded by corresponding open") elif isinstance(op.data, WaitOp) and op.data.ferrno == 0: for pred_node in nx.dfs_preorder_nodes(provlog_reverse, node): - pred_op = prov_log_get_node(provlog, *pred_node) if pred_node[-1] != -1 else None + pred_op = prov_log_get_node(provlog, *pred_node) + pid1, eid1, tid1, opid1 = pred_node if isinstance(pred_op.data, CloneOp) and pred_op.data.task_type == op.data.task_type and pred_op.data.task_id == op.data.task_type and op.data.ferrno == 0: break else: - ret.append(f"Close of {closed_fd} is not preceeded by corresponding open") + ret.append(f"Close of {closed_fd} in {node} is not preceeded by corresponding open") + elif isinstance(op.data, CloneOp) and op.data.ferrno == 0: + for node1 in process_graph.successors(node): + op1 = prov_log_get_node(provlog, *node1) + if False: + pass + elif op.data.task_type == TaskType.TASK_PID: + if isinstance(op1.data, InitProcessOp): + if op.data.task_id != pid1: + ret.append(f"CloneOp {node} returns {op.data.task_id} but the next op has pid {pid1}") + break + elif op.data.task_type == TaskType.TASK_TID: + if isinstance(op1.data, InitThreadOp): + if op.data.task_id != tid1: + ret.append(f"CloneOp {node} returns {op.data.task_id} but the next op has tid {tid1}") + break + elif op.data.task_type == TaskType.TASK_PTHREAD_ID and op.data.task_id == op1.pthread_id: + break + elif op.data.task_type == TaskType.TASK_ISO_C_THREAD_ID and op.data.task_id == op1.iso_c_thread_id: + break + else: + ret.append(f"Could not find a successor for CloneOp {node} {op.data.task_type} in the target thread") if process_graph.in_degree(node) == 0: if not found_entry: found_entry = True @@ -246,34 +259,15 @@ def validate_hb_graph(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: for (node0, node1) in process_graph.edges: pid0, eid0, tid0, op0 = node0 pid1, eid1, tid1, op1 = node1 - op0 = prov_log_get_node(provlog, *node0) if node0[-1] != -1 else None - op1 = prov_log_get_node(provlog, *node1) if node1[-1] != -1 else None - if op0 is None: + op0 = prov_log_get_node(provlog, *node0) + op1 = prov_log_get_node(provlog, *node1) + if False: pass elif isinstance(op0.data, ExecOp): if eid0 + 1 != eid1: ret.append(f"ExecOp {node0} is followed by {node1}, whose exec epoch id should be {eid0 + 1}") - if op1 is None or not isinstance(op1.data, InitExecEpochOp): - ret.append(f"ExecOp {node0} is followed by {op1}, which is not InitExecEpoch") - elif isinstance(op0.data, CloneOp) and op0.data.ferrno == 0: - if False: - pass - elif op0.data.task_type == TaskType.TASK_PID: - if op1 is None or not isinstance(op1.data, InitProcessOp): - ret.append(f"CloneOp {node0} with TASK_PID is followed by {node1} which is not InitProcessOp") - if op0.data.task_id != pid1: - ret.append(f"CloneOp {node0} returns {op0.data.task_id} but the next op has pid {pid1}") - elif op0.data.task_type == TaskType.TASK_TID: - if op1 is None or not isinstance(op1.data, InitThreadOp): - ret.append(f"CloneOp {node0} with TASK_TID is followed by {node1} which is not InitThreadOp") - if op1 is None or not isinstance(op1.data, InitThreadOp): - ret.append(f"CloneOp {node0} returns {op0.data.task_id} but the next op has tid {tid1}") - elif op0.data.task_type == TaskType.PTHREAD_PID: - if op1 is None or not op1.pthread_id != op0.data.task_id: - ret.append(f"CloneOp {node0} with TASK_PTHREAD is followed by {node1} which has a different pthread_id") - elif op0.data.task_type == TaskType.ISO_C_THREAD_PID: - if op1 is None or not op1.iso_c_thread_id != op0.data.task_id: - ret.append(f"CloneOp {node0} with TASK_ISO_C_THREAD is followed by {node1} which has a different pthread_id") + if not isinstance(op1.data, InitExecEpochOp): + ret.append(f"ExecOp {node0} is followed by {node1}, which is not InitExecEpoch") try: cycle = nx.find_cycle(process_graph) except nx.NetworkXNoCycle: @@ -283,7 +277,18 @@ def validate_hb_graph(provlog: ProvLog, process_graph: nx.DiGraph) -> list[str]: return ret -def digraph_to_pydot_string(process_graph: nx.DiGraph) -> str: +def relax_node(graph: nx.DiGraph, node: typing.Any) -> list[tuple[typing.Any, typing.Any]]: + """Remove node from graph and attach its predecessors to its successors""" + ret = list[tuple[typing.Any, typing.Any]]() + for predecessor in graph.predecessors: + for successor in graph.successors: + ret.append((predecessor, successor)) + graph.add_edge(predecessor, successor) + graph.remove_node(node) + return ret + + +def digraph_to_pydot_string(prov_log: ProvLog, process_graph: nx.DiGraph) -> str: label_color_map = { EdgeLabels.EXEC: 'yellow', @@ -292,8 +297,23 @@ def digraph_to_pydot_string(process_graph: nx.DiGraph) -> str: } for node0, node1, attrs in process_graph.edges(data=True): - label:EdgeLabels = attrs['label'] + label: EdgeLabels = attrs['label'] process_graph[node0][node1]['color'] = label_color_map[label] + del attrs['label'] + + for node, data in process_graph.nodes(data=True): + pid, exid, tid, op_no = node + op = prov_log_get_node(prov_log, *node) + typ = type(op.data).__name__ + data["label"] = f"{pid}.{exid}.{tid}.{op_no}\n{typ}" + if False: + pass + elif isinstance(op.data, OpenOp): + data["label"] += f"\nopen{op.data.path.path} (fd={op.data.fd})" + elif isinstance(op.data, CloseOp): + fds = list(range(op.data.low_fd, op.data.high_fd + 1)) + data["label"] += "\nclose" + " ".join(map(str, fds)) + pydot_graph = nx.drawing.nx_pydot.to_pydot(process_graph) dot_string = typing.cast(str, pydot_graph.to_string()) return dot_string @@ -307,4 +327,4 @@ def construct_process_graph(process_tree_prov_log: ProvLog) -> str: """ process_graph = provlog_to_digraph(process_tree_prov_log) - return digraph_to_pydot_string(process_graph) + return digraph_to_pydot_string(process_tree_prov_log, process_graph) diff --git a/probe_src/probe_py/cli.py b/probe_src/probe_py/cli.py index 0a4b3d30..d35c5e9a 100644 --- a/probe_src/probe_py/cli.py +++ b/probe_src/probe_py/cli.py @@ -13,6 +13,9 @@ from . import analysis from . import util +rich.traceback.install(show_locals=False) + + project_root = pathlib.Path(__file__).resolve().parent.parent A = typing_extensions.Annotated @@ -116,10 +119,11 @@ def process_graph( console = rich.console.Console(file=sys.stderr) for warning in analysis.validate_provlog(prov_log): console.print(warning, style="red") + rich.traceback.install(show_locals=False) process_graph = analysis.provlog_to_digraph(prov_log) for warning in analysis.validate_hb_graph(prov_log, process_graph): console.print(warning, style="red") - print(analysis.digraph_to_pydot_string(process_graph)) + print(analysis.digraph_to_pydot_string(prov_log, process_graph)) @app.command() @@ -135,11 +139,14 @@ def dump( probe_log_tar_obj = tarfile.open(input, "r") processes_prov_log = parse_probe_log.parse_probe_log_tar(probe_log_tar_obj) probe_log_tar_obj.close() - for process in processes_prov_log.processes.values(): - for exec_epoch in process.exec_epochs.values(): - for thread in exec_epoch.threads.values(): - for op in thread.ops: - print(op.data) + for pid, process in processes_prov_log.processes.items(): + print(pid) + for exid, exec_epoch in process.exec_epochs.items(): + print(pid, exid) + for tid, thread in exec_epoch.threads.items(): + print(pid, exid, tid) + for op_no, op in enumerate(thread.ops): + print(pid, exid, tid, op_no, op.data) print() if __name__ == "__main__": diff --git a/probe_src/probe_py/test_probe.py b/probe_src/probe_py/test_probe.py index 54dd5357..10bcb038 100644 --- a/probe_src/probe_py/test_probe.py +++ b/probe_src/probe_py/test_probe.py @@ -203,7 +203,7 @@ def check_pthread_graph( if(isinstance(curr_node_op.data,parse_probe_log.CloneOp)): next_op = get_op_from_provlog(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3]) if edge[1][2] != curr_tid: - assert next_op!=None + assert next_op is not None assert curr_node_op.data.task_id == next_op.pthread_id continue check_wait.append(curr_node_op.data.task_id) diff --git a/probe_src/tests/test_suite.py b/probe_src/tests/test_suite.py deleted file mode 100644 index 104913b0..00000000 --- a/probe_src/tests/test_suite.py +++ /dev/null @@ -1,300 +0,0 @@ -from __future__ import annotations -import dataclasses -import shlex -import os -import pathlib -import typing -import tempfile -import subprocess -import shutil - - -pwd = pathlib.Path().resolve() -AT_FDCWD = -100 -_head = shutil.which("head") -assert _head -head = pathlib.Path(_head) -NoneType = type(None) - - -class Sentinel: - def __repr__(self) -> str: - return self.__class__.__name__ + "()" - - -class MatchAny(Sentinel): - pass - - -class MatchPositive(Sentinel): - pass - - -@dataclasses.dataclass -class Op: - op_code: str # not worth making an enum yet - fd: int | None - dirfd: int | None - mode: int | None - inode: int | None - device_major: int | None - device_minor: int | None - path: pathlib.Path | None - - @staticmethod - def parse_prov_log_dir(prov_log_path: pathlib.Path, verbose: bool = False) -> tuple[Op, ...]: - output = list[Op]() - for child in sorted(prov_log_path.iterdir()): - if verbose: - print(child) - output.extend(Op.parse_prov_log_file(child, verbose)) - return tuple(output) - - @staticmethod - def parse_prov_log_file(prov_log_file: pathlib.Path, verbose: bool) -> tuple[Op, ...]: - output = list[Op]() - for line in prov_log_file.read_text().split("\0"): - if line.startswith("\n"): - line = line[1:] - if line: - op_code, fd, dirfd, mode, inode, device_major, device_minor, path = line.split(" ") - op = Op( - op_code, - int(fd) if fd != "-20" else None, - int(dirfd) if dirfd != "-20" else None, - int(mode) if mode != "-20" else None, - int(inode) if inode != "-20" else None, - int(device_major) if device_major != "-20" else None, - int(device_minor) if device_minor != "-20" else None, - pathlib.Path(path) if path else None, - ) - if verbose: - print(op.op_code, end=" ") - if op.fd: - print(op.fd, end=" ") - if op.path: - print(op.path, end=" ") - print() - output.append(op) - return tuple(output) - - -@dataclasses.dataclass -class OpTemplate: - op_code: str # not worth making an enum yet - fd: int | None | MatchPositive - dirfd: int | None - mode: int | None - inode: int | None | MatchPositive - device_major: int | None | MatchAny - device_minor: int | None | MatchAny - path: pathlib.Path | None - optional: bool = False - - @staticmethod - def match(actual_op: Op, expected_op: OpTemplate) -> tuple[None, None, None] | tuple[str, typing.Any, typing.Any]: - if actual_op.op_code != expected_op.op_code: - return ("op_code", actual_op.op_code, expected_op.op_code) - elif (isinstance(expected_op.fd, (int, NoneType)) and actual_op.fd != expected_op.fd) or (isinstance(expected_op.fd, MatchPositive) and (not isinstance(actual_op.fd, int) or actual_op.fd <= 0)): - return ("fd", actual_op.fd, expected_op.fd) - elif actual_op.dirfd != expected_op.dirfd: - return ("dirfd", actual_op.dirfd, expected_op.dirfd) - elif (isinstance(expected_op.inode, (int, NoneType)) and actual_op.inode != expected_op.inode) or (isinstance(expected_op.inode, MatchPositive) and (not isinstance(actual_op.inode, int) or actual_op.inode <= 0)): - return ("inode", actual_op.inode, expected_op.inode) - elif (isinstance(expected_op.device_major, (int, NoneType)) and actual_op.device_major != expected_op.device_major): - return ("device_major", actual_op.device_major, expected_op.device_major) - elif (isinstance(expected_op.device_minor, (int, NoneType)) and actual_op.device_minor != expected_op.device_minor): - return ("device_minor", actual_op.device_minor, expected_op.device_minor) - elif actual_op.path != expected_op.path: - return ("path", actual_op.path, expected_op.path) - else: - return (None, None, None) - - @staticmethod - def match_list( - actual_ops: tuple[Op, ...], - expected_ops: tuple[OpTemplate | MatchAny, ...], - verbose: bool = False, - ) -> tuple[int, int, str, typing.Any, typing.Any] | tuple[None, None, None, None, None]: - actual_op_index = 0 - expected_op_index = 0 - while actual_op_index < len(actual_ops): - actual_op = actual_ops[actual_op_index] - if expected_op_index >= len(expected_ops): - return (actual_op_index, expected_op_index, "exhausted expected ops", None, None) - expected_op = expected_ops[expected_op_index] - if isinstance(expected_op, MatchAny): - expected_op_index += 1 - actual_op_index += 1 - if verbose: - print(f"expected_ops[{expected_op_index}] ~ actual_ops[{actual_op_index}] (MatchAny)") - elif isinstance(expected_op, OpTemplate): - prop, actual_val, expected_val = OpTemplate.match(actual_op, expected_op) - if prop is None: - # Match - if verbose: - print(f"expected_ops[{expected_op_index}] ~ actual_ops[{actual_op_index}] ({actual_op.op_code})") - expected_op_index += 1 - actual_op_index += 1 - elif expected_op.optional: - # No match, but expected was optional - if verbose: - print(f"expected_ops[{expected_op_index}] (optional {expected_op.op_code}) ~ actual_ops[{actual_op_index}] ({actual_op.op_code})") - expected_op_index += 1 - else: - # No match, not optional; must fail - if verbose: - print(f"expected_ops[{expected_op_index}] ({expected_op.op_code}) !~ actual_ops[{actual_op_index}] ({actual_op.op_code})") - return (actual_op_index, expected_op_index, prop, actual_val, expected_val) - else: - raise TypeError() - return (None, None, None, None, None) - - @staticmethod - def assert_match_list( - actual_ops: tuple[Op, ...], - expected_ops: tuple[OpTemplate | MatchAny, ...], - ) -> None: - actual_op_index, expected_op_index, prop, actual, expected = OpTemplate.match_list(actual_ops, expected_ops) - if prop: - for i, op in enumerate(actual_ops): - print(i, op) - for i, op2 in enumerate(expected_ops): - print(i, op2) - assert prop is None, (actual_op_index, expected_op_index, prop, actual, expected) - -def run_command_with_prov( - cmd: tuple[str, ...], - verbose: bool = False, -) -> tuple[Op, ...]: - with tempfile.TemporaryDirectory() as _prov_log_dir: - prov_log_dir = pathlib.Path(_prov_log_dir) - print(f"\n$ LD_PRELOAD={pwd}/libprov.so " + shlex.join(cmd)) - proc = subprocess.run( - cmd, - env={ - **os.environ, - "LD_PRELOAD": f"{pwd}/libprov.so", - "PROV_LOG_DIR": str(prov_log_dir), - "PROV_LOG_VERBOSE": "1" if verbose else "", - }, - check=True, - capture_output=False, - ) - print(proc.returncode) - print() - return Op.parse_prov_log_dir(prov_log_dir) - - -def close_op(fd: int | MatchPositive) -> OpTemplate: - return OpTemplate( - op_code='Close', - fd=fd, - dirfd=None, - inode=None, - mode=None, - device_major=None, - device_minor=None, - path=None, - ) - -def open_ops(op_code: str, dirfd: int, fd: int | MatchPositive, path: pathlib.Path) -> tuple[OpTemplate, ...]: - return ( - OpTemplate( - op_code='MetadataRead', - fd=None, - dirfd=dirfd, - inode=MatchPositive(), - mode=None, - device_major=MatchAny(), - device_minor=MatchAny(), - path=path, - ), - OpTemplate( - op_code=op_code, - fd=fd, - dirfd=dirfd, - inode=MatchPositive(), - mode=None, - device_major=MatchAny(), - device_minor=MatchAny(), - path=path, - ), - ) - - -def optional_op(op: OpTemplate) -> OpTemplate: - return dataclasses.replace(op, optional=True) - - -def initial_ops() -> tuple[OpTemplate, ...]: - return ( - # isatty - *open_ops("OpenReadWrite", AT_FDCWD, MatchPositive(), pathlib.Path("/dev/tty")), - close_op(MatchPositive()), - *map(optional_op, open_ops("OpenRead", AT_FDCWD, MatchPositive(), pathlib.Path("/lib/terminfo/x/xterm"))), - optional_op(close_op(MatchPositive())), - ) - - -STDOUT_FILENO = 1 -STDERR_FILENO = 2 - -def closing_ops() -> tuple[OpTemplate, ...]: - return ( - optional_op(close_op(STDOUT_FILENO)), - optional_op(close_op(STDERR_FILENO)), - ) - - -def test_head() -> None: - OpTemplate.assert_match_list( - run_command_with_prov(("head", "--bytes=5", "flake.nix")), - ( - *open_ops("OpenRead", AT_FDCWD, MatchPositive(), pathlib.Path('flake.nix')), - close_op(MatchPositive()), - *closing_ops(), - ), - ) - - -def test_shell() -> None: - OpTemplate.assert_match_list( - run_command_with_prov(("bash", "-c", "head --bytes=5 flake.nix")), - ( - *initial_ops(), - OpTemplate(op_code='Execute', fd=None, dirfd=AT_FDCWD, inode=MatchPositive(), mode=None, device_major=MatchAny(), device_minor=MatchAny(), path=head), - *open_ops("OpenRead", AT_FDCWD, MatchPositive(), pathlib.Path('flake.nix')), - close_op(MatchPositive()), - *closing_ops(), - ), - ) - - -def test_chdir() -> None: - with tempfile.TemporaryDirectory() as _tmpdir: - tmpdir = pathlib.Path(_tmpdir) - file = tmpdir / "flake.nix" - file.write_text("hello\n") - OpTemplate.assert_match_list( - run_command_with_prov(("bash", "-c", f"head --bytes=5 flake.nix; cd {tmpdir!s}; head --bytes=5 flake.nix")), - ( - *initial_ops(), - OpTemplate(op_code='Execute', fd=None, dirfd=AT_FDCWD, inode=MatchPositive(), mode=None, device_major=MatchAny(), device_minor=MatchAny(), path=head), - *open_ops("OpenRead", AT_FDCWD, MatchPositive(), pathlib.Path('flake.nix')), - close_op(MatchPositive()), - *closing_ops(), - OpTemplate(op_code='Chdir', fd=None, dirfd=AT_FDCWD, inode=MatchPositive(), mode=None, device_major=MatchAny(), device_minor=MatchAny(), path=tmpdir), - OpTemplate(op_code='Execute', fd=None, dirfd=AT_FDCWD, inode=MatchPositive(), mode=None, device_major=MatchAny(), device_minor=MatchAny(), path=head), - *open_ops("OpenRead", AT_FDCWD, MatchPositive(), pathlib.Path('flake.nix')), - close_op(MatchPositive()), - *closing_ops(), - ), - ) - - -def test_shell2() -> None: - run_command_with_prov(("bash", "-c", "python -c 'print(4)'; head --bytes=5 flake.nix")) - -# TODO: test command not found -# TODO: test empty path