diff --git a/supervisor/api/__init__.py b/supervisor/api/__init__.py index 3a436c94b28..00022c14cb7 100644 --- a/supervisor/api/__init__.py +++ b/supervisor/api/__init__.py @@ -219,6 +219,8 @@ def _register_jobs(self) -> None: web.get("/jobs/info", api_jobs.info), web.post("/jobs/options", api_jobs.options), web.post("/jobs/reset", api_jobs.reset), + web.get("/jobs/{uuid}", api_jobs.job_info), + web.delete("/jobs/{uuid}", api_jobs.remove_job), ] ) diff --git a/supervisor/api/backups.py b/supervisor/api/backups.py index 56518633b9b..288284394af 100644 --- a/supervisor/api/backups.py +++ b/supervisor/api/backups.py @@ -1,5 +1,6 @@ """Backups RESTful API.""" import asyncio +from collections.abc import Callable import errno import logging from pathlib import Path @@ -11,6 +12,7 @@ from aiohttp.hdrs import CONTENT_DISPOSITION import voluptuous as vol +from ..backups.backup import Backup from ..backups.validate import ALL_FOLDERS, FOLDER_HOMEASSISTANT, days_until_stale from ..const import ( ATTR_ADDONS, @@ -33,12 +35,15 @@ ATTR_TIMEOUT, ATTR_TYPE, ATTR_VERSION, + BusEvent, + CoreState, ) from ..coresys import CoreSysAttributes from ..exceptions import APIError +from ..jobs import JobSchedulerOptions from ..mounts.const import MountUsage from ..resolution.const import UnhealthyReason -from .const import CONTENT_TYPE_TAR +from .const import ATTR_BACKGROUND, ATTR_JOB_ID, CONTENT_TYPE_TAR from .utils import api_process, api_validate _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -50,17 +55,21 @@ _ALL_FOLDERS = ALL_FOLDERS + [FOLDER_HOMEASSISTANT] # pylint: disable=no-value-for-parameter -SCHEMA_RESTORE_PARTIAL = vol.Schema( +SCHEMA_RESTORE_FULL = vol.Schema( { vol.Optional(ATTR_PASSWORD): vol.Maybe(str), + vol.Optional(ATTR_BACKGROUND, default=False): vol.Boolean(), + } +) + +SCHEMA_RESTORE_PARTIAL = SCHEMA_RESTORE_FULL.extend( + { vol.Optional(ATTR_HOMEASSISTANT): vol.Boolean(), vol.Optional(ATTR_ADDONS): vol.All([str], vol.Unique()), vol.Optional(ATTR_FOLDERS): vol.All([vol.In(_ALL_FOLDERS)], vol.Unique()), } ) -SCHEMA_RESTORE_FULL = vol.Schema({vol.Optional(ATTR_PASSWORD): vol.Maybe(str)}) - SCHEMA_BACKUP_FULL = vol.Schema( { vol.Optional(ATTR_NAME): str, @@ -68,6 +77,7 @@ vol.Optional(ATTR_COMPRESSED): vol.Maybe(vol.Boolean()), vol.Optional(ATTR_LOCATON): vol.Maybe(str), vol.Optional(ATTR_HOMEASSISTANT_EXCLUDE_DATABASE): vol.Boolean(), + vol.Optional(ATTR_BACKGROUND, default=False): vol.Boolean(), } ) @@ -204,46 +214,109 @@ def _location_to_mount(self, body: dict[str, Any]) -> dict[str, Any]: return body + async def _background_backup_task( + self, backup_method: Callable, *args, **kwargs + ) -> tuple[asyncio.Task, str]: + """Start backup task in background and return task and job ID.""" + event = asyncio.Event() + job, backup_task = self.sys_jobs.schedule_job( + backup_method, JobSchedulerOptions(), *args, **kwargs + ) + + async def release_on_freeze(new_state: CoreState): + if new_state == CoreState.FREEZE: + event.set() + + # Wait for system to get into freeze state before returning + # If the backup fails validation it will raise before getting there + listener = self.sys_bus.register_event( + BusEvent.SUPERVISOR_STATE_CHANGE, release_on_freeze + ) + try: + await asyncio.wait( + ( + backup_task, + self.sys_create_task(event.wait()), + ), + return_when=asyncio.FIRST_COMPLETED, + ) + return (backup_task, job.uuid) + finally: + self.sys_bus.remove_listener(listener) + @api_process async def backup_full(self, request): """Create full backup.""" body = await api_validate(SCHEMA_BACKUP_FULL, request) - - backup = await asyncio.shield( - self.sys_backups.do_backup_full(**self._location_to_mount(body)) + background = body.pop(ATTR_BACKGROUND) + backup_task, job_id = await self._background_backup_task( + self.sys_backups.do_backup_full, **self._location_to_mount(body) ) + if background and not backup_task.done(): + return {ATTR_JOB_ID: job_id} + + backup: Backup = await backup_task if backup: - return {ATTR_SLUG: backup.slug} - return False + return {ATTR_JOB_ID: job_id, ATTR_SLUG: backup.slug} + raise APIError( + f"An error occurred while making backup, check job '{job_id}' or supervisor logs for details", + job_id=job_id, + ) @api_process async def backup_partial(self, request): """Create a partial backup.""" body = await api_validate(SCHEMA_BACKUP_PARTIAL, request) - backup = await asyncio.shield( - self.sys_backups.do_backup_partial(**self._location_to_mount(body)) + background = body.pop(ATTR_BACKGROUND) + backup_task, job_id = await self._background_backup_task( + self.sys_backups.do_backup_partial, **self._location_to_mount(body) ) + if background and not backup_task.done(): + return {ATTR_JOB_ID: job_id} + + backup: Backup = await backup_task if backup: - return {ATTR_SLUG: backup.slug} - return False + return {ATTR_JOB_ID: job_id, ATTR_SLUG: backup.slug} + raise APIError( + f"An error occurred while making backup, check job '{job_id}' or supervisor logs for details", + job_id=job_id, + ) @api_process async def restore_full(self, request): """Full restore of a backup.""" backup = self._extract_slug(request) body = await api_validate(SCHEMA_RESTORE_FULL, request) + background = body.pop(ATTR_BACKGROUND) + restore_task, job_id = await self._background_backup_task( + self.sys_backups.do_restore_full, backup, **body + ) - return await asyncio.shield(self.sys_backups.do_restore_full(backup, **body)) + if background and not restore_task.done() or await restore_task: + return {ATTR_JOB_ID: job_id} + raise APIError( + f"An error occurred during restore of {backup.slug}, check job '{job_id}' or supervisor logs for details", + job_id=job_id, + ) @api_process async def restore_partial(self, request): """Partial restore a backup.""" backup = self._extract_slug(request) body = await api_validate(SCHEMA_RESTORE_PARTIAL, request) + background = body.pop(ATTR_BACKGROUND) + restore_task, job_id = await self._background_backup_task( + self.sys_backups.do_restore_partial, backup, **body + ) - return await asyncio.shield(self.sys_backups.do_restore_partial(backup, **body)) + if background and not restore_task.done() or await restore_task: + return {ATTR_JOB_ID: job_id} + raise APIError( + f"An error occurred during restore of {backup.slug}, check job '{job_id}' or supervisor logs for details", + job_id=job_id, + ) @api_process async def freeze(self, request): diff --git a/supervisor/api/const.py b/supervisor/api/const.py index 576d2beb653..7c5dd72d190 100644 --- a/supervisor/api/const.py +++ b/supervisor/api/const.py @@ -13,6 +13,7 @@ ATTR_APPARMOR_VERSION = "apparmor_version" ATTR_ATTRIBUTES = "attributes" ATTR_AVAILABLE_UPDATES = "available_updates" +ATTR_BACKGROUND = "background" ATTR_BOOT_TIMESTAMP = "boot_timestamp" ATTR_BOOTS = "boots" ATTR_BROADCAST_LLMNR = "broadcast_llmnr" @@ -31,6 +32,7 @@ ATTR_FALLBACK = "fallback" ATTR_FILESYSTEMS = "filesystems" ATTR_IDENTIFIERS = "identifiers" +ATTR_JOB_ID = "job_id" ATTR_JOBS = "jobs" ATTR_LLMNR = "llmnr" ATTR_LLMNR_HOSTNAME = "llmnr_hostname" diff --git a/supervisor/api/jobs.py b/supervisor/api/jobs.py index 9aec7166f7c..c87b3ad8574 100644 --- a/supervisor/api/jobs.py +++ b/supervisor/api/jobs.py @@ -6,6 +6,7 @@ import voluptuous as vol from ..coresys import CoreSysAttributes +from ..exceptions import APIError from ..jobs import SupervisorJob from ..jobs.const import ATTR_IGNORE_CONDITIONS, JobCondition from .const import ATTR_JOBS @@ -21,7 +22,7 @@ class APIJobs(CoreSysAttributes): """Handle RESTful API for OS functions.""" - def _list_jobs(self) -> list[dict[str, Any]]: + def _list_jobs(self, start: SupervisorJob | None = None) -> list[dict[str, Any]]: """Return current job tree.""" jobs_by_parent: dict[str | None, list[SupervisorJob]] = {} for job in self.sys_jobs.jobs: @@ -34,9 +35,11 @@ def _list_jobs(self) -> list[dict[str, Any]]: jobs_by_parent[job.parent_id].append(job) job_list: list[dict[str, Any]] = [] - queue: list[tuple[list[dict[str, Any]], SupervisorJob]] = [ - (job_list, job) for job in jobs_by_parent.get(None, []) - ] + queue: list[tuple[list[dict[str, Any]], SupervisorJob]] = ( + [(job_list, start)] + if start + else [(job_list, job) for job in jobs_by_parent.get(None, [])] + ) while queue: (current_list, current_job) = queue.pop(0) @@ -78,3 +81,19 @@ async def options(self, request: web.Request) -> None: async def reset(self, request: web.Request) -> None: """Reset options for JobManager.""" self.sys_jobs.reset_data() + + @api_process + async def job_info(self, request: web.Request) -> dict[str, Any]: + """Get details of a job by ID.""" + job = self.sys_jobs.get_job(request.match_info.get("uuid")) + return self._list_jobs(job)[0] + + @api_process + async def remove_job(self, request: web.Request) -> None: + """Remove a completed job.""" + job = self.sys_jobs.get_job(request.match_info.get("uuid")) + + if not job.done: + raise APIError(f"Job {job.uuid} is not done!") + + self.sys_jobs.remove_job(job) diff --git a/supervisor/api/utils.py b/supervisor/api/utils.py index 02c277302eb..8c8db2259f7 100644 --- a/supervisor/api/utils.py +++ b/supervisor/api/utils.py @@ -13,6 +13,7 @@ HEADER_TOKEN, HEADER_TOKEN_OLD, JSON_DATA, + JSON_JOB_ID, JSON_MESSAGE, JSON_RESULT, REQUEST_FROM, @@ -124,11 +125,15 @@ def api_return_error( if check_exception_chain(error, DockerAPIError): message = format_message(message) + result = { + JSON_RESULT: RESULT_ERROR, + JSON_MESSAGE: message or "Unknown error, see supervisor", + } + if isinstance(error, APIError) and error.job_id: + result[JSON_JOB_ID] = error.job_id + return web.json_response( - { - JSON_RESULT: RESULT_ERROR, - JSON_MESSAGE: message or "Unknown error, see supervisor", - }, + result, status=400, dumps=json_dumps, ) diff --git a/supervisor/backups/backup.py b/supervisor/backups/backup.py index e9919a5b927..3f92c6b54cb 100644 --- a/supervisor/backups/backup.py +++ b/supervisor/backups/backup.py @@ -1,7 +1,9 @@ """Representation of a backup file.""" import asyncio from base64 import b64decode, b64encode +from collections import defaultdict from collections.abc import Awaitable +from copy import deepcopy from datetime import timedelta from functools import cached_property import json @@ -42,8 +44,11 @@ ATTR_VERSION, CRYPTO_AES128, ) -from ..coresys import CoreSys, CoreSysAttributes -from ..exceptions import AddonsError, BackupError +from ..coresys import CoreSys +from ..exceptions import AddonsError, BackupError, BackupInvalidError +from ..jobs.const import JOB_GROUP_BACKUP +from ..jobs.decorator import Job +from ..jobs.job_group import JobGroup from ..utils import remove_folder from ..utils.dt import parse_datetime, utcnow from ..utils.json import write_json_file @@ -54,14 +59,22 @@ _LOGGER: logging.Logger = logging.getLogger(__name__) -class Backup(CoreSysAttributes): +class Backup(JobGroup): """A single Supervisor backup.""" - def __init__(self, coresys: CoreSys, tar_file: Path): + def __init__( + self, + coresys: CoreSys, + tar_file: Path, + slug: str, + data: dict[str, Any] | None = None, + ): """Initialize a backup.""" - self.coresys: CoreSys = coresys + super().__init__( + coresys, JOB_GROUP_BACKUP.format_map(defaultdict(str, slug=slug)), slug + ) self._tarfile: Path = tar_file - self._data: dict[str, Any] = {} + self._data: dict[str, Any] = data or {ATTR_SLUG: slug} self._tmp = None self._key: bytes | None = None self._aes: Cipher | None = None @@ -87,7 +100,7 @@ def name(self) -> str: return self._data[ATTR_NAME] @property - def date(self): + def date(self) -> str: """Return backup date.""" return self._data[ATTR_DATE] @@ -102,32 +115,32 @@ def compressed(self) -> bool: return self._data[ATTR_COMPRESSED] @property - def addons(self): + def addons(self) -> list[dict[str, Any]]: """Return backup date.""" return self._data[ATTR_ADDONS] @property - def addon_list(self): + def addon_list(self) -> list[str]: """Return a list of add-ons slugs.""" return [addon_data[ATTR_SLUG] for addon_data in self.addons] @property - def folders(self): + def folders(self) -> list[str]: """Return list of saved folders.""" return self._data[ATTR_FOLDERS] @property - def repositories(self): + def repositories(self) -> list[str]: """Return backup date.""" return self._data[ATTR_REPOSITORIES] @repositories.setter - def repositories(self, value): + def repositories(self, value: list[str]) -> None: """Set backup date.""" self._data[ATTR_REPOSITORIES] = value @property - def homeassistant_version(self): + def homeassistant_version(self) -> AwesomeVersion: """Return backup Home Assistant version.""" if self.homeassistant is None: return None @@ -141,7 +154,7 @@ def homeassistant_exclude_database(self) -> bool: return self.homeassistant[ATTR_EXCLUDE_DATABASE] @property - def homeassistant(self): + def homeassistant(self) -> dict[str, Any]: """Return backup Home Assistant data.""" return self._data[ATTR_HOMEASSISTANT] @@ -151,12 +164,12 @@ def supervisor_version(self) -> AwesomeVersion: return self._data[ATTR_SUPERVISOR_VERSION] @property - def docker(self): + def docker(self) -> dict[str, Any]: """Return backup Docker config data.""" return self._data.get(ATTR_DOCKER, {}) @docker.setter - def docker(self, value): + def docker(self, value: dict[str, Any]) -> None: """Set the Docker config data.""" self._data[ATTR_DOCKER] = value @@ -169,32 +182,36 @@ def location(self) -> str | None: return None @property - def size(self): + def size(self) -> float: """Return backup size.""" if not self.tarfile.is_file(): return 0 return round(self.tarfile.stat().st_size / 1048576, 2) # calc mbyte @property - def is_new(self): + def is_new(self) -> bool: """Return True if there is new.""" return not self.tarfile.exists() @property - def tarfile(self): + def tarfile(self) -> Path: """Return path to backup tarfile.""" return self._tarfile @property - def is_current(self): + def is_current(self) -> bool: """Return true if backup is current, false if stale.""" return parse_datetime(self.date) >= utcnow() - timedelta( days=self.sys_backups.days_until_stale ) + @property + def data(self) -> dict[str, Any]: + """Returns a copy of the data.""" + return deepcopy(self._data) + def new( self, - slug: str, name: str, date: str, sys_type: BackupType, @@ -204,7 +221,6 @@ def new( """Initialize a new backup.""" # Init metadata self._data[ATTR_VERSION] = 2 - self._data[ATTR_SLUG] = slug self._data[ATTR_NAME] = name self._data[ATTR_DATE] = date self._data[ATTR_TYPE] = sys_type @@ -349,152 +365,240 @@ def _create_backup(): write_json_file(Path(self._tmp.name, "backup.json"), self._data) await self.sys_run_in_executor(_create_backup) except (OSError, json.JSONDecodeError) as err: + self.sys_jobs.current.capture_error(BackupError("Can't write backup")) _LOGGER.error("Can't write backup: %s", err) finally: self._tmp.cleanup() + @Job(name="backup_addon_save", cleanup=False) + async def _addon_save(self, addon: Addon) -> asyncio.Task | None: + """Store an add-on into backup.""" + self.sys_jobs.current.reference = addon.slug + + tar_name = f"{addon.slug}.tar{'.gz' if self.compressed else ''}" + addon_file = SecureTarFile( + Path(self._tmp.name, tar_name), + "w", + key=self._key, + gzip=self.compressed, + bufsize=BUF_SIZE, + ) + + # Take backup + try: + start_task = await addon.backup(addon_file) + except AddonsError as err: + raise BackupError( + f"Can't create backup for {addon.slug}", _LOGGER.error + ) from err + + # Store to config + self._data[ATTR_ADDONS].append( + { + ATTR_SLUG: addon.slug, + ATTR_NAME: addon.name, + ATTR_VERSION: addon.version, + ATTR_SIZE: addon_file.size, + } + ) + + return start_task + + @Job(name="backup_store_addons", cleanup=False) async def store_addons(self, addon_list: list[str]) -> list[asyncio.Task]: """Add a list of add-ons into backup. For each addon that needs to be started after backup, returns a Task which completes when that addon has state 'started' (see addon.start). """ - - async def _addon_save(addon: Addon) -> asyncio.Task | None: - """Task to store an add-on into backup.""" - tar_name = f"{addon.slug}.tar{'.gz' if self.compressed else ''}" - addon_file = SecureTarFile( - Path(self._tmp.name, tar_name), - "w", - key=self._key, - gzip=self.compressed, - bufsize=BUF_SIZE, - ) - - # Take backup - try: - start_task = await addon.backup(addon_file) - except AddonsError: - _LOGGER.error("Can't create backup for %s", addon.slug) - return - - # Store to config - self._data[ATTR_ADDONS].append( - { - ATTR_SLUG: addon.slug, - ATTR_NAME: addon.name, - ATTR_VERSION: addon.version, - ATTR_SIZE: addon_file.size, - } - ) - - return start_task - - # Save Add-ons sequential - # avoid issue on slow IO + # Save Add-ons sequential avoid issue on slow IO start_tasks: list[asyncio.Task] = [] for addon in addon_list: try: - if start_task := await _addon_save(addon): + if start_task := await self._addon_save(addon): start_tasks.append(start_task) except Exception as err: # pylint: disable=broad-except _LOGGER.warning("Can't save Add-on %s: %s", addon.slug, err) return start_tasks + @Job(name="backup_addon_restore", cleanup=False) + async def _addon_restore(self, addon_slug: str) -> asyncio.Task | None: + """Restore an add-on from backup.""" + self.sys_jobs.current.reference = addon_slug + + tar_name = f"{addon_slug}.tar{'.gz' if self.compressed else ''}" + addon_file = SecureTarFile( + Path(self._tmp.name, tar_name), + "r", + key=self._key, + gzip=self.compressed, + bufsize=BUF_SIZE, + ) + + # If exists inside backup + if not addon_file.path.exists(): + raise BackupError(f"Can't find backup {addon_slug}", _LOGGER.error) + + # Perform a restore + try: + return await self.sys_addons.restore(addon_slug, addon_file) + except AddonsError as err: + raise BackupError( + f"Can't restore backup {addon_slug}", _LOGGER.error + ) from err + + @Job(name="backup_restore_addons", cleanup=False) async def restore_addons( self, addon_list: list[str] ) -> tuple[bool, list[asyncio.Task]]: """Restore a list add-on from backup.""" - - async def _addon_restore(addon_slug: str) -> tuple[bool, asyncio.Task | None]: - """Task to restore an add-on into backup.""" - tar_name = f"{addon_slug}.tar{'.gz' if self.compressed else ''}" - addon_file = SecureTarFile( - Path(self._tmp.name, tar_name), - "r", - key=self._key, - gzip=self.compressed, - bufsize=BUF_SIZE, - ) - - # If exists inside backup - if not addon_file.path.exists(): - _LOGGER.error("Can't find backup %s", addon_slug) - return (False, None) - - # Perform a restore - try: - return (True, await self.sys_addons.restore(addon_slug, addon_file)) - except AddonsError: - _LOGGER.error("Can't restore backup %s", addon_slug) - return (False, None) - - # Save Add-ons sequential - # avoid issue on slow IO + # Save Add-ons sequential avoid issue on slow IO start_tasks: list[asyncio.Task] = [] success = True for slug in addon_list: try: - addon_success, start_task = await _addon_restore(slug) + start_task = await self._addon_restore(slug) except Exception as err: # pylint: disable=broad-except _LOGGER.warning("Can't restore Add-on %s: %s", slug, err) success = False else: - success = success and addon_success if start_task: start_tasks.append(start_task) return (success, start_tasks) - async def store_folders(self, folder_list: list[str]): - """Backup Supervisor data into backup.""" + @Job(name="backup_remove_delta_addons", cleanup=False) + async def remove_delta_addons(self) -> bool: + """Remove addons which are not in this backup.""" + success = True + for addon in self.sys_addons.installed: + if addon.slug in self.addon_list: + continue - async def _folder_save(name: str): - """Take backup of a folder.""" - slug_name = name.replace("/", "_") - tar_name = Path( - self._tmp.name, f"{slug_name}.tar{'.gz' if self.compressed else ''}" - ) - origin_dir = Path(self.sys_config.path_supervisor, name) + # Remove Add-on because it's not a part of the new env + # Do it sequential avoid issue on slow IO + try: + await self.sys_addons.uninstall(addon.slug) + except AddonsError as err: + self.sys_jobs.current.capture_error(err) + _LOGGER.warning("Can't uninstall Add-on %s: %s", addon.slug, err) + success = False - # Check if exists - if not origin_dir.is_dir(): - _LOGGER.warning("Can't find backup folder %s", name) - return + return success - def _save() -> None: - # Take backup - _LOGGER.info("Backing up folder %s", name) - with SecureTarFile( - tar_name, "w", key=self._key, gzip=self.compressed, bufsize=BUF_SIZE - ) as tar_file: - atomic_contents_add( - tar_file, - origin_dir, - excludes=[ - bound.bind_mount.local_where.as_posix() - for bound in self.sys_mounts.bound_mounts - if bound.bind_mount.local_where - ], - arcname=".", - ) + @Job(name="backup_folder_save", cleanup=False) + async def _folder_save(self, name: str): + """Take backup of a folder.""" + self.sys_jobs.current.reference = name + + slug_name = name.replace("/", "_") + tar_name = Path( + self._tmp.name, f"{slug_name}.tar{'.gz' if self.compressed else ''}" + ) + origin_dir = Path(self.sys_config.path_supervisor, name) + + # Check if exists + if not origin_dir.is_dir(): + _LOGGER.warning("Can't find backup folder %s", name) + return - _LOGGER.info("Backup folder %s done", name) + def _save() -> None: + # Take backup + _LOGGER.info("Backing up folder %s", name) + with SecureTarFile( + tar_name, "w", key=self._key, gzip=self.compressed, bufsize=BUF_SIZE + ) as tar_file: + atomic_contents_add( + tar_file, + origin_dir, + excludes=[ + bound.bind_mount.local_where.as_posix() + for bound in self.sys_mounts.bound_mounts + if bound.bind_mount.local_where + ], + arcname=".", + ) + _LOGGER.info("Backup folder %s done", name) + + try: await self.sys_run_in_executor(_save) - self._data[ATTR_FOLDERS].append(name) + except (tarfile.TarError, OSError) as err: + raise BackupError( + f"Can't backup folder {name}: {str(err)}", _LOGGER.error + ) from err - # Save folder sequential - # avoid issue on slow IO + self._data[ATTR_FOLDERS].append(name) + + @Job(name="backup_store_folders", cleanup=False) + async def store_folders(self, folder_list: list[str]): + """Backup Supervisor data into backup.""" + # Save folder sequential avoid issue on slow IO for folder in folder_list: + await self._folder_save(folder) + + @Job(name="backup_folder_restore", cleanup=False) + async def _folder_restore(self, name: str) -> None: + """Restore a folder.""" + self.sys_jobs.current.reference = name + + slug_name = name.replace("/", "_") + tar_name = Path( + self._tmp.name, f"{slug_name}.tar{'.gz' if self.compressed else ''}" + ) + origin_dir = Path(self.sys_config.path_supervisor, name) + + # Check if exists inside backup + if not tar_name.exists(): + raise BackupInvalidError( + f"Can't find restore folder {name}", _LOGGER.warning + ) + + # Unmount any mounts within folder + bind_mounts = [ + bound.bind_mount + for bound in self.sys_mounts.bound_mounts + if bound.bind_mount.local_where + and bound.bind_mount.local_where.is_relative_to(origin_dir) + ] + if bind_mounts: + await asyncio.gather(*[bind_mount.unmount() for bind_mount in bind_mounts]) + + # Clean old stuff + if origin_dir.is_dir(): + await remove_folder(origin_dir, content_only=True) + + # Perform a restore + def _restore() -> bool: try: - await _folder_save(folder) + _LOGGER.info("Restore folder %s", name) + with SecureTarFile( + tar_name, + "r", + key=self._key, + gzip=self.compressed, + bufsize=BUF_SIZE, + ) as tar_file: + tar_file.extractall( + path=origin_dir, members=tar_file, filter="fully_trusted" + ) + _LOGGER.info("Restore folder %s done", name) except (tarfile.TarError, OSError) as err: raise BackupError( - f"Can't backup folder {folder}: {str(err)}", _LOGGER.error + f"Can't restore folder {name}: {err}", _LOGGER.warning ) from err + return True + + try: + return await self.sys_run_in_executor(_restore) + finally: + if bind_mounts: + await asyncio.gather( + *[bind_mount.mount() for bind_mount in bind_mounts] + ) + @Job(name="backup_restore_folders", cleanup=False) async def restore_folders(self, folder_list: list[str]) -> bool: """Backup Supervisor data into backup.""" success = True @@ -556,16 +660,16 @@ def _restore() -> bool: *[bind_mount.mount() for bind_mount in bind_mounts] ) - # Restore folder sequential - # avoid issue on slow IO + # Restore folder sequential avoid issue on slow IO for folder in folder_list: try: - success = success and await _folder_restore(folder) + await self._folder_restore(folder) except Exception as err: # pylint: disable=broad-except _LOGGER.warning("Can't restore folder %s: %s", folder, err) success = False return success + @Job(name="backup_store_homeassistant", cleanup=False) async def store_homeassistant(self, exclude_database: bool = False): """Backup Home Assistant Core configuration folder.""" self._data[ATTR_HOMEASSISTANT] = { @@ -586,6 +690,7 @@ async def store_homeassistant(self, exclude_database: bool = False): # Store size self.homeassistant[ATTR_SIZE] = homeassistant_file.size + @Job(name="backup_restore_homeassistant", cleanup=False) async def restore_homeassistant(self) -> Awaitable[None]: """Restore Home Assistant Core configuration folder.""" await self.sys_homeassistant.core.stop() @@ -619,7 +724,7 @@ async def _core_update(): return self.sys_create_task(_core_update()) - def store_repositories(self): + def store_repositories(self) -> None: """Store repository list into backup.""" self.repositories = self.sys_store.repository_urls diff --git a/supervisor/backups/manager.py b/supervisor/backups/manager.py index 7869a7acd7e..ef425b1d3bc 100644 --- a/supervisor/backups/manager.py +++ b/supervisor/backups/manager.py @@ -15,7 +15,7 @@ CoreState, ) from ..dbus.const import UnitActiveState -from ..exceptions import AddonsError, BackupError, BackupInvalidError, BackupJobError +from ..exceptions import BackupError, BackupInvalidError, BackupJobError from ..jobs.const import JOB_GROUP_BACKUP_MANAGER, JobCondition, JobExecutionLimit from ..jobs.decorator import Job from ..jobs.job_group import JobGroup @@ -139,8 +139,8 @@ def _create_backup( tar_file = Path(self._get_base_path(location), f"{slug}.tar") # init object - backup = Backup(self.coresys, tar_file) - backup.new(slug, name, date_str, sys_type, password, compressed) + backup = Backup(self.coresys, tar_file, slug) + backup.new(name, date_str, sys_type, password, compressed) # Add backup ID to job self.sys_jobs.current.reference = backup.slug @@ -165,9 +165,11 @@ async def reload(self) -> None: async def _load_backup(tar_file): """Load the backup.""" - backup = Backup(self.coresys, tar_file) + backup = Backup(self.coresys, tar_file, "temp") if await backup.load(): - self._backups[backup.slug] = backup + self._backups[backup.slug] = Backup( + self.coresys, tar_file, backup.slug, backup.data + ) tasks = [ self.sys_create_task(_load_backup(tar_file)) @@ -199,7 +201,7 @@ def remove(self, backup: Backup) -> bool: async def import_backup(self, tar_file: Path) -> Backup | None: """Check backup tarfile and import it.""" - backup = Backup(self.coresys, tar_file) + backup = Backup(self.coresys, tar_file, "temp") # Read meta data if not await backup.load(): @@ -222,7 +224,7 @@ async def import_backup(self, tar_file: Path) -> Backup | None: return None # Load new backup - backup = Backup(self.coresys, tar_origin) + backup = Backup(self.coresys, tar_origin, backup.slug, backup.data) if not await backup.load(): return None _LOGGER.info("Successfully imported %s", backup.slug) @@ -269,9 +271,15 @@ async def _do_backup( self._change_stage(BackupJobStage.FINISHING_FILE, backup) + except BackupError as err: + self.sys_jobs.current.capture_error(err) + return None except Exception as err: # pylint: disable=broad-except _LOGGER.exception("Backup %s error", backup.slug) capture_exception(err) + self.sys_jobs.current.capture_error( + BackupError(f"Backup {backup.slug} error, see supervisor logs") + ) return None else: self._backups[backup.slug] = backup @@ -290,6 +298,7 @@ async def _do_backup( conditions=[JobCondition.RUNNING], limit=JobExecutionLimit.GROUP_ONCE, on_condition=BackupJobError, + cleanup=False, ) async def do_backup_full( self, @@ -326,6 +335,7 @@ async def do_backup_full( conditions=[JobCondition.RUNNING], limit=JobExecutionLimit.GROUP_ONCE, on_condition=BackupJobError, + cleanup=False, ) async def do_backup_partial( self, @@ -410,17 +420,7 @@ async def _do_restore( # Delete delta add-ons if replace: self._change_stage(RestoreJobStage.REMOVE_DELTA_ADDONS, backup) - for addon in self.sys_addons.installed: - if addon.slug in backup.addon_list: - continue - - # Remove Add-on because it's not a part of the new env - # Do it sequential avoid issue on slow IO - try: - await self.sys_addons.uninstall(addon.slug) - except AddonsError: - _LOGGER.warning("Can't uninstall Add-on %s", addon.slug) - success = False + success = success and await backup.remove_delta_addons() if addon_list: self._change_stage(RestoreJobStage.ADDON_REPOSITORIES, backup) @@ -444,7 +444,7 @@ async def _do_restore( _LOGGER.exception("Restore %s error", backup.slug) capture_exception(err) raise BackupError( - f"Restore {backup.slug} error, check logs for details" + f"Restore {backup.slug} error, see supervisor logs" ) from err else: if addon_start_tasks: @@ -463,12 +463,16 @@ async def _do_restore( # Do we need start Home Assistant Core? if not await self.sys_homeassistant.core.is_running(): - await self.sys_homeassistant.core.start() + await self.sys_homeassistant.core.start( + _job_override__cleanup=False + ) # Check If we can access to API / otherwise restart if not await self.sys_homeassistant.api.check_api_state(): _LOGGER.warning("Need restart HomeAssistant for API") - await self.sys_homeassistant.core.restart() + await self.sys_homeassistant.core.restart( + _job_override__cleanup=False + ) @Job( name="backup_manager_full_restore", @@ -481,6 +485,7 @@ async def _do_restore( ], limit=JobExecutionLimit.GROUP_ONCE, on_condition=BackupJobError, + cleanup=False, ) async def do_restore_full( self, backup: Backup, password: str | None = None @@ -534,6 +539,7 @@ async def do_restore_full( ], limit=JobExecutionLimit.GROUP_ONCE, on_condition=BackupJobError, + cleanup=False, ) async def do_restore_partial( self, diff --git a/supervisor/const.py b/supervisor/const.py index 418c18a909c..9379565554d 100644 --- a/supervisor/const.py +++ b/supervisor/const.py @@ -68,6 +68,7 @@ JSON_DATA = "data" JSON_MESSAGE = "message" JSON_RESULT = "result" +JSON_JOB_ID = "job_id" RESULT_ERROR = "error" RESULT_OK = "ok" @@ -458,9 +459,11 @@ class HostFeature(StrEnum): class BusEvent(StrEnum): """Bus event type.""" + DOCKER_CONTAINER_STATE_CHANGE = "docker_container_state_change" HARDWARE_NEW_DEVICE = "hardware_new_device" HARDWARE_REMOVE_DEVICE = "hardware_remove_device" - DOCKER_CONTAINER_STATE_CHANGE = "docker_container_state_change" + SUPERVISOR_JOB_END = "supervisor_job_end" + SUPERVISOR_JOB_START = "supervisor_job_start" SUPERVISOR_STATE_CHANGE = "supervisor_state_change" diff --git a/supervisor/coresys.py b/supervisor/coresys.py index f8a8f7f71ef..10bf7e3c723 100644 --- a/supervisor/coresys.py +++ b/supervisor/coresys.py @@ -544,13 +544,44 @@ def run_in_executor( return self.loop.run_in_executor(None, funct, *args) - def create_task(self, coroutine: Coroutine) -> asyncio.Task: - """Create an async task.""" + def _create_context(self) -> Context: + """Create a new context for a task.""" context = copy_context() for callback in self._set_task_context: context = callback(context) + return context - return self.loop.create_task(coroutine, context=context) + def create_task(self, coroutine: Coroutine) -> asyncio.Task: + """Create an async task.""" + return self.loop.create_task(coroutine, context=self._create_context()) + + def call_later( + self, + delay: float, + funct: Callable[..., Coroutine[Any, Any, T]], + *args: tuple[Any], + **kwargs: dict[str, Any], + ) -> asyncio.TimerHandle: + """Start a task after a delay.""" + if kwargs: + funct = partial(funct, **kwargs) + + return self.loop.call_later(delay, funct, *args, context=self._create_context()) + + def call_at( + self, + when: datetime, + funct: Callable[..., Coroutine[Any, Any, T]], + *args: tuple[Any], + **kwargs: dict[str, Any], + ) -> asyncio.TimerHandle: + """Start a task at the specified datetime.""" + if kwargs: + funct = partial(funct, **kwargs) + + return self.loop.call_at( + when.timestamp(), funct, *args, context=self._create_context() + ) class CoreSysAttributes: @@ -731,3 +762,23 @@ def sys_run_in_executor( def sys_create_task(self, coroutine: Coroutine) -> asyncio.Task: """Create an async task.""" return self.coresys.create_task(coroutine) + + def sys_call_later( + self, + delay: float, + funct: Callable[..., Coroutine[Any, Any, T]], + *args: tuple[Any], + **kwargs: dict[str, Any], + ) -> asyncio.TimerHandle: + """Start a task after a delay.""" + return self.coresys.call_later(delay, funct, *args, **kwargs) + + def sys_call_at( + self, + when: datetime, + funct: Callable[..., Coroutine[Any, Any, T]], + *args: tuple[Any], + **kwargs: dict[str, Any], + ) -> asyncio.TimerHandle: + """Start a task at the specified datetime.""" + return self.coresys.call_at(when, funct, *args, **kwargs) diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index 321630ab524..402853c3645 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -304,6 +304,16 @@ class HostLogError(HostError): class APIError(HassioError, RuntimeError): """API errors.""" + def __init__( + self, + message: str | None = None, + logger: Callable[..., None] | None = None, + job_id: str | None = None, + ) -> None: + """Raise & log, optionally with job.""" + super().__init__(message, logger) + self.job_id = job_id + class APIForbidden(APIError): """API forbidden error.""" diff --git a/supervisor/jobs/__init__.py b/supervisor/jobs/__init__.py index 8ab8eb720ee..fd5ee6cfc1d 100644 --- a/supervisor/jobs/__init__.py +++ b/supervisor/jobs/__init__.py @@ -1,7 +1,11 @@ """Supervisor job manager.""" -from collections.abc import Callable + +import asyncio +from collections.abc import Awaitable, Callable from contextlib import contextmanager from contextvars import Context, ContextVar, Token +from dataclasses import dataclass +from datetime import datetime import logging from typing import Any from uuid import UUID, uuid4 @@ -10,8 +14,9 @@ from attrs.setters import convert as attr_convert, frozen, validate as attr_validate from attrs.validators import ge, le +from ..const import BusEvent from ..coresys import CoreSys, CoreSysAttributes -from ..exceptions import JobNotFound, JobStartException +from ..exceptions import HassioError, JobNotFound, JobStartException from ..homeassistant.const import WSEvent from ..utils.common import FileConfiguration from ..utils.sentry import capture_exception @@ -27,6 +32,14 @@ _LOGGER: logging.Logger = logging.getLogger(__name__) +@dataclass +class JobSchedulerOptions: + """Options for scheduling a job.""" + + start_at: datetime | None = None + delayed_start: float = 0 # Ignored if start_at is set + + def _remove_current_job(context: Context) -> Context: """Remove the current job from the context.""" context.run(_CURRENT_JOB.set, None) @@ -48,11 +61,29 @@ def _on_change(instance: "SupervisorJob", attribute: Attribute, value: Any) -> A return value +def _invalid_if_started(instance: "SupervisorJob", *_) -> None: + """Validate that job has not been started.""" + if instance.done is not None: + raise ValueError("Field cannot be updated once job has started") + + +@define +class SupervisorJobError: + """Representation of an error occurring during a supervisor job.""" + + type_: type[HassioError] = HassioError + message: str = "Unknown error, see supervisor logs" + + def as_dict(self) -> dict[str, str]: + """Return dictionary representation.""" + return {"type": self.type_.__name__, "message": self.message} + + @define class SupervisorJob: """Representation of a job running in supervisor.""" - name: str = field(on_setattr=frozen) + name: str | None = field(default=None, validator=[_invalid_if_started]) reference: str | None = field(default=None, on_setattr=_on_change) progress: float = field( default=0, @@ -65,13 +96,17 @@ class SupervisorJob: ) uuid: UUID = field(init=False, factory=lambda: uuid4().hex, on_setattr=frozen) parent_id: UUID | None = field( - init=False, factory=lambda: _CURRENT_JOB.get(None), on_setattr=frozen + factory=lambda: _CURRENT_JOB.get(None), on_setattr=frozen ) done: bool | None = field(init=False, default=None, on_setattr=_on_change) on_change: Callable[["SupervisorJob", Attribute, Any], None] | None = field( default=None, on_setattr=frozen ) - internal: bool = field(default=False, on_setattr=frozen) + internal: bool = field(default=False) + errors: list[SupervisorJobError] = field( + init=False, factory=list, on_setattr=_on_change + ) + release_event: asyncio.Event | None = None def as_dict(self) -> dict[str, Any]: """Return dictionary representation.""" @@ -83,8 +118,17 @@ def as_dict(self) -> dict[str, Any]: "stage": self.stage, "done": self.done, "parent_id": self.parent_id, + "errors": [err.as_dict() for err in self.errors], } + def capture_error(self, err: HassioError | None = None) -> None: + """Capture an error or record that an unknown error has occurred.""" + if err: + new_error = SupervisorJobError(type(err), str(err)) + else: + new_error = SupervisorJobError() + self.errors += [new_error] + @contextmanager def start(self): """Start the job in the current task. @@ -156,17 +200,24 @@ def is_job(self) -> bool: def _notify_on_job_change( self, job: SupervisorJob, attribute: Attribute, value: Any ) -> None: - """Notify Home Assistant of a change to a job.""" + """Notify Home Assistant of a change to a job and bus on job start/end.""" self.sys_homeassistant.websocket.supervisor_event( WSEvent.JOB, job.as_dict() | {attribute.alias: value} ) + if attribute.name == "done": + if value is False: + self.sys_bus.fire_event(BusEvent.SUPERVISOR_JOB_START, job.uuid) + if value is True: + self.sys_bus.fire_event(BusEvent.SUPERVISOR_JOB_END, job.uuid) + def new_job( self, - name: str, + name: str | None = None, reference: str | None = None, initial_stage: str | None = None, internal: bool = False, + no_parent: bool = False, ) -> SupervisorJob: """Create a new job.""" job = SupervisorJob( @@ -175,6 +226,7 @@ def new_job( stage=initial_stage, on_change=None if internal else self._notify_on_job_change, internal=internal, + **({"parent_id": None} if no_parent else {}), ) self._jobs[job.uuid] = job return job @@ -194,3 +246,30 @@ def remove_job(self, job: SupervisorJob) -> None: _LOGGER.warning("Removing incomplete job %s from job manager", job.name) del self._jobs[job.uuid] + + # Clean up any completed sub jobs of this one + for sub_job in self.jobs: + if sub_job.parent_id == job.uuid and job.done: + self.remove_job(sub_job) + + def schedule_job( + self, + job_method: Callable[..., Awaitable[Any]], + options: JobSchedulerOptions, + *args, + **kwargs, + ) -> tuple[SupervisorJob, asyncio.Task | asyncio.TimerHandle]: + """Schedule a job to run later and return job and task or timer handle.""" + job = self.new_job(no_parent=True) + + def _wrap_task() -> asyncio.Task: + return self.sys_create_task( + job_method(*args, _job__use_existing=job, **kwargs) + ) + + if options.start_at: + return (job, self.sys_call_at(options.start_at, _wrap_task)) + if options.delayed_start: + return (job, self.sys_call_later(options.delayed_start, _wrap_task)) + + return (job, _wrap_task()) diff --git a/supervisor/jobs/const.py b/supervisor/jobs/const.py index 0528d143e4c..221b5972edb 100644 --- a/supervisor/jobs/const.py +++ b/supervisor/jobs/const.py @@ -9,6 +9,7 @@ ATTR_IGNORE_CONDITIONS = "ignore_conditions" JOB_GROUP_ADDON = "addon_{slug}" +JOB_GROUP_BACKUP = "backup_{slug}" JOB_GROUP_BACKUP_MANAGER = "backup_manager" JOB_GROUP_DOCKER_INTERFACE = "container_{name}" JOB_GROUP_HOME_ASSISTANT_CORE = "home_assistant_core" diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index d7762584827..67b2198297e 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -6,6 +6,7 @@ import logging from typing import Any +from . import SupervisorJob from ..const import CoreState from ..coresys import CoreSys, CoreSysAttributes from ..exceptions import ( @@ -157,22 +158,23 @@ def _post_init(self, obj: JobGroup | CoreSysAttributes) -> JobGroup | None: self._lock = asyncio.Semaphore() # Job groups - if self.limit in ( + try: + is_job_group = obj.acquire and obj.release + except AttributeError: + is_job_group = False + + if not is_job_group and self.limit in ( JobExecutionLimit.GROUP_ONCE, JobExecutionLimit.GROUP_WAIT, JobExecutionLimit.GROUP_THROTTLE, JobExecutionLimit.GROUP_THROTTLE_WAIT, JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, ): - try: - _ = obj.acquire and obj.release - except AttributeError: - raise RuntimeError( - f"Job on {self.name} need to be a JobGroup to use group based limits!" - ) from None + raise RuntimeError( + f"Job on {self.name} need to be a JobGroup to use group based limits!" + ) from None - return obj - return None + return obj if is_job_group else None def _handle_job_condition_exception(self, err: JobConditionException) -> None: """Handle a job condition failure.""" @@ -187,7 +189,13 @@ def __call__(self, method): self._method = method @wraps(method) - async def wrapper(obj: JobGroup | CoreSysAttributes, *args, **kwargs) -> Any: + async def wrapper( + obj: JobGroup | CoreSysAttributes, + *args, + _job__use_existing: SupervisorJob | None = None, + _job_override__cleanup: bool | None = None, + **kwargs, + ) -> Any: """Wrap the method. This method must be on an instance of CoreSysAttributes. If a JOB_GROUP limit @@ -195,11 +203,18 @@ async def wrapper(obj: JobGroup | CoreSysAttributes, *args, **kwargs) -> Any: """ job_group = self._post_init(obj) group_name: str | None = job_group.group_name if job_group else None - job = self.sys_jobs.new_job( - self.name, - job_group.job_reference if job_group else None, - internal=self._internal, - ) + if _job__use_existing: + job = _job__use_existing + job.name = self.name + job.internal = self._internal + if job_group: + job.reference = job_group.job_reference + else: + job = self.sys_jobs.new_job( + self.name, + job_group.job_reference if job_group else None, + internal=self._internal, + ) try: # Handle condition @@ -293,9 +308,11 @@ async def wrapper(obj: JobGroup | CoreSysAttributes, *args, **kwargs) -> Any: except JobConditionException as err: return self._handle_job_condition_exception(err) except HassioError as err: + job.capture_error(err) raise err except Exception as err: _LOGGER.exception("Unhandled exception: %s", err) + job.capture_error() capture_exception(err) raise JobException() from err finally: @@ -308,7 +325,12 @@ async def wrapper(obj: JobGroup | CoreSysAttributes, *args, **kwargs) -> Any: # Jobs that weren't started are always cleaned up. Also clean up done jobs if required finally: - if job.done is None or self.cleanup: + if ( + job.done is None + or _job_override__cleanup + or _job_override__cleanup is None + and self.cleanup + ): self.sys_jobs.remove_job(job) return wrapper diff --git a/supervisor/misc/scheduler.py b/supervisor/misc/scheduler.py index a8583161fc7..9e4ac5c6162 100644 --- a/supervisor/misc/scheduler.py +++ b/supervisor/misc/scheduler.py @@ -74,7 +74,7 @@ async def _wrap_task(): def _schedule_task(self, task: _Task) -> None: """Schedule a task on loop.""" if isinstance(task.interval, (int, float)): - task.next = self.sys_loop.call_later(task.interval, self._run_task, task) + task.next = self.sys_call_later(task.interval, self._run_task, task) elif isinstance(task.interval, time): today = datetime.combine(date.today(), task.interval) tomorrow = datetime.combine(date.today() + timedelta(days=1), task.interval) @@ -85,7 +85,7 @@ def _schedule_task(self, task: _Task) -> None: else: calc = tomorrow - task.next = self.sys_loop.call_at(calc.timestamp(), self._run_task, task) + task.next = self.sys_call_at(calc, self._run_task, task) else: _LOGGER.critical( "Unknown interval %s (type: %s) for scheduler %s", diff --git a/supervisor/store/__init__.py b/supervisor/store/__init__.py index 0b91baaa0c0..f68affdc585 100644 --- a/supervisor/store/__init__.py +++ b/supervisor/store/__init__.py @@ -207,6 +207,7 @@ async def remove_repository(self, repository: Repository, *, persist: bool = Tru await self.data.update() self._read_addons() + @Job(name="store_manager_update_repositories") async def update_repositories( self, list_repositories: list[str], diff --git a/tests/api/test_backups.py b/tests/api/test_backups.py index 63fc9621f9c..e9e83f58704 100644 --- a/tests/api/test_backups.py +++ b/tests/api/test_backups.py @@ -2,17 +2,22 @@ import asyncio from pathlib import Path, PurePath -from unittest.mock import ANY, AsyncMock, patch +from typing import Any +from unittest.mock import ANY, AsyncMock, PropertyMock, patch from aiohttp.test_utils import TestClient from awesomeversion import AwesomeVersion import pytest +from supervisor.addons.addon import Addon from supervisor.backups.backup import Backup from supervisor.const import CoreState from supervisor.coresys import CoreSys +from supervisor.exceptions import AddonsError, HomeAssistantBackupError +from supervisor.homeassistant.core import HomeAssistantCore from supervisor.homeassistant.module import HomeAssistant from supervisor.mounts.mount import Mount +from supervisor.supervisor import Supervisor async def test_info(api_client, coresys: CoreSys, mock_full_backup: Backup): @@ -199,3 +204,256 @@ async def test_api_backup_exclude_database( backup.assert_awaited_once_with(ANY, True) assert resp.status == 200 + + +async def _get_job_info(api_client: TestClient, job_id: str) -> dict[str, Any]: + """Test background job progress and block until it is done.""" + resp = await api_client.get(f"/jobs/{job_id}") + assert resp.status == 200 + result = await resp.json() + return result["data"] + + +@pytest.mark.parametrize( + "backup_type,options", + [ + ("full", {}), + ( + "partial", + { + "homeassistant": True, + "folders": ["addons/local", "media", "share", "ssl"], + }, + ), + ], +) +async def test_api_backup_restore_background( + api_client: TestClient, + coresys: CoreSys, + backup_type: str, + options: dict[str, Any], + tmp_supervisor_data: Path, + path_extern, +): + """Test background option on backup/restore APIs.""" + coresys.core.state = CoreState.RUNNING + coresys.hardware.disk.get_disk_free_space = lambda x: 5000 + coresys.homeassistant.version = AwesomeVersion("2023.09.0") + (tmp_supervisor_data / "addons/local").mkdir(parents=True) + + assert coresys.jobs.jobs == [] + + resp = await api_client.post( + f"/backups/new/{backup_type}", + json={"background": True, "name": f"{backup_type} backup"} | options, + ) + assert resp.status == 200 + result = await resp.json() + job_id = result["data"]["job_id"] + assert (await _get_job_info(api_client, job_id))["done"] is False + + while not (job := (await _get_job_info(api_client, job_id)))["done"]: + await asyncio.sleep(0) + + assert job["name"] == f"backup_manager_{backup_type}_backup" + assert (backup_slug := job["reference"]) + assert job["child_jobs"][0]["name"] == "backup_store_homeassistant" + assert job["child_jobs"][0]["reference"] == backup_slug + assert job["child_jobs"][1]["name"] == "backup_store_folders" + assert job["child_jobs"][1]["reference"] == backup_slug + assert {j["reference"] for j in job["child_jobs"][1]["child_jobs"]} == { + "addons/local", + "media", + "share", + "ssl", + } + + with patch.object(HomeAssistantCore, "start"): + resp = await api_client.post( + f"/backups/{backup_slug}/restore/{backup_type}", + json={"background": True} | options, + ) + assert resp.status == 200 + result = await resp.json() + job_id = result["data"]["job_id"] + assert (await _get_job_info(api_client, job_id))["done"] is False + + while not (job := (await _get_job_info(api_client, job_id)))["done"]: + await asyncio.sleep(0) + + assert job["name"] == f"backup_manager_{backup_type}_restore" + assert job["reference"] == backup_slug + assert job["child_jobs"][0]["name"] == "backup_restore_folders" + assert job["child_jobs"][0]["reference"] == backup_slug + assert {j["reference"] for j in job["child_jobs"][0]["child_jobs"]} == { + "addons/local", + "media", + "share", + "ssl", + } + assert job["child_jobs"][1]["name"] == "backup_restore_homeassistant" + assert job["child_jobs"][1]["reference"] == backup_slug + + if backup_type == "full": + assert job["child_jobs"][2]["name"] == "backup_remove_delta_addons" + assert job["child_jobs"][2]["reference"] == backup_slug + + +@pytest.mark.parametrize( + "backup_type,options", + [ + ("full", {}), + ( + "partial", + { + "homeassistant": True, + "folders": ["addons/local", "media", "share", "ssl"], + "addons": ["local_ssh"], + }, + ), + ], +) +async def test_api_backup_errors( + api_client: TestClient, + coresys: CoreSys, + backup_type: str, + options: dict[str, Any], + tmp_supervisor_data: Path, + install_addon_ssh, + path_extern, +): + """Test error reporting in backup job.""" + coresys.core.state = CoreState.RUNNING + coresys.hardware.disk.get_disk_free_space = lambda x: 5000 + coresys.homeassistant.version = AwesomeVersion("2023.09.0") + (tmp_supervisor_data / "addons/local").mkdir(parents=True) + + assert coresys.jobs.jobs == [] + + with patch.object(Addon, "backup", side_effect=AddonsError("Backup error")): + resp = await api_client.post( + f"/backups/new/{backup_type}", + json={"name": f"{backup_type} backup"} | options, + ) + + assert resp.status == 200 + result = await resp.json() + job_id = result["data"]["job_id"] + slug = result["data"]["slug"] + job = await _get_job_info(api_client, job_id) + + assert job["name"] == f"backup_manager_{backup_type}_backup" + assert job["done"] is True + assert job["reference"] == slug + assert job["errors"] == [] + assert job["child_jobs"][0]["name"] == "backup_store_addons" + assert job["child_jobs"][0]["reference"] == slug + assert job["child_jobs"][0]["child_jobs"][0]["name"] == "backup_addon_save" + assert job["child_jobs"][0]["child_jobs"][0]["reference"] == "local_ssh" + assert job["child_jobs"][0]["child_jobs"][0]["errors"] == [ + {"type": "BackupError", "message": "Can't create backup for local_ssh"} + ] + assert job["child_jobs"][1]["name"] == "backup_store_homeassistant" + assert job["child_jobs"][1]["reference"] == slug + assert job["child_jobs"][2]["name"] == "backup_store_folders" + assert job["child_jobs"][2]["reference"] == slug + assert {j["reference"] for j in job["child_jobs"][2]["child_jobs"]} == { + "addons/local", + "media", + "share", + "ssl", + } + + with patch.object( + HomeAssistant, "backup", side_effect=HomeAssistantBackupError("Backup error") + ), patch.object(Addon, "backup"): + resp = await api_client.post( + f"/backups/new/{backup_type}", + json={"name": f"{backup_type} backup"} | options, + ) + + assert resp.status == 400 + result = await resp.json() + job_id = result["job_id"] + job = await _get_job_info(api_client, job_id) + + assert job["name"] == f"backup_manager_{backup_type}_backup" + assert job["done"] is True + assert job["errors"] == ( + err := [{"type": "HomeAssistantBackupError", "message": "Backup error"}] + ) + assert job["child_jobs"][0]["name"] == "backup_store_addons" + assert job["child_jobs"][1]["name"] == "backup_store_homeassistant" + assert job["child_jobs"][1]["errors"] == err + assert len(job["child_jobs"]) == 2 + + +async def test_backup_immediate_errors(api_client: TestClient, coresys: CoreSys): + """Test backup errors that return immediately even in background mode.""" + coresys.core.state = CoreState.FREEZE + resp = await api_client.post( + "/backups/new/full", + json={"name": "Test", "background": True}, + ) + assert resp.status == 400 + assert "freeze" in (await resp.json())["message"] + + coresys.core.state = CoreState.RUNNING + coresys.hardware.disk.get_disk_free_space = lambda x: 0.5 + resp = await api_client.post( + "/backups/new/partial", + json={"name": "Test", "homeassistant": True, "background": True}, + ) + assert resp.status == 400 + assert "not enough free space" in (await resp.json())["message"] + + +async def test_restore_immediate_errors( + request: pytest.FixtureRequest, + api_client: TestClient, + coresys: CoreSys, + mock_partial_backup: Backup, +): + """Test restore errors that return immediately even in background mode.""" + coresys.core.state = CoreState.RUNNING + coresys.hardware.disk.get_disk_free_space = lambda x: 5000 + + resp = await api_client.post( + f"/backups/{mock_partial_backup.slug}/restore/full", json={"background": True} + ) + assert resp.status == 400 + assert "only a partial backup" in (await resp.json())["message"] + + with patch.object( + Backup, + "supervisor_version", + new=PropertyMock(return_value=AwesomeVersion("2024.01.0")), + ), patch.object( + Supervisor, + "version", + new=PropertyMock(return_value=AwesomeVersion("2023.12.0")), + ): + resp = await api_client.post( + f"/backups/{mock_partial_backup.slug}/restore/partial", + json={"background": True, "homeassistant": True}, + ) + assert resp.status == 400 + assert "Must update supervisor" in (await resp.json())["message"] + + with patch.object( + Backup, "protected", new=PropertyMock(return_value=True) + ), patch.object(Backup, "set_password", return_value=False): + resp = await api_client.post( + f"/backups/{mock_partial_backup.slug}/restore/partial", + json={"background": True, "homeassistant": True}, + ) + assert resp.status == 400 + assert "Invalid password" in (await resp.json())["message"] + + with patch.object(Backup, "homeassistant", new=PropertyMock(return_value=None)): + resp = await api_client.post( + f"/backups/{mock_partial_backup.slug}/restore/partial", + json={"background": True, "homeassistant": True}, + ) + assert resp.status == 400 + assert "No Home Assistant" in (await resp.json())["message"] diff --git a/tests/api/test_jobs.py b/tests/api/test_jobs.py index 69e79fd231e..f90e54461f7 100644 --- a/tests/api/test_jobs.py +++ b/tests/api/test_jobs.py @@ -107,6 +107,7 @@ async def test_jobs_tree_internal(self): "progress": 50, "stage": None, "done": False, + "errors": [], "child_jobs": [ { "name": "test_jobs_tree_inner", @@ -116,6 +117,7 @@ async def test_jobs_tree_internal(self): "stage": None, "done": False, "child_jobs": [], + "errors": [], }, ], }, @@ -127,6 +129,7 @@ async def test_jobs_tree_internal(self): "stage": "init", "done": False, "child_jobs": [], + "errors": [], }, ] @@ -144,5 +147,70 @@ async def test_jobs_tree_internal(self): "stage": "end", "done": True, "child_jobs": [], + "errors": [], }, ] + + +async def test_job_manual_cleanup(api_client: TestClient, coresys: CoreSys): + """Test manually cleaning up a job via API.""" + + class TestClass: + """Test class.""" + + def __init__(self, coresys: CoreSys): + """Initialize the test class.""" + self.coresys = coresys + self.event = asyncio.Event() + self.job_id: str | None = None + + @Job(name="test_job_manual_cleanup", cleanup=False) + async def test_job_manual_cleanup(self) -> None: + """Job that requires manual cleanup.""" + self.job_id = coresys.jobs.current.uuid + await self.event.wait() + + test = TestClass(coresys) + task = asyncio.create_task(test.test_job_manual_cleanup()) + await asyncio.sleep(0) + + # Check the job details + resp = await api_client.get(f"/jobs/{test.job_id}") + assert resp.status == 200 + result = await resp.json() + assert result["data"] == { + "name": "test_job_manual_cleanup", + "reference": None, + "uuid": test.job_id, + "progress": 0, + "stage": None, + "done": False, + "child_jobs": [], + "errors": [], + } + + # Only done jobs can be deleted via API + resp = await api_client.delete(f"/jobs/{test.job_id}") + assert resp.status == 400 + result = await resp.json() + assert result["message"] == f"Job {test.job_id} is not done!" + + # Let the job finish + test.event.set() + await task + + # Check that it is now done + resp = await api_client.get(f"/jobs/{test.job_id}") + assert resp.status == 200 + result = await resp.json() + assert result["data"]["done"] is True + + # Delete it + resp = await api_client.delete(f"/jobs/{test.job_id}") + assert resp.status == 200 + + # Confirm it no longer exists + resp = await api_client.get(f"/jobs/{test.job_id}") + assert resp.status == 400 + result = await resp.json() + assert result["message"] == f"No job found with id {test.job_id}" diff --git a/tests/backups/conftest.py b/tests/backups/conftest.py index 3b78617f436..ff4a0352a6a 100644 --- a/tests/backups/conftest.py +++ b/tests/backups/conftest.py @@ -25,6 +25,7 @@ def fixture_backup_mock(): backup_instance.restore_homeassistant = AsyncMock(return_value=None) backup_instance.restore_addons = AsyncMock(return_value=(True, [])) backup_instance.restore_repositories = AsyncMock(return_value=None) + backup_instance.remove_delta_addons = AsyncMock(return_value=True) yield backup_mock diff --git a/tests/backups/test_backup.py b/tests/backups/test_backup.py index 3e7b07e1bbb..ab63793337b 100644 --- a/tests/backups/test_backup.py +++ b/tests/backups/test_backup.py @@ -10,8 +10,8 @@ async def test_new_backup_stays_in_folder(coresys: CoreSys, tmp_path: Path): """Test making a new backup operates entirely within folder where backup will be stored.""" - backup = Backup(coresys, tmp_path / "my_backup.tar") - backup.new("test", "test", "2023-07-21T21:05:00.000000+00:00", BackupType.FULL) + backup = Backup(coresys, tmp_path / "my_backup.tar", "test") + backup.new("test", "2023-07-21T21:05:00.000000+00:00", BackupType.FULL) assert not listdir(tmp_path) async with backup: diff --git a/tests/backups/test_manager.py b/tests/backups/test_manager.py index 1f3e4752915..5ec632c7c7c 100644 --- a/tests/backups/test_manager.py +++ b/tests/backups/test_manager.py @@ -2,6 +2,7 @@ import asyncio import errno +from functools import partial from pathlib import Path from shutil import rmtree from unittest.mock import ANY, AsyncMock, MagicMock, Mock, PropertyMock, patch @@ -23,7 +24,6 @@ from supervisor.docker.homeassistant import DockerHomeAssistant from supervisor.docker.monitor import DockerContainerStateEvent from supervisor.exceptions import ( - AddonsError, BackupError, BackupInvalidError, BackupJobError, @@ -53,9 +53,9 @@ async def test_do_backup_full(coresys: CoreSys, backup_mock, install_addon_ssh): backup_instance: MagicMock = await manager.do_backup_full() # Check Backup has been created without password - assert backup_instance.new.call_args[0][3] == BackupType.FULL - assert backup_instance.new.call_args[0][4] is None - assert backup_instance.new.call_args[0][5] is True + assert backup_instance.new.call_args[0][2] == BackupType.FULL + assert backup_instance.new.call_args[0][3] is None + assert backup_instance.new.call_args[0][4] is True backup_instance.store_homeassistant.assert_called_once() backup_instance.store_repositories.assert_called_once() @@ -83,9 +83,9 @@ async def test_do_backup_full_uncompressed( backup_instance: MagicMock = await manager.do_backup_full(compressed=False) # Check Backup has been created without password - assert backup_instance.new.call_args[0][3] == BackupType.FULL - assert backup_instance.new.call_args[0][4] is None - assert backup_instance.new.call_args[0][5] is False + assert backup_instance.new.call_args[0][2] == BackupType.FULL + assert backup_instance.new.call_args[0][3] is None + assert backup_instance.new.call_args[0][4] is False backup_instance.store_homeassistant.assert_called_once() backup_instance.store_repositories.assert_called_once() @@ -114,9 +114,9 @@ async def test_do_backup_partial_minimal( backup_instance: MagicMock = await manager.do_backup_partial(homeassistant=False) # Check Backup has been created without password - assert backup_instance.new.call_args[0][3] == BackupType.PARTIAL - assert backup_instance.new.call_args[0][4] is None - assert backup_instance.new.call_args[0][5] is True + assert backup_instance.new.call_args[0][2] == BackupType.PARTIAL + assert backup_instance.new.call_args[0][3] is None + assert backup_instance.new.call_args[0][4] is True backup_instance.store_homeassistant.assert_not_called() backup_instance.store_repositories.assert_called_once() @@ -144,9 +144,9 @@ async def test_do_backup_partial_minimal_uncompressed( ) # Check Backup has been created without password - assert backup_instance.new.call_args[0][3] == BackupType.PARTIAL - assert backup_instance.new.call_args[0][4] is None - assert backup_instance.new.call_args[0][5] is False + assert backup_instance.new.call_args[0][2] == BackupType.PARTIAL + assert backup_instance.new.call_args[0][3] is None + assert backup_instance.new.call_args[0][4] is False backup_instance.store_homeassistant.assert_not_called() backup_instance.store_repositories.assert_called_once() @@ -176,9 +176,9 @@ async def test_do_backup_partial_maximal( ) # Check Backup has been created without password - assert backup_instance.new.call_args[0][3] == BackupType.PARTIAL - assert backup_instance.new.call_args[0][4] is None - assert backup_instance.new.call_args[0][5] is True + assert backup_instance.new.call_args[0][2] == BackupType.PARTIAL + assert backup_instance.new.call_args[0][3] is None + assert backup_instance.new.call_args[0][4] is True backup_instance.store_homeassistant.assert_called_once() backup_instance.store_repositories.assert_called_once() @@ -206,6 +206,10 @@ async def test_do_restore_full(coresys: CoreSys, full_backup_mock, install_addon manager = BackupManager(coresys) backup_instance = full_backup_mock.return_value + backup_instance.sys_addons = coresys.addons + backup_instance.remove_delta_addons = partial( + Backup.remove_delta_addons, backup_instance + ) assert await manager.do_restore_full(backup_instance) backup_instance.restore_homeassistant.assert_called_once() @@ -235,6 +239,10 @@ async def test_do_restore_full_different_addon( backup_instance = full_backup_mock.return_value backup_instance.addon_list = ["differentslug"] + backup_instance.sys_addons = coresys.addons + backup_instance.remove_delta_addons = partial( + Backup.remove_delta_addons, backup_instance + ) assert await manager.do_restore_full(backup_instance) backup_instance.restore_homeassistant.assert_called_once() @@ -371,7 +379,7 @@ async def test_backup_error( coresys.core.state = CoreState.RUNNING coresys.hardware.disk.get_disk_free_space = lambda x: 5000 - backup_mock.return_value.store_addons.side_effect = (err := AddonsError()) + backup_mock.return_value.store_folders.side_effect = (err := OSError()) await coresys.backups.do_backup_full() capture_exception.assert_called_once_with(err) @@ -937,6 +945,7 @@ def _make_backup_message_for_assert( "stage": stage, "done": done, "parent_id": None, + "errors": [], }, }, } diff --git a/tests/conftest.py b/tests/conftest.py index 365b4e1e2f1..1092fe89f2f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -395,6 +395,7 @@ async def tmp_supervisor_data(coresys: CoreSys, tmp_path: Path) -> Path: coresys.config.path_share.mkdir() coresys.config.path_addons_data.mkdir(parents=True) coresys.config.path_addon_configs.mkdir(parents=True) + coresys.config.path_ssl.mkdir() yield tmp_path @@ -538,8 +539,8 @@ def install_addon_example(coresys: CoreSys, repository): @pytest.fixture async def mock_full_backup(coresys: CoreSys, tmp_path) -> Backup: """Mock a full backup.""" - mock_backup = Backup(coresys, Path(tmp_path, "test_backup")) - mock_backup.new("test", "Test", utcnow().isoformat(), BackupType.FULL) + mock_backup = Backup(coresys, Path(tmp_path, "test_backup"), "test") + mock_backup.new("Test", utcnow().isoformat(), BackupType.FULL) mock_backup.repositories = ["https://github.com/awesome-developer/awesome-repo"] mock_backup.docker = {} mock_backup._data[ATTR_ADDONS] = [ @@ -562,8 +563,8 @@ async def mock_full_backup(coresys: CoreSys, tmp_path) -> Backup: @pytest.fixture async def mock_partial_backup(coresys: CoreSys, tmp_path) -> Backup: """Mock a partial backup.""" - mock_backup = Backup(coresys, Path(tmp_path, "test_backup")) - mock_backup.new("test", "Test", utcnow().isoformat(), BackupType.PARTIAL) + mock_backup = Backup(coresys, Path(tmp_path, "test_backup"), "test") + mock_backup.new("Test", utcnow().isoformat(), BackupType.PARTIAL) mock_backup.repositories = ["https://github.com/awesome-developer/awesome-repo"] mock_backup.docker = {} mock_backup._data[ATTR_ADDONS] = [ @@ -593,7 +594,7 @@ async def backups( temp_tar = Path(tmp_path, f"{slug}.tar") with SecureTarFile(temp_tar, "w"): pass - backup = Backup(coresys, temp_tar) + backup = Backup(coresys, temp_tar, slug) backup._data = { # pylint: disable=protected-access ATTR_SLUG: slug, ATTR_DATE: utcnow().isoformat(), diff --git a/tests/docker/test_interface.py b/tests/docker/test_interface.py index 3cf03c54948..1fc962c274d 100644 --- a/tests/docker/test_interface.py +++ b/tests/docker/test_interface.py @@ -159,23 +159,36 @@ async def test_attach_existing_container( ): await coresys.homeassistant.core.instance.attach(AwesomeVersion("2022.7.3")) await asyncio.sleep(0) - fire_event.assert_called_once_with( - BusEvent.DOCKER_CONTAINER_STATE_CHANGE, - DockerContainerStateEvent("homeassistant", expected, "abc123", 1), - ) + assert [ + event + for event in fire_event.call_args_list + if event.args[0] == BusEvent.DOCKER_CONTAINER_STATE_CHANGE + ] == [ + call( + BusEvent.DOCKER_CONTAINER_STATE_CHANGE, + DockerContainerStateEvent("homeassistant", expected, "abc123", 1), + ) + ] fire_event.reset_mock() await coresys.homeassistant.core.instance.attach( AwesomeVersion("2022.7.3"), skip_state_event_if_down=True ) await asyncio.sleep(0) + docker_events = [ + event + for event in fire_event.call_args_list + if event.args[0] == BusEvent.DOCKER_CONTAINER_STATE_CHANGE + ] if fired_when_skip_down: - fire_event.assert_called_once_with( - BusEvent.DOCKER_CONTAINER_STATE_CHANGE, - DockerContainerStateEvent("homeassistant", expected, "abc123", 1), - ) + assert docker_events == [ + call( + BusEvent.DOCKER_CONTAINER_STATE_CHANGE, + DockerContainerStateEvent("homeassistant", expected, "abc123", 1), + ) + ] else: - fire_event.assert_not_called() + assert not docker_events async def test_attach_container_failure(coresys: CoreSys): @@ -195,7 +208,11 @@ async def test_attach_container_failure(coresys: CoreSys): type(coresys.bus), "fire_event" ) as fire_event: await coresys.homeassistant.core.instance.attach(AwesomeVersion("2022.7.3")) - fire_event.assert_not_called() + assert not [ + event + for event in fire_event.call_args_list + if event.args[0] == BusEvent.DOCKER_CONTAINER_STATE_CHANGE + ] assert coresys.homeassistant.core.instance.meta_config == image_config diff --git a/tests/jobs/test_job_decorator.py b/tests/jobs/test_job_decorator.py index d33451248e1..a3e174be857 100644 --- a/tests/jobs/test_job_decorator.py +++ b/tests/jobs/test_job_decorator.py @@ -1,7 +1,7 @@ """Test the condition decorators.""" # pylint: disable=protected-access,import-error import asyncio -from datetime import timedelta +from datetime import datetime, timedelta from unittest.mock import ANY, AsyncMock, Mock, PropertyMock, patch from uuid import uuid4 @@ -9,7 +9,7 @@ import pytest import time_machine -from supervisor.const import CoreState +from supervisor.const import BusEvent, CoreState from supervisor.coresys import CoreSys from supervisor.exceptions import ( AudioUpdateError, @@ -19,7 +19,7 @@ ) from supervisor.host.const import HostFeature from supervisor.host.manager import HostManager -from supervisor.jobs import SupervisorJob +from supervisor.jobs import JobSchedulerOptions, SupervisorJob from supervisor.jobs.const import JobExecutionLimit from supervisor.jobs.decorator import Job, JobCondition from supervisor.jobs.job_group import JobGroup @@ -979,6 +979,7 @@ async def execute_default(self) -> bool: "stage": None, "done": True, "parent_id": None, + "errors": [], }, }, } @@ -1095,3 +1096,104 @@ def release_limit_check(self): await task assert job.done assert coresys.jobs.jobs == [job] + + +async def test_job_scheduled_delay(coresys: CoreSys): + """Test job that schedules a job to start after delay.""" + + class TestClass: + """Test class.""" + + def __init__(self, coresys: CoreSys) -> None: + """Initialize object.""" + self.coresys = coresys + + @Job(name="test_job_scheduled_delay_job_scheduler") + async def job_scheduler(self) -> tuple[SupervisorJob, asyncio.TimerHandle]: + """Schedule a job to run after delay.""" + return self.coresys.jobs.schedule_job( + self.job_task, JobSchedulerOptions(delayed_start=0.1) + ) + + @Job(name="test_job_scheduled_delay_job_task") + async def job_task(self) -> None: + """Do scheduled work.""" + self.coresys.jobs.current.stage = "work" + + test = TestClass(coresys) + + job, _ = await test.job_scheduler() + started = False + ended = False + + async def start_listener(job_id: str): + nonlocal started + started = started or job_id == job.uuid + + async def end_listener(job_id: str): + nonlocal ended + ended = ended or job_id == job.uuid + + coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_START, start_listener) + coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, end_listener) + + await asyncio.sleep(0.2) + + assert started + assert ended + assert job.done + assert job.name == "test_job_scheduled_delay_job_task" + assert job.stage == "work" + assert job.parent_id is None + + +async def test_job_scheduled_at(coresys: CoreSys): + """Test job that schedules a job to start at a specified time.""" + dt = datetime.now() + + class TestClass: + """Test class.""" + + def __init__(self, coresys: CoreSys) -> None: + """Initialize object.""" + self.coresys = coresys + + @Job(name="test_job_scheduled_at_job_scheduler") + async def job_scheduler(self) -> tuple[SupervisorJob, asyncio.TimerHandle]: + """Schedule a job to run at specified time.""" + return self.coresys.jobs.schedule_job( + self.job_task, JobSchedulerOptions(start_at=dt + timedelta(seconds=0.1)) + ) + + @Job(name="test_job_scheduled_at_job_task") + async def job_task(self) -> None: + """Do scheduled work.""" + self.coresys.jobs.current.stage = "work" + + test = TestClass(coresys) + + with time_machine.travel(dt): + job, _ = await test.job_scheduler() + + started = False + ended = False + + async def start_listener(job_id: str): + nonlocal started + started = started or job_id == job.uuid + + async def end_listener(job_id: str): + nonlocal ended + ended = ended or job_id == job.uuid + + coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_START, start_listener) + coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, end_listener) + + await asyncio.sleep(0.2) + + assert started + assert ended + assert job.done + assert job.name == "test_job_scheduled_at_job_task" + assert job.stage == "work" + assert job.parent_id is None diff --git a/tests/jobs/test_job_manager.py b/tests/jobs/test_job_manager.py index b3d9537378f..a85a712de15 100644 --- a/tests/jobs/test_job_manager.py +++ b/tests/jobs/test_job_manager.py @@ -106,6 +106,7 @@ async def test_notify_on_change(coresys: CoreSys): "stage": None, "done": None, "parent_id": None, + "errors": [], }, }, } @@ -126,6 +127,7 @@ async def test_notify_on_change(coresys: CoreSys): "stage": "test", "done": None, "parent_id": None, + "errors": [], }, }, } @@ -146,6 +148,7 @@ async def test_notify_on_change(coresys: CoreSys): "stage": "test", "done": None, "parent_id": None, + "errors": [], }, }, } @@ -166,6 +169,7 @@ async def test_notify_on_change(coresys: CoreSys): "stage": "test", "done": False, "parent_id": None, + "errors": [], }, }, } @@ -185,6 +189,7 @@ async def test_notify_on_change(coresys: CoreSys): "stage": "test", "done": True, "parent_id": None, + "errors": [], }, }, }