Skip to content

Commit

Permalink
[Tasks] Add run_experiment_group() task type (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy authored Feb 8, 2021
1 parent 9c8b9f1 commit cea027d
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 6 deletions.
15 changes: 15 additions & 0 deletions errors/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@
processing task '{identifier}'. All experiment option values must be
either a string, integer, floating point number, or boolean.
1014:
name: ExperimentGroupDuplicateName
message: >-
Encountered a duplicate experiment instance name '{instance_name}' when
processing a run_experiment_group() task with name '{task_name}'.
Experiment instance names must be unique.
1015:
name: ExperimentGroupInvalidExperimentInstance
message: >-
Encountered an experiment instance that was incorrectly formed when
processing a run_experiment_group() task with name '{task_name}'.
Experiments in an experiment group must be defined using an iterable of
ExperimentInstance named tuples.
# Task graph loading errors (error code 2xxx)
2001:
Expand Down
30 changes: 30 additions & 0 deletions src/conductor/errors/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,34 @@ def _message(self):
)


class ExperimentGroupDuplicateName(ConductorError):
error_code = 1014

def __init__(self, **kwargs):
super().__init__()
self.instance_name = kwargs["instance_name"]
self.task_name = kwargs["task_name"]

def _message(self):
return "Encountered a duplicate experiment instance name '{instance_name}' when processing a run_experiment_group() task with name '{task_name}'. Experiment instance names must be unique.".format(
instance_name=self.instance_name,
task_name=self.task_name,
)


class ExperimentGroupInvalidExperimentInstance(ConductorError):
error_code = 1015

def __init__(self, **kwargs):
super().__init__()
self.task_name = kwargs["task_name"]

def _message(self):
return "Encountered an experiment instance that was incorrectly formed when processing a run_experiment_group() task with name '{task_name}'. Experiments in an experiment group must be defined using a list of ExperimentInstance named tuples.".format(
task_name=self.task_name,
)


class TaskNotFound(ConductorError):
error_code = 2001

Expand Down Expand Up @@ -386,6 +414,8 @@ def _message(self):
"CombineDuplicateDepName",
"ExperimentOptionsNonStringKey",
"ExperimentOptionsNonPrimitiveValue",
"ExperimentGroupDuplicateName",
"ExperimentGroupInvalidExperimentInstance",
"TaskNotFound",
"MissingProjectRoot",
"CyclicDependency",
Expand Down
26 changes: 20 additions & 6 deletions src/conductor/parsing/task_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
ParsingUnknownNameError,
TaskSyntaxError,
)
from conductor.task_types.stdlib import STDLIB_FILES


class TaskLoader:
def __init__(self):
self._tasks = None
self._current_cond_file_path = None
self._task_constructors = {}
for raw_task_type in raw_task_types.values():
self._task_constructors[raw_task_type.name] = self._wrap_task_function(
raw_task_type.load_from_cond_file
)
self._conductor_scope = self._compile_scope()

def parse_cond_file(self, cond_file_path):
"""
Expand All @@ -29,7 +26,7 @@ def parse_cond_file(self, cond_file_path):
with open(cond_file_path) as file:
code = file.read()
# pylint: disable=exec-used
exec(code, self._task_constructors, self._task_constructors)
exec(code, self._conductor_scope.copy())
return tasks
except ConductorError as ex:
ex.add_file_context(file_path=cond_file_path)
Expand All @@ -53,6 +50,23 @@ def parse_cond_file(self, cond_file_path):
self._tasks = None
self._current_cond_file_path = None

def _compile_scope(self):
scope = {}
# Create the task constructors for Conductor's foundational task types.
for raw_task_type in raw_task_types.values():
scope[raw_task_type.name] = self._wrap_task_function(
raw_task_type.load_from_cond_file
)
# We need to explicitly `compile()` the Conductor standard library
# files here to ensure that any uses of Conductor's foundational task
# types bind to the task constructors defined above.
for lib_file_path in STDLIB_FILES:
with open(lib_file_path, "r") as lib_file:
code = compile(lib_file.read(), str(lib_file_path), "exec")
# pylint: disable=exec-used
exec(code, scope)
return scope

def _wrap_task_function(self, task_constructor):
def shim(**kwargs):
raw_task = task_constructor(**kwargs)
Expand Down
5 changes: 5 additions & 0 deletions src/conductor/task_types/stdlib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import pathlib

_MODULE_PATH = pathlib.Path(__file__).resolve().parent

STDLIB_FILES = [_MODULE_PATH / "run_experiment_group.py"]
52 changes: 52 additions & 0 deletions src/conductor/task_types/stdlib/run_experiment_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Dict, Iterable, NamedTuple, Optional, Sequence
from conductor.utils.experiment_options import OptionValue
from conductor.errors import (
ExperimentGroupDuplicateName,
ExperimentGroupInvalidExperimentInstance,
)


class ExperimentInstance(NamedTuple):
name: str
options: Dict[str, OptionValue]


def run_experiment_group(
name: str,
run: str,
experiments: Iterable[ExperimentInstance],
deps: Optional[Sequence[str]] = None,
) -> None:
task_deps = deps if deps is not None else []
relative_experiment_identifiers = []

try:
seen_experiment_names = set()
for experiment in experiments:
if not isinstance(experiment, ExperimentInstance):
raise ExperimentGroupInvalidExperimentInstance(task_name=name)
if experiment.name in seen_experiment_names:
raise ExperimentGroupDuplicateName(
task_name=name, instance_name=experiment.name
)

seen_experiment_names.add(experiment.name)
# run_experiment(): Defined by Conductor at runtime
# pylint: disable=undefined-variable
run_experiment( # type: ignore
name=experiment.name,
run=run,
options=experiment.options,
deps=task_deps,
)
relative_experiment_identifiers.append(":" + experiment.name)

except TypeError as ex:
raise ExperimentGroupInvalidExperimentInstance(task_name=name) from ex

# combine(): Defined by Conductor at runtime
# pylint: disable=undefined-variable
combine( # type: ignore
name=name,
deps=relative_experiment_identifiers,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
run_experiment_group(
name="test",
run="exit 0",
experiments=[
ExperimentInstance(name="test-inst", options={}),
ExperimentInstance(name="test-inst", options={}),
]
)
8 changes: 8 additions & 0 deletions tests/fixture-projects/experiments/invalid-group-type/COND
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
run_experiment_group(
name="test",
run="exit 0",
experiments=[
{"name": "test-inst", "options": {}},
{"name": "test-inst-2", "options": {}},
]
)
11 changes: 11 additions & 0 deletions tests/fixture-projects/experiments/sweep/COND
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
run_experiment_group(
name="threads",
run="./run.sh",
experiments=[
ExperimentInstance(
name="threads-{}".format(threads),
options={"threads": threads},
)
for threads in range(1, 5)
],
)
3 changes: 3 additions & 0 deletions tests/fixture-projects/experiments/sweep/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#! /bin/bash

echo $1 > ${COND_OUT}/output.txt
46 changes: 46 additions & 0 deletions tests/run_experiment_group_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pathlib
from conductor.config import TASK_OUTPUT_DIR_SUFFIX
from .conductor_runner import (
ConductorRunner,
FIXTURE_TEMPLATES,
)


def test_run_experiment_group(tmp_path: pathlib.Path):
cond = ConductorRunner.from_template(tmp_path, FIXTURE_TEMPLATES["experiments"])
result = cond.run("//sweep:threads")
assert result.returncode == 0
assert cond.output_path.is_dir()

combined_dir = pathlib.Path(
cond.output_path, "sweep", ("threads" + TASK_OUTPUT_DIR_SUFFIX)
)
assert combined_dir.is_dir()

expected_tasks = ["threads-{}".format(threads) for threads in range(1, 5)]

# Ensure combined task dirs all exist and are non-empty.
combined_dir_names = [path.name for path in combined_dir.iterdir()]
for task_name in expected_tasks:
assert task_name in combined_dir_names
assert any(True for _ in (combined_dir / task_name).iterdir())

# Ensure individual experiment dirs also exist.
sweep_output = combined_dir.parent
assert sweep_output.is_dir()
sweep_output_count = len(list(sweep_output.iterdir()))

# 4 experiment instances plus the combined output dir.
assert sweep_output_count == 5


def test_run_experiment_group_invalid_duplicate(tmp_path: pathlib.Path):
cond = ConductorRunner.from_template(tmp_path, FIXTURE_TEMPLATES["experiments"])
result = cond.run("//invalid-group-duplicate:test")
assert result.returncode != 0


def test_run_experiment_group_invalid_type(tmp_path: pathlib.Path):
cond = ConductorRunner.from_template(tmp_path, FIXTURE_TEMPLATES["experiments"])
result = cond.run("//invalid-group-type:test")
assert result.returncode != 0

0 comments on commit cea027d

Please sign in to comment.