Skip to content

Commit

Permalink
add option for replay timeout +fix proxy issues
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinDavid committed Jul 28, 2023
1 parent d790def commit fa263a0
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 40 deletions.
6 changes: 4 additions & 2 deletions bin/pastis-benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,13 @@ def showmap(bins: str):
@click.option('--filter-inputs', type=bool, is_flag=True, default=False, help="Filter inputs that do not generate coverage", show_default=True)
@click.option('--stream', type=bool, is_flag=True, default=False, help="Stream input and coverage info in the given file", show_default=True)
@click.option('--replay-threads', type=int, default=4, help="number of threads to use for input replay", show_default=True)
@click.option('--replay-timeout', type=int, default=60, help="Timeout for seed replay", show_default=True)
@click.option('--proxy', type=str, default="", help="Run the broker as a proxy to another broker: pymodule@ip:port")
@click.argument('pargs', nargs=-1)
def run(workspace: str, bins: str, seeds: str, mode: str, injloc: str, aflpp: bool, hfuzz: bool, triton: bool,
debug: bool, timeout: Optional[int], port: int, hfuzz_path: str, hfuzz_threads: int, spawn: bool,
allow_remote: bool, probe: Tuple[str], skip_cpufreq: bool, mem_threshold: int, start_quorum: int, proxy: str,
filter_inputs: bool, stream: bool, replay_threads: int, pargs: Tuple[str]):
filter_inputs: bool, stream: bool, replay_threads: int, replay_timeout: int, pargs: Tuple[str]):

configure_logging(logging.DEBUG if debug else logging.INFO, "%(asctime)s %(name)s [%(levelname)s] %(message)s")

Expand All @@ -183,7 +184,8 @@ def run(workspace: str, bins: str, seeds: str, mode: str, injloc: str, aflpp: bo
start_quorum,
filter_inputs,
stream,
replay_threads)
replay_threads,
replay_timeout)

if proxy: # proxy format should be: IP:port@py_module
try:
Expand Down
28 changes: 20 additions & 8 deletions engines/pastis-triton/pastisdse/pastisdse.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import time
import logging
from hashlib import md5
from pathlib import Path
import threading
import platform
Expand All @@ -24,6 +25,17 @@
from tritondse.trace import QBDITrace, TraceException
from tritondse.worklist import FreshSeedPrioritizerWorklist, WorklistAddressToSet

def to_h(seed: Seed) -> str:
if seed.is_composite():
if PastisDSE.INPUT_FILE_NAME in seed.content.files:
return md5(seed.content.files[PastisDSE.INPUT_FILE_NAME]).hexdigest()
elif "stdin" in seed.content.files:
return md5(seed.content.files["stdin"]).hexdigest()
else:
raise NameError("can't find main payload in Seed")
else:
return md5(seed.content).hexdigest()


class PastisDSE(object):

Expand Down Expand Up @@ -492,11 +504,11 @@ def seed_received(self, typ: SeedType, seed: bytes):
seed = self._get_seed(seed)

if seed in self._seed_received:
logging.warning(f"receiving seed already known: {seed.hash} (dropped)")
logging.warning(f"receiving seed already known: {to_h(seed)} (dropped)")
return
else:
self._seed_queue.put((seed, typ))
logging.info(f"seed received {seed.hash} (pool: {self._seed_queue.qsize()})")
logging.info(f"seed received {to_h(seed)} (pool: {self._seed_queue.qsize()})")


def _process_seed_received(self, typ: SeedType, seed: Seed):
Expand All @@ -515,7 +527,7 @@ def _process_seed_received(self, typ: SeedType, seed: Seed):
else: # Try running the seed to know whether to keep it
# NOTE: re-run the seed regardless of its status
coverage = None
logging.info(f"process seed received {seed.hash} (pool: {self._seed_queue.qsize()})")
logging.info(f"process seed received {to_h(seed)} (pool: {self._seed_queue.qsize()})")

data = seed.content.files[self.INPUT_FILE_NAME] if seed.is_composite() else seed.bytes()
self.replay_seed_file.write_bytes(data)
Expand Down Expand Up @@ -555,7 +567,7 @@ def _process_seed_received(self, typ: SeedType, seed: Seed):
logging.warning('There was an error while trying to re-run the seed')

if not coverage:
logging.warning(f"coverage not found after replaying: {seed.hash} [{typ.name}] (add it anyway)")
logging.warning(f"coverage not found after replaying: {to_h(seed)} [{typ.name}] (add it anyway)")
# Add the seed anyway, if it was not possible to re-run the seed.
# TODO Set seed.coverage_objectives as "empty" (use ellipsis
# object). Modify WorklistAddressToSet to support it.
Expand All @@ -564,15 +576,15 @@ def _process_seed_received(self, typ: SeedType, seed: Seed):
else:
# Check whether the seed improves the current coverage.
if self.dse.coverage.improve_coverage(coverage):
logging.info(f"seed added {seed.hash} [{typ.name}] (coverage merged)")
logging.info(f"seed added {to_h(seed)} [{typ.name}] (coverage merged)")
self.seeds_merged += 1
self.dse.coverage.merge(coverage)
self.dse.seeds_manager.worklist.update_worklist(coverage)

seed.coverage_objectives = self.dse.coverage.new_items_to_cover(coverage)
self.dse.add_input_seed(seed)
else:
logging.info(f"seed archived {seed.hash} [{typ.name}] (NOT merging coverage)")
logging.info(f"seed archived {to_h(seed)} [{typ.name}] (NOT merging coverage)")
self.seeds_rejected += 1
#self.dse.seeds_manager.archive_seed(seed)
# logging.info(f"seed archived {seed.hash} [{typ.name}]")
Expand All @@ -596,7 +608,7 @@ def stop_received(self):
if self.dse:
self.dse.stop_exploration()

self.save_stats() # Save stats
self.save_stats() # Save stats

self._stop = True
# self.agent.stop() # Can't call it here as this function executed from within agent thread
Expand Down Expand Up @@ -635,7 +647,7 @@ def dual_log(self, level: LogLevel, message: str) -> None:
def send_seed_to_broker(self, se: SymbolicExecutor, state: ProcessState, seed: Seed):
if seed not in self._seed_received: # Do not send back a seed that already came from broker
self._sending_count += 1
logging.info(f"Sending new: {seed.hash} [{self._sending_count}]")
logging.info(f"Sending new: {to_h(seed)} [{self._sending_count}]")
bytes = seed.content.files[self.INPUT_FILE_NAME] if seed.is_composite() else seed.content
self.agent.send_seed(SeedType.INPUT, bytes)

Expand Down
52 changes: 35 additions & 17 deletions pastisbroker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def __init__(self, workspace: PathLike,
start_quorum: int = 0,
filter_inputs: bool = False,
stream: bool = False,
replay_threads: int = 4):
replay_threads: int = 4,
replay_timeout: int = 60):
super(PastisBroker, self).__init__()

# Initialize workspace
Expand Down Expand Up @@ -126,7 +127,7 @@ def __init__(self, workspace: PathLike,
if (path := self.find_vanilla_binary()) is not None: # Find an executable suitable for coverage
logging.info(f"Coverage binary: {path}")
stream_file = self.workspace.coverage_history if stream else ""
self._coverage_manager = CoverageManager(replay_threads, filter_inputs, path, self.argv, self.inject, stream_file)
self._coverage_manager = CoverageManager(replay_threads, replay_timeout, filter_inputs, path, self.argv, self.inject, stream_file)
else:
logging.warning("filtering or stream enabled but cannot find vanilla binary")

Expand Down Expand Up @@ -585,7 +586,21 @@ def start(self, running: bool = True):
for seed in self._init_seed_pool.keys(): # Push initial corpus to set baseline coverage
fname = self.mk_input_name("INITIAL", seed)
sp = fname.split("_")
covi = ClientInput(seed, "", f"{sp[0]}_{sp[1]}", sp[2], sp[4], fname, SeedType.INPUT, b"INITIAL", "INITIAL", "GRANTED", "", -1, [])
hash = sp[4].split(".")[0]
covi = ClientInput(
content=seed,
log_time="",
recv_time=f"{sp[0]}_{sp[1]}",
elapsed=sp[2],
hash=hash,
path=fname,
seed_status=SeedType.INPUT,
fuzzer_id=b"INITIAL",
fuzzer_name="INITIAL",
broker_status="GRANTED", # Unless rejected (later)
replay_status="",
replay_time=-1,
new_coverage=[])
self._coverage_manager.push_input(covi)

if self.is_proxied and self._proxy_cli:
Expand Down Expand Up @@ -621,24 +636,26 @@ def run(self, timeout: int = None):
if cli.engine.SHORT_NAME == "TT": # is triton
self.kick_client(cli.netid)

# if inputs are filtered. Get granted inputs and forward them to appropriate clients
if self.filter_inputs:
for item in self._coverage_manager.iter_granted_inputs():
self.seed_granted(item.fuzzer_id, item.seed_status, item.content)

# Check if we received the start signal from the proxy-master
if self._proxy_start_signal:
logging.info("signal received start clients !")
self._proxy_start_signal = False
self.start_pending_clients()

# Check if there are seed coming from the proxy-master to forward to clients
if not self._proxy_seed_queue.empty():
try:
while True:
origin, typ, seed = self._proxy_seed_queue.get_nowait()
self.send_seed_to_all_others(origin, typ, seed)
except queue.Empty:
pass
if self._running: # Perform following checks only if running
# if inputs are filtered. Get granted inputs and forward them to appropriate clients
if self.filter_inputs:
for item in self._coverage_manager.iter_granted_inputs():
self.seed_granted(item.fuzzer_id, item.seed_status, item.content)

# Check if there are seed coming from the proxy-master to forward to clients
if not self._proxy_seed_queue.empty():
try:
while True:
origin, typ, seed = self._proxy_seed_queue.get_nowait()
self.send_seed_to_all_others(origin, typ, seed)
except queue.Empty:
pass

if self._stop:
logging.info("broker terminate")
Expand Down Expand Up @@ -776,12 +793,13 @@ def _proxy_start_received(self, fname: str, binary: bytes, engine: FuzzingEngine
# FIXME: Use parameters received
logging.info("[PROXY] start received !")
self._running = True
self._proxy_start_signal = True
# if self._running:
# self.start_pending_clients()

def _proxy_seed_received(self, typ: SeedType, seed: bytes):
# Forward the seed to underlying clients
logging.info(f"[PROXY] seed {typ.name} received forward to agents")
logging.info(f"[PROXY] receive {md5(seed).hexdigest()} [{typ.name}] (forward it)")

# Save the seed locally
self.write_seed(typ, "PROXY", seed)
Expand Down
29 changes: 16 additions & 13 deletions pastisbroker/coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ class CoverageManager(object):
ARGV_PLACEHOLDER = "@@"
STRATEGY = CoverageStrategy.EDGE

def __init__(self, pool_size: int, filter: bool, program: str, args: list[str], inj_loc: SeedInjectLoc, stream_file: str = ""):
def __init__(self, pool_size: int, replay_timeout: int, filter: bool, program: str, args: list[str], inj_loc: SeedInjectLoc, stream_file: str = ""):
# Base info for replay
self.pool_size = pool_size
self.replay_timeout = replay_timeout
self.filter_enabled = filter
self.program = str(program)
self.args = args
Expand Down Expand Up @@ -86,7 +87,7 @@ def start(self) -> None:
logging.info("Starting coverage manager")

for work_id in range(self.pool_size):
self.pool.apply_async(self.worker, (self.input_queue, self.cov_queue, self.program, self.args, self.inj_loc))
self.pool.apply_async(self.worker, (self.input_queue, self.cov_queue, self.program, self.args, self.inj_loc, self.replay_timeout))

def stop(self) -> None:
self._running = False
Expand Down Expand Up @@ -174,14 +175,17 @@ def coverage_worker(self):
os.unlink(cov_file)
except FileNotFoundError:
if item.seed_status == SeedType.INPUT:
logging.warning(f"seed {item.hash}({item.seed_status}) can't load coverage file (maybe had crashed?)")
pass
# logging.warning(f"seed {item.hash}({item.seed_status}) can't load coverage file (maybe had crashed?)")
else:
logging.info(f"seed {item.hash}({item.seed_status}) cannot get coverage (normal..)")
pass
# logging.info(f"seed {item.hash}({item.seed_status}) cannot get coverage (normal..)")
# Grant input
self.seeds_accepted += 1
self.granted_queue.put(item)
if item.fuzzer_name != "INITIAL": # if not initial corpus add it
self.granted_queue.put(item)

logging.info(f"seed {item.hash} ({item.fuzzer_name}) [replay:{self.mk_rpl_status(item.replay_status)}][status:{self.mk_broker_status(item.broker_status, bool(new_items))}] ({len(new_items)} new edges)")
logging.info(f"seed {item.hash} ({item.fuzzer_name}) [replay:{self.mk_rpl_status(item.replay_status)}][{self.mk_broker_status(item.broker_status, bool(new_items))}][{int(item.replay_time):}s] ({len(new_items)} new edges) (pool:{self.input_queue.qsize()})")
# Regardless if it was a success or not log it
self.add_item_coverage_stream(item)
except queue.Empty:
Expand Down Expand Up @@ -209,7 +213,7 @@ def mk_broker_status(status: str, new_items: bool) -> str:
return mk_color(status, Bcolors.FAIL)

@staticmethod
def worker(input_queue: Queue, cov_queue: Queue, program: str, argv: list[str], seed_inj: SeedInjectLoc) -> None:
def worker(input_queue: Queue, cov_queue: Queue, program: str, argv: list[str], seed_inj: SeedInjectLoc, timeout) -> None:
"""
worker thread that unstack inputs and replay them.
"""
Expand All @@ -236,26 +240,25 @@ def worker(input_queue: Queue, cov_queue: Queue, program: str, argv: list[str],
logging.error(f"seed injection {seed_inj.name} but can't find '@@' on program argv: {argv}: {e}")
continue

t0 = time.time()
try:
# Run the seed
t0 = time.time()
if QBDITrace.run(CoverageManager.STRATEGY,
program,
cur_argv, # argv[1:] if len(argv) > 1 else [],
output_path=str(cov_file),
stdin_file=str(tmpfile) if seed_inj == SeedInjectLoc.STDIN else None,
cwd=Path(program).parent,
timeout=60):
item.replay_time = time.time() - t0
timeout=timeout):
item.replay_status = "SUCCESS"
# logging.info(f"[worker-{pid}] replaying {item.hash} sucessful")
else:
item.replay_status = "FAIL_NO_COV"
logging.warning("Cannot load the coverage file generated (maybe had crashed?)")
# logging.warning("Cannot load the coverage file generated (maybe had crashed?)")
except TraceException:
item.replay_status = "FAIL_TIMEOUT"
logging.warning('Timeout hit, while trying to re-run the seed')

# logging.warning('Timeout hit, while trying to re-run the seed')
item.replay_time = time.time() - t0
# Add it to the coverage queue (even if it failed
cov_queue.put((item, cov_file))
except KeyboardInterrupt:
Expand Down

0 comments on commit fa263a0

Please sign in to comment.