From 008bbcb789852df54c22b5aa2b8cc1126e43953a Mon Sep 17 00:00:00 2001 From: "igor udot (horw)" Date: Wed, 26 Jul 2023 16:23:55 +0800 Subject: [PATCH] refactor: change logic of threading control in CaseTester for MultiDev --- .../pytest_embedded_idf/unity_tester.py | 374 +++++++++++++----- pytest-embedded-idf/tests/test_idf.py | 5 +- 2 files changed, 269 insertions(+), 110 deletions(-) diff --git a/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py b/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py index 9783aae1..1b1ffa6f 100644 --- a/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py +++ b/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py @@ -3,12 +3,13 @@ import functools import logging import re +import threading import time import typing as t import warnings from collections.abc import Iterable from dataclasses import dataclass -from threading import Semaphore, Thread +from threading import Thread import pexpect from pexpect.exceptions import TIMEOUT @@ -23,7 +24,6 @@ if t.TYPE_CHECKING: from .dut import IdfDut - DEFAULT_START_RETRY = 3 DEFAULT_TIMEOUT = 90 @@ -416,100 +416,189 @@ def run_all_single_board_cases( self._run_multi_stage_case(case, reset=reset, timeout=timeout) -class MultiDevResource: +class ResourcesProxy: """ - Resources of multi_dev dut + Interface for share data between main thread and LoopedDevWorker thread Attributes: dut (IdfDut): Object of the Device under test - sem (Semaphore): Semaphore of monitoring whether the case finished + group (List[MultiDevResource]): List of MultiDevResource, same instance for all ResourcesProxy recv_sig (t.List[str]): The list of received signals from other dut - thread (Thread): The thread of monitoring the signals """ - def __init__(self, dut: 'IdfDut') -> None: - self.dut = dut - self.sem = Semaphore() + def __init__(self, **kwargs): + self.dut = kwargs['dut'] + self.group = kwargs['group'] self.recv_sig: t.List[str] = [] - self.thread: Thread = None # type: ignore + self.case = None + self.sub_case_index = None + self.start_retry = None + self.start_time = None + self.returned_value = None + self.raw_data_to_report = None + self.timeout = 0 + self.wait_for_menu_timeout = 0 -class CaseTester: + +class LoopedDevWorker: """ - The Generic tester of all the types + Worker for processing received task from main thread through ResourcesProxy Attributes: - group (t.List[MultiDevResource]): The group of the devices' resources - dut (IdfDut): The first dut if there is more than one - test_menu (t.List[UnittestMenuCase]): The list of the cases + worker (Thread): Thread for running loop task + wait_for_task_event (Event): Event for lock worker before receive task + task_complete_event (Event): Event for lock result processing before completed/failed task + rp (ResourcesProxy): Interface for storing shared variables between main thread and worker """ - # The signal pattens come from 'test_utils.c' SEND_SIGNAL_PREFIX = 'Send signal: ' WAIT_SIGNAL_PREFIX = 'Waiting for signal: ' UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!' UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!' - def __init__(self, dut: t.Union['IdfDut', t.List['IdfDut']]) -> None: # type: ignore - """ - Create the object for every dut and put them into the group - """ - if isinstance(dut, Iterable): - self.is_multi_dut = True - self.dut = list(dut) - self.first_dut = self.dut[0] - self.test_menu = self.first_dut.test_menu - else: - self.is_multi_dut = False - self.dut = dut - self.first_dut = dut - self.test_menu = self.dut.test_menu + def __init__( + self, + worker: type(Thread), + event: type(threading.Event), + resource_proxy: ResourcesProxy, + ): + self.wait_for_task_event = event() + self.task_complete_event = event() + self.rp = resource_proxy - if self.is_multi_dut: - self.group: t.List[MultiDevResource] = [] - if isinstance(dut, list): - for item in dut: - dev_res = MultiDevResource(item) - self.group.append(dev_res) + def loop(): + while True: + self.wait_for_task_event.wait() + self.wait_for_task_event.clear() + self.rp.dut.serial.hard_reset() - def _wait_multi_dev_case_finish(self, timeout: float = DEFAULT_TIMEOUT) -> None: - """ - Wait until all the sub-cases of this multi_device case finished - """ - for d in self.group: - if d.sem.acquire(timeout=timeout): - d.sem.release() + self.rp.returned_value = None + + try: + self.rp.returned_value = self.run_case() + except Exception as e: + self.rp.returned_value = e + + self.task_complete_event.set() + + self.worker = worker(target=loop, daemon=True) + self.worker.start() + + def setup_task(self, case, sub_case_index, start_retry, timeout) -> None: + self.rp.case = case + self.rp.sub_case_index = sub_case_index + self.rp.start_retry = start_retry + self.rp.start_time = time.perf_counter() + self.rp.wait_for_menu_timeout = timeout + self.rp.timeout = timeout + + def start_task(self) -> None: + self.wait_for_task_event.set() + + def wait_for_result(self) -> None: + self.task_complete_event.wait() + res: t.Tuple[str, dict] | Exception = self.rp.returned_value + self.task_complete_event.clear() + self.rp.raw_data_to_report = res + + def process_raw_data_to_report(self) -> dict: + additional_attrs = {} + if isinstance(self.rp.raw_data_to_report, tuple) and len(self.rp.raw_data_to_report) == 2: + log = str(self.rp.raw_data_to_report[0]) + additional_attrs = self.rp.raw_data_to_report[1] + else: + log = str(self.rp.raw_data_to_report) + + if log: + # check format + check = UNITY_FIXTURE_REGEX.search(log) + if check: + regex = UNITY_FIXTURE_REGEX else: - raise TimeoutError('Wait case to finish timeout') + regex = UNITY_BASIC_REGEX - def _start_sub_case_thread( - self, - dev_res: MultiDevResource, - case: UnittestMenuCase, - sub_case_index: int, - case_start_time: float, - start_retry: int = DEFAULT_START_RETRY, - ) -> None: - """ - Start the thread monitoring on the corresponding dut of the sub-case - """ - # Allocate the kwargs that pass to '_run' - _kwargs = { - 'dut': dev_res.dut, - 'dev_res': dev_res, - 'case': case, - 'sub_case_index': sub_case_index, - 'start_retry': start_retry, - 'start_time': case_start_time, - } - - # Create the thread of the sub-case - dev_res.thread = Thread(target=self._run, kwargs=_kwargs, daemon=True) - dev_res.thread.start() - # Thread starts, acquire the semaphore to block '_wait_multi_dev_case_finish' - dev_res.sem.acquire() - - def _run(self, **kwargs) -> None: # type: ignore + res = list(regex.finditer(log)) + else: + res = [] + + # real parsing + if len(res) == 0: + logging.warning(f'unity test case not found, use case {self.rp.case.name} instead') + attrs = { + 'name': self.rp.case.name, + 'result': 'FAIL', + 'message': self.rp.dut.pexpect_proc.buffer_debug_str, + 'time': self.rp.timeout, + } + elif len(res) == 1: + attrs = {k: v for k, v in res[0].groupdict().items() if v is not None} + else: + warnings.warn('This function is for recording single unity test case only. Use the last matched one') + attrs = {k: v for k, v in res[-1].groupdict().items() if v is not None} + + if additional_attrs: + attrs.update(additional_attrs) + + if log: + attrs.update({'stdout': log}) + + return attrs + + @staticmethod + def merge_result(test_cases_attr: list[dict]) -> dict: + output = {} + results = set() + time_attr = 0.0 + name_attr = set() + for ind, attr in enumerate(test_cases_attr): + for k, val in attr.items(): + if k == 'result': + results.add(val) + continue + if k == 'name': + name_attr.add(val) + continue + if k == 'time': + time_attr = max(time_attr, float(val)) + continue + + if k not in output: + output[k] = [f'[group dev-{ind}]: {val}'] + else: + output[k].append(f'[group dev-{ind}]: {val}') + + for k, val in output.items(): + output[k] = '<------------------->\n'.join(output[k]) + + output['time'] = time_attr + output['name'] = ' <---> '.join(list(name_attr)) + + if 'FAIL' in results: + output['result'] = 'FAIL' + elif 'IGNORE' in results: + output['result'] = 'IGNORE' + else: + output['result'] = (results - {'FAIL', 'IGNORE'}).pop() + + return output + + def add_to_report(self, attrs): + testcase = TestCase(**attrs) + self.rp.dut.testsuite.testcases.append(testcase) + + if testcase.result == 'FAIL': + self.rp.dut.testsuite.attrs['failures'] += 1 + elif testcase.result == 'IGNORE': + self.rp.dut.testsuite.attrs['skipped'] += 1 + else: + self.rp.dut.testsuite.attrs['tests'] += 1 + + def clear_var_values(self): + self.rp.recv_sig.clear() + self.rp.returned_value = None + + def run_case(self, **kwargs): """ The thread target function Will run for each case on each dut @@ -526,62 +615,115 @@ def _run(self, **kwargs) -> None: # type: ignore self.UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal UNITY_SUMMARY_LINE_REGEX, # Means the case finished ] - dut = kwargs['dut'] - dev_res = kwargs['dev_res'] - case = kwargs['case'] - sub_case_index = kwargs['sub_case_index'] - start_retry = kwargs['start_retry'] - start_time = kwargs['start_time'] + # Start the case - dut.expect_exact(READY_PATTERN_LIST) + self.rp.dut.expect_exact(READY_PATTERN_LIST, timeout=self.rp.wait_for_menu_timeout) + _start_time = time.perf_counter() + # Retry at defined number of times if not write successfully - for retry in range(start_retry): - dut.write(str(case.index)) + for retry in range(self.rp.start_retry): + self.rp.dut.write(str(self.rp.case.index)) try: - dut.expect_exact(case.name, timeout=1) + self.rp.dut.expect_exact(self.rp.case.name, timeout=1) break except TIMEOUT as e: - if retry >= start_retry - 1: - dev_res.sem.release() + if retry >= self.rp.start_retry - 1: raise e - dut.write(str(sub_case_index)) + self.rp.dut.write(str(self.rp.sub_case_index)) # Wait for the specific patterns, only exist when the sub-case finished while True: - pat = dut.expect(signal_pattern_list, timeout=60) + _current = time.perf_counter() + _timeout = _start_time + self.rp.timeout - _current + if _timeout < 0: + raise TIMEOUT('Tasks timed out, without other exception') + + pat = self.rp.dut.expect(signal_pattern_list, timeout=_timeout) if pat is not None: match_str = pat.group().decode('utf-8') # Send a signal if self.SEND_SIGNAL_PREFIX in match_str: send_sig = pat.group(1).decode('utf-8') - for d in self.group: - d.recv_sig.append(send_sig) + for d in self.rp.group: + d.rp.recv_sig.append(send_sig) # Waiting for a signal elif self.WAIT_SIGNAL_PREFIX in match_str: wait_sig = pat.group(1).decode('utf-8') while True: - if wait_sig in dev_res.recv_sig: - dev_res.recv_sig.remove(wait_sig) - dut.write('') + if wait_sig in self.rp.recv_sig: + self.rp.recv_sig.remove(wait_sig) + self.rp.dut.write('') break # Keep waiting the signal else: time.sleep(0.1) + if _start_time + self.rp.timeout < time.perf_counter(): + raise TIMEOUT('Not receive signal %r' % wait_sig) # Case finished elif 'Tests' in match_str: case_end_time = time.perf_counter() - case_duration = case_end_time - start_time + case_duration = case_end_time - self.rp.start_time additional_attrs = {'time': round(case_duration, 3)} - log = remove_asci_color_code(dut.pexpect_proc.before) - dut.testsuite.add_unity_test_cases(log, additional_attrs=additional_attrs) - break + log = remove_asci_color_code(self.rp.dut.pexpect_proc.before) + return log, additional_attrs + + +class MultiDevResource: + """ + Resources of multi_dev dut + + Attributes: + rp (ResourcesProxy): Interface for storing shared variables between main thread and "worker" + worker (LoopedDevWorker): Worker for processing received task from main thread through ResourcesProxy + """ + + def __init__(self, dut: 'IdfDut', group: t.List['MultiDevResource']) -> None: + self.rp = ResourcesProxy(dut=dut, group=group) + + self.worker = LoopedDevWorker(worker=Thread, event=threading.Event, resource_proxy=self.rp) + + +class CaseTester: + """ + The Generic tester of all the types - # The case finished, release the semaphore to unblock the '_wait_multi_dev_case_finish' - dev_res.sem.release() + Attributes: + group (t.List[MultiDevResource]): The group of the devices' resources + dut (IdfDut): The first dut if there is more than one + test_menu (t.List[UnittestMenuCase]): The list of the cases + """ + + # The signal pattens come from 'test_utils.c' + SEND_SIGNAL_PREFIX = 'Send signal: ' + WAIT_SIGNAL_PREFIX = 'Waiting for signal: ' + UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!' + UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!' + + def __init__(self, dut: t.Union['IdfDut', t.List['IdfDut']]) -> None: # type: ignore + """ + Create the object for every dut and put them into the group + """ + if isinstance(dut, Iterable): + self.is_multi_dut = True + self.dut = list(dut) + self.first_dut = self.dut[0] + self.test_menu = self.first_dut.test_menu + else: + self.is_multi_dut = False + self.dut = dut + self.first_dut = dut + self.test_menu = self.dut.test_menu + + if self.is_multi_dut: + self.group: t.List[MultiDevResource] = [] + if isinstance(dut, list): + for item in dut: + dev_res = MultiDevResource(item, self.group) + self.group.append(dev_res) def run_multi_dev_case( self, @@ -616,23 +758,30 @@ def run_multi_dev_case( if reset: for dev_res in self.group: - dev_res.dut.serial.hard_reset() + dev_res.rp.dut.serial.hard_reset() - start_time = time.perf_counter() + tasks = [] for sub_case in case.subcases: if isinstance(sub_case['index'], str): index = int(sub_case['index'], 10) else: index = sub_case['index'] - self._start_sub_case_thread( - dev_res=self.group[index - 1], - case=case, - sub_case_index=index, - case_start_time=start_time, - start_retry=start_retry, - ) - # Waiting all the devices to finish their test cases - self._wait_multi_dev_case_finish(timeout=timeout) + tasks.append(index) + + for i, task in enumerate(tasks): + self.group[i].worker.setup_task(case=case, sub_case_index=task, start_retry=start_retry, timeout=timeout) + + for i, task in enumerate(tasks): + self.group[i].worker.start_task() + for i, task in enumerate(tasks): + self.group[i].worker.wait_for_result() + + data_to_report_list = [self.group[i].worker.process_raw_data_to_report() for i, task in enumerate(tasks)] + merged_data = self.group[0].worker.merge_result(data_to_report_list) + self.group[0].worker.add_to_report(merged_data) + + for i, task in enumerate(tasks): + self.group[i].worker.clear_var_values() def run_all_multi_dev_cases( self, @@ -685,6 +834,15 @@ def run_case( timeout: timeout in second start_retry (int): number of retries for a single case when it is failed to start """ + warnings.warn( + 'Current timeout logic not clear: currently it is a sum of wait board menu time and runtest time.' + 'Will be renamed and split into two time in future', + DeprecationWarning, + ) + + if case.attributes.get('timeout'): + timeout = int(case.attributes['timeout']) + if case.type == 'normal': self.first_dut._run_normal_case(case, reset=reset, timeout=timeout) elif case.type == 'multi_stage': diff --git a/pytest-embedded-idf/tests/test_idf.py b/pytest-embedded-idf/tests/test_idf.py index b98ac53a..a1cf114d 100644 --- a/pytest-embedded-idf/tests/test_idf.py +++ b/pytest-embedded-idf/tests/test_idf.py @@ -5,6 +5,7 @@ import xml.etree.ElementTree as ET import pytest + from pytest_embedded_idf.dut import IdfDut toolchain_required = pytest.mark.skipif( @@ -544,7 +545,7 @@ def test_unity_test_case_runner(unity_tester): assert junit_report.attrib['errors'] == '0' assert junit_report.attrib['failures'] == '1' assert junit_report.attrib['skipped'] == '0' - assert junit_report.attrib['tests'] == '4' + assert junit_report.attrib['tests'] == '3' case_names_one_dev = [ 'normal_case1', @@ -559,7 +560,7 @@ def test_unity_test_case_runner(unity_tester): multi_dev_duts = ['dut-0', 'dut-1'] required_names_one_dev = [f'{case_name} [{dut}]' for dut in one_dev_dut for case_name in case_names_one_dev] - required_names_multi_dev = [f'{case_name} [{dut}]' for dut in multi_dev_duts for case_name in case_names_multi_dev] + required_names_multi_dev = [f'{case_name} [{multi_dev_duts[0]}]' for case_name in case_names_multi_dev] junit_case_names = [item.attrib['name'] for item in junit_report]