Skip to content

Commit

Permalink
Merge pull request #8 from quarkslab/fix-zmq-multithreading
Browse files Browse the repository at this point in the history
Fix zmq multithreading
  • Loading branch information
cnheitman authored Jul 17, 2023
2 parents 58706a7 + 2053532 commit 1fd6c18
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions pastisbroker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections import Counter
import datetime
import random
import queue

# Third-party imports
import psutil
Expand Down Expand Up @@ -115,6 +116,8 @@ def __init__(self, workspace: PathLike,
# Proxy feature
self._proxy = None
self._proxy_cli = None
self._proxy_start_signal = False
self._proxy_seed_queue = queue.Queue()

# Coverage + filtering feature
self._coverage_manager = None
Expand Down Expand Up @@ -623,6 +626,20 @@ def run(self, timeout: int = None):
for item in self._coverage_manager.iter_granted_inputs():
self.seed_granted(item.fuzzer_id, item.seed_status, item.content)

# Check if we received the start signal from the proxy-master
if self._proxy_start_signal:
self._proxy_start_signal = False
self.start_pending_clients()

# Check if there are seed coming from the proxy-master to forward to clients
if not self._proxy_seed_queue.empty():
try:
while True:
origin, typ, seed = self._proxy_seed_queue.get_nowait()
self.send_seed_to_all_others(origin, typ, seed)
except queue.Empty:
pass

if self._stop:
logging.info("broker terminate")
break
Expand Down Expand Up @@ -759,8 +776,8 @@ def _proxy_start_received(self, fname: str, binary: bytes, engine: FuzzingEngine
# FIXME: Use parameters received
logging.info("[PROXY] start received !")
self._running = True
if self._running:
self.start_pending_clients()
# if self._running:
# self.start_pending_clients()

def _proxy_seed_received(self, typ: SeedType, seed: bytes):
# Forward the seed to underlying clients
Expand All @@ -772,7 +789,8 @@ def _proxy_seed_received(self, typ: SeedType, seed: bytes):
self._init_seed_pool[seed] = typ # also consider it as initial corpus

# Forward it to all clients
self.send_seed_to_all_others(b"PROXY", typ, seed)
self._proxy_seed_queue.put((b"PROXY", typ, seed))
# self.send_seed_to_all_others(b"PROXY", typ, seed)

def _proxy_stop_received(self):
logging.info(f"[PROXY] stop received!")
Expand Down

0 comments on commit 1fd6c18

Please sign in to comment.