Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Envs][Execution] Add general task archival/restore utilities and refactor existing code #124

Merged
merged 7 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ src/conductor_cli.egg-info
.mypy_cache
build
dist
env

cond-out

Expand Down
6 changes: 6 additions & 0 deletions errors/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@
The provided archive contains task output(s) that already exist in the output
directory '{output_dir}'.

4007:
name: UnsupportedArchiveType
message: >-
The provided archive file was compressed as {archive_type}, which is not
supported on your platform.


# General Conductor errors (error code 5xxx)
5001:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ exclude = '''
| src/conductor/errors/generated.py
| src/conductor/envs/proto_gen/
| explorer/
| env/
)
'''

Expand Down
80 changes: 20 additions & 60 deletions src/conductor/cli/archive.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import pathlib
import datetime
import subprocess
from typing import List, Optional

import conductor.filename as f
from conductor.config import ARCHIVE_VERSION_INDEX
from conductor.context import Context
from conductor.errors import (
CreateArchiveFailed,
OutputFileExists,
OutputPathDoesNotExist,
NoTaskOutputsToArchive,
)
from conductor.task_identifier import TaskIdentifier
from conductor.task_types.base import TaskType
from conductor.execution.version_index import VersionIndex
from conductor.utils.user_code import cli_command
from conductor.utils.output_archiving import (
create_archive,
platform_archive_type,
ArchiveType,
)


def register_command(subparsers):
Expand Down Expand Up @@ -49,16 +50,18 @@ def register_command(subparsers):
parser.set_defaults(func=main)


def generate_archive_name() -> str:
def generate_archive_name(archive_type: ArchiveType) -> str:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d+%H-%M-%S")
return f.archive(timestamp=timestamp)
return f.archive(timestamp=timestamp, archive_type=archive_type)


def handle_output_path(ctx: Context, raw_output_path: Optional[str]) -> pathlib.Path:
def handle_output_path(
ctx: Context, raw_output_path: Optional[str], archive_type: ArchiveType
) -> pathlib.Path:
if raw_output_path is None:
output_path = pathlib.Path(
ctx.output_path,
generate_archive_name(),
generate_archive_name(archive_type),
)
return output_path

Expand All @@ -70,7 +73,7 @@ def handle_output_path(ctx: Context, raw_output_path: Optional[str]) -> pathlib.
if output_path.is_dir():
# Corresponds to the case where the user provides a path to a
# directory where the archive should be stored
return output_path / generate_archive_name()
return output_path / generate_archive_name(archive_type)
raise OutputFileExists()

elif output_path.parent.exists() and output_path.parent.is_dir():
Expand Down Expand Up @@ -105,67 +108,27 @@ def append_if_archivable(task: TaskType):
return relevant_tasks


def create_archive(
ctx: Context,
archive_index: VersionIndex,
output_archive_path: pathlib.Path,
archive_index_path: pathlib.Path,
) -> None:
output_dirs_str = [
str(
pathlib.Path(
task_id.path,
f.task_output_dir(task_id, version),
)
)
for task_id, version in archive_index.get_all_versions()
]

try:
process = subprocess.Popen(
[
"tar",
"czf", # Create a new archive and use gzip to compress
str(output_archive_path),
"-C", # Files to put in the archive are relative to `ctx.output_path`
str(ctx.output_path),
str(archive_index_path.relative_to(ctx.output_path)),
*output_dirs_str,
],
shell=False,
)
process.wait()
if process.returncode != 0:
raise CreateArchiveFailed().add_extra_context(
"The tar utility returned a non-zero error code."
)

except OSError as ex:
raise CreateArchiveFailed().add_extra_context(str(ex))


@cli_command
def main(args):
ctx = Context.from_cwd()
output_archive_path = handle_output_path(ctx, args.output)
archive_type = platform_archive_type()
output_archive_path = handle_output_path(ctx, args.output, archive_type)

# If `None`, we should archive all tasks
tasks_to_archive = compute_tasks_to_archive(ctx, args.task_identifier)
if tasks_to_archive is not None and len(tasks_to_archive) == 0:
raise NoTaskOutputsToArchive()

try:
archive_index_path = pathlib.Path(ctx.output_path, ARCHIVE_VERSION_INDEX)
archive_index_path.unlink(missing_ok=True)
archive_index = VersionIndex.create_or_load(archive_index_path)
total_entry_count = ctx.version_index.copy_entries_to(
dest=archive_index, tasks=tasks_to_archive, latest_only=args.latest
tasks_to_archive_with_versions = ctx.version_index.get_versioned_tasks(
tasks=tasks_to_archive, latest_only=args.latest
)
if total_entry_count == 0:
if len(tasks_to_archive_with_versions) == 0:
raise NoTaskOutputsToArchive()

archive_index.commit_changes()
create_archive(ctx, archive_index, output_archive_path, archive_index_path)
create_archive(
ctx, tasks_to_archive_with_versions, output_archive_path, archive_type
)

# Compute a relative path to the current working directory, if possible
try:
Expand All @@ -177,6 +140,3 @@ def main(args):
except:
output_archive_path.unlink(missing_ok=True)
raise

finally:
archive_index_path.unlink(missing_ok=True)
84 changes: 8 additions & 76 deletions src/conductor/cli/restore.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import pathlib
import subprocess
import shutil
import sqlite3

import conductor.filename as f
from conductor.config import ARCHIVE_STAGING, ARCHIVE_VERSION_INDEX
from conductor.context import Context
from conductor.errors import ArchiveFileInvalid, DuplicateTaskOutput
from conductor.execution.version_index import VersionIndex
from conductor.errors import ArchiveFileInvalid
from conductor.utils.user_code import cli_command
from conductor.utils.output_archiving import restore_archive


def register_command(subparsers):
Expand All @@ -21,25 +16,14 @@ def register_command(subparsers):
type=str,
help="Path to the archive file to restore.",
)
parser.add_argument(
"--strict",
action="store_true",
help="If set, the restore operation will fail if any task output is already present.",
)
parser.set_defaults(func=main)


def extract_archive(archive_file: pathlib.Path, staging_path: pathlib.Path):
try:
process = subprocess.Popen(
["tar", "xzf", str(archive_file), "-C", str(staging_path)],
shell=False,
)
process.wait()
if process.returncode != 0:
raise ArchiveFileInvalid().add_extra_context(
"The tar utility returned a non-zero error code."
)

except OSError as ex:
raise ArchiveFileInvalid().add_extra_context(str(ex))


@cli_command
def main(args):
ctx = Context.from_cwd()
Expand All @@ -48,56 +32,4 @@ def main(args):
if not archive_file.is_file():
raise ArchiveFileInvalid()

try:
archive_version_index = None
staging_path = ctx.output_path / ARCHIVE_STAGING
staging_path.mkdir(exist_ok=True)
extract_archive(archive_file, staging_path)

archive_version_index_path = staging_path / ARCHIVE_VERSION_INDEX
if not archive_version_index_path.is_file():
raise ArchiveFileInvalid().add_extra_context(
"Could not locate the archive version index."
)

archive_version_index = VersionIndex.create_or_load(archive_version_index_path)
try:
archive_version_index.copy_entries_to(
dest=ctx.version_index, tasks=None, latest_only=False
)
except sqlite3.IntegrityError as ex:
raise DuplicateTaskOutput(output_dir=str(ctx.output_path)) from ex

# Copy over all archived task outputs
for task_id, version in archive_version_index.get_all_versions():
src_task_path = pathlib.Path(
staging_path, task_id.path, f.task_output_dir(task_id, version)
)
if not src_task_path.is_dir():
raise ArchiveFileInvalid().add_extra_context(
"Missing archived task output for '{}' at version {} in the "
"archive.".format(str(task_id), str(version))
)

dest_task_path = pathlib.Path(
ctx.output_path, task_id.path, f.task_output_dir(task_id, version)
)
dest_task_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(src_task_path, dest_task_path)
if not dest_task_path.is_dir():
raise ArchiveFileInvalid().add_extra_context(
"Missing copied archived task output for '{}' at version {}.".format(
str(task_id), str(version)
)
)

# Everything was copied over and verified - safe to commit the index changes
ctx.version_index.commit_changes()

except:
ctx.version_index.rollback_changes()
raise

finally:
del archive_version_index
shutil.rmtree(staging_path, ignore_errors=True)
restore_archive(ctx, archive_file, expect_no_duplicates=args.strict)
2 changes: 1 addition & 1 deletion src/conductor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
SLOT_ENV_VARIABLE_NAME = "COND_SLOT"

# A template for the default name of a Conductor archive.
ARCHIVE_FILE_NAME_TEMPLATE = "cond-archive+{timestamp}.tar.gz"
ARCHIVE_FILE_NAME_TEMPLATE = "cond-archive+{timestamp}.tar.{extension}"

# The file name of the version index used in a Conductor archive.
ARCHIVE_VERSION_INDEX = "version_index_archive.sqlite"
Expand Down
2 changes: 2 additions & 0 deletions src/conductor/envs/maestro/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ async def execute_task(
)
executor = Executor(execution_slots=1, silent=True)
executor.run_plan(plan, ctx)
# Make sure any new versions are committed.
ctx.version_index.commit_changes()

end_timestamp = int(time.time())
return ExecuteTaskResponse(
Expand Down
16 changes: 16 additions & 0 deletions src/conductor/errors/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,20 @@ def _message(self):
)


class UnsupportedArchiveType(ConductorError):
error_code = 4007

def __init__(self, **kwargs):
super().__init__()
self.kwargs = kwargs
self.archive_type = kwargs["archive_type"]

def _message(self):
return "The provided archive file was compressed as {archive_type}, which is not supported on your platform.".format(
archive_type=self.archive_type,
)


class ConfigParseError(ConductorError):
error_code = 5001

Expand Down Expand Up @@ -775,6 +789,7 @@ def _message(self):
4004: CreateArchiveFailed,
4005: ArchiveFileInvalid,
4006: DuplicateTaskOutput,
4007: UnsupportedArchiveType,
5001: ConfigParseError,
5002: ConfigInvalidValue,
5003: UnsupportedPlatform,
Expand Down Expand Up @@ -830,6 +845,7 @@ def _message(self):
"CreateArchiveFailed",
"ArchiveFileInvalid",
"DuplicateTaskOutput",
"UnsupportedArchiveType",
"ConfigParseError",
"ConfigInvalidValue",
"UnsupportedPlatform",
Expand Down
Loading