Skip to content

Commit

Permalink
[Envs][Execution] Add general task archival/restore utilities and ref…
Browse files Browse the repository at this point in the history
…actor existing code (#124)

This adds generic task result archival/restore utilities that will be
used for results transfer to/from an environment. This also refactors
the existing archive/restore tools to use these new utilities.

This PR also:
- Resolves #82 
- Resolves #106 (we switch to `zstdmt` by default on Linux platforms)
  • Loading branch information
geoffxy authored Dec 24, 2024
1 parent 775a6f7 commit 303ea3e
Show file tree
Hide file tree
Showing 16 changed files with 593 additions and 154 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ src/conductor_cli.egg-info
.mypy_cache
build
dist
env

cond-out

Expand Down
6 changes: 6 additions & 0 deletions errors/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@
The provided archive contains task output(s) that already exist in the output
directory '{output_dir}'.
4007:
name: UnsupportedArchiveType
message: >-
The provided archive file was compressed as {archive_type}, which is not
supported on your platform.
# General Conductor errors (error code 5xxx)
5001:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ exclude = '''
| src/conductor/errors/generated.py
| src/conductor/envs/proto_gen/
| explorer/
| env/
)
'''

Expand Down
80 changes: 20 additions & 60 deletions src/conductor/cli/archive.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import pathlib
import datetime
import subprocess
from typing import List, Optional

import conductor.filename as f
from conductor.config import ARCHIVE_VERSION_INDEX
from conductor.context import Context
from conductor.errors import (
CreateArchiveFailed,
OutputFileExists,
OutputPathDoesNotExist,
NoTaskOutputsToArchive,
)
from conductor.task_identifier import TaskIdentifier
from conductor.task_types.base import TaskType
from conductor.execution.version_index import VersionIndex
from conductor.utils.user_code import cli_command
from conductor.utils.output_archiving import (
create_archive,
platform_archive_type,
ArchiveType,
)


def register_command(subparsers):
Expand Down Expand Up @@ -49,16 +50,18 @@ def register_command(subparsers):
parser.set_defaults(func=main)


def generate_archive_name() -> str:
def generate_archive_name(archive_type: ArchiveType) -> str:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d+%H-%M-%S")
return f.archive(timestamp=timestamp)
return f.archive(timestamp=timestamp, archive_type=archive_type)


def handle_output_path(ctx: Context, raw_output_path: Optional[str]) -> pathlib.Path:
def handle_output_path(
ctx: Context, raw_output_path: Optional[str], archive_type: ArchiveType
) -> pathlib.Path:
if raw_output_path is None:
output_path = pathlib.Path(
ctx.output_path,
generate_archive_name(),
generate_archive_name(archive_type),
)
return output_path

Expand All @@ -70,7 +73,7 @@ def handle_output_path(ctx: Context, raw_output_path: Optional[str]) -> pathlib.
if output_path.is_dir():
# Corresponds to the case where the user provides a path to a
# directory where the archive should be stored
return output_path / generate_archive_name()
return output_path / generate_archive_name(archive_type)
raise OutputFileExists()

elif output_path.parent.exists() and output_path.parent.is_dir():
Expand Down Expand Up @@ -105,67 +108,27 @@ def append_if_archivable(task: TaskType):
return relevant_tasks


def create_archive(
ctx: Context,
archive_index: VersionIndex,
output_archive_path: pathlib.Path,
archive_index_path: pathlib.Path,
) -> None:
output_dirs_str = [
str(
pathlib.Path(
task_id.path,
f.task_output_dir(task_id, version),
)
)
for task_id, version in archive_index.get_all_versions()
]

try:
process = subprocess.Popen(
[
"tar",
"czf", # Create a new archive and use gzip to compress
str(output_archive_path),
"-C", # Files to put in the archive are relative to `ctx.output_path`
str(ctx.output_path),
str(archive_index_path.relative_to(ctx.output_path)),
*output_dirs_str,
],
shell=False,
)
process.wait()
if process.returncode != 0:
raise CreateArchiveFailed().add_extra_context(
"The tar utility returned a non-zero error code."
)

except OSError as ex:
raise CreateArchiveFailed().add_extra_context(str(ex))


@cli_command
def main(args):
ctx = Context.from_cwd()
output_archive_path = handle_output_path(ctx, args.output)
archive_type = platform_archive_type()
output_archive_path = handle_output_path(ctx, args.output, archive_type)

# If `None`, we should archive all tasks
tasks_to_archive = compute_tasks_to_archive(ctx, args.task_identifier)
if tasks_to_archive is not None and len(tasks_to_archive) == 0:
raise NoTaskOutputsToArchive()

try:
archive_index_path = pathlib.Path(ctx.output_path, ARCHIVE_VERSION_INDEX)
archive_index_path.unlink(missing_ok=True)
archive_index = VersionIndex.create_or_load(archive_index_path)
total_entry_count = ctx.version_index.copy_entries_to(
dest=archive_index, tasks=tasks_to_archive, latest_only=args.latest
tasks_to_archive_with_versions = ctx.version_index.get_versioned_tasks(
tasks=tasks_to_archive, latest_only=args.latest
)
if total_entry_count == 0:
if len(tasks_to_archive_with_versions) == 0:
raise NoTaskOutputsToArchive()

archive_index.commit_changes()
create_archive(ctx, archive_index, output_archive_path, archive_index_path)
create_archive(
ctx, tasks_to_archive_with_versions, output_archive_path, archive_type
)

# Compute a relative path to the current working directory, if possible
try:
Expand All @@ -177,6 +140,3 @@ def main(args):
except:
output_archive_path.unlink(missing_ok=True)
raise

finally:
archive_index_path.unlink(missing_ok=True)
84 changes: 8 additions & 76 deletions src/conductor/cli/restore.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import pathlib
import subprocess
import shutil
import sqlite3

import conductor.filename as f
from conductor.config import ARCHIVE_STAGING, ARCHIVE_VERSION_INDEX
from conductor.context import Context
from conductor.errors import ArchiveFileInvalid, DuplicateTaskOutput
from conductor.execution.version_index import VersionIndex
from conductor.errors import ArchiveFileInvalid
from conductor.utils.user_code import cli_command
from conductor.utils.output_archiving import restore_archive


def register_command(subparsers):
Expand All @@ -21,25 +16,14 @@ def register_command(subparsers):
type=str,
help="Path to the archive file to restore.",
)
parser.add_argument(
"--strict",
action="store_true",
help="If set, the restore operation will fail if any task output is already present.",
)
parser.set_defaults(func=main)


def extract_archive(archive_file: pathlib.Path, staging_path: pathlib.Path):
try:
process = subprocess.Popen(
["tar", "xzf", str(archive_file), "-C", str(staging_path)],
shell=False,
)
process.wait()
if process.returncode != 0:
raise ArchiveFileInvalid().add_extra_context(
"The tar utility returned a non-zero error code."
)

except OSError as ex:
raise ArchiveFileInvalid().add_extra_context(str(ex))


@cli_command
def main(args):
ctx = Context.from_cwd()
Expand All @@ -48,56 +32,4 @@ def main(args):
if not archive_file.is_file():
raise ArchiveFileInvalid()

try:
archive_version_index = None
staging_path = ctx.output_path / ARCHIVE_STAGING
staging_path.mkdir(exist_ok=True)
extract_archive(archive_file, staging_path)

archive_version_index_path = staging_path / ARCHIVE_VERSION_INDEX
if not archive_version_index_path.is_file():
raise ArchiveFileInvalid().add_extra_context(
"Could not locate the archive version index."
)

archive_version_index = VersionIndex.create_or_load(archive_version_index_path)
try:
archive_version_index.copy_entries_to(
dest=ctx.version_index, tasks=None, latest_only=False
)
except sqlite3.IntegrityError as ex:
raise DuplicateTaskOutput(output_dir=str(ctx.output_path)) from ex

# Copy over all archived task outputs
for task_id, version in archive_version_index.get_all_versions():
src_task_path = pathlib.Path(
staging_path, task_id.path, f.task_output_dir(task_id, version)
)
if not src_task_path.is_dir():
raise ArchiveFileInvalid().add_extra_context(
"Missing archived task output for '{}' at version {} in the "
"archive.".format(str(task_id), str(version))
)

dest_task_path = pathlib.Path(
ctx.output_path, task_id.path, f.task_output_dir(task_id, version)
)
dest_task_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(src_task_path, dest_task_path)
if not dest_task_path.is_dir():
raise ArchiveFileInvalid().add_extra_context(
"Missing copied archived task output for '{}' at version {}.".format(
str(task_id), str(version)
)
)

# Everything was copied over and verified - safe to commit the index changes
ctx.version_index.commit_changes()

except:
ctx.version_index.rollback_changes()
raise

finally:
del archive_version_index
shutil.rmtree(staging_path, ignore_errors=True)
restore_archive(ctx, archive_file, expect_no_duplicates=args.strict)
2 changes: 1 addition & 1 deletion src/conductor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
SLOT_ENV_VARIABLE_NAME = "COND_SLOT"

# A template for the default name of a Conductor archive.
ARCHIVE_FILE_NAME_TEMPLATE = "cond-archive+{timestamp}.tar.gz"
ARCHIVE_FILE_NAME_TEMPLATE = "cond-archive+{timestamp}.tar.{extension}"

# The file name of the version index used in a Conductor archive.
ARCHIVE_VERSION_INDEX = "version_index_archive.sqlite"
Expand Down
2 changes: 2 additions & 0 deletions src/conductor/envs/maestro/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ async def execute_task(
)
executor = Executor(execution_slots=1, silent=True)
executor.run_plan(plan, ctx)
# Make sure any new versions are committed.
ctx.version_index.commit_changes()

end_timestamp = int(time.time())
return ExecuteTaskResponse(
Expand Down
16 changes: 16 additions & 0 deletions src/conductor/errors/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,20 @@ def _message(self):
)


class UnsupportedArchiveType(ConductorError):
error_code = 4007

def __init__(self, **kwargs):
super().__init__()
self.kwargs = kwargs
self.archive_type = kwargs["archive_type"]

def _message(self):
return "The provided archive file was compressed as {archive_type}, which is not supported on your platform.".format(
archive_type=self.archive_type,
)


class ConfigParseError(ConductorError):
error_code = 5001

Expand Down Expand Up @@ -775,6 +789,7 @@ def _message(self):
4004: CreateArchiveFailed,
4005: ArchiveFileInvalid,
4006: DuplicateTaskOutput,
4007: UnsupportedArchiveType,
5001: ConfigParseError,
5002: ConfigInvalidValue,
5003: UnsupportedPlatform,
Expand Down Expand Up @@ -830,6 +845,7 @@ def _message(self):
"CreateArchiveFailed",
"ArchiveFileInvalid",
"DuplicateTaskOutput",
"UnsupportedArchiveType",
"ConfigParseError",
"ConfigInvalidValue",
"UnsupportedPlatform",
Expand Down
Loading

0 comments on commit 303ea3e

Please sign in to comment.