Skip to content

Commit

Permalink
feat: enable --input-dir flag to allow custom input directories (#29)
Browse files Browse the repository at this point in the history
* Search for AHB data in both the submodule as well as project root

* Make CLI accept `--input-dir` flag

* Add import/export tests for CLI commands

* Log absolute path

Co-authored-by: konstantin <konstantin.klein@hochfrequenz.de>

* Log absolute path

Co-authored-by: konstantin <konstantin.klein@hochfrequenz.de>

* Use logger `exception` instead of `error`

Co-authored-by: konstantin <konstantin.klein@hochfrequenz.de>

* Rename `SUBMODULE` to `RELATIVE_PATH_TO_SUBMODULE`

* Remove unused variable

* Force `--output-dir` by removing default output destination

* Add `data/output` as output directory to CLI test workflow

* Use kwargs for `_process_files`

* Use lowercase `tuple[]`

* Add `_is_formatversion_dir()` to check if path leads to FV directory

* Raise error instead of returning empty lists and warnings

* Grammar and typo adjustments

* Remove (default) fallback input directory

* Add `-i` and `-o` aliases for input and output directory, respectively

* Add missing input flag to CLI testing workflow

* Fix typo in submodule path

* Catch 'em all

---------

Co-authored-by: konstantin <konstantin.klein@hochfrequenz.de>
  • Loading branch information
OLILHR and hf-kklein authored Nov 20, 2024
1 parent f561abb commit 07f6391
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cli_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ jobs:
- name: Try to install the script, then run the cli
run: |
pip install .
ahlbatross
ahlbatross -i data/machine-readable_anwendungshandbuecher -o data/output
29 changes: 23 additions & 6 deletions src/ahlbatross/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
import logging
import sys
from pathlib import Path
from typing import Optional

import pandas as pd
import typer
from rich.console import Console

from ahlbatross.main import DEFAULT_OUTPUT_DIR, _process_submodule
from ahlbatross.main import process_ahb_data

app = typer.Typer(help="ahlbatross diffs machine-readable AHBs")
_logger = logging.getLogger(__name__)
Expand All @@ -20,14 +19,31 @@


@app.command()
def main(output_dir: Optional[Path] = None) -> None:
def main(
input_dir: Path = typer.Option(..., "--input-dir", "-i", help="Directory containing AHB data."),
output_dir: Path = typer.Option(
..., "--output-dir", "-o", help="Destination path to output directory containing processed files."
),
) -> None:
"""
main entrypoint for AHlBatross.
"""
try:
_process_submodule(output_dir or DEFAULT_OUTPUT_DIR)
if not input_dir.exists():
_logger.error("❌ Input directory does not exist: %s", input_dir.absolute())
sys.exit(1)
process_ahb_data(input_dir, output_dir)
except FileNotFoundError as e:
_logger.error("❌ Path error: %s", str(e))
sys.exit(1)
except PermissionError as e:
_logger.error("❌ Permission denied: %s", str(e))
sys.exit(1)
except (OSError, pd.errors.EmptyDataError, ValueError) as e:
_logger.error("❌error processing AHB files: %s", str(e))
_logger.exception("❌ Error processing AHB files: %s", str(e))
sys.exit(1)
except (RuntimeError, TypeError, AttributeError) as e:
_logger.exception("❌ Unexpected error: %s", str(e))
sys.exit(1)


Expand All @@ -38,6 +54,7 @@ def cli() -> None:
app()


# run locally using $ PYTHONPATH=src python -m ahlbatross.cli
# to run the script during local development, execute the following command:
# PYTHONPATH=src python -m ahlbatross.cli -i data/machine-readable_anwendungshandbuecher -o data/output
if __name__ == "__main__":
main()
79 changes: 43 additions & 36 deletions src/ahlbatross/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import re
import sys
from pathlib import Path
from typing import Any, Tuple, TypeAlias
from typing import Any, TypeAlias

import pandas as pd
from pandas.core.frame import DataFrame
Expand All @@ -19,26 +19,25 @@
logging.basicConfig(level=logging.INFO, format="%(message)s", stream=sys.stdout)
logger = logging.getLogger(__name__)

SUBMODULE = Path("data/machine-readable_anwendungshandbuecher")
DEFAULT_OUTPUT_DIR = Path("data/output")

XlsxFormat: TypeAlias = Format


def _get_available_formatversions() -> list[str]:
def _is_formatversion_dir(path: Path) -> bool:
"""
get all available <formatversion> directories in SUBMODULE, sorted from latest to oldest.
confirm if path is a formatversion directory - for instance "FV2504/".
"""
if not SUBMODULE.exists():
logger.error("❌Base directory does not exist: %s", SUBMODULE)
return []
return path.is_dir() and path.name.startswith("FV") and len(path.name) == 6

formatversion_dirs = [
d.name for d in SUBMODULE.iterdir() if d.is_dir() and d.name.startswith("FV") and len(d.name) == 6
]

formatversion_dirs.sort(key=parse_formatversions, reverse=True)
def _get_available_formatversions(root_dir: Path) -> list[str]:
"""
get all available <formatversion> directories, sorted from latest to oldest.
"""
if not root_dir.exists():
raise FileNotFoundError(f"❌ Submodule / base directory does not exist: {root_dir}")

formatversion_dirs = [d.name for d in root_dir.iterdir() if _is_formatversion_dir(d)]
formatversion_dirs.sort(key=parse_formatversions, reverse=True)
return formatversion_dirs


Expand All @@ -47,38 +46,39 @@ def _get_nachrichtenformat_dirs(formatversion_dir: Path) -> list[Path]:
get all <nachrichtenformat> directories that contain a csv subdirectory.
"""
if not formatversion_dir.exists():
logger.warning("❌formatversion directory not found: %s", formatversion_dir)
return []
raise FileNotFoundError(f"❌ Formatversion directory not found: {formatversion_dir.absolute()}")

return [d for d in formatversion_dir.iterdir() if d.is_dir() and (d / "csv").exists() and (d / "csv").is_dir()]


def _is_formatversion_dir_empty(formatversion: str) -> bool:
def _is_formatversion_dir_empty(root_dir: Path, formatversion: str) -> bool:
"""
check if a <formatversion> directory does not contain any <nachrichtenformat> directories.
"""
formatversion_dir = SUBMODULE / formatversion
formatversion_dir = root_dir / formatversion
if not formatversion_dir.exists():
return True

return len(_get_nachrichtenformat_dirs(formatversion_dir)) == 0


def determine_consecutive_formatversions() -> list[Tuple[str, str]]:
def determine_consecutive_formatversions(root_dir: Path) -> list[tuple[str, str]]:
"""
generate pairs of consecutive <formatversion> directories to compare and skip empty directories.
"""
formatversion_list = _get_available_formatversions()
formatversion_list = _get_available_formatversions(root_dir)
consecutive_formatversions = []

for i in range(len(formatversion_list) - 1):
subsequent_formatversion = formatversion_list[i]
previous_formatversion = formatversion_list[i + 1]

# skip if either directory is empty.
if _is_formatversion_dir_empty(subsequent_formatversion) or _is_formatversion_dir_empty(previous_formatversion):
if _is_formatversion_dir_empty(root_dir, subsequent_formatversion) or _is_formatversion_dir_empty(
root_dir, previous_formatversion
):
logger.warning(
"⚠️skipping empty consecutive formatversions: %s -> %s",
"❗️Skipping empty consecutive formatversions: %s -> %s",
subsequent_formatversion,
previous_formatversion,
)
Expand All @@ -91,16 +91,16 @@ def determine_consecutive_formatversions() -> list[Tuple[str, str]]:

# pylint:disable=too-many-locals
def get_matching_pruefid_files(
previous_formatversion: str, subsequent_formatversion: str
root_dir: Path, previous_formatversion: str, subsequent_formatversion: str
) -> list[tuple[Path, Path, str, str]]:
"""
find matching ahb/<pruefid>.csv files across <formatversion> and <nachrichtenformat> directories.
"""
previous_formatversion_dir = SUBMODULE / previous_formatversion
subsequent_formatversion_dir = SUBMODULE / subsequent_formatversion
previous_formatversion_dir = root_dir / previous_formatversion
subsequent_formatversion_dir = root_dir / subsequent_formatversion

if not all(d.exists() for d in [previous_formatversion_dir, subsequent_formatversion_dir]):
logger.error("❌at least one formatversion directory does not exist.")
logger.error("❌ At least one formatversion directory does not exist.")
return []

matching_files = []
Expand Down Expand Up @@ -417,12 +417,12 @@ def align_columns(


def _process_files(
previous_formatversion: str, subsequent_formatversion: str, output_dir: Path = DEFAULT_OUTPUT_DIR
root_dir: Path, previous_formatversion: str, subsequent_formatversion: str, output_dir: Path
) -> None:
"""
process all matching ahb/<pruefid>.csv files between two <formatversion> directories.
"""
matching_files = get_matching_pruefid_files(previous_formatversion, subsequent_formatversion)
matching_files = get_matching_pruefid_files(root_dir, previous_formatversion, subsequent_formatversion)

if not matching_files:
logger.warning("No matching files found to compare")
Expand Down Expand Up @@ -453,36 +453,43 @@ def _process_files(
merged_df.to_csv(csv_path, index=False)
export_to_excel(merged_df, str(xlsx_path))

logger.info("✅successfully processed %s/%s", nachrichtentyp, pruefid)
logger.info("✅ Successfully processed %s/%s", nachrichtentyp, pruefid)

except pd.errors.EmptyDataError:
logger.error("❌empty or corrupted CSV file for %s/%s", nachrichtentyp, pruefid)
logger.error("❌ Empty or corrupted CSV file for %s/%s", nachrichtentyp, pruefid)
except OSError as e:
logger.error("❌file system error for %s/%s: %s", nachrichtentyp, pruefid, str(e))
logger.error("❌ File system error for %s/%s: %s", nachrichtentyp, pruefid, str(e))
except ValueError as e:
logger.error("❌data processing error for %s/%s: %s", nachrichtentyp, pruefid, str(e))
logger.error("❌ Data processing error for %s/%s: %s", nachrichtentyp, pruefid, str(e))


def _process_submodule(output_dir: Path = DEFAULT_OUTPUT_DIR) -> None:
def process_ahb_data(input_dir: Path, output_dir: Path) -> None:
"""
processes all valid consecutive <formatversion> subdirectories.
"""
logger.info("Found AHB root directory at: %s", input_dir.absolute())
logger.info("The output dir is %s", output_dir.absolute())
consecutive_formatversions = determine_consecutive_formatversions()

consecutive_formatversions = determine_consecutive_formatversions(input_dir)

if not consecutive_formatversions:
logger.warning("⚠️no valid consecutive formatversion subdirectories found to compare")
logger.warning("❗️ No valid consecutive formatversion subdirectories found to compare.")
return

for subsequent_formatversion, previous_formatversion in consecutive_formatversions:
logger.info(
"⌛processing consecutive formatversions: %s -> %s", subsequent_formatversion, previous_formatversion
)
try:
_process_files(previous_formatversion, subsequent_formatversion, output_dir)
_process_files(
root_dir=input_dir,
previous_formatversion=previous_formatversion,
subsequent_formatversion=subsequent_formatversion,
output_dir=output_dir,
)
except (OSError, pd.errors.EmptyDataError, ValueError) as e:
logger.error(
"❌error processing formatversions %s -> %s: %s",
"❌ Error processing formatversions %s -> %s: %s",
subsequent_formatversion,
previous_formatversion,
str(e),
Expand Down
30 changes: 30 additions & 0 deletions unittests/test_export.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import logging
import os
import tempfile
from pathlib import Path

import pandas as pd
import pytest
from pandas.testing import assert_frame_equal
from typer.testing import CliRunner

from ahlbatross.cli import app
from ahlbatross.excel import export_to_excel
from ahlbatross.main import align_columns

Expand Down Expand Up @@ -62,3 +66,29 @@ def test_empty_dataframe_export() -> None:

assert os.path.exists(xlsx_path)
assert os.path.getsize(xlsx_path) > 0


def test_cli_with_custom_output_directory(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None:
"""
test CLI handling of custom --output-dir.
"""
caplog.set_level(logging.INFO)

input_dir = tmp_path / "input"
input_dir.mkdir()
fv_dir = input_dir / "FV2504" / "Nachrichtenformat_1"
fv_dir.mkdir(parents=True)
csv_dir = fv_dir / "csv"
csv_dir.mkdir()
(csv_dir / "test.csv").write_text("test data")

output_dir = tmp_path / "custom_output"
output_dir.mkdir()

runner = CliRunner()
result = runner.invoke(
app, ["--input-dir", str(input_dir), "--output-dir", str(output_dir)], catch_exceptions=False
)

assert result.exit_code == 0
assert str(output_dir.absolute()) in caplog.text
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from pathlib import Path

import pytest
from _pytest.monkeypatch import MonkeyPatch
from typer.testing import CliRunner

from ahlbatross.cli import app
from ahlbatross.format_version_helpers import parse_formatversions
from ahlbatross.main import determine_consecutive_formatversions, get_matching_pruefid_files

Expand Down Expand Up @@ -34,11 +36,10 @@ def test_parse_invalid_formatversions() -> None:
parse_formatversions(invalid_input)


def test_get_matching_files(tmp_path: Path, monkeypatch: MonkeyPatch) -> None:
def test_get_matching_files(tmp_path: Path) -> None:
"""
test find matching files across formatversions.
"""
monkeypatch.setattr("ahlbatross.main.SUBMODULE", tmp_path)

submodule: dict[str, dict[str, dict[str, str]]] = {
"FV2504": {
Expand All @@ -60,20 +61,21 @@ def test_get_matching_files(tmp_path: Path, monkeypatch: MonkeyPatch) -> None:
for file, content in files.items():
(nachrichtenformat_dir / file).write_text(content)

matches = get_matching_pruefid_files("FV2410", "FV2504")
matches = get_matching_pruefid_files(
root_dir=tmp_path, previous_formatversion="FV2410", subsequent_formatversion="FV2504"
)

assert len(matches) == 2
assert matches[0][2] == "nachrichtenformat_1"
assert matches[0][3] == "pruefid_1"
assert matches[1][3] == "pruefid_2"


def test_determine_consecutive_formatversions(tmp_path: Path, monkeypatch: MonkeyPatch) -> None:
def test_determine_consecutive_formatversions(tmp_path: Path) -> None:
"""
test successful determination of consecutive formatversions.
"""
monkeypatch.setattr("ahlbatross.main.SUBMODULE", tmp_path)

# Create test directory structure with formatversions and add dummy file
submodule: dict[str, dict[str, bool | dict[str, str]]] = {
"FV2504": {"nachrichtenformat_1": True},
"FV2410": {"nachrichtenformat_1": True},
Expand All @@ -92,5 +94,42 @@ def test_determine_consecutive_formatversions(tmp_path: Path, monkeypatch: Monke
csv_dir.mkdir()
(csv_dir / "test.csv").write_text("test")

result = determine_consecutive_formatversions()
result = determine_consecutive_formatversions(root_dir=tmp_path)
assert result == [("FV2504", "FV2410")]


def test_cli_with_custom_input_directory(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None:
"""
test CLI handling of custom --input-dir.
"""
caplog.set_level(logging.INFO)

input_dir = tmp_path / "custom_input"
input_dir.mkdir()
fv_dir = input_dir / "FV2504" / "Nachrichtenformat_1"
fv_dir.mkdir(parents=True)
csv_dir = fv_dir / "csv"
csv_dir.mkdir()
(csv_dir / "test.csv").write_text("test data")

runner = CliRunner()
result = runner.invoke(app, ["--input-dir", str(input_dir), "--output-dir", str(tmp_path)], catch_exceptions=False)

assert result.exit_code == 0
assert "No valid consecutive formatversion subdirectories found to compare." in caplog.text


def test_cli_with_invalid_input_directory(tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None:
"""
test CLI handling of invalid --input-dir.
"""
caplog.set_level(logging.INFO)
invalid_dir = tmp_path / "does_not_exist"
runner = CliRunner()
result = runner.invoke(
app, ["--input-dir", str(invalid_dir), "--output-dir", str(tmp_path)], catch_exceptions=False
)

assert "❌ Input directory does not exist:" in caplog.text
assert str(invalid_dir) in caplog.text
assert result.exit_code == 1

0 comments on commit 07f6391

Please sign in to comment.