From 9e6bf0b10f8302cfa9c945df4ee22d31e842d5a3 Mon Sep 17 00:00:00 2001 From: tangkong Date: Tue, 17 Sep 2024 10:38:40 -0700 Subject: [PATCH 1/2] PERF: refactor atef.bin cli entrypoint to defer importing functionality until subcommand is invoked --- atef/bin/check.py | 475 +----------------------------- atef/bin/check_main.py | 460 +++++++++++++++++++++++++++++ atef/bin/config.py | 32 +- atef/bin/config_main.py | 31 ++ atef/bin/scripts.py | 2 + atef/scripts/converter_v0.py | 280 +----------------- atef/scripts/converter_v0_main.py | 286 ++++++++++++++++++ atef/scripts/pmgr_check.py | 140 +-------- atef/scripts/pmgr_check_main.py | 149 ++++++++++ atef/tests/test_commandline.py | 2 +- 10 files changed, 956 insertions(+), 901 deletions(-) create mode 100644 atef/bin/check_main.py create mode 100644 atef/bin/config_main.py create mode 100644 atef/scripts/converter_v0_main.py create mode 100644 atef/scripts/pmgr_check_main.py diff --git a/atef/bin/check.py b/atef/bin/check.py index d49d1693..a9758b14 100644 --- a/atef/bin/check.py +++ b/atef/bin/check.py @@ -1,82 +1,17 @@ """ `atef check` runs passive checkouts of devices given a configuration file. """ -from __future__ import annotations - import argparse -import asyncio -import enum -import itertools -import logging -from typing import Dict, Optional, Sequence, Tuple, Union - -import happi -import rich -import rich.console -import rich.tree - -from ..cache import DataCache, _SignalCache, get_signal_cache -from ..check import Comparison, Severity -from ..config import (AnyConfiguration, AnyPreparedConfiguration, - ConfigurationFile, FailedConfiguration, - PreparedComparison, PreparedFile, PreparedGroup) -from ..report import PassiveAtefReport -from ..result import Result -from ..util import ophyd_cleanup - -logger = logging.getLogger(__name__) DESCRIPTION = __doc__ - -class VerbositySetting(enum.Flag): - show_severity_emoji = enum.auto() - show_severity_description = enum.auto() - show_config_description = enum.auto() - show_tags = enum.auto() - show_passed_tests = enum.auto() - default = show_severity_emoji | show_severity_description - - @classmethod - def from_kwargs( - cls, start: Optional[VerbositySetting] = None, **kwargs - ) -> VerbositySetting: - """ - Get a VerbositySetting from the provided kwargs. - - Parameters - ---------- - start : VerbositySetting, optional - The starting VerbositySetting. - - **kwargs : str to bool - Keyword arguments that match VerbositySetting flags, with the - value set to False (clear) or True (set). - - Returns - ------- - VerbositySetting - The adjusted VerbositySetting. - """ - def set_or_clear(verbosity: cls, name: str, value: bool) -> cls: - flag = getattr(cls, name) - if value: - return verbosity | flag - return verbosity & ~flag - - if start is None: - verbosity = cls.default - else: - verbosity = start - - for setting in cls: - if setting.name is None: - continue - - setting_value = kwargs.get(setting.name, None) - if setting_value is not None: - verbosity = set_or_clear(verbosity, setting.name, setting_value) - return verbosity +_VERBOSITY_SETTINGS = { + "show-severity-emoji": True, + "show-severity-description": True, + "show-config-description": False, + "show-tags": False, + "show-passed-tests": False, +} def build_arg_parser(argparser=None): @@ -92,19 +27,15 @@ def build_arg_parser(argparser=None): help="Configuration filename", ) - for setting in VerbositySetting: - flag_name = setting.name.replace("_", "-") - if setting == VerbositySetting.default: - continue - - help_text = setting.name.replace("_", " ").capitalize() + for flag_name, default in _VERBOSITY_SETTINGS.items(): + help_text = flag_name.replace("-", " ").capitalize() argparser.add_argument( f"--{flag_name}", - dest=setting.name, + dest=flag_name.replace("-", "_"), help=help_text, action="store_true", - default=setting in VerbositySetting.default, + default=default, ) if flag_name.startswith("show-"): @@ -112,7 +43,7 @@ def build_arg_parser(argparser=None): help_text = help_text.replace("Show ", "Hide ") argparser.add_argument( f"--{hide_flag_name}", - dest=setting.name, + dest=flag_name.replace("-", "_"), help=help_text, action="store_false", ) @@ -139,383 +70,7 @@ def build_arg_parser(argparser=None): return argparser -default_severity_to_rich = { - Severity.success: "[bold green]:heavy_check_mark:", - Severity.warning: "[bold yellow]:heavy_check_mark:", - Severity.error: "[bold red]:x:", - Severity.internal_error: "[bold red]:x:", -} - -default_severity_to_log_level = { - Severity.success: logging.DEBUG, - Severity.warning: logging.WARNING, - Severity.error: logging.ERROR, - Severity.internal_error: logging.ERROR, -} - - -def get_result_from_comparison( - item: Union[PreparedComparison, Exception, FailedConfiguration, None] -) -> Tuple[Optional[PreparedComparison], Result]: - """ - Get a Result, if available, from the provided arguments. - - In the case of an exception (or None/internal error), create one. - - Parameters - ---------- - item : Union[PreparedComparison, Exception, None] - The item to grab a result from. - - Returns - ------- - PreparedComparison or None - The prepared comparison, if available - Result - The result instance. - """ - if item is None: - return None, Result( - severity=Severity.internal_error, - reason="no result available (comparison not run?)" - ) - if isinstance(item, Exception): - # An error that was transformed into a Result with a severity - return None, Result.from_exception(item) - if isinstance(item, FailedConfiguration): - # An error that was transformed into a Result with a severity - return None, item.result - - if item.result is None: - return item, Result( - severity=Severity.internal_error, - reason="no result available (comparison not run?)" - ) - - return item, item.result - - -def get_comparison_text_for_tree( - item: Union[PreparedComparison, Exception], - *, - severity_to_rich: Optional[Dict[Severity, str]] = None, - verbosity: VerbositySetting = VerbositySetting.default, -) -> str: - """ - Get a description for the rich Tree, given a comparison or a failed result. - - Parameters - ---------- - item : Union[PreparedComparison, Exception] - The item to add to the tree. - severity_to_rich : Dict[Severity, str], optional - A mapping of severity values to rich colors. - verbosity : VerbositySetting, optional - The verbosity settings. - - Returns - ------- - str or None - Returns a description to add to the tree. - """ - severity_to_rich = severity_to_rich or default_severity_to_rich - - prepared, result = get_result_from_comparison(item) - if result.severity > Severity.success: - return ( - f"{severity_to_rich[result.severity]}[default]: {result.reason}" - ) - - if VerbositySetting.show_passed_tests in verbosity and prepared is not None: - if prepared.comparison is not None: - description = prepared.comparison.describe() - else: - description = "no comparison configured" - - return ( - f"{severity_to_rich[result.severity]}[default]: " - f"{prepared.identifier} {description}" - ) - - # According to the severity and verbosity settings, this message should - # not be displayed. - return None - - -def get_name_for_tree( - obj: Union[Comparison, AnyConfiguration], - verbosity: VerbositySetting -) -> str: - """ - Get a combined name and description for a given item. - - Parameters - ---------- - obj : Union[Comparison, AnyConfiguration] - The comparison or configuration. - - Returns - ------- - str - The displayable name. - """ - if VerbositySetting.show_config_description in verbosity: - if obj.description: - if obj.name: - return f"{obj.name}: {obj.description}" - return obj.description - - if obj.name: - return obj.name - return "" - +async def main(*args, **kwargs): + from atef.bin.check_main import main -def get_tree_heading( - obj: AnyPreparedConfiguration, - verbosity: VerbositySetting, - severity_to_rich: Dict[Severity, str], -) -> str: - """ - Get severity, name, and description (per verbosity settings) for a tree. - - Parameters - ---------- - obj : Comparison or AnyConfiguration - The comparison or configuration. - - Returns - ------- - str - The displayable name. - """ - severity: Severity = getattr(obj.result, "severity", Severity.error) - - severity_text = [] - if VerbositySetting.show_severity_emoji in verbosity: - severity_text.append(severity_to_rich[severity]) - if VerbositySetting.show_severity_description in verbosity: - severity_text.append(severity.name.replace("_", " ").capitalize()) - severity_text.append(": ") - - severity_text = "".join(severity_text) - name_and_desc = get_name_for_tree(obj.config, verbosity) - return f"{severity_text}{name_and_desc}" - - -def should_show_in_tree( - item: Union[PreparedComparison, Exception, FailedConfiguration, None], - verbosity: VerbositySetting = VerbositySetting.default -) -> bool: - """ - Should ``item`` be shown in the tree, based on the verbosity settings? - - Parameters - ---------- - item : Union[PreparedComparison, Exception, FailedConfiguration, None] - The item to check. - verbosity : VerbositySetting, optional - The verbosity settings. - - Returns - ------- - bool - True to show it in the tree, False to not show it. - """ - _, result = get_result_from_comparison(item) - if result is None: - # Error - always show it - return True - - if result.severity == Severity.success: - return VerbositySetting.show_passed_tests in verbosity - return True - - -def group_to_rich_tree( - group: PreparedGroup, - verbosity: VerbositySetting = VerbositySetting.default, - severity_to_rich: Optional[Dict[Severity, str]] = None, -) -> rich.tree.Tree: - """ - Convert a `PreparedGroup` into a `rich.tree.Tree`. - - Parameters - ---------- - group : PreparedGroup - The group to convert. Comparisons must be complete to generate the - tree effectively. - verbosity : VerbositySetting, optional - The verbosity settings. - severity_to_rich : Dict[Severity, str], optional - A mapping of severity values to rich colors. - - Returns - ------- - rich.tree.Tree - """ - severity_to_rich = severity_to_rich or default_severity_to_rich - - tree = rich.tree.Tree( - get_tree_heading(group, severity_to_rich=severity_to_rich, verbosity=verbosity) - ) - for failure in group.prepare_failures: - if should_show_in_tree(failure, verbosity): - tree.add( - get_comparison_text_for_tree(failure, verbosity=verbosity) - ) - - for config in group.configs: - if isinstance(config, PreparedGroup): - tree.add( - group_to_rich_tree( - config, - verbosity=verbosity, - severity_to_rich=severity_to_rich - ) - ) - else: - subtree = rich.tree.Tree( - get_tree_heading( - config, severity_to_rich=severity_to_rich, verbosity=verbosity - ) - ) - severity = getattr(config.result, "severity", Severity.error) - if config.result is not None and severity == Severity.success: - if VerbositySetting.show_passed_tests in verbosity: - tree.add(subtree) - else: - tree.add(subtree) - - for comparison in itertools.chain( - config.comparisons, config.prepare_failures - ): - if should_show_in_tree(comparison, verbosity): - subtree.add( - get_comparison_text_for_tree(comparison, verbosity=verbosity) - ) - - return tree - - -async def check_and_log( - config: ConfigurationFile, - console: rich.console.Console, - verbosity: VerbositySetting = VerbositySetting.default, - client: Optional[happi.Client] = None, - name_filter: Optional[Sequence[str]] = None, - parallel: bool = True, - cache: Optional[DataCache] = None, - filename: Optional[str] = None, -) -> PreparedFile: - """ - Check a configuration and log the results. - - Parameters - ---------- - config : ConfigurationFile - The configuration to check. - console : rich.console.Console - The rich console to write output to. - verbosity : VerbositySetting, optional - The verbosity settings. - client : happi.Client, optional - The happi client, if available. - name_filter : Sequence[str], optional - A filter for names. - parallel : bool, optional - Pre-fill cache in parallel when possible. - cache : DataCache - The data cache instance. - - Returns - ------- - PreparedFile - The completed checkout file, with results recorded - """ - name_filter = list(name_filter or []) - - if cache is None: - cache = DataCache() - - prepared_file = PreparedFile.from_config(config, cache=cache, client=client) - - cache_fill_tasks = [] - if parallel: - try: - cache_fill_tasks = await prepared_file.fill_cache() - except asyncio.CancelledError: - console.print("Tests interrupted; no results available.") - return - - try: - await prepared_file.compare() - except asyncio.CancelledError: - console.print("Tests interrupted; showing partial results.") - for task in cache_fill_tasks or []: - task.cancel() - - root_tree = rich.tree.Tree(str(filename)) - tree = group_to_rich_tree(prepared_file.root, verbosity=verbosity) - root_tree.add(tree) - - if filename is not None: - console.print(root_tree) - else: - console.print(tree) - - return prepared_file - - -def save_report(prep_file: PreparedFile, report_path: str): - # Normalize report path - from pathlib import Path - save_path = Path(report_path).resolve() - - doc = PassiveAtefReport(str(save_path), config=prep_file) - doc.create_report() - - -async def main( - filename: str, - name_filter: Optional[Sequence[str]] = None, - parallel: bool = False, - *, - cleanup: bool = True, - signal_cache: Optional[_SignalCache] = None, - show_severity_emoji: bool = True, - show_severity_description: bool = True, - show_config_description: bool = False, - show_tags: bool = False, - show_passed_tests: bool = False, - report_path: Optional[str] = None, -): - - verbosity = VerbositySetting.from_kwargs( - show_severity_emoji=show_severity_emoji, - show_severity_description=show_severity_description, - show_config_description=show_config_description, - show_tags=show_tags, - show_passed_tests=show_passed_tests, - ) - - config_file = ConfigurationFile.from_filename(filename) - - console = rich.console.Console() - cache = DataCache(signals=signal_cache or get_signal_cache()) - try: - with console.status("[bold green] Performing checks..."): - prep_file = await check_and_log( - config_file, - console=console, - name_filter=name_filter, - parallel=parallel, - cache=cache, - filename=filename, - verbosity=verbosity, - ) - if report_path is not None: - with console.status("[bold green] Saving report..."): - save_report(prep_file, report_path) - finally: - if cleanup: - ophyd_cleanup() + await main(*args, **kwargs) diff --git a/atef/bin/check_main.py b/atef/bin/check_main.py new file mode 100644 index 00000000..f917cffa --- /dev/null +++ b/atef/bin/check_main.py @@ -0,0 +1,460 @@ +""" +`atef check` runs passive checkouts of devices given a configuration file. +""" +from __future__ import annotations + +import asyncio +import enum +import itertools +import logging +from typing import Dict, Optional, Sequence, Tuple, Union + +import happi +import rich +import rich.console +import rich.tree + +from ..cache import DataCache, _SignalCache, get_signal_cache +from ..check import Comparison, Severity +from ..config import (AnyConfiguration, AnyPreparedConfiguration, + ConfigurationFile, FailedConfiguration, + PreparedComparison, PreparedFile, PreparedGroup) +from ..report import PassiveAtefReport +from ..result import Result +from ..util import ophyd_cleanup + +logger = logging.getLogger(__name__) + +DESCRIPTION = __doc__ + + +class VerbositySetting(enum.Flag): + show_severity_emoji = enum.auto() + show_severity_description = enum.auto() + show_config_description = enum.auto() + show_tags = enum.auto() + show_passed_tests = enum.auto() + default = show_severity_emoji | show_severity_description + + @classmethod + def from_kwargs( + cls, start: Optional[VerbositySetting] = None, **kwargs + ) -> VerbositySetting: + """ + Get a VerbositySetting from the provided kwargs. + + Parameters + ---------- + start : VerbositySetting, optional + The starting VerbositySetting. + + **kwargs : str to bool + Keyword arguments that match VerbositySetting flags, with the + value set to False (clear) or True (set). + + Returns + ------- + VerbositySetting + The adjusted VerbositySetting. + """ + def set_or_clear(verbosity: cls, name: str, value: bool) -> cls: + flag = getattr(cls, name) + if value: + return verbosity | flag + return verbosity & ~flag + + if start is None: + verbosity = cls.default + else: + verbosity = start + + for setting in cls: + if setting.name is None: + continue + + setting_value = kwargs.get(setting.name, None) + if setting_value is not None: + verbosity = set_or_clear(verbosity, setting.name, setting_value) + return verbosity + + +default_severity_to_rich = { + Severity.success: "[bold green]:heavy_check_mark:", + Severity.warning: "[bold yellow]:heavy_check_mark:", + Severity.error: "[bold red]:x:", + Severity.internal_error: "[bold red]:x:", +} + +default_severity_to_log_level = { + Severity.success: logging.DEBUG, + Severity.warning: logging.WARNING, + Severity.error: logging.ERROR, + Severity.internal_error: logging.ERROR, +} + + +def get_result_from_comparison( + item: Union[PreparedComparison, Exception, FailedConfiguration, None] +) -> Tuple[Optional[PreparedComparison], Result]: + """ + Get a Result, if available, from the provided arguments. + + In the case of an exception (or None/internal error), create one. + + Parameters + ---------- + item : Union[PreparedComparison, Exception, None] + The item to grab a result from. + + Returns + ------- + PreparedComparison or None + The prepared comparison, if available + Result + The result instance. + """ + if item is None: + return None, Result( + severity=Severity.internal_error, + reason="no result available (comparison not run?)" + ) + if isinstance(item, Exception): + # An error that was transformed into a Result with a severity + return None, Result.from_exception(item) + if isinstance(item, FailedConfiguration): + # An error that was transformed into a Result with a severity + return None, item.result + + if item.result is None: + return item, Result( + severity=Severity.internal_error, + reason="no result available (comparison not run?)" + ) + + return item, item.result + + +def get_comparison_text_for_tree( + item: Union[PreparedComparison, Exception], + *, + severity_to_rich: Optional[Dict[Severity, str]] = None, + verbosity: VerbositySetting = VerbositySetting.default, +) -> str: + """ + Get a description for the rich Tree, given a comparison or a failed result. + + Parameters + ---------- + item : Union[PreparedComparison, Exception] + The item to add to the tree. + severity_to_rich : Dict[Severity, str], optional + A mapping of severity values to rich colors. + verbosity : VerbositySetting, optional + The verbosity settings. + + Returns + ------- + str or None + Returns a description to add to the tree. + """ + severity_to_rich = severity_to_rich or default_severity_to_rich + + prepared, result = get_result_from_comparison(item) + if result.severity > Severity.success: + return ( + f"{severity_to_rich[result.severity]}[default]: {result.reason}" + ) + + if VerbositySetting.show_passed_tests in verbosity and prepared is not None: + if prepared.comparison is not None: + description = prepared.comparison.describe() + else: + description = "no comparison configured" + + return ( + f"{severity_to_rich[result.severity]}[default]: " + f"{prepared.identifier} {description}" + ) + + # According to the severity and verbosity settings, this message should + # not be displayed. + return None + + +def get_name_for_tree( + obj: Union[Comparison, AnyConfiguration], + verbosity: VerbositySetting +) -> str: + """ + Get a combined name and description for a given item. + + Parameters + ---------- + obj : Union[Comparison, AnyConfiguration] + The comparison or configuration. + + Returns + ------- + str + The displayable name. + """ + if VerbositySetting.show_config_description in verbosity: + if obj.description: + if obj.name: + return f"{obj.name}: {obj.description}" + return obj.description + + if obj.name: + return obj.name + return "" + + +def get_tree_heading( + obj: AnyPreparedConfiguration, + verbosity: VerbositySetting, + severity_to_rich: Dict[Severity, str], +) -> str: + """ + Get severity, name, and description (per verbosity settings) for a tree. + + Parameters + ---------- + obj : Comparison or AnyConfiguration + The comparison or configuration. + + Returns + ------- + str + The displayable name. + """ + severity: Severity = getattr(obj.result, "severity", Severity.error) + + severity_text = [] + if VerbositySetting.show_severity_emoji in verbosity: + severity_text.append(severity_to_rich[severity]) + if VerbositySetting.show_severity_description in verbosity: + severity_text.append(severity.name.replace("_", " ").capitalize()) + severity_text.append(": ") + + severity_text = "".join(severity_text) + name_and_desc = get_name_for_tree(obj.config, verbosity) + return f"{severity_text}{name_and_desc}" + + +def should_show_in_tree( + item: Union[PreparedComparison, Exception, FailedConfiguration, None], + verbosity: VerbositySetting = VerbositySetting.default +) -> bool: + """ + Should ``item`` be shown in the tree, based on the verbosity settings? + + Parameters + ---------- + item : Union[PreparedComparison, Exception, FailedConfiguration, None] + The item to check. + verbosity : VerbositySetting, optional + The verbosity settings. + + Returns + ------- + bool + True to show it in the tree, False to not show it. + """ + _, result = get_result_from_comparison(item) + if result is None: + # Error - always show it + return True + + if result.severity == Severity.success: + return VerbositySetting.show_passed_tests in verbosity + return True + + +def group_to_rich_tree( + group: PreparedGroup, + verbosity: VerbositySetting = VerbositySetting.default, + severity_to_rich: Optional[Dict[Severity, str]] = None, +) -> rich.tree.Tree: + """ + Convert a `PreparedGroup` into a `rich.tree.Tree`. + + Parameters + ---------- + group : PreparedGroup + The group to convert. Comparisons must be complete to generate the + tree effectively. + verbosity : VerbositySetting, optional + The verbosity settings. + severity_to_rich : Dict[Severity, str], optional + A mapping of severity values to rich colors. + + Returns + ------- + rich.tree.Tree + """ + severity_to_rich = severity_to_rich or default_severity_to_rich + + tree = rich.tree.Tree( + get_tree_heading(group, severity_to_rich=severity_to_rich, verbosity=verbosity) + ) + for failure in group.prepare_failures: + if should_show_in_tree(failure, verbosity): + tree.add( + get_comparison_text_for_tree(failure, verbosity=verbosity) + ) + + for config in group.configs: + if isinstance(config, PreparedGroup): + tree.add( + group_to_rich_tree( + config, + verbosity=verbosity, + severity_to_rich=severity_to_rich + ) + ) + else: + subtree = rich.tree.Tree( + get_tree_heading( + config, severity_to_rich=severity_to_rich, verbosity=verbosity + ) + ) + severity = getattr(config.result, "severity", Severity.error) + if config.result is not None and severity == Severity.success: + if VerbositySetting.show_passed_tests in verbosity: + tree.add(subtree) + else: + tree.add(subtree) + + for comparison in itertools.chain( + config.comparisons, config.prepare_failures + ): + if should_show_in_tree(comparison, verbosity): + subtree.add( + get_comparison_text_for_tree(comparison, verbosity=verbosity) + ) + + return tree + + +async def check_and_log( + config: ConfigurationFile, + console: rich.console.Console, + verbosity: VerbositySetting = VerbositySetting.default, + client: Optional[happi.Client] = None, + name_filter: Optional[Sequence[str]] = None, + parallel: bool = True, + cache: Optional[DataCache] = None, + filename: Optional[str] = None, +) -> PreparedFile: + """ + Check a configuration and log the results. + + Parameters + ---------- + config : ConfigurationFile + The configuration to check. + console : rich.console.Console + The rich console to write output to. + verbosity : VerbositySetting, optional + The verbosity settings. + client : happi.Client, optional + The happi client, if available. + name_filter : Sequence[str], optional + A filter for names. + parallel : bool, optional + Pre-fill cache in parallel when possible. + cache : DataCache + The data cache instance. + + Returns + ------- + PreparedFile + The completed checkout file, with results recorded + """ + name_filter = list(name_filter or []) + + if cache is None: + cache = DataCache() + + prepared_file = PreparedFile.from_config(config, cache=cache, client=client) + + cache_fill_tasks = [] + if parallel: + try: + cache_fill_tasks = await prepared_file.fill_cache() + except asyncio.CancelledError: + console.print("Tests interrupted; no results available.") + return + + try: + await prepared_file.compare() + except asyncio.CancelledError: + console.print("Tests interrupted; showing partial results.") + for task in cache_fill_tasks or []: + task.cancel() + + root_tree = rich.tree.Tree(str(filename)) + tree = group_to_rich_tree(prepared_file.root, verbosity=verbosity) + root_tree.add(tree) + + if filename is not None: + console.print(root_tree) + else: + console.print(tree) + + return prepared_file + + +def save_report(prep_file: PreparedFile, report_path: str): + # Normalize report path + from pathlib import Path + save_path = Path(report_path).resolve() + + doc = PassiveAtefReport(str(save_path), config=prep_file) + doc.create_report() + + +async def main( + filename: str, + name_filter: Optional[Sequence[str]] = None, + parallel: bool = False, + *, + cleanup: bool = True, + signal_cache: Optional[_SignalCache] = None, + show_severity_emoji: bool = True, + show_severity_description: bool = True, + show_config_description: bool = False, + show_tags: bool = False, + show_passed_tests: bool = False, + report_path: Optional[str] = None, +): + + verbosity = VerbositySetting.from_kwargs( + show_severity_emoji=show_severity_emoji, + show_severity_description=show_severity_description, + show_config_description=show_config_description, + show_tags=show_tags, + show_passed_tests=show_passed_tests, + ) + + config_file = ConfigurationFile.from_filename(filename) + + console = rich.console.Console() + cache = DataCache(signals=signal_cache or get_signal_cache()) + try: + with console.status("[bold green] Performing checks..."): + prep_file = await check_and_log( + config_file, + console=console, + name_filter=name_filter, + parallel=parallel, + cache=cache, + filename=filename, + verbosity=verbosity, + ) + if report_path is not None: + with console.status("[bold green] Saving report..."): + save_report(prep_file, report_path) + finally: + if cleanup: + ophyd_cleanup() diff --git a/atef/bin/config.py b/atef/bin/config.py index 2699cbcf..2c513cad 100644 --- a/atef/bin/config.py +++ b/atef/bin/config.py @@ -1,18 +1,6 @@ -""" -`atef config` opens up a graphical config file editor. -""" import argparse -import logging -import sys -from typing import List, Optional -from pydm import exception -from qtpy.QtWidgets import QApplication, QStyleFactory - -from ..type_hints import AnyPath -from ..widgets.config.window import Window - -logger = logging.getLogger(__name__) +from qtpy.QtWidgets import QStyleFactory def build_arg_parser(argparser=None): @@ -77,18 +65,6 @@ def build_arg_parser(argparser=None): return argparser -def main(cache_size: int, filenames: Optional[List[AnyPath]] = None, **kwargs): - app = QApplication(sys.argv) - main_window = Window(cache_size=cache_size, show_welcome=not filenames) - main_window.show() - exception.install() - - for filename in filenames or []: - try: - main_window.open_file(filename=filename) - except FileNotFoundError: - logger.error( - "File specified on the command-line not found: %s", filename - ) - - app.exec() +def main(*args, **kwargs): + from atef.bin.config_main import main + main(*args, **kwargs) diff --git a/atef/bin/config_main.py b/atef/bin/config_main.py new file mode 100644 index 00000000..699fc25b --- /dev/null +++ b/atef/bin/config_main.py @@ -0,0 +1,31 @@ +""" +`atef config` opens up a graphical config file editor. +""" +import logging +import sys +from typing import List, Optional + +from pydm import exception +from qtpy.QtWidgets import QApplication + +from ..type_hints import AnyPath +from ..widgets.config.window import Window + +logger = logging.getLogger(__name__) + + +def main(cache_size: int, filenames: Optional[List[AnyPath]] = None, **kwargs): + app = QApplication(sys.argv) + main_window = Window(cache_size=cache_size, show_welcome=not filenames) + main_window.show() + exception.install() + + for filename in filenames or []: + try: + main_window.open_file(filename=filename) + except FileNotFoundError: + logger.error( + "File specified on the command-line not found: %s", filename + ) + + app.exec() diff --git a/atef/bin/scripts.py b/atef/bin/scripts.py index a5084259..b9e37e4a 100644 --- a/atef/bin/scripts.py +++ b/atef/bin/scripts.py @@ -24,6 +24,8 @@ def gather_scripts() -> Dict[str, Tuple[Callable, Callable]]: scripts_module = importlib.import_module("atef.scripts") for sub_module in iter_modules(scripts_module.__path__): module_name = sub_module.name + if "_main" in module_name: + continue try: module = importlib.import_module(f".{module_name}", "atef.scripts") except Exception as ex: diff --git a/atef/scripts/converter_v0.py b/atef/scripts/converter_v0.py index a1e5859d..6f57d37f 100644 --- a/atef/scripts/converter_v0.py +++ b/atef/scripts/converter_v0.py @@ -2,272 +2,12 @@ This script will convert a prototype atef configuration file to the latest supported (and numbered) version. """ - -from __future__ import annotations - import argparse -import json import logging -import pathlib -from dataclasses import dataclass, field -from typing import Dict, List, Optional, Tuple, Union, cast - -import apischema -import yaml - -import atef -import atef.config -from atef import serialization, tools -from atef.check import Comparison -from atef.type_hints import AnyPath logger = logging.getLogger(__name__) -DESCRIPTION = __doc__ - - -@dataclass -class IdentifierAndComparison: - """ - Set of identifiers (IDs) and comparisons to perform on those identifiers. - """ - - #: An optional identifier for this set. - name: Optional[str] = None - #: PV name, attribute name, or test-specific identifier. - ids: List[str] = field(default_factory=list) - #: The comparisons to perform for *each* of the ids. - comparisons: List[Comparison] = field(default_factory=list) - - -@dataclass -@serialization.as_tagged_union -class Configuration: - """ - Configuration base class for shared settings between all configurations. - - Subclasses of Comparison will be serialized as a tagged union. This means - that the subclass name will be used as an identifier for the generated - serialized dictionary (and JSON object). - """ - - #: Name tied to this configuration. - name: Optional[str] = None - #: Description tied to this configuration. - description: Optional[str] = None - #: Tags tied to this configuration. - tags: Optional[List[str]] = None - #: Comparison checklist for this configuration. - checklist: List[IdentifierAndComparison] = field(default_factory=list) - - -@dataclass -class DeviceConfiguration(Configuration): - """ - A configuration that is built to check one or more devices. - - Identifiers are by default assumed to be attribute (component) names of the - devices. Identifiers may refer to components on the device - (``"component"`` would mean to access each device's ``.component``) or may - refer to any level of sub-device components (``"sub_device.component"`` - would mean to access each device's ``.sub_device`` and that sub-device's - ``.a`` component). - """ - - #: Happi device names which give meaning to self.checklist[].ids. - devices: List[str] = field(default_factory=list) - - -@dataclass -class PVConfiguration(Configuration): - """ - A configuration that is built to check live EPICS PVs. - - Identifiers are by default assumed to be PV names. - """ - - ... - - -@dataclass -class ToolConfiguration(Configuration): - """ - A configuration unrelated to PVs or Devices which verifies status via some - tool. - - Comparisons can optionally be run on the tool's results. - """ - - tool: tools.Tool = field(default_factory=tools.Ping) - - -AnyConfiguration = Union[ - PVConfiguration, - DeviceConfiguration, - ToolConfiguration, -] -PathItem = Union[ - AnyConfiguration, - IdentifierAndComparison, - Comparison, - str, -] - -@dataclass -class PrototypeConfigurationFile: - #: configs: PVConfiguration, DeviceConfiguration, or ToolConfiguration. - configs: List[Configuration] - - @classmethod - def from_file(cls, filename: AnyPath) -> PrototypeConfigurationFile: - """Load a configuration file from JSON or yaml.""" - filename = pathlib.Path(filename) - if filename.suffix.lower() in (".yml", ".yaml"): - return cls.from_yaml(filename) - return cls.from_json(filename) - - @classmethod - def from_json(cls, filename: AnyPath) -> PrototypeConfigurationFile: - """Load a configuration file from JSON.""" - with open(filename) as fp: - serialized_config = json.load(fp) - return apischema.deserialize(cls, serialized_config) - - @classmethod - def from_yaml(cls, filename: AnyPath) -> PrototypeConfigurationFile: - """Load a configuration file from yaml.""" - with open(filename) as fp: - serialized_config = yaml.safe_load(fp) - return apischema.deserialize(cls, serialized_config) - - -def _split_shared_checklist( - checklist: List[IdentifierAndComparison], -) -> Tuple[List[Comparison], Dict[str, List[Comparison]]]: - """ - Split a prototype "checklist", consisting of pairs of identifiers and - comparisons into the new format of "shared" and "per-identifier" (i.e., - pv/attr) comparisons. - - Parameters - ---------- - checklist : List[IdentifierAndComparison] - The prototype checklist. - - Returns - ------- - List[Comparison] - Shared comparisons. - Dict[str, List[Comparison]] - Per-identifier comparisons, with the identifier as the key. - """ - shared = [] - by_identifier = {} - if len(checklist) == 1: - # If there is only one checklist, the comparisons can be considered - # "shared". - for check in checklist: - for comparison in check.comparisons: - shared.append(comparison) - for identifier in check.ids: - by_identifier.setdefault(identifier, []) - else: - # Otherwise, comparisons from every checklist will become - # per-identifier. - for check in checklist: - for comparison in check.comparisons: - for identifier in check.ids: - by_identifier.setdefault(identifier, []).append(comparison) - return shared, by_identifier - - -def convert_configuration(config: AnyConfiguration) -> atef.config.AnyConfiguration: - """ - Convert a prototype Configuration to a supported one. - - Parameters - ---------- - config : AnyConfiguration - The old prototype configuration. - - Returns - ------- - atef.config.AnyConfiguration - The new and supported configuration. - """ - if not isinstance(config, (DeviceConfiguration, PVConfiguration, ToolConfiguration)): - raise ValueError(f"Unexpected and unsupported config type: {type(config)}") - - shared, by_identifier = _split_shared_checklist(config.checklist) - if isinstance(config, DeviceConfiguration): - return atef.config.DeviceConfiguration( - name=config.name, - description=config.description, - tags=config.tags, - devices=config.devices, - by_attr=by_identifier, - shared=shared, - ) - - if isinstance(config, PVConfiguration): - return atef.config.PVConfiguration( - name=config.name, - description=config.description, - tags=config.tags, - by_pv=by_identifier, - shared=shared, - ) - - if isinstance(config, ToolConfiguration): - return atef.config.ToolConfiguration( - name=config.name, - description=config.description, - tags=config.tags, - tool=config.tool, - shared=shared, - by_attr=by_identifier, - ) - - -def load(filename: AnyPath) -> atef.config.ConfigurationFile: - """ - Load the provided prototype atef configuration file to the latest - supported (and numbered) version. - - Parameters - ---------- - filename : AnyPath - The filename to open. - - Returns - ------- - atef.config.ConfigurationFile - The converted configuration file. - """ - old = PrototypeConfigurationFile.from_file(filename) - new = atef.config.ConfigurationFile() - for config in old.configs: - config = cast(AnyConfiguration, config) - new.root.configs.append(convert_configuration(config)) - return new - - -def convert(fn: AnyPath) -> str: - """ - Convert the provided prototype atef configuration file, returning JSON - to be saved. - - Parameters - ---------- - filename : AnyPath - The filename to open. - - Returns - ------- - str - The new file contents. - """ - return json.dumps(load(fn).to_json(), indent=2) +DESCRIPTION = __doc__ def build_arg_parser(argparser=None) -> argparse.ArgumentParser: @@ -303,21 +43,9 @@ def build_arg_parser(argparser=None) -> argparse.ArgumentParser: return argparser -def main( - filename: str, - write: bool -): - for filename in filename: - converted = convert(filename) - - if write: - logger.warning("Overwriting converted file: %s", filename) - with open(filename, "wt") as fp: - print(converted, file=fp) - else: - print(f"-- {filename} --") - print(converted) - print() +def main(*args, **kwargs): + from atef.scripts.converter_v0_main import main + main(*args, **kwargs) def main_script(args=None) -> None: diff --git a/atef/scripts/converter_v0_main.py b/atef/scripts/converter_v0_main.py new file mode 100644 index 00000000..b0c7eff8 --- /dev/null +++ b/atef/scripts/converter_v0_main.py @@ -0,0 +1,286 @@ +""" +This script will convert a prototype atef configuration file to the latest +supported (and numbered) version. +""" + +from __future__ import annotations + +import json +import logging +import pathlib +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple, Union, cast + +import apischema +import yaml + +import atef +import atef.config +from atef import serialization, tools +from atef.check import Comparison +from atef.type_hints import AnyPath + +logger = logging.getLogger(__name__) +DESCRIPTION = __doc__ + + +@dataclass +class IdentifierAndComparison: + """ + Set of identifiers (IDs) and comparisons to perform on those identifiers. + """ + + #: An optional identifier for this set. + name: Optional[str] = None + #: PV name, attribute name, or test-specific identifier. + ids: List[str] = field(default_factory=list) + #: The comparisons to perform for *each* of the ids. + comparisons: List[Comparison] = field(default_factory=list) + + +@dataclass +@serialization.as_tagged_union +class Configuration: + """ + Configuration base class for shared settings between all configurations. + + Subclasses of Comparison will be serialized as a tagged union. This means + that the subclass name will be used as an identifier for the generated + serialized dictionary (and JSON object). + """ + + #: Name tied to this configuration. + name: Optional[str] = None + #: Description tied to this configuration. + description: Optional[str] = None + #: Tags tied to this configuration. + tags: Optional[List[str]] = None + #: Comparison checklist for this configuration. + checklist: List[IdentifierAndComparison] = field(default_factory=list) + + +@dataclass +class DeviceConfiguration(Configuration): + """ + A configuration that is built to check one or more devices. + + Identifiers are by default assumed to be attribute (component) names of the + devices. Identifiers may refer to components on the device + (``"component"`` would mean to access each device's ``.component``) or may + refer to any level of sub-device components (``"sub_device.component"`` + would mean to access each device's ``.sub_device`` and that sub-device's + ``.a`` component). + """ + + #: Happi device names which give meaning to self.checklist[].ids. + devices: List[str] = field(default_factory=list) + + +@dataclass +class PVConfiguration(Configuration): + """ + A configuration that is built to check live EPICS PVs. + + Identifiers are by default assumed to be PV names. + """ + + ... + + +@dataclass +class ToolConfiguration(Configuration): + """ + A configuration unrelated to PVs or Devices which verifies status via some + tool. + + Comparisons can optionally be run on the tool's results. + """ + + tool: tools.Tool = field(default_factory=tools.Ping) + + +AnyConfiguration = Union[ + PVConfiguration, + DeviceConfiguration, + ToolConfiguration, +] +PathItem = Union[ + AnyConfiguration, + IdentifierAndComparison, + Comparison, + str, +] + + +@dataclass +class PrototypeConfigurationFile: + #: configs: PVConfiguration, DeviceConfiguration, or ToolConfiguration. + configs: List[Configuration] + + @classmethod + def from_file(cls, filename: AnyPath) -> PrototypeConfigurationFile: + """Load a configuration file from JSON or yaml.""" + filename = pathlib.Path(filename) + if filename.suffix.lower() in (".yml", ".yaml"): + return cls.from_yaml(filename) + return cls.from_json(filename) + + @classmethod + def from_json(cls, filename: AnyPath) -> PrototypeConfigurationFile: + """Load a configuration file from JSON.""" + with open(filename) as fp: + serialized_config = json.load(fp) + return apischema.deserialize(cls, serialized_config) + + @classmethod + def from_yaml(cls, filename: AnyPath) -> PrototypeConfigurationFile: + """Load a configuration file from yaml.""" + with open(filename) as fp: + serialized_config = yaml.safe_load(fp) + return apischema.deserialize(cls, serialized_config) + + +def _split_shared_checklist( + checklist: List[IdentifierAndComparison], +) -> Tuple[List[Comparison], Dict[str, List[Comparison]]]: + """ + Split a prototype "checklist", consisting of pairs of identifiers and + comparisons into the new format of "shared" and "per-identifier" (i.e., + pv/attr) comparisons. + + Parameters + ---------- + checklist : List[IdentifierAndComparison] + The prototype checklist. + + Returns + ------- + List[Comparison] + Shared comparisons. + Dict[str, List[Comparison]] + Per-identifier comparisons, with the identifier as the key. + """ + shared = [] + by_identifier = {} + if len(checklist) == 1: + # If there is only one checklist, the comparisons can be considered + # "shared". + for check in checklist: + for comparison in check.comparisons: + shared.append(comparison) + for identifier in check.ids: + by_identifier.setdefault(identifier, []) + else: + # Otherwise, comparisons from every checklist will become + # per-identifier. + for check in checklist: + for comparison in check.comparisons: + for identifier in check.ids: + by_identifier.setdefault(identifier, []).append(comparison) + return shared, by_identifier + + +def convert_configuration(config: AnyConfiguration) -> atef.config.AnyConfiguration: + """ + Convert a prototype Configuration to a supported one. + + Parameters + ---------- + config : AnyConfiguration + The old prototype configuration. + + Returns + ------- + atef.config.AnyConfiguration + The new and supported configuration. + """ + if not isinstance(config, (DeviceConfiguration, PVConfiguration, ToolConfiguration)): + raise ValueError(f"Unexpected and unsupported config type: {type(config)}") + + shared, by_identifier = _split_shared_checklist(config.checklist) + if isinstance(config, DeviceConfiguration): + return atef.config.DeviceConfiguration( + name=config.name, + description=config.description, + tags=config.tags, + devices=config.devices, + by_attr=by_identifier, + shared=shared, + ) + + if isinstance(config, PVConfiguration): + return atef.config.PVConfiguration( + name=config.name, + description=config.description, + tags=config.tags, + by_pv=by_identifier, + shared=shared, + ) + + if isinstance(config, ToolConfiguration): + return atef.config.ToolConfiguration( + name=config.name, + description=config.description, + tags=config.tags, + tool=config.tool, + shared=shared, + by_attr=by_identifier, + ) + + +def load(filename: AnyPath) -> atef.config.ConfigurationFile: + """ + Load the provided prototype atef configuration file to the latest + supported (and numbered) version. + + Parameters + ---------- + filename : AnyPath + The filename to open. + + Returns + ------- + atef.config.ConfigurationFile + The converted configuration file. + """ + old = PrototypeConfigurationFile.from_file(filename) + new = atef.config.ConfigurationFile() + for config in old.configs: + config = cast(AnyConfiguration, config) + new.root.configs.append(convert_configuration(config)) + return new + + +def convert(fn: AnyPath) -> str: + """ + Convert the provided prototype atef configuration file, returning JSON + to be saved. + + Parameters + ---------- + filename : AnyPath + The filename to open. + + Returns + ------- + str + The new file contents. + """ + return json.dumps(load(fn).to_json(), indent=2) + + +def main( + filename: str, + write: bool +): + for filename in filename: + converted = convert(filename) + + if write: + logger.warning("Overwriting converted file: %s", filename) + with open(filename, "wt") as fp: + print(converted, file=fp) + else: + print(f"-- {filename} --") + print(converted) + print() diff --git a/atef/scripts/pmgr_check.py b/atef/scripts/pmgr_check.py index a9251ffa..02db058e 100644 --- a/atef/scripts/pmgr_check.py +++ b/atef/scripts/pmgr_check.py @@ -7,123 +7,10 @@ python scripts/pmgr_check.py cxi test_pmgr_checkout.json --names "KB1 DS SLIT LEF" --prefix CXI:KB1:MMS:13 """ import argparse -import json import logging -from typing import Any, Dict, List - -import apischema -from pmgr import pmgrAPI - -from atef.check import Equals -from atef.config import ConfigurationFile, ConfigurationGroup, PVConfiguration +logger = logging.getLogger(__name__) DESCRIPTION = __doc__ -logger = logging.getLogger() - - -def get_pv(prefix: str, key: str): - """ - Parse key from pmgr configuration data dictionary. Keys are of the form: - 'FLD_ACCL' or 'FLD_BDST', denoting the suffixes to append to `prefix`. - - Ignores unrecognized keys (keys without expected prefixes) - - Parameters - ---------- - prefix : str - the EPICS PV prefix - key : str - the key from a pmgr configuration data dictionary - - Returns - ------- - str - a fully qualified EPICS PV - """ - if 'FLD_' in key: - suffix = key.removeprefix('FLD') - elif 'PV_' in key: - suffix = key.removeprefix('PV') - else: - logger.debug(f'Unrecognized key provided: {key}') - return - - # general string fixing... ew - suffix_parts = suffix.split("__") - new_suffix_list = [":".join(substr.split('_')) for substr in suffix_parts] - suffix = '_'.join(new_suffix_list) - if 'FLD_' in key: - suffix = ".".join(suffix.rsplit(":", 1)) - pv = prefix + suffix - return pv - - -def get_cfg_data( - hutch: str, - config_name: str, - table_name: str = 'ims_motor' -) -> Dict[str, Any]: - """ - Get pmgr config data corresponding to ``config_name`` and ``hutch`` - - Parameters - ---------- - hutch : str - the hutch name, e.g. 'cxi' - config_name : str - the pmgr config name, e.g. 'KB1 DS SLIT LEF' - table_name : str - the name of the pmgr table to examine, by default 'ims_motor' - - Returns - ------- - Dict[str, Any] - The configuration values dictionary - """ - pm = pmgrAPI.pmgrAPI(table_name, hutch.lower()) - cfg_data = pm.get_config_values(config_name) - - return cfg_data - - -def create_atef_check( - config_name: str, - cfg_data: Dict[str, Any], - prefix: str -) -> PVConfiguration: - """ - Construct the full atef checkout. Simply creates an Equals comparison for each - value in the pmgr configuration, and groups it in a PVConfiguration - - Parameters - ---------- - config_name : str - the pmgr config name, e.g. 'KB1 DS SLIT LEF' - cfg_data : Dict[str, Any] - the configuration values dictionary, as returned from `get_cfg_data` - prefix : str - the EPICS Prefix - - Returns - ------- - PVConfiguration - The completed atef checkout - """ - pv_config = PVConfiguration(name=f'check motor config: {config_name}', - description='Configuration pulled from pmgr') - - for key, value in cfg_data.items(): - pv = get_pv(prefix, key) - if pv is None: - continue - - comp = Equals(name=f'check for {pv}', description=f'Checking {pv} == {value}', - value=value or 0) - - # would need to handle first-time additions - pv_config.by_pv[pv] = [comp] - - return pv_config def build_arg_parser(argparser=None) -> argparse.ArgumentParser: @@ -177,28 +64,9 @@ def build_arg_parser(argparser=None) -> argparse.ArgumentParser: return argparser -def main( - hutch: str, - filename: str, - pmgr_names: List[str], - prefixes: List[str], - table_name: str = 'ims_motor' -) -> None: - if len(prefixes) != len(pmgr_names): - raise ValueError('Must provide the same number of configuration names ' - f'{len(pmgr_names)} and prefixes {len(prefixes)}') - - file = ConfigurationFile(root=ConfigurationGroup(name='base group', configs=[])) - for prefix, name in zip(prefixes, pmgr_names): - cfg_data = get_cfg_data(hutch, name, table_name=table_name) - pv_config = create_atef_check(name, cfg_data, prefix) - - file.root.configs.append(pv_config) - - ser = apischema.serialize(ConfigurationFile, file) - - with open(filename, 'w') as fd: - json.dump(ser, fd, indent=2) +def main(*args, **kwargs): + from atef.scripts.pmgr_check_main import main + main(*args, **kwargs) def main_script(args=None) -> None: diff --git a/atef/scripts/pmgr_check_main.py b/atef/scripts/pmgr_check_main.py new file mode 100644 index 00000000..66e52f39 --- /dev/null +++ b/atef/scripts/pmgr_check_main.py @@ -0,0 +1,149 @@ +""" +This script creates an atef check from a pmgr configuration. The configuration will +be converted into a PVConfiguration. Note that default tolerances will be used for +checks. + +An example invocation might be: +python scripts/pmgr_check.py cxi test_pmgr_checkout.json --names "KB1 DS SLIT LEF" --prefix CXI:KB1:MMS:13 +""" +import json +import logging +from typing import Any, Dict, List + +import apischema +from pmgr import pmgrAPI + +from atef.check import Equals +from atef.config import ConfigurationFile, ConfigurationGroup, PVConfiguration + +DESCRIPTION = __doc__ +logger = logging.getLogger() + + +def get_pv(prefix: str, key: str): + """ + Parse key from pmgr configuration data dictionary. Keys are of the form: + 'FLD_ACCL' or 'FLD_BDST', denoting the suffixes to append to `prefix`. + + Ignores unrecognized keys (keys without expected prefixes) + + Parameters + ---------- + prefix : str + the EPICS PV prefix + key : str + the key from a pmgr configuration data dictionary + + Returns + ------- + str + a fully qualified EPICS PV + """ + if 'FLD_' in key: + suffix = key.removeprefix('FLD') + elif 'PV_' in key: + suffix = key.removeprefix('PV') + else: + logger.debug(f'Unrecognized key provided: {key}') + return + + # general string fixing... ew + suffix_parts = suffix.split("__") + new_suffix_list = [":".join(substr.split('_')) for substr in suffix_parts] + suffix = '_'.join(new_suffix_list) + if 'FLD_' in key: + suffix = ".".join(suffix.rsplit(":", 1)) + pv = prefix + suffix + return pv + + +def get_cfg_data( + hutch: str, + config_name: str, + table_name: str = 'ims_motor' +) -> Dict[str, Any]: + """ + Get pmgr config data corresponding to ``config_name`` and ``hutch`` + + Parameters + ---------- + hutch : str + the hutch name, e.g. 'cxi' + config_name : str + the pmgr config name, e.g. 'KB1 DS SLIT LEF' + table_name : str + the name of the pmgr table to examine, by default 'ims_motor' + + Returns + ------- + Dict[str, Any] + The configuration values dictionary + """ + pm = pmgrAPI.pmgrAPI(table_name, hutch.lower()) + cfg_data = pm.get_config_values(config_name) + + return cfg_data + + +def create_atef_check( + config_name: str, + cfg_data: Dict[str, Any], + prefix: str +) -> PVConfiguration: + """ + Construct the full atef checkout. Simply creates an Equals comparison for each + value in the pmgr configuration, and groups it in a PVConfiguration + + Parameters + ---------- + config_name : str + the pmgr config name, e.g. 'KB1 DS SLIT LEF' + cfg_data : Dict[str, Any] + the configuration values dictionary, as returned from `get_cfg_data` + prefix : str + the EPICS Prefix + + Returns + ------- + PVConfiguration + The completed atef checkout + """ + pv_config = PVConfiguration(name=f'check motor config: {config_name}', + description='Configuration pulled from pmgr') + + for key, value in cfg_data.items(): + pv = get_pv(prefix, key) + if pv is None: + continue + + comp = Equals(name=f'check for {pv}', description=f'Checking {pv} == {value}', + value=value or 0) + + # would need to handle first-time additions + pv_config.by_pv[pv] = [comp] + + return pv_config + + +def main( + hutch: str, + filename: str, + pmgr_names: List[str], + prefixes: List[str], + table_name: str = 'ims_motor' +) -> None: + if len(prefixes) != len(pmgr_names): + raise ValueError('Must provide the same number of configuration names ' + f'{len(pmgr_names)} and prefixes {len(prefixes)}') + + file = ConfigurationFile(root=ConfigurationGroup(name='base group', configs=[])) + for prefix, name in zip(prefixes, pmgr_names): + cfg_data = get_cfg_data(hutch, name, table_name=table_name) + pv_config = create_atef_check(name, cfg_data, prefix) + + file.root.configs.append(pv_config) + + ser = apischema.serialize(ConfigurationFile, file) + + with open(filename, 'w') as fd: + json.dump(ser, fd, indent=2) diff --git a/atef/tests/test_commandline.py b/atef/tests/test_commandline.py index 8a16c02b..8c28d211 100644 --- a/atef/tests/test_commandline.py +++ b/atef/tests/test_commandline.py @@ -4,7 +4,7 @@ import pytest import atef.bin.main as atef_main -from atef.bin import check as bin_check +from atef.bin import check_main as bin_check from .. import util from .conftest import CONFIG_PATH From 225ede38ed0a96795dff1bb41efde07c3df462d0 Mon Sep 17 00:00:00 2001 From: tangkong Date: Tue, 17 Sep 2024 10:42:52 -0700 Subject: [PATCH 2/2] DOC: pre-release notes --- .../upcoming_release_notes/257-perf_cli2.rst | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 docs/source/upcoming_release_notes/257-perf_cli2.rst diff --git a/docs/source/upcoming_release_notes/257-perf_cli2.rst b/docs/source/upcoming_release_notes/257-perf_cli2.rst new file mode 100644 index 00000000..b83c91dd --- /dev/null +++ b/docs/source/upcoming_release_notes/257-perf_cli2.rst @@ -0,0 +1,22 @@ +257 perf_cli2 +############# + +API Breaks +---------- +- N/A + +Features +-------- +- N/A + +Bugfixes +-------- +- N/A + +Maintenance +----------- +- improves the performance of the CLI entrypoint, deferring functional imports as long as possible + +Contributors +------------ +- tangkong