From f60b0c170a8ab575df03a626721405fa9d8665cf Mon Sep 17 00:00:00 2001 From: "igor udot (horw)" Date: Wed, 26 Jul 2023 16:23:55 +0800 Subject: [PATCH 1/2] refactor: change logic of threading control in CaseTester for MultiDev --- .../pytest_embedded_idf/unity_tester.py | 374 +++++++++++++----- pytest-embedded-idf/tests/test_idf.py | 4 +- 2 files changed, 268 insertions(+), 110 deletions(-) diff --git a/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py b/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py index 9783aae1..56830333 100644 --- a/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py +++ b/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py @@ -3,12 +3,13 @@ import functools import logging import re +import threading import time import typing as t import warnings from collections.abc import Iterable from dataclasses import dataclass -from threading import Semaphore, Thread +from threading import Thread import pexpect from pexpect.exceptions import TIMEOUT @@ -23,7 +24,6 @@ if t.TYPE_CHECKING: from .dut import IdfDut - DEFAULT_START_RETRY = 3 DEFAULT_TIMEOUT = 90 @@ -416,100 +416,189 @@ def run_all_single_board_cases( self._run_multi_stage_case(case, reset=reset, timeout=timeout) -class MultiDevResource: +class ResourcesProxy: """ - Resources of multi_dev dut + Interface for share data between main thread and LoopedDevWorker thread Attributes: dut (IdfDut): Object of the Device under test - sem (Semaphore): Semaphore of monitoring whether the case finished + group (List[MultiDevResource]): List of MultiDevResource, same instance for all ResourcesProxy recv_sig (t.List[str]): The list of received signals from other dut - thread (Thread): The thread of monitoring the signals """ - def __init__(self, dut: 'IdfDut') -> None: - self.dut = dut - self.sem = Semaphore() + def __init__(self, **kwargs): + self.dut = kwargs['dut'] + self.group = kwargs['group'] self.recv_sig: t.List[str] = [] - self.thread: Thread = None # type: ignore + self.case = None + self.sub_case_index = None + self.start_retry = None + self.start_time = None + self.returned_value = None + self.raw_data_to_report = None + self.timeout = 0 + self.wait_for_menu_timeout = 0 -class CaseTester: + +class LoopedDevWorker: """ - The Generic tester of all the types + Worker for processing received task from main thread through ResourcesProxy Attributes: - group (t.List[MultiDevResource]): The group of the devices' resources - dut (IdfDut): The first dut if there is more than one - test_menu (t.List[UnittestMenuCase]): The list of the cases + worker (Thread): Thread for running loop task + wait_for_task_event (Event): Event for lock worker before receive task + task_complete_event (Event): Event for lock result processing before completed/failed task + rp (ResourcesProxy): Interface for storing shared variables between main thread and worker """ - # The signal pattens come from 'test_utils.c' SEND_SIGNAL_PREFIX = 'Send signal: ' WAIT_SIGNAL_PREFIX = 'Waiting for signal: ' UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!' UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!' - def __init__(self, dut: t.Union['IdfDut', t.List['IdfDut']]) -> None: # type: ignore - """ - Create the object for every dut and put them into the group - """ - if isinstance(dut, Iterable): - self.is_multi_dut = True - self.dut = list(dut) - self.first_dut = self.dut[0] - self.test_menu = self.first_dut.test_menu - else: - self.is_multi_dut = False - self.dut = dut - self.first_dut = dut - self.test_menu = self.dut.test_menu + def __init__( + self, + worker: type(Thread), + event: type(threading.Event), + resource_proxy: ResourcesProxy, + ): + self.wait_for_task_event = event() + self.task_complete_event = event() + self.rp = resource_proxy - if self.is_multi_dut: - self.group: t.List[MultiDevResource] = [] - if isinstance(dut, list): - for item in dut: - dev_res = MultiDevResource(item) - self.group.append(dev_res) + def loop(): + while True: + self.wait_for_task_event.wait() + self.wait_for_task_event.clear() + self.rp.dut.serial.hard_reset() - def _wait_multi_dev_case_finish(self, timeout: float = DEFAULT_TIMEOUT) -> None: - """ - Wait until all the sub-cases of this multi_device case finished - """ - for d in self.group: - if d.sem.acquire(timeout=timeout): - d.sem.release() + self.rp.returned_value = None + + try: + self.rp.returned_value = self.run_case() + except Exception as e: + self.rp.returned_value = e + + self.task_complete_event.set() + + self.worker = worker(target=loop, daemon=True) + self.worker.start() + + def setup_task(self, case, sub_case_index, start_retry, timeout) -> None: + self.rp.case = case + self.rp.sub_case_index = sub_case_index + self.rp.start_retry = start_retry + self.rp.start_time = time.perf_counter() + self.rp.wait_for_menu_timeout = timeout + self.rp.timeout = timeout + + def start_task(self) -> None: + self.wait_for_task_event.set() + + def wait_for_result(self) -> None: + self.task_complete_event.wait() + res: t.Tuple[str, t.Dict] | Exception = self.rp.returned_value + self.task_complete_event.clear() + self.rp.raw_data_to_report = res + + def process_raw_data_to_report(self) -> t.Dict: + additional_attrs = {} + if isinstance(self.rp.raw_data_to_report, tuple) and len(self.rp.raw_data_to_report) == 2: + log = str(self.rp.raw_data_to_report[0]) + additional_attrs = self.rp.raw_data_to_report[1] + else: + log = str(self.rp.raw_data_to_report) + + if log: + # check format + check = UNITY_FIXTURE_REGEX.search(log) + if check: + regex = UNITY_FIXTURE_REGEX else: - raise TimeoutError('Wait case to finish timeout') + regex = UNITY_BASIC_REGEX - def _start_sub_case_thread( - self, - dev_res: MultiDevResource, - case: UnittestMenuCase, - sub_case_index: int, - case_start_time: float, - start_retry: int = DEFAULT_START_RETRY, - ) -> None: - """ - Start the thread monitoring on the corresponding dut of the sub-case - """ - # Allocate the kwargs that pass to '_run' - _kwargs = { - 'dut': dev_res.dut, - 'dev_res': dev_res, - 'case': case, - 'sub_case_index': sub_case_index, - 'start_retry': start_retry, - 'start_time': case_start_time, - } - - # Create the thread of the sub-case - dev_res.thread = Thread(target=self._run, kwargs=_kwargs, daemon=True) - dev_res.thread.start() - # Thread starts, acquire the semaphore to block '_wait_multi_dev_case_finish' - dev_res.sem.acquire() - - def _run(self, **kwargs) -> None: # type: ignore + res = list(regex.finditer(log)) + else: + res = [] + + # real parsing + if len(res) == 0: + logging.warning(f'unity test case not found, use case {self.rp.case.name} instead') + attrs = { + 'name': self.rp.case.name, + 'result': 'FAIL', + 'message': self.rp.dut.pexpect_proc.buffer_debug_str, + 'time': self.rp.timeout, + } + elif len(res) == 1: + attrs = {k: v for k, v in res[0].groupdict().items() if v is not None} + else: + warnings.warn('This function is for recording single unity test case only. Use the last matched one') + attrs = {k: v for k, v in res[-1].groupdict().items() if v is not None} + + if additional_attrs: + attrs.update(additional_attrs) + + if log: + attrs.update({'stdout': log}) + + return attrs + + @staticmethod + def merge_result(test_cases_attr: t.List[t.Dict]) -> t.Dict: + output = {} + results = set() + time_attr = 0.0 + name_attr = set() + for ind, attr in enumerate(test_cases_attr): + for k, val in attr.items(): + if k == 'result': + results.add(val) + continue + if k == 'name': + name_attr.add(val) + continue + if k == 'time': + time_attr = max(time_attr, float(val)) + continue + + if k not in output: + output[k] = [f'[group dev-{ind}]: {val}'] + else: + output[k].append(f'[group dev-{ind}]: {val}') + + for k, val in output.items(): + output[k] = '<------------------->\n'.join(output[k]) + + output['time'] = time_attr + output['name'] = ' <---> '.join(list(name_attr)) + + if 'FAIL' in results: + output['result'] = 'FAIL' + elif 'IGNORE' in results: + output['result'] = 'IGNORE' + else: + output['result'] = (results - {'FAIL', 'IGNORE'}).pop() + + return output + + def add_to_report(self, attrs): + testcase = TestCase(**attrs) + self.rp.dut.testsuite.testcases.append(testcase) + + if testcase.result == 'FAIL': + self.rp.dut.testsuite.attrs['failures'] += 1 + elif testcase.result == 'IGNORE': + self.rp.dut.testsuite.attrs['skipped'] += 1 + else: + self.rp.dut.testsuite.attrs['tests'] += 1 + + def clear_var_values(self) -> None: + self.rp.recv_sig.clear() + self.rp.returned_value = None + + def run_case(self, **kwargs): """ The thread target function Will run for each case on each dut @@ -526,62 +615,115 @@ def _run(self, **kwargs) -> None: # type: ignore self.UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal UNITY_SUMMARY_LINE_REGEX, # Means the case finished ] - dut = kwargs['dut'] - dev_res = kwargs['dev_res'] - case = kwargs['case'] - sub_case_index = kwargs['sub_case_index'] - start_retry = kwargs['start_retry'] - start_time = kwargs['start_time'] + # Start the case - dut.expect_exact(READY_PATTERN_LIST) + self.rp.dut.expect_exact(READY_PATTERN_LIST, timeout=self.rp.wait_for_menu_timeout) + _start_time = time.perf_counter() + # Retry at defined number of times if not write successfully - for retry in range(start_retry): - dut.write(str(case.index)) + for retry in range(self.rp.start_retry): + self.rp.dut.write(str(self.rp.case.index)) try: - dut.expect_exact(case.name, timeout=1) + self.rp.dut.expect_exact(self.rp.case.name, timeout=1) break except TIMEOUT as e: - if retry >= start_retry - 1: - dev_res.sem.release() + if retry >= self.rp.start_retry - 1: raise e - dut.write(str(sub_case_index)) + self.rp.dut.write(str(self.rp.sub_case_index)) # Wait for the specific patterns, only exist when the sub-case finished while True: - pat = dut.expect(signal_pattern_list, timeout=60) + _current = time.perf_counter() + _timeout = _start_time + self.rp.timeout - _current + if _timeout < 0: + raise TIMEOUT('Tasks timed out, without other exception') + + pat = self.rp.dut.expect(signal_pattern_list, timeout=_timeout) if pat is not None: match_str = pat.group().decode('utf-8') # Send a signal if self.SEND_SIGNAL_PREFIX in match_str: send_sig = pat.group(1).decode('utf-8') - for d in self.group: - d.recv_sig.append(send_sig) + for d in self.rp.group: + d.rp.recv_sig.append(send_sig) # Waiting for a signal elif self.WAIT_SIGNAL_PREFIX in match_str: wait_sig = pat.group(1).decode('utf-8') while True: - if wait_sig in dev_res.recv_sig: - dev_res.recv_sig.remove(wait_sig) - dut.write('') + if wait_sig in self.rp.recv_sig: + self.rp.recv_sig.remove(wait_sig) + self.rp.dut.write('') break # Keep waiting the signal else: time.sleep(0.1) + if _start_time + self.rp.timeout < time.perf_counter(): + raise TIMEOUT('Not receive signal %r' % wait_sig) # Case finished elif 'Tests' in match_str: case_end_time = time.perf_counter() - case_duration = case_end_time - start_time + case_duration = case_end_time - self.rp.start_time additional_attrs = {'time': round(case_duration, 3)} - log = remove_asci_color_code(dut.pexpect_proc.before) - dut.testsuite.add_unity_test_cases(log, additional_attrs=additional_attrs) - break + log = remove_asci_color_code(self.rp.dut.pexpect_proc.before) + return log, additional_attrs + + +class MultiDevResource: + """ + Resources of multi_dev dut + + Attributes: + rp (ResourcesProxy): Interface for storing shared variables between main thread and "worker" + worker (LoopedDevWorker): Worker for processing received task from main thread through ResourcesProxy + """ + + def __init__(self, dut: 'IdfDut', group: t.List['MultiDevResource']) -> None: + self.rp = ResourcesProxy(dut=dut, group=group) + + self.worker = LoopedDevWorker(worker=Thread, event=threading.Event, resource_proxy=self.rp) + + +class CaseTester: + """ + The Generic tester of all the types - # The case finished, release the semaphore to unblock the '_wait_multi_dev_case_finish' - dev_res.sem.release() + Attributes: + group (t.List[MultiDevResource]): The group of the devices' resources + dut (IdfDut): The first dut if there is more than one + test_menu (t.List[UnittestMenuCase]): The list of the cases + """ + + # The signal pattens come from 'test_utils.c' + SEND_SIGNAL_PREFIX = 'Send signal: ' + WAIT_SIGNAL_PREFIX = 'Waiting for signal: ' + UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!' + UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!' + + def __init__(self, dut: t.Union['IdfDut', t.List['IdfDut']]) -> None: # type: ignore + """ + Create the object for every dut and put them into the group + """ + if isinstance(dut, Iterable): + self.is_multi_dut = True + self.dut = list(dut) + self.first_dut = self.dut[0] + self.test_menu = self.first_dut.test_menu + else: + self.is_multi_dut = False + self.dut = dut + self.first_dut = dut + self.test_menu = self.dut.test_menu + + if self.is_multi_dut: + self.group: t.List[MultiDevResource] = [] + if isinstance(dut, list): + for item in dut: + dev_res = MultiDevResource(item, self.group) + self.group.append(dev_res) def run_multi_dev_case( self, @@ -616,23 +758,30 @@ def run_multi_dev_case( if reset: for dev_res in self.group: - dev_res.dut.serial.hard_reset() + dev_res.rp.dut.serial.hard_reset() - start_time = time.perf_counter() + tasks = [] for sub_case in case.subcases: if isinstance(sub_case['index'], str): index = int(sub_case['index'], 10) else: index = sub_case['index'] - self._start_sub_case_thread( - dev_res=self.group[index - 1], - case=case, - sub_case_index=index, - case_start_time=start_time, - start_retry=start_retry, - ) - # Waiting all the devices to finish their test cases - self._wait_multi_dev_case_finish(timeout=timeout) + tasks.append(index) + + for i, task in enumerate(tasks): + self.group[i].worker.setup_task(case=case, sub_case_index=task, start_retry=start_retry, timeout=timeout) + + for i, task in enumerate(tasks): + self.group[i].worker.start_task() + for i, task in enumerate(tasks): + self.group[i].worker.wait_for_result() + + data_to_report_list = [self.group[i].worker.process_raw_data_to_report() for i, task in enumerate(tasks)] + merged_data = self.group[0].worker.merge_result(data_to_report_list) + self.group[0].worker.add_to_report(merged_data) + + for i, task in enumerate(tasks): + self.group[i].worker.clear_var_values() def run_all_multi_dev_cases( self, @@ -685,6 +834,15 @@ def run_case( timeout: timeout in second start_retry (int): number of retries for a single case when it is failed to start """ + warnings.warn( + 'Current timeout logic not clear: currently it is a sum of wait board menu time and runtest time.' + 'Will be renamed and split into two time in future', + DeprecationWarning, + ) + + if case.attributes.get('timeout'): + timeout = int(case.attributes['timeout']) + if case.type == 'normal': self.first_dut._run_normal_case(case, reset=reset, timeout=timeout) elif case.type == 'multi_stage': diff --git a/pytest-embedded-idf/tests/test_idf.py b/pytest-embedded-idf/tests/test_idf.py index b98ac53a..ad7bb964 100644 --- a/pytest-embedded-idf/tests/test_idf.py +++ b/pytest-embedded-idf/tests/test_idf.py @@ -544,7 +544,7 @@ def test_unity_test_case_runner(unity_tester): assert junit_report.attrib['errors'] == '0' assert junit_report.attrib['failures'] == '1' assert junit_report.attrib['skipped'] == '0' - assert junit_report.attrib['tests'] == '4' + assert junit_report.attrib['tests'] == '3' case_names_one_dev = [ 'normal_case1', @@ -559,7 +559,7 @@ def test_unity_test_case_runner(unity_tester): multi_dev_duts = ['dut-0', 'dut-1'] required_names_one_dev = [f'{case_name} [{dut}]' for dut in one_dev_dut for case_name in case_names_one_dev] - required_names_multi_dev = [f'{case_name} [{dut}]' for dut in multi_dev_duts for case_name in case_names_multi_dev] + required_names_multi_dev = [f'{case_name} [{multi_dev_duts[0]}]' for case_name in case_names_multi_dev] junit_case_names = [item.attrib['name'] for item in junit_report] From 3529f27703b704fbd8a15dc5e9acdbf135eb1026 Mon Sep 17 00:00:00 2001 From: "igor udot (horw)" Date: Fri, 25 Aug 2023 14:48:55 +0800 Subject: [PATCH 2/2] refactor: change threads to generator functions --- .../pytest_embedded_idf/unity_tester.py | 497 ++++++++++-------- 1 file changed, 270 insertions(+), 227 deletions(-) diff --git a/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py b/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py index 56830333..9c675789 100644 --- a/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py +++ b/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py @@ -3,13 +3,12 @@ import functools import logging import re -import threading import time import typing as t import warnings +from collections import namedtuple from collections.abc import Iterable from dataclasses import dataclass -from threading import Thread import pexpect from pexpect.exceptions import TIMEOUT @@ -26,6 +25,7 @@ DEFAULT_START_RETRY = 3 DEFAULT_TIMEOUT = 90 +WAIT_FOR_MENU_TIMEOUT = 10 READY_PATTERN_LIST = [ 'Press ENTER to see the list of tests', @@ -247,10 +247,14 @@ def wrapper(self, *args, **kwargs): _timeout = kwargs.get('timeout', 30) _case = args[0] + if _case.type not in func.__name__: + logging.warning('The %s case can\'t be executed with %s function.', _case.type, func.__name__) + return + try: # do it here since the first hard reset before test case shouldn't be counted in duration time if 'reset' in kwargs: - if kwargs.pop('reset') and self._hard_reset_func: + if kwargs.get('reset') and self._hard_reset_func: try: self._hard_reset_func() except NotImplementedError: @@ -322,6 +326,7 @@ def _add_single_unity_test_case( def _run_normal_case( self, case: UnittestMenuCase, + *, reset: bool = False, # noqa timeout: float = 30, ) -> None: @@ -347,6 +352,7 @@ def _run_normal_case( def _run_multi_stage_case( self, case: UnittestMenuCase, + *, reset: bool = False, # noqa timeout: float = 30, ) -> None: @@ -416,99 +422,150 @@ def run_all_single_board_cases( self._run_multi_stage_case(case, reset=reset, timeout=timeout) -class ResourcesProxy: - """ - Interface for share data between main thread and LoopedDevWorker thread - - Attributes: - dut (IdfDut): Object of the Device under test - group (List[MultiDevResource]): List of MultiDevResource, same instance for all ResourcesProxy - recv_sig (t.List[str]): The list of received signals from other dut +class _MultiDevTestDut: """ - - def __init__(self, **kwargs): - self.dut = kwargs['dut'] - self.group = kwargs['group'] - self.recv_sig: t.List[str] = [] - - self.case = None - self.sub_case_index = None - self.start_retry = None - self.start_time = None - self.returned_value = None - self.raw_data_to_report = None - self.timeout = 0 - self.wait_for_menu_timeout = 0 - - -class LoopedDevWorker: - """ - Worker for processing received task from main thread through ResourcesProxy - - Attributes: - worker (Thread): Thread for running loop task - wait_for_task_event (Event): Event for lock worker before receive task - task_complete_event (Event): Event for lock result processing before completed/failed task - rp (ResourcesProxy): Interface for storing shared variables between main thread and worker + Dut control for multidevice test case """ + # The signal pattens come from 'test_utils.c' SEND_SIGNAL_PREFIX = 'Send signal: ' WAIT_SIGNAL_PREFIX = 'Waiting for signal: ' UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!' UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!' + signal_pattern_list = [ + UNITY_SEND_SIGNAL_REGEX, # The dut send a signal + UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal + UNITY_SUMMARY_LINE_REGEX, # Means the case finished + ] + + DevResponse = namedtuple('DevResponse', ['completed', 'data']) def __init__( self, - worker: type(Thread), - event: type(threading.Event), - resource_proxy: ResourcesProxy, + dut, + case, + sub_case_index, + shared_message_query, + start_retry, + wait_for_menu_timeout=WAIT_FOR_MENU_TIMEOUT, + runtest_timeout=DEFAULT_TIMEOUT, ): - self.wait_for_task_event = event() - self.task_complete_event = event() - self.rp = resource_proxy + self.dut = dut + self.case = case + self.sub_case_index = sub_case_index + self.shared_message_query = shared_message_query + self.dut_index = self.sub_case_index - 1 + self.start_retry = start_retry + self.wait_for_menu_timeout = wait_for_menu_timeout + self.runtest_timeout = runtest_timeout + + self.work = self.run_case() + self.init_time = time.perf_counter() + self.response: _MultiDevTestDut.DevResponse = _MultiDevTestDut.DevResponse(False, None) + + def __iter__(self): + return self.work + + def __next__(self): + if self.response.completed: + return self.response + else: + try: + next(self.__iter__()) + except StopIteration as e: + self.response = _MultiDevTestDut.DevResponse(True, e.value) + self.work.close() + except TIMEOUT as e: + self.response = _MultiDevTestDut.DevResponse(True, e) + self.work.close() + return self.response - def loop(): - while True: - self.wait_for_task_event.wait() - self.wait_for_task_event.clear() - self.rp.dut.serial.hard_reset() + def close(self): + self.work.close() - self.rp.returned_value = None + def run_case(self): + yield from self._expect_exact(READY_PATTERN_LIST, self.wait_for_menu_timeout) - try: - self.rp.returned_value = self.run_case() - except Exception as e: - self.rp.returned_value = e + for retry in range(self.start_retry): + self.dut.write(str(self.case.index)) + try: + yield from self._expect_exact(self.case.name, 1) + break + except TIMEOUT as e: + if retry >= self.start_retry - 1: + raise e + self.dut.write(str(self.sub_case_index)) + + _start_time = time.perf_counter() + while True: + _current = time.perf_counter() + _timeout = _start_time + self.runtest_timeout - _current + + if _timeout < 0: + raise TIMEOUT('Tasks timed out, without other exception') + + pat = yield from self._expect(self.signal_pattern_list, _timeout) + + if pat is not None: + match_str = pat.group().decode('utf-8') - self.task_complete_event.set() + # Send a signal + if self.SEND_SIGNAL_PREFIX in match_str: + send_sig = pat.group(1).decode('utf-8') + for i, q in enumerate(self.shared_message_query): + if i != self.dut_index: + q.append(send_sig) - self.worker = worker(target=loop, daemon=True) - self.worker.start() + # Waiting for a signal + elif self.WAIT_SIGNAL_PREFIX in match_str: + wait_sig = pat.group(1).decode('utf-8') + while True: + yield + if wait_sig in self.shared_message_query[self.dut_index]: + self.shared_message_query[self.dut_index].remove(wait_sig) + self.dut.write('') + break + # Keep waiting the signal + else: + if _start_time + self.runtest_timeout < time.perf_counter(): + raise TIMEOUT('Not receive signal %r' % wait_sig) - def setup_task(self, case, sub_case_index, start_retry, timeout) -> None: - self.rp.case = case - self.rp.sub_case_index = sub_case_index - self.rp.start_retry = start_retry - self.rp.start_time = time.perf_counter() - self.rp.wait_for_menu_timeout = timeout - self.rp.timeout = timeout + # Case finished + elif 'Tests' in match_str: + case_duration = time.perf_counter() - _start_time + additional_attrs = {'time': round(case_duration, 3)} + log = remove_asci_color_code(self.dut.pexpect_proc.before) + return log, additional_attrs - def start_task(self) -> None: - self.wait_for_task_event.set() + def _expect_exact(self, pattern, timeout): + start = time.perf_counter() + while True: + try: + yield 'Await for result' + res = self.dut.expect_exact(pattern, timeout=0.01) + return res + except TIMEOUT as e: + if time.perf_counter() - start > timeout: + raise e - def wait_for_result(self) -> None: - self.task_complete_event.wait() - res: t.Tuple[str, t.Dict] | Exception = self.rp.returned_value - self.task_complete_event.clear() - self.rp.raw_data_to_report = res + def _expect(self, pattern, timeout): + start = time.perf_counter() + while True: + try: + yield 'Await for result' + res = self.dut.expect(pattern, timeout=0.01) + return res + except TIMEOUT as e: + if time.perf_counter() - start > timeout: + raise e - def process_raw_data_to_report(self) -> t.Dict: + def process_raw_report_data(self, raw_data_to_report) -> t.Dict: additional_attrs = {} - if isinstance(self.rp.raw_data_to_report, tuple) and len(self.rp.raw_data_to_report) == 2: - log = str(self.rp.raw_data_to_report[0]) - additional_attrs = self.rp.raw_data_to_report[1] + if isinstance(raw_data_to_report, tuple) and len(raw_data_to_report) == 2: + log = str(raw_data_to_report[0]) + additional_attrs = raw_data_to_report[1] else: - log = str(self.rp.raw_data_to_report) + log = str(raw_data_to_report) if log: # check format @@ -524,12 +581,12 @@ def process_raw_data_to_report(self) -> t.Dict: # real parsing if len(res) == 0: - logging.warning(f'unity test case not found, use case {self.rp.case.name} instead') + logging.warning(f'unity test case not found, use case {self.case.name} instead') attrs = { - 'name': self.rp.case.name, + 'name': self.case.name, 'result': 'FAIL', - 'message': self.rp.dut.pexpect_proc.buffer_debug_str, - 'time': self.rp.timeout, + 'message': self.dut.pexpect_proc.buffer_debug_str, + 'time': time.perf_counter() - self.init_time, } elif len(res) == 1: attrs = {k: v for k, v in res[0].groupdict().items() if v is not None} @@ -545,8 +602,75 @@ def process_raw_data_to_report(self) -> t.Dict: return attrs + def add_to_report(self, attrs): + testcase = TestCase(**attrs) + self.dut.testsuite.testcases.append(testcase) + + if testcase.result == 'FAIL': + self.dut.testsuite.attrs['failures'] += 1 + elif testcase.result == 'IGNORE': + self.dut.testsuite.attrs['skipped'] += 1 + else: + self.dut.testsuite.attrs['tests'] += 1 + + +class MultiDevRunTestManager: + """ + Manager for control dut generator function + """ + + def __init__(self, duts, case, start_retry, wait_for_menu_timeout, runtest_timeout): + self.case = case + self.workers: t.List[_MultiDevTestDut] = [] + shared_query = [[] for _ in case.subcases] + for sub_case in case.subcases: + index: int + if isinstance(sub_case['index'], str): + index = int(sub_case['index'], 10) + else: + index = sub_case['index'] + + self.workers.append( + _MultiDevTestDut( + dut=duts[index - 1], + case=case, + sub_case_index=index, + shared_message_query=shared_query, + start_retry=start_retry, + wait_for_menu_timeout=wait_for_menu_timeout, + runtest_timeout=runtest_timeout, + ) + ) + + def next_for_all(self): + res = [] + err = [] + for i, it in enumerate(self.workers): + try: + r = next(it) + if r.completed: + res.append(r.data) + except Exception as e: + err.append(e) + return res, err + + def gather(self): + try: + while True: + res, er = self.next_for_all() + if er: + raise Exception('There are Exception: ', er) + if len(res) == len(self.workers): + return res + finally: + for _t in self.workers: + _t.close() + + def get_processed_report_data(self, res: t.List[t.Any]) -> t.List[t.Dict]: + return [self.workers[i].process_raw_report_data(res[i]) for i in range(len(res))] + @staticmethod - def merge_result(test_cases_attr: t.List[t.Dict]) -> t.Dict: + def get_merge_data(test_cases_attr: t.List[t.Dict]) -> t.Dict: output = {} results = set() time_attr = 0.0 @@ -564,12 +688,15 @@ def merge_result(test_cases_attr: t.List[t.Dict]) -> t.Dict: continue if k not in output: - output[k] = [f'[group dev-{ind}]: {val}'] + output[k] = [f'[dut-{ind}]: {val}'] else: - output[k].append(f'[group dev-{ind}]: {val}') + output[k].append(f'[dut-{ind}]: {val}') for k, val in output.items(): - output[k] = '<------------------->\n'.join(output[k]) + if k in ('file', 'line'): + output[k] = val[0] + else: + output[k] = '<------------------->\n'.join(val) output['time'] = time_attr output['name'] = ' <---> '.join(list(name_attr)) @@ -583,108 +710,8 @@ def merge_result(test_cases_attr: t.List[t.Dict]) -> t.Dict: return output - def add_to_report(self, attrs): - testcase = TestCase(**attrs) - self.rp.dut.testsuite.testcases.append(testcase) - - if testcase.result == 'FAIL': - self.rp.dut.testsuite.attrs['failures'] += 1 - elif testcase.result == 'IGNORE': - self.rp.dut.testsuite.attrs['skipped'] += 1 - else: - self.rp.dut.testsuite.attrs['tests'] += 1 - - def clear_var_values(self) -> None: - self.rp.recv_sig.clear() - self.rp.returned_value = None - - def run_case(self, **kwargs): - """ - The thread target function - Will run for each case on each dut - - Call the wrapped function to trigger the case - Then keep listening on the dut for the signal - - - If the dut send a signal, it will be put into others' recv_sig - - If the dut waits for a signal, it block and keep polling for the recv_sig until get the signal it requires - - If the dut finished running the case, it will quite the loop and terminate the thread - """ - signal_pattern_list = [ - self.UNITY_SEND_SIGNAL_REGEX, # The dut send a signal - self.UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal - UNITY_SUMMARY_LINE_REGEX, # Means the case finished - ] - - # Start the case - self.rp.dut.expect_exact(READY_PATTERN_LIST, timeout=self.rp.wait_for_menu_timeout) - _start_time = time.perf_counter() - - # Retry at defined number of times if not write successfully - for retry in range(self.rp.start_retry): - self.rp.dut.write(str(self.rp.case.index)) - try: - self.rp.dut.expect_exact(self.rp.case.name, timeout=1) - break - except TIMEOUT as e: - if retry >= self.rp.start_retry - 1: - raise e - - self.rp.dut.write(str(self.rp.sub_case_index)) - - # Wait for the specific patterns, only exist when the sub-case finished - while True: - _current = time.perf_counter() - _timeout = _start_time + self.rp.timeout - _current - if _timeout < 0: - raise TIMEOUT('Tasks timed out, without other exception') - - pat = self.rp.dut.expect(signal_pattern_list, timeout=_timeout) - if pat is not None: - match_str = pat.group().decode('utf-8') - - # Send a signal - if self.SEND_SIGNAL_PREFIX in match_str: - send_sig = pat.group(1).decode('utf-8') - for d in self.rp.group: - d.rp.recv_sig.append(send_sig) - - # Waiting for a signal - elif self.WAIT_SIGNAL_PREFIX in match_str: - wait_sig = pat.group(1).decode('utf-8') - while True: - if wait_sig in self.rp.recv_sig: - self.rp.recv_sig.remove(wait_sig) - self.rp.dut.write('') - break - # Keep waiting the signal - else: - time.sleep(0.1) - if _start_time + self.rp.timeout < time.perf_counter(): - raise TIMEOUT('Not receive signal %r' % wait_sig) - - # Case finished - elif 'Tests' in match_str: - case_end_time = time.perf_counter() - case_duration = case_end_time - self.rp.start_time - additional_attrs = {'time': round(case_duration, 3)} - log = remove_asci_color_code(self.rp.dut.pexpect_proc.before) - return log, additional_attrs - - -class MultiDevResource: - """ - Resources of multi_dev dut - - Attributes: - rp (ResourcesProxy): Interface for storing shared variables between main thread and "worker" - worker (LoopedDevWorker): Worker for processing received task from main thread through ResourcesProxy - """ - - def __init__(self, dut: 'IdfDut', group: t.List['MultiDevResource']) -> None: - self.rp = ResourcesProxy(dut=dut, group=group) - - self.worker = LoopedDevWorker(worker=Thread, event=threading.Event, resource_proxy=self.rp) + def add_report_to_first_dut(self, attrs): + self.workers[0].add_to_report(attrs) class CaseTester: @@ -692,38 +719,24 @@ class CaseTester: The Generic tester of all the types Attributes: - group (t.List[MultiDevResource]): The group of the devices' resources dut (IdfDut): The first dut if there is more than one test_menu (t.List[UnittestMenuCase]): The list of the cases """ - # The signal pattens come from 'test_utils.c' - SEND_SIGNAL_PREFIX = 'Send signal: ' - WAIT_SIGNAL_PREFIX = 'Waiting for signal: ' - UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!' - UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!' - def __init__(self, dut: t.Union['IdfDut', t.List['IdfDut']]) -> None: # type: ignore """ Create the object for every dut and put them into the group """ if isinstance(dut, Iterable): self.is_multi_dut = True - self.dut = list(dut) + self.dut: t.List['IdfDut'] = list(dut) self.first_dut = self.dut[0] self.test_menu = self.first_dut.test_menu else: self.is_multi_dut = False - self.dut = dut + self.dut = [dut] self.first_dut = dut - self.test_menu = self.dut.test_menu - - if self.is_multi_dut: - self.group: t.List[MultiDevResource] = [] - if isinstance(dut, list): - for item in dut: - dev_res = MultiDevResource(item, self.group) - self.group.append(dev_res) + self.test_menu = self.dut[0].test_menu def run_multi_dev_case( self, @@ -757,31 +770,66 @@ def run_multi_dev_case( return if reset: - for dev_res in self.group: - dev_res.rp.dut.serial.hard_reset() + for dut in self.dut: + dut.serial.hard_reset() - tasks = [] - for sub_case in case.subcases: - if isinstance(sub_case['index'], str): - index = int(sub_case['index'], 10) - else: - index = sub_case['index'] - tasks.append(index) + mdm = MultiDevRunTestManager( + duts=self.dut, case=case, start_retry=start_retry, wait_for_menu_timeout=timeout, runtest_timeout=timeout + ) + res = mdm.gather() + data_to_report = mdm.get_processed_report_data(res) + merged_data = mdm.get_merge_data(data_to_report) + mdm.add_report_to_first_dut(merged_data) - for i, task in enumerate(tasks): - self.group[i].worker.setup_task(case=case, sub_case_index=task, start_retry=start_retry, timeout=timeout) + def run_normal_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None: + """ + Run a specific normal case - for i, task in enumerate(tasks): - self.group[i].worker.start_task() - for i, task in enumerate(tasks): - self.group[i].worker.wait_for_result() + Notes: + Will skip if the case type is not normal - data_to_report_list = [self.group[i].worker.process_raw_data_to_report() for i, task in enumerate(tasks)] - merged_data = self.group[0].worker.merge_result(data_to_report_list) - self.group[0].worker.add_to_report(merged_data) + Args: + case: the specific case that parsed in test menu + reset: whether do a hardware reset before running the case + timeout: timeout in second + """ + self.first_dut._run_normal_case(case, reset=reset, timeout=timeout) - for i, task in enumerate(tasks): - self.group[i].worker.clear_var_values() + def run_multi_stage_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None: + """ + Run a specific multi_stage case + + Notes: + Will skip if the case type is not multi_stage + + Args: + case: the specific case that parsed in test menu + reset: whether do a hardware reset before running the case + timeout: timeout in second + """ + self.first_dut._run_multi_stage_case(case, reset=reset, timeout=timeout) + + def run_all_normal_cases(self, reset: bool = False, timeout: int = 90) -> None: + """ + Run all normal cases + + Args: + reset: whether do a hardware reset before running the case + timeout: timeout in second + """ + for case in self.test_menu: + self.run_normal_case(case, reset, timeout=timeout) + + def run_all_multi_stage_cases(self, reset: bool = False, timeout: int = 90) -> None: + """ + Run all multi_stage cases + + Args: + reset: whether do a hardware reset before running the case + timeout: timeout in second + """ + for case in self.test_menu: + self.run_multi_stage_case(case, reset=reset, timeout=timeout) def run_all_multi_dev_cases( self, @@ -831,14 +879,9 @@ def run_case( Args: case: the specific case that parsed in test menu reset: whether to perform a hardware reset before running a case - timeout: timeout in second + timeout: timeout in second, setup time excluded start_retry (int): number of retries for a single case when it is failed to start """ - warnings.warn( - 'Current timeout logic not clear: currently it is a sum of wait board menu time and runtest time.' - 'Will be renamed and split into two time in future', - DeprecationWarning, - ) if case.attributes.get('timeout'): timeout = int(case.attributes['timeout'])