Skip to content

Commit

Permalink
changed process graph to accomodate pthreads (#30)
Browse files Browse the repository at this point in the history
* feat: changed process graph to accomodate pthreads
for every cloneOp pthread with the pthread_id is found and an edge is created
for every waitOp last op with the corresponding pthread_id is found and an edge is created with the last op as the source

* fix: changed check for main thread to use the feature tid=pid instead of tid=0

* fix: Added support to ensure tests check whether there exists an operation with pthread_id the same as cloneOp

---------

Co-authored-by: Sam Grayson <sam@samgrayson.me>
  • Loading branch information
Shofiya2003 and charmoniumQ authored Jul 22, 2024
1 parent da17dc4 commit 8a98683
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
37 changes: 33 additions & 4 deletions probe_src/probe_py/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def provlog_to_digraph(process_tree_prov_log: ProvLog) -> nx.DiGraph:
proc_to_ops = dict[tuple[int, int, int], list[Node]]()
last_exec_epoch = dict[int, int]()
for pid, process in process_tree_prov_log.processes.items():

for exec_epoch_no, exec_epoch in process.exec_epochs.items():
# to find the last executing epoch of the process
last_exec_epoch[pid] = max(last_exec_epoch.get(pid, 0), exec_epoch_no)
Expand Down Expand Up @@ -125,27 +126,55 @@ def last(pid: int, exid: int, tid: int) -> Node:
return exit_node
else:
return proc_to_ops[(pid, exid, tid)][-1]

def get_first_pthread(pid, exid, target_pthread_id):
for kthread_id, thread in process_tree_prov_log.processes[pid].exec_epochs[exid].threads.items():
op_index = 0
for op in thread.ops:
if op.pthread_id == target_pthread_id:
return (pid, exid, kthread_id, op_index)
op_index+=1
return (pid, -1, target_pthread_id, -1)

def get_last_pthread(pid, exid, target_pthread_id):
for kthread_id, thread in process_tree_prov_log.processes[pid].exec_epochs[exid].threads.items():
op_index = len(thread.ops) - 1
ops = thread.ops
while op_index >= 0:
op = ops[op_index]
if op.pthread_id == target_pthread_id:
return (pid, exid, kthread_id, op_index)
op_index-=1
return (pid, -1, target_pthread_id, -1)

# Hook up forks/joins
for node in list(nodes):
pid, exid, tid, op_index = node
op = process_tree_prov_log.processes[pid].exec_epochs[exid].threads[tid].ops[op_index].data
global target
if False:
pass
elif isinstance(op, CloneOp) and op.data.ferrno == 0:
if op.flags & CLONE_THREAD:
if op.task_type == TaskType.TASK_PID:
# Spawning a thread links to the current PID and exec epoch
target = (pid, exid, op.task_id)
elif op.task_type == TaskType.TASK_PTHREAD:
target_pthread_id = op.task_id
dest = get_first_pthread(pid, exid, target_pthread_id)
fork_join_edges.append((node, dest))
continue
else:
# New process always links to exec epoch 0 and main thread
# THe TID of the main thread is the same as the PID
target = (op.task_id, 0, op.task_id)
exec_edges.append((node, first(*target)))
elif isinstance(op, WaitOp) and op.ferrno == 0 and op.task_id > 0:
# Always wait for main thread of the last exec epoch
if op.ferrno == 0:
if op.ferrno == 0 and (op.task_type == TaskType.TASK_PID or op.task_type == TaskType.TASK_TID):
target = (op.task_id, last_exec_epoch.get(op.task_id, 0), op.task_id)
fork_join_edges.append((last(*target), node))
elif op.ferrno == 0 and op.task_type == TaskType.TASK_PTHREAD:
fork_join_edges.append((get_last_pthread(pid, exid, op.task_id), node))
elif isinstance(op, ExecOp):
# Exec brings same pid, incremented exid, and main thread
target = pid, exid + 1, pid
Expand All @@ -155,8 +184,8 @@ def last(pid: int, exid: int, tid: int) -> Node:
for pid, process in process_tree_prov_log.processes.items():
for exec_epoch_no, exec_epoch in process.exec_epochs.items():
for tid, thread in exec_epoch.threads.items():
if tid != 0:
fork_join_edges.append((last(pid, exec_epoch_no, tid), last(pid, exec_epoch_no, 0)))
if tid != pid:
fork_join_edges.append((last(pid, exec_epoch_no, tid), last(pid, exec_epoch_no, pid)))

process_graph = nx.DiGraph()
for node in nodes:
Expand Down
1 change: 1 addition & 0 deletions probe_src/probe_py/parse_probe_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
OpCode: enum.EnumType = py_types[("enum", "OpCode")]
TaskType: enum.EnumType = py_types[("enum", "TaskType")]


@dataclasses.dataclass
class ThreadProvLog:
tid: int
Expand Down
13 changes: 2 additions & 11 deletions probe_src/probe_py/test_probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ def check_pthread_graph(
for edge in dfs_edges:
curr_pid, curr_epoch_idx, curr_tid, curr_op_idx = edge[0]
curr_node_op = get_op_from_provlog(process_tree_prov_log, curr_pid, curr_epoch_idx, curr_tid, curr_op_idx)
print(curr_node_op.data)
if(isinstance(curr_node_op.data,parse_probe_log.CloneOp)):
next_op = get_op_from_provlog(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3])
if edge[1][2] != curr_tid:
assert next_op!=None
assert curr_node_op.data.task_id == next_op.pthread_id
continue
check_wait.append(curr_node_op.data.task_id)
Expand All @@ -219,29 +219,19 @@ def check_pthread_graph(
elif(isinstance(curr_node_op.data,parse_probe_log.OpenOp)):
file_descriptors.append(curr_node_op.data.fd)
path = curr_node_op.data.path.path
# print(curr_node_op.data)
# print(edge)
# next_op = get_op_from_provlog(process_tree_prov_log, edge[1][0], edge[1][1], edge[1][2], edge[1][3])
# print(next_op.data)
# print(file_descriptors)
# print(">>>>>>>>>>>>>>>>>>>>>")
if path in paths:
if len(process_file_map.keys())!=0 and parent_pthread_id!=curr_node_op.pthread_id:
# ensure the right cloned process has OpenOp for the path
assert process_file_map[path] == curr_node_op.pthread_id
elif(isinstance(curr_node_op.data, parse_probe_log.CloseOp)):
fd = curr_node_op.data.low_fd
print(curr_node_op.data)
if fd in reserved_file_descriptors:
continue
if curr_node_op.data.ferrno != 0:
continue
assert fd in file_descriptors
if fd in file_descriptors:
file_descriptors.remove(fd)

print(file_descriptors)
print("after close")

# check number of cloneOps
assert current_child_process == total_pthreads
Expand All @@ -250,6 +240,7 @@ def check_pthread_graph(
# for every file there is a pthread
assert len(process_file_map.items()) == len(paths)
assert len(file_descriptors) == 0
# add a check for when the pthread id is not found in the threads

def get_op_from_provlog(
process_tree_prov_log: parse_probe_log.ProvLog,
Expand Down

0 comments on commit 8a98683

Please sign in to comment.