diff --git a/tools/sim/tests/test_metadrive_bridge.py b/tools/sim/tests/test_metadrive_bridge.py index 2c534656bb4c24..2d738bedf4bb15 100755 --- a/tools/sim/tests/test_metadrive_bridge.py +++ b/tools/sim/tests/test_metadrive_bridge.py @@ -1,15 +1,16 @@ #!/usr/bin/env python3 import unittest +import os from openpilot.tools.sim.run_bridge import parse_args from openpilot.tools.sim.bridge.metadrive.metadrive_bridge import MetaDriveBridge from openpilot.tools.sim.tests.test_sim_bridge import TestSimBridgeBase - class TestMetaDriveBridge(TestSimBridgeBase): + drive_time = 60 def create_bridge(self): return MetaDriveBridge(parse_args([])) - if __name__ == "__main__": + TestMetaDriveBridge.drive_time = int(os.getenv("DRIVE_TIME", TestMetaDriveBridge.drive_time)) unittest.main() diff --git a/tools/sim/tests/test_sim_bridge.py b/tools/sim/tests/test_sim_bridge.py index 850fd83baca73a..2d37fbef7cad6d 100644 --- a/tools/sim/tests/test_sim_bridge.py +++ b/tools/sim/tests/test_sim_bridge.py @@ -10,11 +10,13 @@ SIM_DIR = os.path.join(BASEDIR, "tools/sim") + class TestSimBridgeBase(unittest.TestCase): @classmethod def setUpClass(cls): if cls is TestSimBridgeBase: raise unittest.SkipTest("Don't run this base class, run test_metadrive_bridge.py instead") + TestSimBridgeBase.drive_time = cls.drive_time if cls.drive_time else 60 def setUp(self): self.processes = [] @@ -22,18 +24,18 @@ def setUp(self): def test_engage(self): # Startup manager and bridge.py. Check processes are running, then engage and verify. p_manager = subprocess.Popen("./launch_openpilot.sh", cwd=SIM_DIR) + sm = messaging.SubMaster(['controlsState', 'onroadEvents', 'managerState']) self.processes.append(p_manager) - sm = messaging.SubMaster(['controlsState', 'onroadEvents', 'managerState']) q = Queue() bridge = self.create_bridge() bridge.started = Value('b', False) p_bridge = bridge.run(q, retries=10) self.processes.append(p_bridge) + # Wait for bridge to startup max_time_per_step = 60 - # Wait for bridge to startup start_waiting = time.monotonic() while not bridge.started and time.monotonic() < start_waiting + max_time_per_step: time.sleep(0.1) @@ -43,16 +45,14 @@ def test_engage(self): no_car_events_issues_once = False car_event_issues = [] not_running = [] + while time.monotonic() < start_time + max_time_per_step: sm.update() - not_running = [p.name for p in sm['managerState'].processes if not p.running and p.shouldBeRunning] car_event_issues = [event.name for event in sm['onroadEvents'] if any([event.noEntry, event.softDisable, event.immediateDisable])] - if sm.all_alive() and len(car_event_issues) == 0 and len(not_running) == 0: no_car_events_issues_once = True break - self.assertTrue(no_car_events_issues_once, f"Failed because no messages received, or CarEvents '{car_event_issues}' or processes not running '{not_running}'") @@ -62,17 +62,27 @@ def test_engage(self): while time.monotonic() < start_time + max_time_per_step: sm.update() - q.put("cruise_down") # Try engaging - if sm.all_alive() and sm['controlsState'].active: control_active += 1 - if control_active == min_counts_control_active: break - self.assertEqual(min_counts_control_active, control_active, f"Simulator did not engage a minimal of {min_counts_control_active} steps was {control_active}") + # Drive course + q.put("reset") + start_time = time.monotonic() + user_disengage_once = False + disengage_events = ('stockAeb', 'fcw', 'ldw') + + while time.monotonic() < start_time + TestSimBridgeBase.drive_time: + sm.update() + onroadEventNames = [e.name for e in sm['onroadEvents']] + if any(e in onroadEventNames for e in disengage_events): + user_disengage_once = True + break + self.assertFalse(user_disengage_once, "Failed because user has to disengage") + def tearDown(self): print("Test shutting down. CommIssues are acceptable") for p in reversed(self.processes): @@ -84,6 +94,5 @@ def tearDown(self): else: p.join(15) - if __name__ == "__main__": unittest.main()