diff --git a/tests/trestle/tasks/csv_to_oscal_cd_test.py b/tests/trestle/tasks/csv_to_oscal_cd_test.py index 67a5930bc..1eef1cd25 100644 --- a/tests/trestle/tasks/csv_to_oscal_cd_test.py +++ b/tests/trestle/tasks/csv_to_oscal_cd_test.py @@ -32,7 +32,7 @@ def monkey_exception() -> None: """Monkey exception.""" - raise Exception('foobar') + raise RuntimeError('foobar') def _get_rows(file_: str) -> List[List[str]]: @@ -139,7 +139,7 @@ def _get_config_section_init(tmp_path: pathlib.Path, fname: str) -> tuple: def test_print_info(tmp_path: pathlib.Path) -> None: """Test print_info.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.print_info() assert retval is None @@ -147,7 +147,7 @@ def test_print_info(tmp_path: pathlib.Path) -> None: def test_simulate(tmp_path: pathlib.Path) -> None: """Test simulate.""" - config, section = _get_config_section(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section(tmp_path, 'test-csv-to-oscal-cd.config') tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.simulate() assert retval == TaskOutcome.SIM_SUCCESS @@ -156,7 +156,7 @@ def test_simulate(tmp_path: pathlib.Path) -> None: def test_execute(tmp_path: pathlib.Path) -> None: """Test execute.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.execute() assert retval == TaskOutcome.SUCCESS @@ -173,7 +173,7 @@ def test_config_missing(tmp_path: pathlib.Path) -> None: def test_config_missing_title(tmp_path: pathlib.Path) -> None: """Test config missing title.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') section.pop('title') tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.execute() @@ -182,7 +182,7 @@ def test_config_missing_title(tmp_path: pathlib.Path) -> None: def test_config_missing_version(tmp_path: pathlib.Path) -> None: """Test config missing version.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') section.pop('version') tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.execute() @@ -191,7 +191,7 @@ def test_config_missing_version(tmp_path: pathlib.Path) -> None: def test_config_missing_csv_file_spec(tmp_path: pathlib.Path) -> None: """Test config missing csv file spec.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') section['output-dir'] = str(tmp_path) section.pop('csv-file') tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) @@ -201,7 +201,7 @@ def test_config_missing_csv_file_spec(tmp_path: pathlib.Path) -> None: def test_config_missing_csv_file(tmp_path: pathlib.Path) -> None: """Test config missing csv file.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') section['csv-file'] = 'foobar' tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.execute() @@ -211,7 +211,7 @@ def test_config_missing_csv_file(tmp_path: pathlib.Path) -> None: def test_exception(tmp_path: pathlib.Path, monkeypatch: MonkeyPatch) -> None: """Test exception.""" monkeypatch.setattr(csv_to_oscal_cd._RuleSetIdMgr, 'get_next_rule_set_id', monkey_exception) - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.execute() assert retval == TaskOutcome.FAILURE @@ -219,7 +219,7 @@ def test_exception(tmp_path: pathlib.Path, monkeypatch: MonkeyPatch) -> None: def test_execute_mock(tmp_path: pathlib.Path) -> None: """Test execute mock.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') # get good data & test that mocking works rows = _get_rows('tests/data/csv/ocp4-user.v2.csv') with mock.patch('trestle.tasks.csv_to_oscal_cd.csv.reader') as mock_csv_reader: @@ -260,7 +260,7 @@ def test_execute_validate_controls(tmp_path: pathlib.Path, monkeypatch: MonkeyPa cwd = os.getcwd() try: os.chdir(workspace) - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) section['validate-controls'] = 'warn' retval = tgt.execute() @@ -276,7 +276,7 @@ def test_execute_validate_controls_nist(tmp_path: pathlib.Path, monkeypatch: Mon cwd = os.getcwd() try: os.chdir(workspace) - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') # replace resolved profile (catalog) rows = _get_rows('tests/data/csv/ocp4-user.v2.csv') for i, row in enumerate(rows): @@ -305,7 +305,7 @@ def test_execute_control_invalid(tmp_path: pathlib.Path, monkeypatch: MonkeyPatc cwd = os.getcwd() try: os.chdir(workspace) - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') # inject error rows = _get_rows('tests/data/csv/ocp4-user.v2.csv') row = rows[2] @@ -329,7 +329,7 @@ def test_execute_control_invalid_fail(tmp_path: pathlib.Path, monkeypatch: Monke cwd = os.getcwd() try: os.chdir(workspace) - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') # inject error rows = _get_rows('tests/data/csv/ocp4-user.v2.csv') row = rows[2] @@ -348,7 +348,7 @@ def test_execute_control_invalid_fail(tmp_path: pathlib.Path, monkeypatch: Monke def test_execute_no_overwrite(tmp_path: pathlib.Path) -> None: """Test execute no overwrite.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.execute() assert retval == TaskOutcome.SUCCESS @@ -359,7 +359,7 @@ def test_execute_no_overwrite(tmp_path: pathlib.Path) -> None: def test_execute_verbose(tmp_path: pathlib.Path) -> None: """Test execute verbose.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') section['quiet'] = 'False' tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.execute() @@ -369,7 +369,7 @@ def test_execute_verbose(tmp_path: pathlib.Path) -> None: def test_execute_missing_heading(tmp_path: pathlib.Path) -> None: """Test execute missing heading.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') # inject error rows = _get_rows('tests/data/csv/ocp4-user.v2.csv') row = rows[0] @@ -384,7 +384,7 @@ def test_execute_missing_heading(tmp_path: pathlib.Path) -> None: def test_execute_missing_value(tmp_path: pathlib.Path) -> None: """Test execute missing value.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') # inject error rows = _get_rows('tests/data/csv/ocp4-user.v2.csv') row = rows[2] @@ -399,7 +399,7 @@ def test_execute_missing_value(tmp_path: pathlib.Path) -> None: def test_execute_missing_rule_id(tmp_path: pathlib.Path) -> None: """Test execute missing rule id.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') # inject error rows = _get_rows('tests/data/csv/ocp4-user.v2.csv') row = rows[2] @@ -414,7 +414,7 @@ def test_execute_missing_rule_id(tmp_path: pathlib.Path) -> None: def test_execute_missing_control_id_list(tmp_path: pathlib.Path) -> None: """Test execute missing control id list.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') # inject error rows = _get_rows('tests/data/csv/ocp4-user.v2.csv') row = rows[2] @@ -433,24 +433,9 @@ def test_execute_missing_control_id_list(tmp_path: pathlib.Path) -> None: assert len(component.props) == 423 -def test_execute_missing_parameter_id(tmp_path: pathlib.Path) -> None: - """Test execute missing parameter id.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd.config') - # inject error - rows = _get_rows('tests/data/csv/ocp4-user.v2.csv') - row = rows[2] - assert row[9] == 'scan_interval_max' - row[9] = '' - with mock.patch('trestle.tasks.csv_to_oscal_cd.csv.reader') as mock_csv_reader: - mock_csv_reader.return_value = rows - tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) - retval = tgt.execute() - assert retval == TaskOutcome.FAILURE - - def test_execute_bp_sample(tmp_path: pathlib.Path) -> None: """Test execute bp sample.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.execute() assert retval == TaskOutcome.SUCCESS @@ -459,7 +444,7 @@ def test_execute_bp_sample(tmp_path: pathlib.Path) -> None: def test_execute_bp_cd(tmp_path: pathlib.Path) -> None: """Test execute bp cd.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.execute() @@ -469,7 +454,7 @@ def test_execute_bp_cd(tmp_path: pathlib.Path) -> None: def test_execute_bp_cd_missing(tmp_path: pathlib.Path) -> None: """Test execute bp cd missing.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/foobar/component-definition.json' tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) retval = tgt.execute() @@ -478,7 +463,7 @@ def test_execute_bp_cd_missing(tmp_path: pathlib.Path) -> None: def test_execute_duplicate_rule(tmp_path: pathlib.Path) -> None: """Test execute duplicate rule.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # duplicate rule rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -492,7 +477,7 @@ def test_execute_duplicate_rule(tmp_path: pathlib.Path) -> None: def test_execute_delete_rule(tmp_path: pathlib.Path) -> None: """Test execute delete rule.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # delete rule rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -517,7 +502,7 @@ def test_execute_delete_rule(tmp_path: pathlib.Path) -> None: def test_execute_delete_all_rules_with_params(tmp_path: pathlib.Path) -> None: """Test execute delete all rules with params.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # delete all rules with params rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -543,7 +528,7 @@ def test_execute_delete_all_rules_with_params(tmp_path: pathlib.Path) -> None: def test_execute_delete_rule_with_params(tmp_path: pathlib.Path) -> None: """Test execute delete rule with params.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # delete rule with params rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -566,7 +551,7 @@ def test_execute_delete_rule_with_params(tmp_path: pathlib.Path) -> None: def test_execute_add_rule(tmp_path: pathlib.Path) -> None: """Test execute add rule.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # add rule rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -650,7 +635,7 @@ def test_execute_add_rule(tmp_path: pathlib.Path) -> None: def test_execute_missing_param_default_value(tmp_path: pathlib.Path) -> None: """Test execute missing param default_value.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') # delete default param default value rows = _get_rows('tests/data/csv/bp.sample.v2.csv') row = rows[3] @@ -666,7 +651,7 @@ def test_execute_missing_param_default_value(tmp_path: pathlib.Path) -> None: def test_execute_change_param_default_value(tmp_path: pathlib.Path) -> None: """Test execute change param default_value.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # change default param default value rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -696,7 +681,7 @@ def test_execute_change_param_default_value(tmp_path: pathlib.Path) -> None: def test_execute_delete_param(tmp_path: pathlib.Path) -> None: """Test execute delete param.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # delete param rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -726,7 +711,7 @@ def test_execute_delete_param(tmp_path: pathlib.Path) -> None: def test_execute_delete_params(tmp_path: pathlib.Path) -> None: """Test execute delete params.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # delete params rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -750,7 +735,7 @@ def test_execute_delete_params(tmp_path: pathlib.Path) -> None: def test_execute_add_param(tmp_path: pathlib.Path) -> None: """Test execute add param.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # add param rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -789,7 +774,7 @@ def test_execute_add_param(tmp_path: pathlib.Path) -> None: def test_execute_delete_all_control_id_list(tmp_path: pathlib.Path) -> None: """Test execute delete all control id list.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # delete all control lists rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -810,7 +795,7 @@ def test_execute_delete_all_control_id_list(tmp_path: pathlib.Path) -> None: def test_execute_delete_control_id(tmp_path: pathlib.Path) -> None: """Test execute delete control id.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # delete control id rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -835,7 +820,7 @@ def test_execute_delete_control_id(tmp_path: pathlib.Path) -> None: def test_execute_delete_control_id_multi(tmp_path: pathlib.Path) -> None: """Test execute delete control id multi.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # delete control id multi rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -860,7 +845,7 @@ def test_execute_delete_control_id_multi(tmp_path: pathlib.Path) -> None: def test_execute_delete_control_id_smt(tmp_path: pathlib.Path) -> None: """Test execute delete control id smt.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # delete control id smt rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -887,7 +872,7 @@ def test_execute_delete_control_id_smt(tmp_path: pathlib.Path) -> None: def test_execute_add_control_id(tmp_path: pathlib.Path) -> None: """Test execute add control id.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # add control id rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -911,7 +896,7 @@ def test_execute_add_control_id(tmp_path: pathlib.Path) -> None: def test_execute_add_control_id_smt(tmp_path: pathlib.Path) -> None: """Test execute add control mapping smt.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # add control mapping smt rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -941,7 +926,7 @@ def test_execute_add_control_id_smt(tmp_path: pathlib.Path) -> None: def test_execute_delete_property(tmp_path: pathlib.Path) -> None: """Test execute delete property.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # delete property rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -967,7 +952,7 @@ def test_execute_delete_property(tmp_path: pathlib.Path) -> None: def test_execute_add_property(tmp_path: pathlib.Path) -> None: """Test execute add property.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # add property rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -999,7 +984,7 @@ def test_execute_add_property(tmp_path: pathlib.Path) -> None: def test_execute_add_user_property(tmp_path: pathlib.Path) -> None: """Test execute add user property.""" - config, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' # add user property rows = _get_rows('tests/data/csv/bp.sample.v2.csv') @@ -1020,3 +1005,54 @@ def test_execute_add_user_property(tmp_path: pathlib.Path) -> None: assert len(component.props) == 71 assert component.props[5].name == 'New_Column_Name' assert component.props[5].value == 'new-column-value-2' + + +def test_execute_validation(tmp_path: pathlib.Path) -> None: + """Test execute validation.""" + _, section = _get_config_section_init(tmp_path, 'test-csv-to-oscal-cd-bp.config') + section['component-definition'] = 'tests/data/csv/component-definitions/bp/component-definition.json' + # set validation component type + rows = _get_rows('tests/data/csv/bp.sample.v2.csv') + row = [ + 'validation-reference-id', + 'validation-rule-id', + 'validation-rule-description', + 'validation-check-id', + 'validation-check-description', + 'validation-fetcher', + 'validation-fetcher-description', + 'https://abc.com/validation-profile-reference-url', + 'validation-profile-description', + 'Validation', + 'validation-control-id-list', + 'IAM', + 'IAM', + 'validation-parameter-id', + 'validation-parameter-description', + 'validation-parameter-value-default', + 'validation-parameter-value-alternatives', + 'https://abc.com/validation-namespace' + ] + rows.append(row) + with mock.patch('trestle.tasks.csv_to_oscal_cd.csv.reader') as mock_csv_reader: + mock_csv_reader.return_value = rows + tgt = csv_to_oscal_cd.CsvToOscalComponentDefinition(section) + retval = tgt.execute() + assert retval == TaskOutcome.SUCCESS + # read component-definition + fp = pathlib.Path(tmp_path) / 'component-definition.json' + cd = ComponentDefinition.oscal_read(fp) + # spot check + component = cd.components[1] + assert component.type == 'Validation' + assert component.title == 'IAM' + assert component.description == 'IAM' + assert len(component.props) == 6 + assert component.props[0].name == 'Rule_Id' + assert component.props[0].value == 'validation-rule-id' + assert component.props[0].class_ == 'scc_class' + assert component.props[1].name == 'Check_Id' + assert component.props[1].value == 'validation-check-id' + assert component.props[2].name == 'Check_Description' + assert component.props[2].value == 'validation-check-description' + assert len(component.control_implementations) == 0 diff --git a/trestle/tasks/csv_to_oscal_cd.py b/trestle/tasks/csv_to_oscal_cd.py index 9a6f65d89..7fdf8e47a 100644 --- a/trestle/tasks/csv_to_oscal_cd.py +++ b/trestle/tasks/csv_to_oscal_cd.py @@ -54,11 +54,16 @@ PROFILE_DESCRIPTION = 'Profile_Description' CHECK_ID = 'Check_Id' CHECK_DESCRIPTION = 'Check_Description' +FETCHER_ID = 'Fetcher_Id' +FETCHER_DESCRIPTION = 'Fetcher_Description' PARAMETER_ID = 'Parameter_Id' PARAMETER_DESCRIPTION = 'Parameter_Description' PARAMETER_VALUE_DEFAULT = 'Parameter_Value_Default' PARAMETER_VALUE_ALTERNATIVES = 'Parameter_Value_Alternatives' +validation = 'validation' +prefix_rule_set = 'rule_set_' + logger = logging.getLogger(__name__) @@ -118,11 +123,20 @@ def print_info(self) -> None: text3 = ' [1st row are column headings; 2nd row are column descriptions; 3rd row and beyond is data]' logger.info(text1 + text2 + text3) text1 = ' required columns: ' - for text2 in CsvColumn.columns_required: + for text2 in CsvColumn.get_required_column_names(): + if text2 in ['Rule_Description', 'Profile_Source', 'Profile_Description', 'Control_Id_List']: + text2 += '*' logger.info(text1 + text2) text1 = ' ' text1 = ' optional columns: ' - for text2 in CsvColumn.columns_optional: + for text2 in CsvColumn.get_optional_column_names(): + if text2 in ['Parameter_Id', + 'Parameter_Description', + 'Parameter_Value_Alternatives', + 'Parameter_Value_Default']: + text2 += '*' + if text2 in ['Check_Id', 'Check_Description']: + text2 += '+' logger.info(text1 + text2) text1 = ' ' text1 = ' output-dir = ' @@ -140,6 +154,16 @@ def print_info(self) -> None: text1 = ' validate-controls = ' text2 = '(optional) on, warn, or off [default]; validate controls exist in resolved profile.' logger.info(text1 + text2) + # Notes + text1 = '' + text2 = '' + logger.info(text1 + text2) + text1 = 'Notes: ' + text2 = '* column is ignored for validation component type' + logger.info(text1 + text2) + text1 = ' ' + text2 = '+ column is required for validation component type' + logger.info(text1 + text2) def configure(self) -> bool: """Configure.""" @@ -464,18 +488,25 @@ def rules_add(self, add_rules: List[str]) -> None: # props component.props = as_list(component.props) component.props = component.props + self._create_rule_props(rule_key) - # control implementation - source = self._csv_mgr.get_value(rule_key, PROFILE_SOURCE) - description = self._csv_mgr.get_value(rule_key, PROFILE_DESCRIPTION) - control_implementation = self._get_control_implementation(component, source, description) - # set-parameter - set_parameter = self._create_set_parameter(rule_key) - if set_parameter: - control_implementation.set_parameters = as_list(control_implementation.set_parameters) - _OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter) - # control-mappings - control_mappings = self._csv_mgr.get_value(rule_key, CONTROL_ID_LIST).split() - self._add_rule_prop(control_implementation, control_mappings, rule_key) + # additional props, when not validation component + if not self._is_validation(rule_key): + # control implementation + source = self._csv_mgr.get_value(rule_key, PROFILE_SOURCE) + description = self._csv_mgr.get_value(rule_key, PROFILE_DESCRIPTION) + control_implementation = self._get_control_implementation(component, source, description) + # set-parameter + set_parameter = self._create_set_parameter(rule_key) + if set_parameter: + control_implementation.set_parameters = as_list(control_implementation.set_parameters) + _OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter) + # control-mappings + control_mappings = self._csv_mgr.get_value(rule_key, CONTROL_ID_LIST).split() + self._add_rule_prop(control_implementation, control_mappings, rule_key) + + def _is_validation(self, rule_key: tuple) -> bool: + """Check for validation component.""" + component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE) + return component_type.lower() == validation def _add_rule_prop( self, control_implementation: ControlImplementation, control_mappings: List[str], rule_key: tuple @@ -506,26 +537,29 @@ def _create_rule_props(self, rule_key: tuple) -> List[Property]: rule_set = self._rule_set_id_mgr.get_next_rule_set_id() row_number = self._csv_mgr.get_row_number(rule_key) rule_set_mgr = _RuleSetMgr(row_number, rule_set) - column_names = CsvColumn.get_filtered_required_column_names() + CsvColumn.get_filtered_optional_column_names() namespace = self._get_namespace(rule_key) + if self._is_validation(rule_key): + column_names = CsvColumn.get_check_property_column_names() + else: + column_names = CsvColumn.get_rule_property_column_names() # req'd & optional props for column_name in column_names: prop_name = self._get_prop_name(column_name) prop_value = self._csv_mgr.get_value(rule_key, column_name).strip() rule_set_mgr.add_prop(prop_name, prop_value, namespace, self.get_class(prop_name)) - # parameter columns - column_names = CsvColumn.get_parameter_column_names() - for column_name in column_names: - prop_name = self._get_prop_name(column_name) - prop_value = self._csv_mgr.get_value(rule_key, column_name).strip() - rule_set_mgr.add_prop(prop_name, prop_value, namespace, self.get_class(prop_name)) + if not self._is_validation(rule_key): + # parameter columns + column_names = CsvColumn.get_parameter_column_names() + for column_name in column_names: + prop_name = self._get_prop_name(column_name) + prop_value = self._csv_mgr.get_value(rule_key, column_name).strip() + rule_set_mgr.add_prop(prop_name, prop_value, namespace, self.get_class(prop_name)) # user props column_names = self._csv_mgr.get_user_column_names() for column_name in column_names: prop_name = self._get_prop_name(column_name) prop_value = self._csv_mgr.get_value(rule_key, column_name).strip() rule_set_mgr.add_prop(prop_name, prop_value, namespace, self.get_class(prop_name)) - rule_set_mgr.validate() return rule_set_mgr.get_props() def _get_control_implementation( @@ -832,7 +866,7 @@ def __init__(self, max_rule_set_number: int, add_rules_count: int) -> None: def get_next_rule_set_id(self) -> str: self._prev_rule_set_number += 1 - rval = f'rule_set_{str(self._prev_rule_set_number).zfill(self._fill_sz)}' + rval = f'{prefix_rule_set}{str(self._prev_rule_set_number).zfill(self._fill_sz)}' return rval @@ -857,15 +891,6 @@ def add_prop(self, name: str, value: str, ns: str, class_: str) -> None: ) self._props[name] = prop - def validate(self) -> None: - """Validate.""" - if PARAMETER_ID not in self._props.keys(): - forbidden = CsvColumn.get_parameter_dependent_column_names() - for name in self._props.keys(): - if name in forbidden: - text = f'row "{self._row_number}" invalid "{name}"' - raise RuntimeError(text) - def get_props(self) -> List[Property]: """Get props.""" rval = [] @@ -900,7 +925,7 @@ def _initialize(self): for profile in self._profile_list: catalog = ProfileResolver.get_resolved_profile_catalog( pathlib.Path(self._root), - pathlib.Path(profile), + profile, ) self._profile_map[profile] = catalog controls = CatalogInterface.get_control_ids_from_catalog(catalog) @@ -1046,7 +1071,7 @@ def accounting_rule_definitions(self, component: DefinedComponent) -> None: value = prop.remarks self._cd_rules_map[key] = value logger.debug(f'cd: {key} {self._cd_rules_map[key]}') - rule_set_number = int(value.replace('rule_set_', '')) + rule_set_number = int(value.replace(f'{prefix_rule_set}', '')) if rule_set_number > self._max_rule_set_number: self._max_rule_set_number = rule_set_number @@ -1207,7 +1232,7 @@ def delete_property(self, component: DefinedComponent, rule_set: str, name: str) class CsvColumn(): """CsvColumn.""" - columns_required = [ + _columns_required = [ f'{COMPONENT_TITLE}', f'{COMPONENT_DESCRIPTION}', f'{COMPONENT_TYPE}', @@ -1219,18 +1244,7 @@ class CsvColumn(): f'{NAMESPACE}', ] - # columns required which do not become properties - columns_required_filtered = [ - f'{COMPONENT_TITLE}', - f'{COMPONENT_DESCRIPTION}', - f'{COMPONENT_TYPE}', - f'{PROFILE_SOURCE}', - f'{PROFILE_DESCRIPTION}', - f'{CONTROL_ID_LIST}', - f'{NAMESPACE}', - ] - - columns_optional = [ + _columns_optional = [ f'{PARAMETER_ID}', f'{PARAMETER_DESCRIPTION}', f'{PARAMETER_VALUE_ALTERNATIVES}', @@ -1239,35 +1253,21 @@ class CsvColumn(): f'{CHECK_DESCRIPTION}', ] - # optional columns which do not become properties, initially - columns_optional_filtered = [ - f'{PARAMETER_ID}', - f'{PARAMETER_DESCRIPTION}', - f'{PARAMETER_VALUE_ALTERNATIVES}', - f'{PARAMETER_VALUE_DEFAULT}', - ] - - # optional columns which do become properties, afterwards - columns_parameters = [ - f'{PARAMETER_ID}', - f'{PARAMETER_DESCRIPTION}', - f'{PARAMETER_VALUE_ALTERNATIVES}', - ] - - # optional columns which require Param_Id be present in the row - columns_parameters_dependent = [ - f'{PARAMETER_DESCRIPTION}', - f'{PARAMETER_VALUE_ALTERNATIVES}', - f'{PARAMETER_VALUE_DEFAULT}', + _columns_required_validation = [ + f'{COMPONENT_TITLE}', + f'{COMPONENT_DESCRIPTION}', + f'{COMPONENT_TYPE}', + f'{RULE_ID}', + f'{NAMESPACE}', + f'{CHECK_ID}', + f'{CHECK_DESCRIPTION}', ] - columns_filtered = columns_required_filtered + columns_optional_filtered - @staticmethod def get_order(column_name: str) -> int: """Get order for column_name.""" rval = sys.maxsize - columns_ordered = CsvColumn.columns_required + CsvColumn.columns_optional + columns_ordered = CsvColumn._columns_required + CsvColumn._columns_optional if column_name in columns_ordered: rval = columns_ordered.index(column_name) return rval @@ -1276,23 +1276,74 @@ def get_order(column_name: str) -> int: def get_required_column_names() -> List[str]: """Get required column names.""" rval = [] - rval += CsvColumn.columns_required + rval += CsvColumn._columns_required return rval @staticmethod - def get_filtered_required_column_names() -> List[str]: - """Get filtered required column names.""" + def get_optional_column_names() -> List[str]: + """Get optional column names.""" rval = [] - for column_name in CsvColumn.get_required_column_names(): - if column_name not in CsvColumn.columns_filtered: - rval.append(column_name) + rval += CsvColumn._columns_optional return rval @staticmethod - def get_optional_column_names() -> List[str]: - """Get optional column names.""" + def get_reserved_column_names() -> List[str]: + """Get reserved column names.""" + rval = [] + rval += CsvColumn._columns_required + rval += CsvColumn._columns_optional + return rval + + @staticmethod + def get_required_column_names_validation() -> List[str]: + """Get required column names validation.""" + rval = [] + rval += CsvColumn._columns_required_validation + return rval + + _rule_property_column_names = [ + f'{RULE_ID}', + f'{RULE_DESCRIPTION}', + f'{PARAMETER_ID}', + f'{PARAMETER_DESCRIPTION}', + f'{PARAMETER_VALUE_ALTERNATIVES}', + f'{CHECK_ID}', + f'{CHECK_DESCRIPTION}', + ] + + @staticmethod + def get_rule_property_column_names() -> List[str]: + """Get rule property column names.""" + return CsvColumn._rule_property_column_names + + # columns required which do not become properties + _columns_required_filtered = [ + f'{COMPONENT_TITLE}', + f'{COMPONENT_DESCRIPTION}', + f'{COMPONENT_TYPE}', + f'{PROFILE_SOURCE}', + f'{PROFILE_DESCRIPTION}', + f'{CONTROL_ID_LIST}', + f'{NAMESPACE}', + ] + + # optional columns which do not become properties, initially + _columns_optional_filtered = [ + f'{PARAMETER_ID}', + f'{PARAMETER_DESCRIPTION}', + f'{PARAMETER_VALUE_ALTERNATIVES}', + f'{PARAMETER_VALUE_DEFAULT}', + ] + + _columns_filtered = _columns_required_filtered + _columns_optional_filtered + + @staticmethod + def get_filtered_required_column_names() -> List[str]: + """Get filtered required column names.""" rval = [] - rval += CsvColumn.columns_optional + for column_name in CsvColumn.get_required_column_names(): + if column_name not in CsvColumn._columns_filtered: + rval.append(column_name) return rval @staticmethod @@ -1300,31 +1351,41 @@ def get_filtered_optional_column_names() -> List[str]: """Get filtered optional column names.""" rval = [] for column_name in CsvColumn.get_optional_column_names(): - if column_name not in CsvColumn.columns_filtered: + if column_name not in CsvColumn._columns_filtered: rval.append(column_name) return rval + _check_property_column_names = [ + f'{RULE_ID}', + f'{CHECK_ID}', + f'{CHECK_DESCRIPTION}', + ] + @staticmethod - def get_reserved_column_names() -> List[str]: - """Get reserved column names.""" - rval = [] - rval += CsvColumn.columns_required - rval += CsvColumn.columns_optional - return rval + def get_check_property_column_names() -> List[str]: + """Get check property column names.""" + return CsvColumn._check_property_column_names + + # optional columns which do become properties, afterwards + _columns_parameters = [ + f'{PARAMETER_ID}', + f'{PARAMETER_DESCRIPTION}', + f'{PARAMETER_VALUE_ALTERNATIVES}', + ] @staticmethod def get_parameter_column_names() -> List[str]: """Get parameter column names.""" rval = [] - rval += CsvColumn.columns_parameters + rval += CsvColumn._columns_parameters return rval - @staticmethod - def get_parameter_dependent_column_names() -> List[str]: - """Get parameter dependent column names.""" - rval = [] - rval += CsvColumn.columns_parameters_dependent - return rval + # optional columns which require Param_Id be present in the row + _columns_parameters_dependent = [ + f'{PARAMETER_DESCRIPTION}', + f'{PARAMETER_VALUE_ALTERNATIVES}', + f'{PARAMETER_VALUE_DEFAULT}', + ] class _CsvMgr(): @@ -1344,6 +1405,8 @@ def __init__(self, csv_path: pathlib.Path) -> None: self._csv_controls_map = {} self._csv_profile_list = [] for row_num, row in self.row_generator(): + if self._is_no_control(row): + continue self._check_row_minimum_requirements(row_num, row) component_title = self.get_row_value(row, f'{COMPONENT_TITLE}') component_type = self.get_row_value(row, f'{COMPONENT_TYPE}') @@ -1393,20 +1456,42 @@ def row_generator(self) -> Generator[Union[int, Iterator[List[str]]], None, None index += 1 if index < 3: continue - control_mappings = self.get_row_value(row, CONTROL_ID_LIST).strip() - if not len(control_mappings): - continue logger.debug(f'row_gen: {index} {row}') yield index, row def _check_row_minimum_requirements(self, row_num: int, row: List) -> None: """Check row minimum requirements.""" - for column_name in CsvColumn.get_required_column_names(): + if self._is_component_type_validation(row): + column_names = CsvColumn.get_required_column_names_validation() + else: + column_names = CsvColumn.get_required_column_names() + for column_name in column_names: value = self.get_row_value(row, column_name) if value is None or value == '': text = f'row "{row_num}" missing value for "{column_name}"' raise RuntimeError(text) + def _is_no_control(self, row: List) -> bool: + """Check for no control.""" + if self._is_component_type_validation(row): + rval = False + else: + control_id_list = self.get_row_value(row, f'{CONTROL_ID_LIST}') + if control_id_list.strip() == '': + rval = True + else: + rval = False + return rval + + def _is_component_type_validation(self, row: List) -> bool: + """Check for component type validation.""" + component_type = self.get_row_value(row, f'{COMPONENT_TYPE}') + if component_type.lower().strip() == validation: + rval = True + else: + rval = False + return rval + def _undecorate_header(self) -> None: """Undecorate header.""" head_row = self._csv[0]