diff --git a/probe_src/probe_py/analysis.py b/probe_src/probe_py/analysis.py index 24afa813..04a4bb3c 100644 --- a/probe_src/probe_py/analysis.py +++ b/probe_src/probe_py/analysis.py @@ -82,6 +82,7 @@ def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph: proc_to_ops = dict[tuple[int, int, int], list[Node]]() last_exec_epoch = dict[int, int]() for pid, process in process_tree_prov_log.processes.items(): + for exec_epoch_no, exec_epoch in process.exec_epochs.items(): # to find the last executing epoch of the process last_exec_epoch[pid] = max(last_exec_epoch.get(pid, 0), exec_epoch_no) @@ -125,17 +126,43 @@ def last(pid: int, exid: int, tid: int) -> Node: return exit_node else: return proc_to_ops[(pid, exid, tid)][-1] + + def get_first_pthread(pid, exid, target_pthread_id): + for kthread_id, thread in process_tree_prov_log.processes[pid].exec_epochs[exid].threads.items(): + op_index = 0 + for op in thread.ops: + if op.pthread_id == target_pthread_id: + return (pid, exid, kthread_id, op_index) + op_index+=1 + return (pid, -1, target_pthread_id, -1) + + def get_last_pthread(pid, exid, target_pthread_id): + for kthread_id, thread in process_tree_prov_log.processes[pid].exec_epochs[exid].threads.items(): + op_index = len(thread.ops) - 1 + ops = thread.ops + while op_index >= 0: + op = ops[op_index] + if op.pthread_id == target_pthread_id: + return (pid, exid, kthread_id, op_index) + op_index-=1 + return (pid, -1, target_pthread_id, -1) # Hook up forks/joins for node in list(nodes): pid, exid, tid, op_index = node op = process_tree_prov_log.processes[pid].exec_epochs[exid].threads[tid].ops[op_index].data + global target if False: pass elif isinstance(op, CloneOp) and op.data.ferrno == 0: - if op.flags & CLONE_THREAD: + if op.task_type == TaskType.TASK_PID: # Spawning a thread links to the current PID and exec epoch target = (pid, exid, op.task_id) + elif op.task_type == TaskType.TASK_PTHREAD: + target_pthread_id = op.task_id + dest = get_first_pthread(pid, exid, target_pthread_id) + fork_join_edges.append((node, dest)) + continue else: # New process always links to exec epoch 0 and main thread # THe TID of the main thread is the same as the PID @@ -143,9 +170,11 @@ def last(pid: int, exid: int, tid: int) -> Node: exec_edges.append((node, first(*target))) elif isinstance(op, WaitOp) and op.ferrno == 0 and op.task_id > 0: # Always wait for main thread of the last exec epoch - if op.ferrno == 0: + if op.ferrno == 0 and (op.task_type == TaskType.TASK_PID or op.task_type == TaskType.TASK_TID): target = (op.task_id, last_exec_epoch.get(op.task_id, 0), op.task_id) fork_join_edges.append((last(*target), node)) + elif op.ferrno == 0 and op.task_type == TaskType.TASK_PTHREAD: + fork_join_edges.append((get_last_pthread(pid, exid, op.task_id), node)) elif isinstance(op, ExecOp): # Exec brings same pid, incremented exid, and main thread target = pid, exid + 1, pid @@ -155,8 +184,8 @@ def last(pid: int, exid: int, tid: int) -> Node: for pid, process in process_tree_prov_log.processes.items(): for exec_epoch_no, exec_epoch in process.exec_epochs.items(): for tid, thread in exec_epoch.threads.items(): - if tid != 0: - fork_join_edges.append((last(pid, exec_epoch_no, tid), last(pid, exec_epoch_no, 0))) + if tid != pid: + fork_join_edges.append((last(pid, exec_epoch_no, tid), last(pid, exec_epoch_no, pid))) process_graph = nx.DiGraph() for node in nodes: diff --git a/probe_src/probe_py/parse_probe_log.py b/probe_src/probe_py/parse_probe_log.py index 035fa865..d062372a 100644 --- a/probe_src/probe_py/parse_probe_log.py +++ b/probe_src/probe_py/parse_probe_log.py @@ -58,6 +58,7 @@ OpCode: enum.EnumType = py_types[("enum", "OpCode")] TaskType: enum.EnumType = py_types[("enum", "TaskType")] + @dataclasses.dataclass class ThreadProvLog: tid: int diff --git a/probe_src/probe_py/test_probe.py b/probe_src/probe_py/test_probe.py index e0d49558..54dd5357 100644 --- a/probe_src/probe_py/test_probe.py +++ b/probe_src/probe_py/test_probe.py @@ -200,10 +200,10 @@ def check_pthread_graph( for edge in dfs_edges: curr_pid, curr_epoch_idx, curr_tid, curr_op_idx = edge[0] curr_node_op = get_op_from_provlog(process_tree_prov_log, curr_pid, curr_epoch_idx, curr_tid, curr_op_idx) - print(curr_node_op.data) if(isinstance(curr_node_op.data,parse_probe_log.CloneOp)): next_op = get_op_from_provlog(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3]) if edge[1][2] != curr_tid: + assert next_op!=None assert curr_node_op.data.task_id == next_op.pthread_id continue check_wait.append(curr_node_op.data.task_id) @@ -219,19 +219,12 @@ def check_pthread_graph( elif(isinstance(curr_node_op.data,parse_probe_log.OpenOp)): file_descriptors.append(curr_node_op.data.fd) path = curr_node_op.data.path.path - # print(curr_node_op.data) - # print(edge) - # next_op = get_op_from_provlog(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3]) - # print(next_op.data) - # print(file_descriptors) - # print(">>>>>>>>>>>>>>>>>>>>>") if path in paths: if len(process_file_map.keys())!=0 and parent_pthread_id!=curr_node_op.pthread_id: # ensure the right cloned process has OpenOp for the path assert process_file_map[path] == curr_node_op.pthread_id elif(isinstance(curr_node_op.data, parse_probe_log.CloseOp)): fd = curr_node_op.data.low_fd - print(curr_node_op.data) if fd in reserved_file_descriptors: continue if curr_node_op.data.ferrno != 0: @@ -239,9 +232,6 @@ def check_pthread_graph( assert fd in file_descriptors if fd in file_descriptors: file_descriptors.remove(fd) - - print(file_descriptors) - print("after close") # check number of cloneOps assert current_child_process == total_pthreads @@ -250,6 +240,7 @@ def check_pthread_graph( # for every file there is a pthread assert len(process_file_map.items()) == len(paths) assert len(file_descriptors) == 0 + # add a check for when the pthread id is not found in the threads def get_op_from_provlog( process_tree_prov_log: parse_probe_log.ProvLog,