diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/data_assets/organize_batches_in_pandas_filesystem_datasource.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/data_assets/organize_batches_in_pandas_filesystem_datasource.py index 1767f5b13ad0..382d850047b3 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/data_assets/organize_batches_in_pandas_filesystem_datasource.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/data_assets/organize_batches_in_pandas_filesystem_datasource.py @@ -46,23 +46,20 @@ # Python # my_batch_request = my_asset.build_batch_request() -batches = my_asset.get_batch_list_from_batch_request(my_batch_request) +batch = my_asset.get_batch(my_batch_request) # assert my_batch_request.datasource_name == "my_datasource" assert my_batch_request.data_asset_name == "my_taxi_data_asset" assert my_batch_request.options == {} -assert len(batches) == 3 -for batch in batches: - batch_spec = batch.batch_spec - assert batch_spec.reader_method == "read_csv" - assert batch_spec.reader_options == {} - assert batch.data.dataframe.shape == (10000, 18) +batch_spec = batch.batch_spec +assert batch_spec.reader_method == "read_csv" +assert batch_spec.reader_options == {} +assert batch.data.dataframe.shape == (10000, 18) # Python # -for batch in batches: - print(batch.batch_spec) +print(batch.batch_spec) # diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_postgresql_data.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_postgresql_data.py index 582de0771f65..83934e881a51 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_postgresql_data.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_postgresql_data.py @@ -98,9 +98,8 @@ assert asset my_batch_request = asset.build_batch_request() - batches = asset.get_batch_list_from_batch_request(my_batch_request) - assert len(batches) == 1 - assert set(batches[0].columns()) == { + batch = asset.get_batch(my_batch_request) + assert set(batch.columns()) == { "vendor_id", "pickup_datetime", "dropoff_datetime", diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_sql_data.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_sql_data.py index 00db8e52aad4..67d7d27ca98c 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_sql_data.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_sql_data.py @@ -59,9 +59,8 @@ assert my_asset my_batch_request = my_asset.build_batch_request() -batches = my_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == { +batch = my_asset.get_batch(my_batch_request) +assert set(batch.columns()) == { "vendor_id", "pickup_datetime", "dropoff_datetime", diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_sql_data_using_a_query.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_sql_data_using_a_query.py index 3cc7373a3c06..97949ab9ce52 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_sql_data_using_a_query.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/database/how_to_connect_to_sql_data_using_a_query.py @@ -56,9 +56,8 @@ assert my_asset my_batch_request = my_asset.build_batch_request() -batches = my_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == { +batch = my_asset.get_batch(my_batch_request) +assert set(batch.columns()) == { "passenger_count", "total_amount", } diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_azure_blob_storage_using_spark.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_azure_blob_storage_using_spark.py index 8569c32e0927..1afbbe0143c1 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_azure_blob_storage_using_spark.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_azure_blob_storage_using_spark.py @@ -54,9 +54,8 @@ assert datasource.get_asset_names() == {"my_taxi_data_asset"} my_batch_request = data_asset.build_batch_request({"year": "2019", "month": "03"}) -batches = data_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == { +batch = data_asset.get_batch(my_batch_request) +assert set(batch.columns()) == { "vendor_id", "pickup_datetime", "dropoff_datetime", diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_gcs_using_pandas.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_gcs_using_pandas.py index ed1e34dd8359..77a0537324fb 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_gcs_using_pandas.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_gcs_using_pandas.py @@ -45,9 +45,8 @@ my_batch_request = my_batch_definition.build_batch_request( {"year": "2019", "month": "03"} ) -batches = data_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == { +batch = data_asset.get_batch(my_batch_request) +assert set(batch.columns()) == { "vendor_id", "pickup_datetime", "dropoff_datetime", diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_gcs_using_spark.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_gcs_using_spark.py index 0504919c8e5a..200324a081f1 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_gcs_using_spark.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_gcs_using_spark.py @@ -46,9 +46,8 @@ assert datasource.get_asset_names() == {"my_taxi_data_asset"} my_batch_request = data_asset.build_batch_request({"year": "2019", "month": "03"}) -batches = data_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == { +batch = data_asset.get_batch(my_batch_request) +assert set(batch.columns()) == { "vendor_id", "pickup_datetime", "dropoff_datetime", diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_s3_using_spark.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_s3_using_spark.py index 06d614138b1f..473f4c0c3bb1 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_s3_using_spark.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_data_on_s3_using_spark.py @@ -46,9 +46,8 @@ assert datasource.get_asset_names() == {"my_taxi_data_asset"} my_batch_request = data_asset.build_batch_request({"year": "2019", "month": "03"}) -batches = data_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == { +batch = data_asset.get_batch(my_batch_request) +assert set(batch.columns()) == { "vendor_id", "pickup_datetime", "dropoff_datetime", diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_one_or_more_files_using_pandas.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_one_or_more_files_using_pandas.py index 36a19edb7390..6b9f90b9fe4e 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_one_or_more_files_using_pandas.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_one_or_more_files_using_pandas.py @@ -54,9 +54,8 @@ assert my_asset my_batch_request = my_asset.build_batch_request({"year": "2019", "month": "03"}) -batches = my_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == { +batch = my_asset.get_batch(my_batch_request) +assert set(batch.columns()) == { "vendor_id", "pickup_datetime", "dropoff_datetime", diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_one_or_more_files_using_spark.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_one_or_more_files_using_spark.py index 89a948608c78..7f810e52f16a 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_one_or_more_files_using_spark.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/filesystem/how_to_connect_to_one_or_more_files_using_spark.py @@ -59,9 +59,8 @@ my_batch_request = my_batch_definition.build_batch_request( batch_parameters={"year": "2019", "month": "03"} ) -batches = my_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == { +batch = my_asset.get_batch(my_batch_request) +assert set(batch.columns()) == { "vendor_id", "pickup_datetime", "dropoff_datetime", diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/in_memory/how_to_connect_to_in_memory_data_using_pandas.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/in_memory/how_to_connect_to_in_memory_data_using_pandas.py index 77cd6e58c475..7d61e2cf5a19 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/in_memory/how_to_connect_to_in_memory_data_using_pandas.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/in_memory/how_to_connect_to_in_memory_data_using_pandas.py @@ -43,9 +43,8 @@ assert my_batch_request.data_asset_name == "taxi_dataframe" assert my_batch_request.options == {} -batches = data_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == { +batch = data_asset.get_batch(my_batch_request) +assert set(batch.columns()) == { "VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime", diff --git a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/in_memory/how_to_connect_to_in_memory_data_using_spark.py b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/in_memory/how_to_connect_to_in_memory_data_using_spark.py index 730d3c6f083d..fae3ade9b16c 100644 --- a/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/in_memory/how_to_connect_to_in_memory_data_using_spark.py +++ b/docs/docusaurus/docs/oss/guides/connecting_to_your_data/fluent/in_memory/how_to_connect_to_in_memory_data_using_spark.py @@ -63,6 +63,5 @@ assert my_batch_request.data_asset_name == "my_df_asset" assert my_batch_request.options == {} -batches = data_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == {"a", "b", "c"} +batch = data_asset.get_batch(my_batch_request) +assert set(batch.columns()) == {"a", "b", "c"} diff --git a/docs/docusaurus/docs/oss/guides/validation/checkpoints/how_to_validate_multiple_batches_within_single_checkpoint.py b/docs/docusaurus/docs/oss/guides/validation/checkpoints/how_to_validate_multiple_batches_within_single_checkpoint.py deleted file mode 100644 index ed54cbf42b97..000000000000 --- a/docs/docusaurus/docs/oss/guides/validation/checkpoints/how_to_validate_multiple_batches_within_single_checkpoint.py +++ /dev/null @@ -1,53 +0,0 @@ -import great_expectations as gx -from great_expectations.core.expectation_suite import ExpectationSuite -from great_expectations.expectations.expectation_configuration import ( - ExpectationConfiguration, -) - -context = gx.get_context() - -datasource = context.data_sources.add_pandas_filesystem( - name="example_datasource", base_directory="./data" -) - -MY_BATCHING_REGEX = r"yellow_tripdata_sample_(?P\d{4})-(?P\d{2})\.csv" - -asset = datasource.add_csv_asset("asset", batching_regex=MY_BATCHING_REGEX) - -ec = ExpectationConfiguration( - type="expect_column_values_to_not_be_null", - kwargs={"column": "passenger_count"}, -) -suite = context.suites.add(ExpectationSuite(name="example_suite", expectations=[ec])) - -# -batch_request = asset.build_batch_request() -# - -# -batch_list = asset.get_batch_list_from_batch_request(batch_request) -# - -# -batch_request_list = [batch.batch_request for batch in batch_list] -# - -# -validations = [ - {"batch_request": batch.batch_request, "expectation_suite_name": "example_suite"} - for batch in batch_list -] -# - -# -checkpoint = context.add_or_update_checkpoint( - name="my_taxi_validator_checkpoint", validations=validations -) - -checkpoint_result = checkpoint.run() -# - -# -context.build_data_docs() -context.open_data_docs() -# diff --git a/docs/docusaurus/docs/snippets/aws_cloud_storage_spark.py b/docs/docusaurus/docs/snippets/aws_cloud_storage_spark.py index dec7c72d1fd4..642dea4806be 100644 --- a/docs/docusaurus/docs/snippets/aws_cloud_storage_spark.py +++ b/docs/docusaurus/docs/snippets/aws_cloud_storage_spark.py @@ -247,7 +247,7 @@ # -batches = asset.get_batch_list_from_batch_request(request) +batch = asset.get_batch(request) # config = context.fluent_datasources["s3_datasource"].yaml() diff --git a/docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py b/docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py index 721d97021652..efb3c48df731 100644 --- a/docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py +++ b/docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py @@ -72,19 +72,15 @@ # Python # -batches = my_asset.get_batch_list_from_batch_request(my_batch_request) +batch = my_asset.get_batch(my_batch_request) # -assert len(batches) == 3 - -for batch in batches: - batch_spec = batch.batch_spec - assert batch_spec.reader_method == "read_csv" - assert batch_spec.reader_options == {} - assert batch.data.dataframe.shape == (10000, 18) +batch_spec = batch.batch_spec +assert batch_spec.reader_method == "read_csv" +assert batch_spec.reader_options == {} +assert batch.data.dataframe.shape == (10000, 18) # Python # -for batch in batches: - print(batch.batch_spec) +print(batch.batch_spec) # diff --git a/docs/docusaurus/docs/snippets/how_to_connect_to_a_sql_table.py b/docs/docusaurus/docs/snippets/how_to_connect_to_a_sql_table.py index 64d4f1db174e..ce27f23493a1 100644 --- a/docs/docusaurus/docs/snippets/how_to_connect_to_a_sql_table.py +++ b/docs/docusaurus/docs/snippets/how_to_connect_to_a_sql_table.py @@ -54,9 +54,8 @@ assert my_asset my_batch_request = my_asset.build_batch_request() -batches = my_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert set(batches[0].columns()) == { +batch = my_asset.get_batch(my_batch_request) +assert set(batch.columns()) == { "vendor_id", "pickup_datetime", "dropoff_datetime", @@ -88,4 +87,4 @@ ) # -batches = my_asset.get_batch_list_from_batch_request(my_batch_request) +batch = my_asset.get_batch(my_batch_request) diff --git a/docs/docusaurus/docs/snippets/organize_batches_in_sqlite_datasource.py b/docs/docusaurus/docs/snippets/organize_batches_in_sqlite_datasource.py index 2d209fc2d678..65616655fcc0 100644 --- a/docs/docusaurus/docs/snippets/organize_batches_in_sqlite_datasource.py +++ b/docs/docusaurus/docs/snippets/organize_batches_in_sqlite_datasource.py @@ -43,9 +43,8 @@ # my_batch_request = my_table_asset.build_batch_request(partitioner=partitioner) -batches = my_table_asset.get_batch_list_from_batch_request(my_batch_request) +batch = my_table_asset.get_batch(my_batch_request) -assert len(batches) == 12 assert my_table_asset.get_batch_parameters_keys(partitioner=partitioner) == ( "year", @@ -55,17 +54,14 @@ # Python # my_batch_request = my_table_asset.build_batch_request(partitioner=partitioner) -batches = my_table_asset.get_batch_list_from_batch_request(my_batch_request) +batch = my_table_asset.get_batch(my_batch_request) # assert my_batch_request.datasource_name == "my_datasource" assert my_batch_request.data_asset_name == "my_table_asset" assert my_batch_request.options == {} -assert len(batches) == 12 - # Python # -for batch in batches: - print(batch.batch_spec) +print(batch.batch_spec) # diff --git a/docs/sphinx_api_docs_source/public_api_missing_threshold.py b/docs/sphinx_api_docs_source/public_api_missing_threshold.py index 127781a227a9..0d8580ad6b84 100644 --- a/docs/sphinx_api_docs_source/public_api_missing_threshold.py +++ b/docs/sphinx_api_docs_source/public_api_missing_threshold.py @@ -45,20 +45,14 @@ "File: great_expectations/datasource/fluent/config.py Name: pop", "File: great_expectations/datasource/fluent/config.py Name: yaml", "File: great_expectations/datasource/fluent/config_str.py Name: ConfigStr", - "File: great_expectations/datasource/fluent/file_path_data_asset.py Name: get_batch_list_from_batch_request", "File: great_expectations/datasource/fluent/fluent_base_model.py Name: dict", "File: great_expectations/datasource/fluent/fluent_base_model.py Name: yaml", "File: great_expectations/datasource/fluent/invalid_datasource.py Name: build_batch_request", "File: great_expectations/datasource/fluent/invalid_datasource.py Name: get_asset", - "File: great_expectations/datasource/fluent/invalid_datasource.py Name: get_batch_list_from_batch_request", "File: great_expectations/datasource/fluent/invalid_datasource.py Name: get_batch_parameters_keys", "File: great_expectations/datasource/fluent/pandas_datasource.py Name: dict", - "File: great_expectations/datasource/fluent/pandas_datasource.py Name: get_batch_list_from_batch_request", "File: great_expectations/datasource/fluent/sources.py Name: add_datasource", "File: great_expectations/datasource/fluent/sources.py Name: delete_datasource", - "File: great_expectations/datasource/fluent/spark_datasource.py Name: get_batch_list_from_batch_request", - "File: great_expectations/datasource/fluent/sql_datasource.py Name: get_batch_list_from_batch_request", - "File: great_expectations/datasource/new_datasource.py Name: get_batch_list_from_batch_request", "File: great_expectations/exceptions/exceptions.py Name: InvalidExpectationConfigurationError", "File: great_expectations/expectations/expectation.py Name: validate_configuration", "File: great_expectations/expectations/expectation_configuration.py Name: to_domain_obj", @@ -68,7 +62,6 @@ "File: great_expectations/expectations/set_based_column_map_expectation.py Name: register_metric", "File: great_expectations/expectations/set_based_column_map_expectation.py Name: validate_configuration", "File: great_expectations/experimental/datasource/fabric.py Name: build_batch_request", - "File: great_expectations/experimental/datasource/fabric.py Name: get_batch_list_from_batch_request", "File: great_expectations/experimental/metric_repository/metric_retriever.py Name: get_validator", "File: great_expectations/experimental/rule_based_profiler/helpers/util.py Name: build_batch_request", "File: great_expectations/experimental/rule_based_profiler/rule_based_profiler.py Name: run", diff --git a/great_expectations/core/batch_definition.py b/great_expectations/core/batch_definition.py index d731f3a8cb3a..ea67a6d0ed3f 100644 --- a/great_expectations/core/batch_definition.py +++ b/great_expectations/core/batch_definition.py @@ -73,14 +73,8 @@ def get_batch(self, batch_parameters: Optional[BatchParameters] = None) -> Batch Returns: A Batch of data. """ - batch_list = self.data_asset.get_batch_list_from_batch_request( - self.build_batch_request(batch_parameters=batch_parameters) - ) - - if len(batch_list) == 0: - raise ValueError("No batch found") # noqa: TRY003 - - return batch_list[-1] + batch_request = self.build_batch_request(batch_parameters=batch_parameters) + return self.data_asset.get_batch(batch_request) def is_added(self) -> BatchDefinitionAddedDiagnostics: return BatchDefinitionAddedDiagnostics( diff --git a/great_expectations/data_context/data_context/abstract_data_context.py b/great_expectations/data_context/data_context/abstract_data_context.py index da307c5fce6a..849c076c0182 100644 --- a/great_expectations/data_context/data_context/abstract_data_context.py +++ b/great_expectations/data_context/data_context/abstract_data_context.py @@ -1183,8 +1183,8 @@ def _get_batch_list_from_inputs( # noqa: PLR0913 # sum check above while here we do a truthy check. batch_request_list = [batch_request] # type: ignore[list-item] for batch_req in batch_request_list: - computed_batch_list.extend( - self.get_batch_list( + computed_batch_list.append( + self.get_last_batch( datasource_name=datasource_name, data_connector_name=data_connector_name, data_asset_name=data_asset_name, @@ -1304,7 +1304,7 @@ def get_validator_using_batch_list( return validator - def get_batch_list( # noqa: PLR0913 + def get_last_batch( # noqa: PLR0913 self, datasource_name: Optional[str] = None, data_connector_name: Optional[str] = None, @@ -1327,7 +1327,7 @@ def get_batch_list( # noqa: PLR0913 batch_spec_passthrough: Optional[dict] = None, batch_parameters: Optional[Union[dict, BatchParameters]] = None, **kwargs: Optional[dict], - ) -> List[Batch]: + ) -> Batch: """Get the list of zero or more batches, based on a variety of flexible input types. `get_batch_list` is the main user-facing API for getting batches. @@ -1377,7 +1377,7 @@ def get_batch_list( # noqa: PLR0913 of `batch_data`, `query` or `path`) """ # noqa: E501 - return self._get_batch_list( + return self._get_last_batch( datasource_name=datasource_name, data_connector_name=data_connector_name, data_asset_name=data_asset_name, @@ -1401,7 +1401,7 @@ def get_batch_list( # noqa: PLR0913 **kwargs, ) - def _get_batch_list( # noqa: PLR0913 + def _get_last_batch( # noqa: PLR0913 self, datasource_name: Optional[str] = None, data_connector_name: Optional[str] = None, @@ -1424,7 +1424,7 @@ def _get_batch_list( # noqa: PLR0913 batch_spec_passthrough: Optional[dict] = None, batch_parameters: Optional[Union[dict, BatchParameters]] = None, **kwargs: Optional[dict], - ) -> List[Batch]: + ) -> Batch: result = get_batch_request_from_acceptable_arguments( datasource_name=datasource_name, data_connector_name=data_connector_name, @@ -1458,7 +1458,7 @@ def _get_batch_list( # noqa: PLR0913 "please confirm that your configuration is accurate.", ) - return datasource.get_batch_list_from_batch_request(batch_request=result) + return datasource.get_batch(batch_request=result) def _validate_datasource_names(self, datasource_names: list[str] | str | None) -> list[str]: if datasource_names is None: diff --git a/great_expectations/datasource/fluent/data_asset/path/file_asset.py b/great_expectations/datasource/fluent/data_asset/path/file_asset.py index 765016f8584f..4d493ed227f9 100644 --- a/great_expectations/datasource/fluent/data_asset/path/file_asset.py +++ b/great_expectations/datasource/fluent/data_asset/path/file_asset.py @@ -234,8 +234,8 @@ def build_batch_request( partitioner: A Partitioner used to narrow the data returned from the asset. Returns: - A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the - get_batch_list_from_batch_request method. + A BatchRequest object that can be used to obtain a batch from an Asset by calling the + get_batch method. Note: Option "batch_slice" is supported for all "DataAsset" extensions of this class identically. This mechanism diff --git a/great_expectations/datasource/fluent/data_asset/path/path_data_asset.py b/great_expectations/datasource/fluent/data_asset/path/path_data_asset.py index cc3381a15609..70856142f023 100644 --- a/great_expectations/datasource/fluent/data_asset/path/path_data_asset.py +++ b/great_expectations/datasource/fluent/data_asset/path/path_data_asset.py @@ -31,6 +31,7 @@ PartitionerSortingProtocol, TestConnectionError, ) +from great_expectations.exceptions.exceptions import NoAvailableBatchesError if TYPE_CHECKING: from great_expectations.alias_types import PathStr @@ -116,55 +117,62 @@ def _validate_batch_request(self, batch_request: BatchRequest) -> None: ) @override - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> List[Batch]: - """A list of batches that match the BatchRequest. + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + batch_definition_list = self._get_batch_definition_list(batch_request) + batch_identifiers_list: List[dict] = [ + batch_definition_list.batch_identifiers + for batch_definition_list in batch_definition_list + ] + if sortable_partitioner := self._get_sortable_partitioner(batch_request.partitioner): + batch_identifiers_list = self.sort_batch_identifiers_list( + batch_identifiers_list, sortable_partitioner + ) - Args: - batch_request: A batch request for this asset. Usually obtained by calling - build_batch_request on the asset. + return batch_identifiers_list - Returns: - A list of batches that match the options specified in the batch request. - """ + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: + """Get a batch from the data asset using a batch request.""" self._validate_batch_request(batch_request) execution_engine: PandasExecutionEngine | SparkDFExecutionEngine = ( self.datasource.get_execution_engine() ) - batch_definition_list = self._get_batch_definition_list(batch_request) - - batch_list: List[Batch] = [] + batch_definitions = self._get_batch_definition_list(batch_request) + if not batch_definitions: + raise NoAvailableBatchesError() - for batch_definition in batch_definition_list: - batch_spec = self._data_connector.build_batch_spec(batch_definition=batch_definition) - batch_spec_options = self._batch_spec_options_from_batch_request(batch_request) - batch_spec.update(batch_spec_options) - - data, markers = execution_engine.get_batch_data_and_markers(batch_spec=batch_spec) - - fully_specified_batch_request = copy.deepcopy(batch_request) - fully_specified_batch_request.options.update(batch_definition.batch_identifiers) - batch_metadata = self._get_batch_metadata_from_batch_request( - batch_request=fully_specified_batch_request + # Pick the last, which most likely corresponds to the most recent, batch in the list + if sortable_partitioner := self._get_sortable_partitioner(batch_request.partitioner): + batch_definitions = self.sort_legacy_batch_definitions( + batch_definitions, + sortable_partitioner, ) + batch_definition = batch_definitions[-1] - batch = Batch( - datasource=self.datasource, - data_asset=self, - batch_request=fully_specified_batch_request, - data=data, - metadata=batch_metadata, - batch_markers=markers, - batch_spec=batch_spec, - batch_definition=batch_definition, - ) - batch_list.append(batch) + batch_spec = self._data_connector.build_batch_spec(batch_definition=batch_definition) + batch_spec_options = self._batch_spec_options_from_batch_request(batch_request) + batch_spec.update(batch_spec_options) - if sortable_partitioner := self._get_sortable_partitioner(batch_request.partitioner): - self.sort_batches(batch_list, sortable_partitioner) + data, markers = execution_engine.get_batch_data_and_markers(batch_spec=batch_spec) + + fully_specified_batch_request = copy.deepcopy(batch_request) + fully_specified_batch_request.options.update(batch_definition.batch_identifiers) + batch_metadata = self._get_batch_metadata_from_batch_request( + batch_request=fully_specified_batch_request + ) - return batch_list + return Batch( + datasource=self.datasource, + data_asset=self, + batch_request=fully_specified_batch_request, + data=data, + metadata=batch_metadata, + batch_markers=markers, + batch_spec=batch_spec, + batch_definition=batch_definition, + ) def _get_batch_definition_list( self, batch_request: BatchRequest diff --git a/great_expectations/datasource/fluent/data_connector/file_path_data_connector.py b/great_expectations/datasource/fluent/data_connector/file_path_data_connector.py index 7468edec55a8..b1228db4b701 100644 --- a/great_expectations/datasource/fluent/data_connector/file_path_data_connector.py +++ b/great_expectations/datasource/fluent/data_connector/file_path_data_connector.py @@ -93,7 +93,7 @@ def get_batch_definition_list(self, batch_request: BatchRequest) -> List[LegacyB A list of BatchDefinition objects that match BatchRequest """ # noqa: E501 - batch_definition_list: List[LegacyBatchDefinition] = ( + legacy_batch_definition_list: List[LegacyBatchDefinition] = ( self._get_unfiltered_batch_definition_list(batch_request=batch_request) ) @@ -114,11 +114,11 @@ def get_batch_definition_list(self, batch_request: BatchRequest) -> List[LegacyB batch_filter_obj: BatchFilter = build_batch_filter( data_connector_query_dict=data_connector_query_dict # type: ignore[arg-type] ) - batch_definition_list = batch_filter_obj.select_from_data_connector_query( - batch_definition_list=batch_definition_list + legacy_batch_definition_list = batch_filter_obj.select_from_data_connector_query( + batch_definition_list=legacy_batch_definition_list ) - return batch_definition_list + return legacy_batch_definition_list @override def build_batch_spec(self, batch_definition: LegacyBatchDefinition) -> PathBatchSpec: diff --git a/great_expectations/datasource/fluent/interfaces.py b/great_expectations/datasource/fluent/interfaces.py index ab9d60bf16ae..5335f7fb6ff9 100644 --- a/great_expectations/datasource/fluent/interfaces.py +++ b/great_expectations/datasource/fluent/interfaces.py @@ -6,6 +6,7 @@ import logging import uuid import warnings +from abc import ABC, abstractmethod from pprint import pformat as pf from typing import ( TYPE_CHECKING, @@ -99,6 +100,8 @@ Validator as V1Validator, ) +_T = TypeVar("_T") + class PartitionerSortingProtocol(Protocol): """Interface defining the fields a Partitioner must contain for sorting.""" @@ -283,7 +286,7 @@ def _sorter_from_str(sort_key: str) -> Sorter: DatasourceT = TypeVar("DatasourceT", bound="Datasource") -class DataAsset(GenericBaseModel, Generic[DatasourceT, PartitionerT]): +class DataAsset(GenericBaseModel, Generic[DatasourceT, PartitionerT], ABC): # To subclass a DataAsset one must define `type` as a Class literal explicitly on the sublass # as well as implementing the methods in the `Abstract Methods` section below. # Some examples: @@ -342,15 +345,18 @@ def build_batch_request( partitioner: A Partitioner used to narrow the data returned from the asset. Returns: - A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the - get_batch_list_from_batch_request method. + A BatchRequest object that can be used to obtain a batch from an asset by calling the + get_batch method. """ # noqa: E501 raise NotImplementedError( """One must implement "build_batch_request" on a DataAsset subclass.""" ) - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> List[Batch]: - raise NotImplementedError + @abstractmethod + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: ... + + @abstractmethod + def get_batch(self, batch_request: BatchRequest) -> Batch: ... def _validate_batch_request(self, batch_request: BatchRequest) -> None: """Validates the batch_request has the correct form. @@ -511,53 +517,86 @@ def _order_by_validator( def sort_batches( self, batch_list: List[Batch], partitioner: PartitionerSortingProtocol - ) -> None: + ) -> List[Batch]: """Sorts batch_list in place in the order configured in this DataAsset. Args: batch_list: The list of batches to sort in place. partitioner: Configuration used to determine sort. """ + + def get_value(key: str) -> Callable[[Batch], Any]: + return lambda bd: bd.metadata[key] + + return self._sort_batch_data_list(batch_list, partitioner, get_value) + + def sort_legacy_batch_definitions( + self, + legacy_batch_definition_list: List[LegacyBatchDefinition], + partitioner: PartitionerSortingProtocol, + ) -> List[LegacyBatchDefinition]: + """Sorts batch_definition_list in the order configured by the partitioner.""" + + def get_value(key: str) -> Callable[[LegacyBatchDefinition], Any]: + return lambda bd: bd.batch_identifiers[key] + + return self._sort_batch_data_list(legacy_batch_definition_list, partitioner, get_value) + + def sort_batch_identifiers_list( + self, batch_identfiers_list: List[dict], partitioner: PartitionerSortingProtocol + ) -> List[dict]: + """Sorts batch_identfiers_list in the order configured by the partitioner.""" + + def get_value(key: str) -> Callable[[dict], Any]: + return lambda d: d[key] + + return self._sort_batch_data_list(batch_identfiers_list, partitioner, get_value) + + def _sort_batch_data_list( + self, + batch_data_list: List[_T], + partitioner: PartitionerSortingProtocol, + get_value: Callable[[str], Any], + ) -> List[_T]: + """Sorts batch_data_list in the order configured by the partitioner.""" reverse = not partitioner.sort_ascending for key in reversed(partitioner.param_names): try: - batch_list.sort( - key=functools.cmp_to_key(_sort_batches_with_none_metadata_values(key)), + batch_data_list = sorted( + batch_data_list, + key=functools.cmp_to_key( + _sort_batch_identifiers_with_none_metadata_values(get_value(key)) + ), reverse=reverse, ) except KeyError as e: raise KeyError( # noqa: TRY003 - f"Trying to sort {self.name} table asset batches on key {key} " + f"Trying to sort {self.name}'s batches on key {key}, " "which isn't available on all batches." ) from e + return batch_data_list -def _sort_batches_with_none_metadata_values( - key: str, -) -> Callable[[Batch, Batch], int]: - def _compare_function(a: Batch, b: Batch) -> int: - if a.metadata[key] is not None and b.metadata[key] is not None: - if a.metadata[key] < b.metadata[key]: - return -1 +def _sort_batch_identifiers_with_none_metadata_values( + get_val: Callable[[_T], Any], +) -> Callable[[_T, _T], int]: + def _compare_function(a: _T, b: _T) -> int: + a_val = get_val(a) + b_val = get_val(b) - if a.metadata[key] > b.metadata[key]: + if a_val is not None and b_val is not None: + if a_val < b_val: + return -1 + elif a_val > b_val: return 1 - - return 0 - - if a.metadata[key] is None and b.metadata[key] is None: + else: + return 0 + elif a_val is None and b_val is None: return 0 - - if a.metadata[key] is None: # b.metadata[key] is not None + elif a_val is None: # b.metadata_val is not None return -1 - - if a.metadata[key] is not None: # b.metadata[key] is None + else: # b[key] is None return 1 - # This line should never be reached; hence, "ValueError" with corresponding error message is raised. # noqa: E501 - raise ValueError( # noqa: TRY003 - f'Unexpected Batch metadata key combination, "{a.metadata[key]}" and "{b.metadata[key]}", was encountered.' # noqa: E501 - ) - return _compare_function @@ -712,18 +751,22 @@ def get_execution_engine(self) -> _ExecutionEngineT: self._cached_execution_engine_kwargs = current_execution_engine_kwargs return self._execution_engine - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> List[Batch]: - """A list of batches that correspond to the BatchRequest. + def get_batch(self, batch_request: BatchRequest) -> Batch: + """A Batch that corresponds to the BatchRequest. Args: batch_request: A batch request for this asset. Usually obtained by calling build_batch_request on the asset. Returns: - A list of batches that match the options specified in the batch request. + A Batch that matches the options specified in the batch request. """ data_asset = self.get_asset(batch_request.data_asset_name) - return data_asset.get_batch_list_from_batch_request(batch_request) + return data_asset.get_batch(batch_request) + + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + data_asset = self.get_asset(batch_request.data_asset_name) + return data_asset.get_batch_identifiers_list(batch_request) def get_assets_as_dict(self) -> MutableMapping[str, _DataAssetT]: """Returns available DataAsset objects as dictionary, with corresponding name as key. diff --git a/great_expectations/datasource/fluent/invalid_datasource.py b/great_expectations/datasource/fluent/invalid_datasource.py index 1e37d6c6a4c9..c2c672f66e4d 100644 --- a/great_expectations/datasource/fluent/invalid_datasource.py +++ b/great_expectations/datasource/fluent/invalid_datasource.py @@ -34,7 +34,8 @@ # Controls which methods should raise an error when called on an InvalidDatasource METHOD_SHOULD_RAISE_ERROR: Final[set] = { - "get_batch_list_from_batch_request", + "get_batch", + "get_batch_identifiers_list", "add_batch_definition", } @@ -92,13 +93,17 @@ def build_batch_request( self._raise_type_error() @override - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> NoReturn: + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + self._raise_type_error() + + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: self._raise_type_error() @override def sort_batches( self, batch_list: List[Batch], partitioner: PartitionerSortingProtocol - ) -> None: + ) -> List[Batch]: self._raise_type_error() @override diff --git a/great_expectations/datasource/fluent/pandas_datasource.py b/great_expectations/datasource/fluent/pandas_datasource.py index 487fd94c7bba..484f684d9048 100644 --- a/great_expectations/datasource/fluent/pandas_datasource.py +++ b/great_expectations/datasource/fluent/pandas_datasource.py @@ -32,7 +32,9 @@ from great_expectations.compatibility import pydantic, sqlalchemy from great_expectations.compatibility.sqlalchemy import sqlalchemy as sa from great_expectations.compatibility.typing_extensions import override +from great_expectations.core.batch import LegacyBatchDefinition from great_expectations.core.batch_spec import PandasBatchSpec, RuntimeDataBatchSpec +from great_expectations.core.id_dict import IDDict from great_expectations.datasource.fluent import BatchParameters, BatchRequest from great_expectations.datasource.fluent.batch_identifier_util import make_batch_identifier from great_expectations.datasource.fluent.constants import ( @@ -119,9 +121,12 @@ def get_batch_parameters_keys( ) @override - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]: + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + return [IDDict(batch_request.options)] + + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: self._validate_batch_request(batch_request) - batch_list: List[Batch] = [] batch_spec = PandasBatchSpec( reader_method=self._get_reader_method(), @@ -138,9 +143,6 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list # batch_definition (along with batch_spec and markers) is only here to satisfy a # legacy constraint when computing usage statistics in a validator. We hope to remove # it in the future. - # imports are done inline to prevent a circular dependency with core/batch.py - from great_expectations.core.batch import LegacyBatchDefinition - batch_definition = LegacyBatchDefinition( datasource_name=self.datasource.name, data_connector_name=_DATA_CONNECTOR_NAME, @@ -153,19 +155,16 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list batch_request=batch_request, ignore_options=("dataframe",) ) - batch_list.append( - Batch( - datasource=self.datasource, - data_asset=self, - batch_request=batch_request, - data=data, - metadata=batch_metadata, - batch_markers=markers, - batch_spec=batch_spec, - batch_definition=batch_definition, - ) + return Batch( + datasource=self.datasource, + data_asset=self, + batch_request=batch_request, + data=data, + metadata=batch_metadata, + batch_markers=markers, + batch_spec=batch_spec, + batch_definition=batch_definition, ) - return batch_list @override def build_batch_request( @@ -182,9 +181,9 @@ def build_batch_request( partitioner: This is not currently supported and must be None for this data asset. Returns: - A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the - get_batch_list_from_batch_request method. - """ # noqa: E501 + A BatchRequest object that can be used to obtain a batch from an Asset by calling the + get_batch method. + """ if options: raise BuildBatchRequestError( message="options is not currently supported for this DataAsset " @@ -388,9 +387,9 @@ def build_batch_request( partitioner: This is not currently supported and must be None for this data asset. Returns: - A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the - get_batch_list_from_batch_request method. - """ # noqa: E501 + A BatchRequest object that can be used to obtain a batch from an Asset by calling the + get_batch method. + """ if batch_slice is not None: raise BuildBatchRequestError( message="batch_slice is not currently supported for this DataAsset " @@ -445,7 +444,11 @@ def _validate_batch_request(self, batch_request: BatchRequest) -> None: ) @override - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]: + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + return [IDDict(batch_request.options)] + + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: self._validate_batch_request(batch_request) batch_spec = RuntimeDataBatchSpec(batch_data=batch_request.options["dataframe"]) @@ -455,9 +458,6 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list # batch_definition (along with batch_spec and markers) is only here to satisfy a # legacy constraint when computing usage statistics in a validator. We hope to remove # it in the future. - # imports are done inline to prevent a circular dependency with core/batch.py - from great_expectations.core.batch import LegacyBatchDefinition - batch_definition = LegacyBatchDefinition( datasource_name=self.datasource.name, data_connector_name=_DATA_CONNECTOR_NAME, @@ -470,18 +470,16 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list batch_request=batch_request, ignore_options=("dataframe",) ) - return [ - Batch( - datasource=self.datasource, - data_asset=self, - batch_request=batch_request, - data=data, - metadata=batch_metadata, - batch_markers=markers, - batch_spec=batch_spec, - batch_definition=batch_definition, - ) - ] + return Batch( + datasource=self.datasource, + data_asset=self, + batch_request=batch_request, + data=data, + metadata=batch_metadata, + batch_markers=markers, + batch_spec=batch_spec, + batch_definition=batch_definition, + ) class _PandasDatasource(Datasource, Generic[_DataAssetT]): @@ -666,7 +664,7 @@ def _get_batch(self, asset: _PandasDataAsset, dataframe: pd.DataFrame | None = N else: batch_request = asset.build_batch_request() - return asset.get_batch_list_from_batch_request(batch_request)[-1] + return asset.get_batch(batch_request) @public_api def add_dataframe_asset( diff --git a/great_expectations/datasource/fluent/pandas_datasource.pyi b/great_expectations/datasource/fluent/pandas_datasource.pyi index a84457d1b929..1e434a9d2a25 100644 --- a/great_expectations/datasource/fluent/pandas_datasource.pyi +++ b/great_expectations/datasource/fluent/pandas_datasource.pyi @@ -70,7 +70,9 @@ class _PandasDataAsset(DataAsset): def test_connection(self) -> None: ... def batch_parameters_template(self) -> BatchParameters: ... @override - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]: ... + def get_batch(self, batch_request: BatchRequest) -> Batch: ... + @override + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> list[dict]: ... @override def build_batch_request( self, @@ -129,7 +131,9 @@ class DataFrameAsset(_PandasDataAsset): partitioner: Optional[ColumnPartitioner] = ..., ) -> BatchRequest: ... @override - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]: ... + def get_batch(self, batch_request: BatchRequest) -> Batch: ... + @override + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: ... _PandasDataAssetT = TypeVar("_PandasDataAssetT", bound=_PandasDataAsset) diff --git a/great_expectations/datasource/fluent/spark_datasource.py b/great_expectations/datasource/fluent/spark_datasource.py index 073021562949..169ffdfe650d 100644 --- a/great_expectations/datasource/fluent/spark_datasource.py +++ b/great_expectations/datasource/fluent/spark_datasource.py @@ -29,6 +29,8 @@ ) from great_expectations.compatibility.pyspark import DataFrame, pyspark from great_expectations.compatibility.typing_extensions import override +from great_expectations.core import IDDict +from great_expectations.core.batch import LegacyBatchDefinition from great_expectations.core.batch_spec import RuntimeDataBatchSpec from great_expectations.datasource.fluent import BatchParameters, BatchRequest from great_expectations.datasource.fluent.batch_identifier_util import make_batch_identifier @@ -211,9 +213,9 @@ def build_batch_request( Returns: - A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the - get_batch_list_from_batch_request method. - """ # noqa: E501 + A BatchRequest object that can be used to obtain a batch from an Asset by calling the + get_batch method. + """ if batch_slice is not None: raise BuildBatchRequestError( message="batch_slice is not currently supported for this DataAsset " @@ -268,7 +270,11 @@ def _validate_batch_request(self, batch_request: BatchRequest) -> None: ) @override - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]: + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + return [IDDict(batch_request.options)] + + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: self._validate_batch_request(batch_request) batch_spec = RuntimeDataBatchSpec(batch_data=batch_request.options["dataframe"]) @@ -278,9 +284,6 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list # batch_definition (along with batch_spec and markers) is only here to satisfy a # legacy constraint when computing usage statistics in a validator. We hope to remove # it in the future. - # imports are done inline to prevent a circular dependency with core/batch.py - from great_expectations.core.batch import LegacyBatchDefinition - batch_definition = LegacyBatchDefinition( datasource_name=self.datasource.name, data_connector_name=_DATA_CONNECTOR_NAME, @@ -293,18 +296,16 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list batch_request=batch_request, ignore_options=("dataframe",) ) - return [ - Batch( - datasource=self.datasource, - data_asset=self, - batch_request=batch_request, - data=data, - metadata=batch_metadata, - batch_markers=markers, - batch_spec=batch_spec, - batch_definition=batch_definition, - ) - ] + return Batch( + datasource=self.datasource, + data_asset=self, + batch_request=batch_request, + data=data, + metadata=batch_metadata, + batch_markers=markers, + batch_spec=batch_spec, + batch_definition=batch_definition, + ) @public_api def add_batch_definition_whole_dataframe(self, name: str) -> BatchDefinition: diff --git a/great_expectations/datasource/fluent/sql_datasource.py b/great_expectations/datasource/fluent/sql_datasource.py index 537ca5e301e4..c5dfda6278f8 100644 --- a/great_expectations/datasource/fluent/sql_datasource.py +++ b/great_expectations/datasource/fluent/sql_datasource.py @@ -32,6 +32,8 @@ from great_expectations.compatibility.pydantic import Field from great_expectations.compatibility.sqlalchemy import sqlalchemy as sa from great_expectations.compatibility.typing_extensions import override +from great_expectations.core import IDDict +from great_expectations.core.batch import LegacyBatchDefinition from great_expectations.core.batch_spec import ( BatchSpec, RuntimeQueryBatchSpec, @@ -49,7 +51,6 @@ PartitionerModInteger, PartitionerMultiColumnValue, ) -from great_expectations.datasource.fluent.batch_identifier_util import make_batch_identifier from great_expectations.datasource.fluent.batch_request import ( BatchRequest, ) @@ -68,6 +69,7 @@ PartitionerProtocol, TestConnectionError, ) +from great_expectations.exceptions.exceptions import NoAvailableBatchesError from great_expectations.execution_engine import SqlAlchemyExecutionEngine from great_expectations.execution_engine.partition_and_sample.data_partitioner import ( DatePart, @@ -581,75 +583,93 @@ def _fully_specified_batch_requests(self, batch_request: BatchRequest) -> List[B return batch_requests @override - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> List[Batch]: - """A list of batches that match the BatchRequest. + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + self._validate_batch_request(batch_request) + if batch_request.partitioner: + sql_partitioner = self.get_partitioner_implementation(batch_request.partitioner) + else: + sql_partitioner = None + + requests = self._fully_specified_batch_requests(batch_request) + metadata_dicts = [self._get_batch_metadata_from_batch_request(r) for r in requests] + + if sql_partitioner: + metadata_dicts = self.sort_batch_identifiers_list(metadata_dicts, sql_partitioner) + + return metadata_dicts[batch_request.batch_slice] + + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: + """Batch that matches the BatchRequest. Args: batch_request: A batch request for this asset. Usually obtained by calling build_batch_request on the asset. Returns: - A list of batches that match the options specified in the batch request. + A list Batch that matches the options specified in the batch request. """ self._validate_batch_request(batch_request) - batch_list: List[Batch] = [] if batch_request.partitioner: sql_partitioner = self.get_partitioner_implementation(batch_request.partitioner) else: sql_partitioner = None + batch_spec_kwargs: dict[str, str | dict | None] - for request in self._fully_specified_batch_requests(batch_request): - batch_metadata: BatchMetadata = self._get_batch_metadata_from_batch_request( - batch_request=request - ) - batch_spec_kwargs = self._create_batch_spec_kwargs() - if sql_partitioner: - batch_spec_kwargs["partitioner_method"] = sql_partitioner.method_name - batch_spec_kwargs["partitioner_kwargs"] = ( - sql_partitioner.partitioner_method_kwargs() - ) - # mypy infers that batch_spec_kwargs["batch_identifiers"] is a collection, but - # it is hardcoded to a dict above, so we cast it here. - cast(Dict, batch_spec_kwargs["batch_identifiers"]).update( - sql_partitioner.batch_parameters_to_batch_spec_kwarg_identifiers( - request.options - ) - ) - # Creating the batch_spec is our hook into the execution engine. - batch_spec = self._create_batch_spec(batch_spec_kwargs) - execution_engine: SqlAlchemyExecutionEngine = self.datasource.get_execution_engine() - data, markers = execution_engine.get_batch_data_and_markers(batch_spec=batch_spec) - - # batch_definition (along with batch_spec and markers) is only here to satisfy a - # legacy constraint when computing usage statistics in a validator. We hope to remove - # it in the future. - # imports are done inline to prevent a circular dependency with core/batch.py - from great_expectations.core.batch import LegacyBatchDefinition - - batch_definition = LegacyBatchDefinition( - datasource_name=self.datasource.name, - data_connector_name=_DATA_CONNECTOR_NAME, - data_asset_name=self.name, - batch_identifiers=make_batch_identifier(batch_spec["batch_identifiers"]), - batch_spec_passthrough=None, - ) + requests = self._fully_specified_batch_requests(batch_request) + unsorted_metadata_dicts = [self._get_batch_metadata_from_batch_request(r) for r in requests] - batch_list.append( - Batch( - datasource=self.datasource, - data_asset=self, - batch_request=request, - data=data, - metadata=batch_metadata, - batch_markers=markers, - batch_spec=batch_spec, - batch_definition=batch_definition, - ) + if not unsorted_metadata_dicts: + raise NoAvailableBatchesError() + + if sql_partitioner: + sorted_metadata_dicts = self.sort_batch_identifiers_list( + unsorted_metadata_dicts, sql_partitioner ) + else: + sorted_metadata_dicts = unsorted_metadata_dicts + + sorted_metadata_dicts = sorted_metadata_dicts[batch_request.batch_slice] + batch_metadata = sorted_metadata_dicts[-1] + + # we've sorted the metadata, but not the requests, so we need the index of our + # batch_metadata from the original unsorted list so that we get the right request + request_index = unsorted_metadata_dicts.index(batch_metadata) + + request = requests[request_index] + batch_spec_kwargs = self._create_batch_spec_kwargs() if sql_partitioner: - self.sort_batches(batch_list, sql_partitioner) - return batch_list[batch_request.batch_slice] + batch_spec_kwargs["partitioner_method"] = sql_partitioner.method_name + batch_spec_kwargs["partitioner_kwargs"] = sql_partitioner.partitioner_method_kwargs() + # mypy infers that batch_spec_kwargs["batch_identifiers"] is a collection, but + # it is hardcoded to a dict above, so we cast it here. + cast(Dict, batch_spec_kwargs["batch_identifiers"]).update( + sql_partitioner.batch_parameters_to_batch_spec_kwarg_identifiers(request.options) + ) + # Creating the batch_spec is our hook into the execution engine. + batch_spec = self._create_batch_spec(batch_spec_kwargs) + execution_engine: SqlAlchemyExecutionEngine = self.datasource.get_execution_engine() + data, markers = execution_engine.get_batch_data_and_markers(batch_spec=batch_spec) + + batch_definition = LegacyBatchDefinition( + datasource_name=self.datasource.name, + data_connector_name=_DATA_CONNECTOR_NAME, + data_asset_name=self.name, + batch_identifiers=IDDict(batch_spec["batch_identifiers"]), + batch_spec_passthrough=None, + ) + + return Batch( + datasource=self.datasource, + data_asset=self, + batch_request=request, + data=data, + metadata=batch_metadata, + batch_markers=markers, + batch_spec=batch_spec, + batch_definition=batch_definition, + ) @override def build_batch_request( @@ -669,8 +689,8 @@ def build_batch_request( partitioner: A Partitioner used to narrow the data returned from the asset. Returns: - A BatchRequest object that can be used to obtain a batch list from a Datasource by calling the - get_batch_list_from_batch_request method. + A BatchRequest object that can be used to obtain a batch from an Asset by calling the + get_batch method. """ # noqa: E501 if options is not None and not self._batch_parameters_are_valid( options=options, partitioner=partitioner @@ -770,7 +790,7 @@ def _validate_batch_request(self, batch_request: BatchRequest) -> None: def _create_batch_spec_kwargs(self) -> dict[str, Any]: """Creates batch_spec_kwargs used to instantiate a SqlAlchemyDatasourceBatchSpec or RuntimeQueryBatchSpec - This is called by get_batch_list_from_batch_request to generate the batches. + This is called by get_batch to generate the batch. Returns: A dictionary that will be passed to self._create_batch_spec(**returned_dict) diff --git a/great_expectations/exceptions/exceptions.py b/great_expectations/exceptions/exceptions.py index fcd798ed5f02..1dcec3560223 100644 --- a/great_expectations/exceptions/exceptions.py +++ b/great_expectations/exceptions/exceptions.py @@ -191,6 +191,11 @@ def __init__(self, message: str): super().__init__(f"Bad input to build_batch_request: {message}") +class NoAvailableBatchesError(GreatExpectationsError): + def __init__(self) -> None: + super().__init__("No available batches found.") + + class InvalidBatchIdError(GreatExpectationsError): pass diff --git a/great_expectations/experimental/datasource/fabric.py b/great_expectations/experimental/datasource/fabric.py index 04b21da2253e..4485087a6c23 100644 --- a/great_expectations/experimental/datasource/fabric.py +++ b/great_expectations/experimental/datasource/fabric.py @@ -27,6 +27,8 @@ from great_expectations._docs_decorators import public_api from great_expectations.compatibility import pydantic from great_expectations.compatibility.typing_extensions import override +from great_expectations.core import IDDict +from great_expectations.core.batch import LegacyBatchDefinition from great_expectations.core.batch_spec import FabricBatchSpec from great_expectations.datasource.fluent import BatchRequest from great_expectations.datasource.fluent.batch_identifier_util import make_batch_identifier @@ -80,9 +82,12 @@ def test_connection(self) -> None: LOGGER.debug(f"Testing connection to {self.__class__.__name__} has not been implemented") @override - def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list[Batch]: + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + return [IDDict(batch_request.options)] + + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: self._validate_batch_request(batch_request) - batch_list: List[Batch] = [] reader_options = { "workspace": self._datasource.workspace, @@ -106,9 +111,6 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list # batch_definition (along with batch_spec and markers) is only here to satisfy a # legacy constraint when computing usage statistics in a validator. We hope to remove # it in the future. - # imports are done inline to prevent a circular dependency with core/batch.py - from great_expectations.core.batch import LegacyBatchDefinition - batch_definition = LegacyBatchDefinition( datasource_name=self.datasource.name, data_connector_name=_DATA_CONNECTOR_NAME, @@ -121,19 +123,16 @@ def get_batch_list_from_batch_request(self, batch_request: BatchRequest) -> list batch_request=batch_request, ignore_options=("dataframe",) ) - batch_list.append( - Batch( - datasource=self.datasource, - data_asset=self, - batch_request=batch_request, - data=data, - metadata=batch_metadata, - batch_markers=markers, - batch_spec=batch_spec.to_json_dict(), # type: ignore[arg-type] # will be coerced to BatchSpec - batch_definition=batch_definition, - ) + return Batch( + datasource=self.datasource, + data_asset=self, + batch_request=batch_request, + data=data, + metadata=batch_metadata, + batch_markers=markers, + batch_spec=batch_spec.to_json_dict(), # type: ignore[arg-type] # will be coerced to BatchSpec + batch_definition=batch_definition, ) - return batch_list @override def build_batch_request( @@ -150,8 +149,8 @@ def build_batch_request( partitioner: This is not currently supported and must be None for this data asset. Returns: - A `BatchRequest` object that can be used to obtain a batch list from a Datasource by - calling the `get_batch_list_from_batch_request()` method. + A BatchRequest object that can be used to obtain a batch from an Asset by calling the + get_batch method. """ asset_type_name: str = self.__class__.__name__ if options: diff --git a/great_expectations/experimental/rule_based_profiler/helpers/util.py b/great_expectations/experimental/rule_based_profiler/helpers/util.py index b8e2ff821f08..0c40eca98dcb 100644 --- a/great_expectations/experimental/rule_based_profiler/helpers/util.py +++ b/great_expectations/experimental/rule_based_profiler/helpers/util.py @@ -173,7 +173,7 @@ def get_batch_ids( # noqa: PLR0913 parameters=parameters, ) - batch_list = data_context.get_batch_list(batch_request=batch_request) + batch_list = [data_context.get_last_batch(batch_request=batch_request)] batch_ids: List[str] = [batch.id for batch in batch_list] diff --git a/tests/core/test_batch_definition.py b/tests/core/test_batch_definition.py index 06263d81743a..82a60906f5ab 100644 --- a/tests/core/test_batch_definition.py +++ b/tests/core/test_batch_definition.py @@ -6,6 +6,7 @@ import pytest +from great_expectations.compatibility.typing_extensions import override from great_expectations.core.batch_definition import BatchDefinition from great_expectations.core.partitioners import FileNamePartitionerYearly from great_expectations.datasource.fluent.batch_request import BatchParameters @@ -15,13 +16,27 @@ ) if TYPE_CHECKING: + from typing import List + import pytest_mock + from great_expectations.datasource.fluent.batch_request import BatchRequest + + +class DataAssetForTests(DataAsset): + @override + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + raise NotImplementedError + + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: + raise NotImplementedError + @pytest.fixture def mock_data_asset(monkeypatch, mocker: pytest_mock.MockerFixture) -> DataAsset: monkeypatch.setattr(DataAsset, "build_batch_request", mocker.Mock()) - data_asset: DataAsset = DataAsset(name="my_data_asset", type="table") + data_asset: DataAsset = DataAssetForTests(name="my_data_asset", type="table") return data_asset @@ -64,64 +79,15 @@ def test_get_batch_retrieves_only_batch(mocker: pytest_mock.MockFixture): mock_asset = mocker.Mock(spec=DataAsset) batch_definition.set_data_asset(mock_asset) - mock_get_batch_list_from_batch_request = mock_asset.get_batch_list_from_batch_request - mock_batch = mocker.Mock(spec=Batch) - mock_get_batch_list_from_batch_request.return_value = [mock_batch] + mock_asset.get_batch.return_value = mock_batch # Act batch = batch_definition.get_batch() # Assert assert batch == mock_batch - mock_get_batch_list_from_batch_request.assert_called_once_with( - batch_definition.build_batch_request() - ) - - -@pytest.mark.unit -def test_get_batch_retrieves_last_batch(mocker: pytest_mock.MockFixture): - # Arrange - batch_definition = BatchDefinition[None](name="test_batch_definition") - mock_asset = mocker.Mock(spec=DataAsset) - batch_definition.set_data_asset(mock_asset) - - mock_get_batch_list_from_batch_request = mock_asset.get_batch_list_from_batch_request - - batch_a = mocker.Mock(spec=Batch) - batch_b = mocker.Mock(spec=Batch) - batch_list = [batch_a, batch_b] - mock_get_batch_list_from_batch_request.return_value = batch_list - - # Act - batch = batch_definition.get_batch() - - # Assert - assert batch == batch_b - mock_get_batch_list_from_batch_request.assert_called_once_with( - batch_definition.build_batch_request() - ) - - -@pytest.mark.unit -def test_get_batch_raises_error_with_empty_batch_list(mocker: pytest_mock.MockFixture): - # Arrange - batch_definition = BatchDefinition[None](name="test_batch_definition") - mock_asset = mocker.Mock(spec=DataAsset) - batch_definition.set_data_asset(mock_asset) - - mock_get_batch_list_from_batch_request = mock_asset.get_batch_list_from_batch_request - - mock_get_batch_list_from_batch_request.return_value = [] - - # Act - with pytest.raises(ValueError): - batch_definition.get_batch() - - # Assert - mock_get_batch_list_from_batch_request.assert_called_once_with( - batch_definition.build_batch_request() - ) + mock_asset.get_batch.assert_called_once_with(batch_definition.build_batch_request()) @pytest.mark.unit diff --git a/tests/datasource/fluent/data_asset/test_data_asset.py b/tests/datasource/fluent/data_asset/test_data_asset.py index f0db20b91b0f..fc7e2d208b87 100644 --- a/tests/datasource/fluent/data_asset/test_data_asset.py +++ b/tests/datasource/fluent/data_asset/test_data_asset.py @@ -411,7 +411,7 @@ def test_sort_batches__ascending( metadata_none_none, ] - empty_data_asset.sort_batches(batches, partitioner) + batches = empty_data_asset.sort_batches(batches, partitioner) assert batches == [ metadata_none_none, @@ -446,7 +446,7 @@ def test_sort_batches__descending( metadata_none_none, ] - empty_data_asset.sort_batches(batches, partitioner) + batches = empty_data_asset.sort_batches(batches, partitioner) assert batches == [ metadata_2_2, @@ -466,7 +466,7 @@ def test_sort_batches__requires_keys(empty_data_asset, mocker): wheres_my_b = mocker.MagicMock(spec=Batch, metadata={"a": 1}) i_have_a_b = mocker.MagicMock(spec=Batch, metadata={"a": 1, "b": 2}) - expected_error = "Trying to sort my data asset for batch configs table asset batches on key b" + expected_error = "Trying to sort my data asset for batch configs's batches on key b" with pytest.raises(KeyError, match=expected_error): empty_data_asset.sort_batches([wheres_my_b, i_have_a_b], partitioner) diff --git a/tests/datasource/fluent/data_asset/test_path_asset.py b/tests/datasource/fluent/data_asset/test_path_asset.py index 1b3668b1ccf1..0fe7892acf50 100644 --- a/tests/datasource/fluent/data_asset/test_path_asset.py +++ b/tests/datasource/fluent/data_asset/test_path_asset.py @@ -110,11 +110,11 @@ def validated_pandas_filesystem_datasource( ), ], ) -def test_get_batch_list_from_batch_request__sort_ascending( +def test_get_batch_identifiers_list__sort_ascending( validated_pandas_filesystem_datasource: PandasFilesystemDatasource, batching_regex: Union[str, re.Pattern], ): - """Verify that get_batch_list_from_batch_request respects a partitioner's ascending sort order. + """Verify that get_batch_identifiers_list respects a partitioner's ascending sort order. NOTE: we just happen to be using pandas as the concrete class. """ @@ -126,15 +126,15 @@ def test_get_batch_list_from_batch_request__sort_ascending( ) batch_request = batch_definition.build_batch_request() - batches = asset.get_batch_list_from_batch_request(batch_request) + batch_identifiers_list = asset.get_batch_identifiers_list(batch_request) expected_years = ["2018"] * 12 + ["2019"] * 12 + ["2020"] * 12 expected_months = [format(m, "02d") for m in range(1, 13)] * 3 - assert (len(batches)) == 36 - for i, batch in enumerate(batches): - assert batch.metadata["year"] == str(expected_years[i]) - assert batch.metadata["month"] == str(expected_months[i]) + assert (len(batch_identifiers_list)) == 36 + for i, batch_identifiers in enumerate(batch_identifiers_list): + assert batch_identifiers["year"] == str(expected_years[i]) + assert batch_identifiers["month"] == str(expected_months[i]) @pytest.mark.filesystem @@ -148,11 +148,11 @@ def test_get_batch_list_from_batch_request__sort_ascending( ), ], ) -def test_get_batch_list_from_batch_request__sort_descending( +def test_get_batch_identifiers_list__sort_descending( validated_pandas_filesystem_datasource: PandasFilesystemDatasource, batching_regex: Union[str, re.Pattern], ): - """Verify that get_batch_list_from_batch_request respects a partitioner's descending sort order. + """Verify that get_batch_identifiers_list respects a partitioner's descending sort order. NOTE: we just happen to be using pandas as the concrete class. """ @@ -164,15 +164,15 @@ def test_get_batch_list_from_batch_request__sort_descending( ) batch_request = batch_definition.build_batch_request() - batches = asset.get_batch_list_from_batch_request(batch_request) + batch_identifiers_list = asset.get_batch_identifiers_list(batch_request) expected_years = list(reversed(["2018"] * 12 + ["2019"] * 12 + ["2020"] * 12)) expected_months = list(reversed([format(m, "02d") for m in range(1, 13)] * 3)) - assert (len(batches)) == 36 - for i, batch in enumerate(batches): - assert batch.metadata["year"] == str(expected_years[i]) - assert batch.metadata["month"] == str(expected_months[i]) + assert (len(batch_identifiers_list)) == 36 + for i, batch_identifiers in enumerate(batch_identifiers_list): + assert batch_identifiers["year"] == str(expected_years[i]) + assert batch_identifiers["month"] == str(expected_months[i]) @pytest.fixture diff --git a/tests/datasource/fluent/data_asset/test_sql_asset.py b/tests/datasource/fluent/data_asset/test_sql_asset.py index 098aff50e3be..ca5a668014bc 100644 --- a/tests/datasource/fluent/data_asset/test_sql_asset.py +++ b/tests/datasource/fluent/data_asset/test_sql_asset.py @@ -41,9 +41,9 @@ def postgres_asset(empty_data_context, create_source: CreateSourceFixture, monke @pytest.mark.postgresql -def test_get_batch_list_from_batch_request__sort_ascending(postgres_asset): +def test_get_batch_identifiers_list__sort_ascending(postgres_asset): years = [2021, 2022] - batches = postgres_asset.get_batch_list_from_batch_request( + batches = postgres_asset.get_batch_identifiers_list( postgres_asset.build_batch_request( partitioner=ColumnPartitionerYearly(column_name="year", sort_ascending=True) ) @@ -51,13 +51,13 @@ def test_get_batch_list_from_batch_request__sort_ascending(postgres_asset): assert len(batches) == len(years) for i, year in enumerate([2021, 2022]): - batches[i].metadata["year"] = year + batches[i]["year"] = year @pytest.mark.postgresql -def test_get_batch_list_from_batch_request__sort_descending(postgres_asset): +def test_get_batch_identifiers_list__sort_descending(postgres_asset): years = [2021, 2022] - batches = postgres_asset.get_batch_list_from_batch_request( + batches = postgres_asset.get_batch_identifiers_list( postgres_asset.build_batch_request( partitioner=ColumnPartitionerYearly(column_name="year", sort_ascending=False) ) @@ -65,7 +65,7 @@ def test_get_batch_list_from_batch_request__sort_descending(postgres_asset): assert len(batches) == len(years) for i, year in enumerate([2022, 2021]): - batches[i].metadata["year"] = year + batches[i]["year"] = year @pytest.fixture diff --git a/tests/datasource/fluent/integration/integration_test_utils.py b/tests/datasource/fluent/integration/integration_test_utils.py index 2923d212a618..fc03249d3e5e 100644 --- a/tests/datasource/fluent/integration/integration_test_utils.py +++ b/tests/datasource/fluent/integration/integration_test_utils.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Dict, Tuple +from typing import Dict, Tuple import pytest @@ -23,10 +23,6 @@ from great_expectations.validator.metrics_calculator import MetricsCalculator from tests.expectations.test_util import get_table_columns_metric -if TYPE_CHECKING: - from great_expectations.datasource.fluent.interfaces import Batch - - logger = logging.getLogger(__name__) @@ -122,20 +118,15 @@ def run_checkpoint_and_data_doc( assert "ge-failed-icon" not in data_doc_index -def run_batch_head( # noqa: C901, PLR0915 +def run_batch_head( # noqa: C901 datasource_test_data: tuple[AbstractDataContext, Datasource, DataAsset, BatchRequest], fetch_all: bool | str, n_rows: int | float | str | None, # noqa: PYI041 success: bool, ) -> None: _, datasource, _, batch_request = datasource_test_data - batch_list: list[Batch] = datasource.get_batch_list_from_batch_request( - batch_request=batch_request - ) - assert len(batch_list) > 0 + batch = datasource.get_batch(batch_request=batch_request) - # arbitrarily select the first batch for testing - batch: Batch = batch_list[0] if success: assert n_rows is None or isinstance(n_rows, int) assert isinstance(fetch_all, bool) diff --git a/tests/datasource/fluent/integration/test_integration_datasource.py b/tests/datasource/fluent/integration/test_integration_datasource.py index 00f5d45d30b8..3534ece989c2 100644 --- a/tests/datasource/fluent/integration/test_integration_datasource.py +++ b/tests/datasource/fluent/integration/test_integration_datasource.py @@ -297,16 +297,16 @@ def test_partitioner( ) partitioner = partitioner_class(**partitioner_kwargs) # Test getting all batches - all_batches = asset.get_batch_list_from_batch_request( + all_batches = asset.get_batch_identifiers_list( asset.build_batch_request(partitioner=partitioner) ) assert len(all_batches) == all_batches_cnt # Test getting specified batches - specified_batches = asset.get_batch_list_from_batch_request( + specified_batches = asset.get_batch_identifiers_list( asset.build_batch_request(specified_batch_request, partitioner=partitioner) ) assert len(specified_batches) == specified_batch_cnt - assert specified_batches[-1].metadata == last_specified_batch_metadata + assert specified_batches[-1] == last_specified_batch_metadata @pytest.mark.sqlite @@ -322,7 +322,7 @@ def test_partitioner_build_batch_request_allows_selecting_by_date_and_datetime_a ) partitioner = PartitionerColumnValue(column_name="pickup_date") # Test getting all batches - all_batches = asset.get_batch_list_from_batch_request( + all_batches = asset.get_batch_identifiers_list( asset.build_batch_request(partitioner=partitioner) ) assert len(all_batches) == 28 @@ -334,7 +334,7 @@ def test_partitioner_build_batch_request_allows_selecting_by_date_and_datetime_a {"pickup_date": datetime.date(2019, 2, 1)}, {"pickup_date": datetime.date(2019, 2, 2)}, ] - specified_batches = asset.get_batch_list_from_batch_request( + specified_batches = asset.get_batch_identifiers_list( asset.build_batch_request( options={"pickup_date": "2019-02-01"}, partitioner=partitioner ) @@ -348,7 +348,7 @@ def test_partitioner_build_batch_request_allows_selecting_by_date_and_datetime_a {"pickup_date": datetime.datetime(2019, 2, 1)}, # noqa: DTZ001 {"pickup_date": datetime.datetime(2019, 2, 2)}, # noqa: DTZ001 ] - specified_batches = asset.get_batch_list_from_batch_request( + specified_batches = asset.get_batch_identifiers_list( asset.build_batch_request( options={"pickup_date": "2019-02-01 00:00:00"}, partitioner=partitioner ) @@ -419,12 +419,9 @@ def test_asset_specified_metadata(empty_data_context, add_asset_method, add_asse ) partitioner = ColumnPartitionerMonthly(column_name="pickup_datetime") # Test getting all batches - batches = asset.get_batch_list_from_batch_request( - asset.build_batch_request(partitioner=partitioner) - ) - assert len(batches) == 1 + batch = asset.get_batch(asset.build_batch_request(partitioner=partitioner)) # Update the batch_metadata from the request with the metadata inherited from the asset - assert batches[0].metadata == {**asset_specified_metadata, "year": 2019, "month": 2} + assert batch.metadata == {**asset_specified_metadata, "year": 2019, "month": 2} # This is marked by the various backend used in testing in the datasource_test_data fixture. diff --git a/tests/datasource/fluent/test_batch.py b/tests/datasource/fluent/test_batch.py index b2eb129e84e4..d365d93488ca 100644 --- a/tests/datasource/fluent/test_batch.py +++ b/tests/datasource/fluent/test_batch.py @@ -26,7 +26,7 @@ def pandas_setup(csv_path: pathlib.Path) -> Tuple[AbstractDataContext, Batch]: / "yellow_tripdata_sample_10_trips_from_each_month.csv" ) asset = source.add_csv_asset(ASSET_NAME, filepath_or_buffer=filepath) - batch = asset.get_batch_list_from_batch_request(asset.build_batch_request())[0] + batch = asset.get_batch(asset.build_batch_request()) return context, batch diff --git a/tests/datasource/fluent/test_fabric.py b/tests/datasource/fluent/test_fabric.py index 9e176c922e55..ce4a52eddeb1 100644 --- a/tests/datasource/fluent/test_fabric.py +++ b/tests/datasource/fluent/test_fabric.py @@ -132,7 +132,7 @@ def test_reader_options_passthrough( ) my_asset = add_asset_fn(f"my_{asset_type}_asset", **asset_kwargs) batch_request = my_asset.build_batch_request() - my_asset.get_batch_list_from_batch_request(batch_request) + my_asset.get_batch(batch_request) _, captured_kwargs = capture_reader_fn_params print(f"keyword args:\n{pf(captured_kwargs[-1])}") diff --git a/tests/datasource/fluent/test_invalid_datasource.py b/tests/datasource/fluent/test_invalid_datasource.py index a93017d30de5..5804d099cfec 100644 --- a/tests/datasource/fluent/test_invalid_datasource.py +++ b/tests/datasource/fluent/test_invalid_datasource.py @@ -188,7 +188,7 @@ def test_get_batch_list_raises_informative_error( ): invalid_ds = invalid_datasource_factory(invalid_ds_cfg) with pytest.raises(TypeError) as err: - invalid_ds.get_batch_list_from_batch_request({}) # type: ignore[arg-type] # expect error + invalid_ds.get_batch({}) # type: ignore[arg-type] # expect error assert invalid_ds.config_error == err.value.__cause__ def test_random_attribute_access_raises_informative_error( diff --git a/tests/datasource/fluent/test_metadatasource.py b/tests/datasource/fluent/test_metadatasource.py index 7b64161ba86e..02296b490bbd 100644 --- a/tests/datasource/fluent/test_metadatasource.py +++ b/tests/datasource/fluent/test_metadatasource.py @@ -40,6 +40,7 @@ if TYPE_CHECKING: from great_expectations.core.config_provider import _ConfigurationProvider from great_expectations.datasource.datasource_dict import DatasourceDict + from great_expectations.datasource.fluent.interfaces import Batch logger = logging.getLogger(__name__) @@ -128,6 +129,14 @@ def build_batch_request( ) -> BatchRequest: return BatchRequest("datasource_name", "data_asset_name", options or {}) + @override + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + raise NotImplementedError + + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: + raise NotImplementedError + @pytest.fixture(scope="function") def context_sources_cleanup() -> Generator[DataSourceManager, None, None]: @@ -307,7 +316,13 @@ def test_ds_assets_type_field_not_set(self, empty_sources: DataSourceManager): ): class MissingTypeAsset(DataAsset): - pass + @override + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + raise NotImplementedError + + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: + raise NotImplementedError class BadAssetDatasource(Datasource): type: str = "valid" @@ -341,12 +356,21 @@ def execution_engine_type(self) -> Type[ExecutionEngine]: def test_minimal_ds_to_asset_flow(context_sources_cleanup): # 1. Define Datasource & Assets - class RedAsset(DataAsset): + class SampleAsset(DataAsset): + @override + def get_batch_identifiers_list(self, batch_request: BatchRequest) -> List[dict]: + raise NotImplementedError + + @override + def get_batch(self, batch_request: BatchRequest) -> Batch: + raise NotImplementedError + + class RedAsset(SampleAsset): type = "red" def test_connection(self): ... # type: ignore[explicit-override] # FIXME - class BlueAsset(DataAsset): + class BlueAsset(SampleAsset): type = "blue" @override diff --git a/tests/datasource/fluent/test_pandas_datasource.py b/tests/datasource/fluent/test_pandas_datasource.py index dd0bd19b1350..acc60635a2aa 100644 --- a/tests/datasource/fluent/test_pandas_datasource.py +++ b/tests/datasource/fluent/test_pandas_datasource.py @@ -359,7 +359,7 @@ def test_positional_arguments( # read_json on a csv file). We patch the internal call that actually tries to read and create the batch. # noqa: E501 # Ideally, we would rewrite this test so we wouldn't need to mock like this. mocker.patch( - "great_expectations.datasource.fluent.pandas_datasource._PandasDataAsset.get_batch_list_from_batch_request" + "great_expectations.datasource.fluent.pandas_datasource._PandasDataAsset.get_batch" ) # read_* normally returns batch but, since we've added a mock in the line above, we get a mock object returned. # noqa: E501 # We are calling it here for it's side effect on the default asset so get and inspect that afterwards. # noqa: E501 @@ -509,11 +509,10 @@ def test_pandas_data_asset_batch_metadata( ) assert csv_asset.batch_metadata == batch_metadata - batch_list = csv_asset.get_batch_list_from_batch_request(csv_asset.build_batch_request()) - assert len(batch_list) == 1 + batch = csv_asset.get_batch(csv_asset.build_batch_request()) # allow mutation of this attribute - batch_list[0].metadata["also_this_one"] = "other_batch-level_value" + batch.metadata["also_this_one"] = "other_batch-level_value" substituted_batch_metadata = copy.deepcopy(batch_metadata) substituted_batch_metadata.update( @@ -523,7 +522,7 @@ def test_pandas_data_asset_batch_metadata( "also_this_one": "other_batch-level_value", } ) - assert batch_list[0].metadata == substituted_batch_metadata + assert batch.metadata == substituted_batch_metadata @pytest.mark.filesystem diff --git a/tests/datasource/fluent/test_pandas_filesystem_datasource.py b/tests/datasource/fluent/test_pandas_filesystem_datasource.py index 1c5fb87f303e..f885e35b6a5e 100644 --- a/tests/datasource/fluent/test_pandas_filesystem_datasource.py +++ b/tests/datasource/fluent/test_pandas_filesystem_datasource.py @@ -30,6 +30,7 @@ from great_expectations.datasource.fluent.dynamic_pandas import PANDAS_VERSION from great_expectations.datasource.fluent.interfaces import TestConnectionError from great_expectations.datasource.fluent.sources import _get_field_details +from great_expectations.exceptions.exceptions import NoAvailableBatchesError if TYPE_CHECKING: from great_expectations.alias_types import PathStr @@ -429,7 +430,7 @@ def test_get_batch_list_from_fully_specified_batch_request( (None, "03", "yellow_tripdata_sample_2018-04.csv", 0), ], ) -def test_get_batch_list_batch_count( +def test_get_batch_identifiers_list_count( year: Optional[str], month: Optional[str], path: Optional[str], @@ -445,12 +446,12 @@ def test_get_batch_list_batch_count( regex=re.compile(r"yellow_tripdata_sample_(?P\d{4})-(?P\d{2})\.csv") ), ) - batches = asset.get_batch_list_from_batch_request(request) - assert len(batches) == batch_count + batch_identifier_list = asset.get_batch_identifiers_list(request) + assert len(batch_identifier_list) == batch_count @pytest.mark.unit -def test_get_batch_list_from_partially_specified_batch_request( +def test_get_batch_identifiers_list_from_partially_specified_batch_request( pandas_filesystem_datasource: PandasFilesystemDatasource, ): # Verify test directory has files that don't match what we will query for @@ -474,9 +475,9 @@ def test_get_batch_list_from_partially_specified_batch_request( regex=re.compile(r"yellow_tripdata_sample_(?P\d{4})-(?P\d{2})\.csv") ), ) - batches = asset.get_batch_list_from_batch_request(request) + batches = asset.get_batch_identifiers_list(request) assert (len(batches)) == 12 - batch_filenames = [pathlib.Path(batch.metadata["path"]).stem for batch in batches] + batch_filenames = [pathlib.Path(batch["path"]).stem for batch in batches] assert set(files_for_2018) == set(batch_filenames) @dataclass(frozen=True) @@ -485,9 +486,7 @@ class YearMonth: month: str expected_year_month = {YearMonth(year="2018", month=format(m, "02d")) for m in range(1, 13)} - batch_year_month = { - YearMonth(year=batch.metadata["year"], month=batch.metadata["month"]) for batch in batches - } + batch_year_month = {YearMonth(year=batch["year"], month=batch["month"]) for batch in batches} assert expected_year_month == batch_year_month @@ -525,8 +524,8 @@ def test_pandas_slice_batch_count( regex=re.compile(r"yellow_tripdata_sample_(?P\d{4})-(?P\d{2})\.csv") ), ) - batches = asset.get_batch_list_from_batch_request(batch_request=batch_request) - assert len(batches) == expected_batch_count + batch_identifiers_list = asset.get_batch_identifiers_list(batch_request=batch_request) + assert len(batch_identifiers_list) == expected_batch_count def bad_batching_regex_config( @@ -596,7 +595,7 @@ def test_csv_asset_batch_metadata( ) ) - batches = pandas_filesystem_datasource.get_batch_list_from_batch_request(batch_request) + batch = pandas_filesystem_datasource.get_batch(batch_request) substituted_batch_metadata: BatchMetadata = copy.deepcopy(asset_specified_metadata) substituted_batch_metadata.update( @@ -606,12 +605,44 @@ def test_csv_asset_batch_metadata( } ) - months = ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"] + actual_metadata = copy.deepcopy(batch.metadata) + + actual_metadata.pop("path") + actual_metadata.pop("year") + actual_metadata.pop("month") + + assert len(actual_metadata) + assert actual_metadata == substituted_batch_metadata + + +@pytest.mark.parametrize( + ("sort_ascending", "expected_metadata"), + [ + (True, {"year": "2020", "month": "12", "path": "yellow_tripdata_sample_2020-12.csv"}), + (False, {"year": "2018", "month": "01", "path": "yellow_tripdata_sample_2018-01.csv"}), + ], +) +@pytest.mark.unit +def test_get_batch_respects_order_ascending( + pandas_filesystem_datasource: PandasFilesystemDatasource, + sort_ascending: bool, + expected_metadata: dict, +) -> None: + asset = pandas_filesystem_datasource.add_csv_asset(name="csv_asset") + regex = r"yellow_tripdata_sample_(?P\d{4})-(?P\d{2})\.csv" + batch_def = asset.add_batch_definition_monthly( + name="batch def", regex=regex, sort_ascending=sort_ascending + ) + batch = batch_def.get_batch() + assert batch.metadata == expected_metadata + - for i, month in enumerate(months): - substituted_batch_metadata["month"] = month - actual_metadata = copy.deepcopy(batches[i].metadata) - # not testing path for the purposes of this test - actual_metadata.pop("path") - actual_metadata.pop("year") - assert actual_metadata == substituted_batch_metadata +@pytest.mark.unit +def test_raises_if_no_matching_batches( + pandas_filesystem_datasource: PandasFilesystemDatasource, +) -> None: + asset = pandas_filesystem_datasource.add_csv_asset(name="csv_asset") + regex = r"yellow_tripdata_sample_(?P\d{4})-(?P\d{2})\.csv" + batch_def = asset.add_batch_definition_monthly(name="batch def", regex=regex) + with pytest.raises(NoAvailableBatchesError): + batch_def.get_batch(batch_parameters={"year": "1995", "month": "01"}) diff --git a/tests/datasource/fluent/test_pandas_s3_datasource.py b/tests/datasource/fluent/test_pandas_s3_datasource.py index 74db2b13a03d..73ffd4b92f2e 100644 --- a/tests/datasource/fluent/test_pandas_s3_datasource.py +++ b/tests/datasource/fluent/test_pandas_s3_datasource.py @@ -279,16 +279,14 @@ def test_add_csv_asset_with_recursive_file_discovery_to_datasource( s3_recursive_file_discovery=False, ) found_files_without_recursion = len( - no_recursion_asset.get_batch_list_from_batch_request( - no_recursion_asset.build_batch_request() - ) + no_recursion_asset.get_batch_identifiers_list(no_recursion_asset.build_batch_request()) ) recursion_asset = pandas_s3_datasource.add_csv_asset( name="csv_asset_recursive", s3_recursive_file_discovery=True, ) found_files_with_recursion = len( - recursion_asset.get_batch_list_from_batch_request(recursion_asset.build_batch_request()) + recursion_asset.get_batch_identifiers_list(recursion_asset.build_batch_request()) ) # Only 1 additional file was added to the subfolder assert found_files_without_recursion + 1 == found_files_with_recursion diff --git a/tests/datasource/fluent/test_postgres_datasource.py b/tests/datasource/fluent/test_postgres_datasource.py index dcb66b8a31af..6dcb9dd852f9 100644 --- a/tests/datasource/fluent/test_postgres_datasource.py +++ b/tests/datasource/fluent/test_postgres_datasource.py @@ -10,6 +10,7 @@ Optional, Tuple, ) +from unittest.mock import ANY import pytest from sqlalchemy.exc import SQLAlchemyError @@ -17,6 +18,7 @@ import great_expectations.exceptions as ge_exceptions from great_expectations.compatibility.pydantic import ValidationError from great_expectations.core.batch_spec import SqlAlchemyDatasourceBatchSpec +from great_expectations.core.id_dict import IDDict from great_expectations.core.partitioners import ( ColumnPartitioner, ColumnPartitionerDaily, @@ -36,9 +38,7 @@ BatchParameters, BatchRequest, ) -from great_expectations.datasource.fluent.interfaces import ( - TestConnectionError, -) +from great_expectations.datasource.fluent.interfaces import TestConnectionError from great_expectations.datasource.fluent.postgres_datasource import ( PostgresDatasource, ) @@ -46,6 +46,7 @@ SqlPartitionerYearAndMonth, TableAsset, ) +from great_expectations.exceptions.exceptions import NoAvailableBatchesError from great_expectations.execution_engine import SqlAlchemyExecutionEngine from tests.datasource.fluent.conftest import ( _DEFAULT_TEST_MONTHS, @@ -61,7 +62,6 @@ from great_expectations.datasource.fluent.interfaces import ( BatchMetadata, - BatchSlice, ) # We set a default time range that we use for testing. @@ -258,13 +258,12 @@ def validate_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: ) = create_and_add_table_asset_without_testing_connection( source=source, name="my_asset", table_name="my_table" ) - source.get_batch_list_from_batch_request(asset.build_batch_request()) + source.get_batch_identifiers_list(asset.build_batch_request()) def assert_batch_specs_correct_with_year_month_partitioner_defaults(batch_specs): - # We should have 1 batch_spec per (year, month) pair - expected_batch_spec_num = len(_DEFAULT_TEST_YEARS) * len(_DEFAULT_TEST_MONTHS) - assert len(batch_specs) == expected_batch_spec_num + # We should have only collected 1 batch_spec + assert len(batch_specs) == 1 for year in _DEFAULT_TEST_YEARS: for month in _DEFAULT_TEST_MONTHS: spec = { @@ -272,25 +271,26 @@ def assert_batch_specs_correct_with_year_month_partitioner_defaults(batch_specs) "data_asset_name": "my_asset", "table_name": "my_table", "schema_name": None, - "batch_identifiers": {"my_col": {"year": year, "month": month}}, + "batch_identifiers": {"my_col": {"year": ANY, "month": ANY}}, "partitioner_method": "partition_on_year_and_month", "partitioner_kwargs": {"column_name": "my_col"}, } assert spec in batch_specs -def assert_batches_correct_with_year_month_partitioner_defaults(batches): +def assert_batch_identifiers_correct_with_year_month_partitioner_defaults( + batch_identifiers_list: list[dict], +): # We should have 1 batch_spec per (year, month) pair expected_batch_spec_num = len(_DEFAULT_TEST_YEARS) * len(_DEFAULT_TEST_MONTHS) - assert len(batches) == expected_batch_spec_num - metadatas = [batch.metadata for batch in batches] + assert len(batch_identifiers_list) == expected_batch_spec_num for year in _DEFAULT_TEST_YEARS: for month in _DEFAULT_TEST_MONTHS: - assert {"year": year, "month": month} in metadatas + assert {"year": year, "month": month} in batch_identifiers_list @pytest.mark.postgresql -def test_datasource_gets_batch_list_partitioner_with_unspecified_batch_parameters( +def test_datasource_gets_batch_partitioner_with_unspecified_batch_parameters( empty_data_context, create_source: CreateSourceFixture, ): @@ -313,9 +313,14 @@ def collect_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: partitioner = ColumnPartitionerMonthly(column_name="my_col") empty_batch_request = asset.build_batch_request(partitioner=partitioner) assert empty_batch_request.options == {} - batches = source.get_batch_list_from_batch_request(empty_batch_request) + batch_identifiers_list = source.get_batch_identifiers_list(empty_batch_request) + batch = source.get_batch(empty_batch_request) + + assert batch.metadata == {"month": 12, "year": 2022} assert_batch_specs_correct_with_year_month_partitioner_defaults(batch_specs) - assert_batches_correct_with_year_month_partitioner_defaults(batches) + assert_batch_identifiers_correct_with_year_month_partitioner_defaults( + [IDDict(bi) for bi in batch_identifiers_list] + ) @pytest.mark.postgresql @@ -348,14 +353,20 @@ def collect_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: options={"year": None, "month": None}, partitioner=partitioner ) assert batch_request_with_none.options == {"year": None, "month": None} - batches = source.get_batch_list_from_batch_request(batch_request_with_none) - # We should have 1 batch_spec per (year, month) pair + batch_identifiers_list = source.get_batch_identifiers_list(batch_request_with_none) + batch = source.get_batch(batch_request_with_none) + + # We should have 1 batch_identifier per (year, month) pair + + assert batch.metadata == {"month": 12, "year": 2022} assert_batch_specs_correct_with_year_month_partitioner_defaults(batch_specs) - assert_batches_correct_with_year_month_partitioner_defaults(batches) + assert_batch_identifiers_correct_with_year_month_partitioner_defaults( + [IDDict(bi) for bi in batch_identifiers_list] + ) @pytest.mark.postgresql -def test_datasource_gets_batch_list_partitioner_with_partially_specified_batch_parameters( +def test_datasource_gets_batch_partitioner_with_partially_specified_batch_parameters( empty_data_context, create_source: CreateSourceFixture, ): @@ -378,31 +389,29 @@ def collect_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: source=source, name="my_asset", table_name="my_table" ) partitioner = ColumnPartitionerMonthly(column_name="my_col") - batches = source.get_batch_list_from_batch_request( - asset.build_batch_request(options={"year": year}, partitioner=partitioner) - ) - assert len(batch_specs) == len(_DEFAULT_TEST_MONTHS) - for month in _DEFAULT_TEST_MONTHS: - spec = { - "type": "table", - "data_asset_name": "my_asset", - "table_name": "my_table", - "schema_name": None, - "batch_identifiers": {"my_col": {"year": year, "month": month}}, - "partitioner_method": "partition_on_year_and_month", - "partitioner_kwargs": {"column_name": "my_col"}, - } - assert spec in batch_specs + batch_request = asset.build_batch_request(options={"year": year}, partitioner=partitioner) + identifiers_list = source.get_batch_identifiers_list(batch_request) + batch = source.get_batch(batch_request) + assert len(batch_specs) == 1 + assert batch_specs[0] == { + "type": "table", + "data_asset_name": "my_asset", + "table_name": "my_table", + "schema_name": None, + "batch_identifiers": {"my_col": {"year": 2022, "month": 12}}, + "partitioner_method": "partition_on_year_and_month", + "partitioner_kwargs": {"column_name": "my_col"}, + } - assert len(batches) == len(_DEFAULT_TEST_MONTHS) - metadatas = [batch.metadata for batch in batches] + assert len(identifiers_list) == len(_DEFAULT_TEST_MONTHS) for month in _DEFAULT_TEST_MONTHS: - expected_metadata = {"month": month, "year": year} - assert expected_metadata in metadatas + expected_metadata = IDDict({"month": month, "year": year}) + assert expected_metadata in identifiers_list + assert batch.metadata == {"month": 12, "year": 2022} @pytest.mark.postgresql -def test_datasource_gets_batch_list_with_fully_specified_batch_parameters( +def test_datasource_gets_batch_with_fully_specified_batch_parameters( empty_data_context, create_source: CreateSourceFixture, ): @@ -433,13 +442,12 @@ def validate_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: source=source, name="my_asset", table_name="my_table" ) partitioner = ColumnPartitionerMonthly(column_name="my_col") - batches = source.get_batch_list_from_batch_request( + batches = source.get_batch( asset.build_batch_request( options={"month": month, "year": year}, partitioner=partitioner ) ) - assert len(batches) == 1 - assert batches[0].metadata == {"month": month, "year": year} + assert batches.metadata == {"month": month, "year": year} @pytest.mark.postgresql @@ -494,7 +502,7 @@ def test_datasource_gets_nonexistent_asset(create_source: CreateSourceFixture): ), ], ) -def test_bad_batch_request_passed_into_get_batch_list_from_batch_request( +def test_bad_batch_request_passed_into_get_batch( create_source: CreateSourceFixture, PartitionerClass, add_partitioner_kwargs, @@ -520,7 +528,7 @@ def test_bad_batch_request_passed_into_get_batch_list_from_batch_request( LookupError, ) ): - source.get_batch_list_from_batch_request(batch_request) + source.get_batch(batch_request) @pytest.mark.postgresql @@ -528,7 +536,7 @@ def test_bad_batch_request_passed_into_get_batch_list_from_batch_request( "batch_parameters", [{}, {"year": 2021}, {"year": 2021, "month": 10}, {"year": None, "month": 10}], ) -def test_get_batch_list_from_batch_request_with_good_batch_request( +def test_get_batch_with_good_batch_request( empty_data_context, create_source: CreateSourceFixture, batch_parameters, @@ -552,7 +560,7 @@ def test_get_batch_list_from_batch_request_with_good_batch_request( partitioner=partitioner, ) # No exception should get thrown - asset.get_batch_list_from_batch_request(batch_request) + asset.get_batch(batch_request) @pytest.mark.postgresql @@ -565,7 +573,7 @@ def test_get_batch_list_from_batch_request_with_good_batch_request( ("bad", "bad", None), ], ) -def test_get_batch_list_from_batch_request_with_malformed_batch_request( +def test_get_batch_with_malformed_batch_request( create_source: CreateSourceFixture, batch_request_args ): with create_source(validate_batch_spec=lambda _: None, dialect="postgresql") as source: @@ -584,7 +592,7 @@ def test_get_batch_list_from_batch_request_with_malformed_batch_request( partitioner=partitioner, ) with pytest.raises(ge_exceptions.InvalidBatchRequestError): - asset.get_batch_list_from_batch_request(batch_request) + asset.get_batch(batch_request) @pytest.mark.postgresql @@ -601,50 +609,6 @@ def test_get_bad_batch_request(create_source: CreateSourceFixture): asset.build_batch_request(options={"invalid_key": None}, partitioner=partitioner) -@pytest.mark.postgresql -@pytest.mark.parametrize( - "batch_slice,expected_batch_count", - [ - ("[-3:]", 3), - ("[5:9]", 4), - ("[:10:2]", 5), - (slice(-3, None), 3), - (slice(5, 9), 4), - (slice(0, 10, 2), 5), - ("-5", 1), - ("-1", 1), - (11, 1), - (0, 1), - ([3], 1), - (None, 12), - ("", 12), - ], -) -def test_postgres_slice_batch_count( - empty_data_context, - create_source: CreateSourceFixture, - batch_slice: BatchSlice, - expected_batch_count: int, -) -> None: - with create_source( - validate_batch_spec=lambda _: None, - dialect="postgresql", - data_context=empty_data_context, - ) as source: - ( - source, # noqa: PLW2901 - asset, - ) = create_and_add_table_asset_without_testing_connection( - source=source, name="my_asset", table_name="my_table" - ) - partitioner = ColumnPartitionerMonthly(column_name="my_col") - batch_request = asset.build_batch_request( - options={"year": 2021}, batch_slice=batch_slice, partitioner=partitioner - ) - batches = asset.get_batch_list_from_batch_request(batch_request=batch_request) - assert len(batches) == expected_batch_count - - @pytest.mark.postgresql def test_data_source_json_has_properties(create_source: CreateSourceFixture): with create_source(validate_batch_spec=lambda _: None, dialect="postgresql") as source: @@ -897,7 +861,7 @@ def validate_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: asset = source.add_query_asset(name="query_asset", query="SELECT * FROM my_table") assert asset.name == "query_asset" assert asset.query.lower() == query.lower() - source.get_batch_list_from_batch_request(asset.build_batch_request()) + source.get_batch(asset.build_batch_request()) @pytest.mark.postgresql @@ -953,18 +917,19 @@ def collect_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: # in this unit test. asset = source.add_query_asset(name="my_asset", query="select * from table") partitioner = ColumnPartitionerYearly(column_name="my_col") - batches = source.get_batch_list_from_batch_request( - asset.build_batch_request(partitioner=partitioner) - ) + batch_request = asset.build_batch_request(partitioner=partitioner) + batches = source.get_batch_identifiers_list(batch_request) + batch = source.get_batch(batch_request) assert len(batches) == len(years) for i, year in enumerate(years): - assert "year" in batches[i].metadata - assert batches[i].metadata["year"] == year + assert "year" in batches[i] + assert batches[i]["year"] == year - assert len(batch_specs) == len(years) + assert len(batch_specs) == 1 for spec in batch_specs: assert "partitioner_method" in spec assert spec["partitioner_method"] == "partition_on_year" + assert batch.metadata == {"year": 2021} @pytest.mark.postgresql @@ -991,22 +956,23 @@ def collect_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: # in this unit test. asset = source.add_query_asset(name="my_asset", query="select * from table") partitioner = ColumnPartitionerMonthly(column_name="my_col") - batches = source.get_batch_list_from_batch_request( - asset.build_batch_request(partitioner=partitioner) - ) - assert len(batches) == len(years) * len(months) + batch_request = asset.build_batch_request(partitioner=partitioner) + batch_identifiers = source.get_batch_identifiers_list(batch_request) + batch = source.get_batch(batch_request) + assert len(batch_identifiers) == len(years) * len(months) for i, year in enumerate(years): for j, month in enumerate(months): batch_index = i * len(months) + j - assert "year" in batches[batch_index].metadata - assert "month" in batches[batch_index].metadata - assert batches[batch_index].metadata["year"] == year - assert batches[batch_index].metadata["month"] == month + assert "year" in batch_identifiers[batch_index] + assert "month" in batch_identifiers[batch_index] + assert batch_identifiers[batch_index]["year"] == year + assert batch_identifiers[batch_index]["month"] == month - assert len(batch_specs) == len(years) * len(months) - for spec in batch_specs: - assert "partitioner_method" in spec - assert spec["partitioner_method"] == "partition_on_year_and_month" + assert len(batch_identifiers) == len(years) * len(months) + assert len(batch_specs) == 1 + assert "partitioner_method" in batch_specs[0] + assert batch_specs[0]["partitioner_method"] == "partition_on_year_and_month" + assert batch.metadata == {"year": 2021, "month": 9} @pytest.mark.postgresql @@ -1040,25 +1006,87 @@ def collect_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: query="select * from table", ) partitioner = ColumnPartitionerDaily(column_name="my_col") - batches = source.get_batch_list_from_batch_request( - asset.build_batch_request(partitioner=partitioner) - ) + batch_request = asset.build_batch_request(partitioner=partitioner) + batches = source.get_batch_identifiers_list(batch_request) + batch = source.get_batch(batch_request) assert len(batches) == len(years) * len(months) * len(days) for i, year in enumerate(years): for j, month in enumerate(months): for k, day in enumerate(days): batch_index = i * len(months) * len(days) + j * len(days) + k - assert "year" in batches[batch_index].metadata - assert "month" in batches[batch_index].metadata - assert "day" in batches[batch_index].metadata - assert batches[batch_index].metadata["year"] == year - assert batches[batch_index].metadata["month"] == month - assert batches[batch_index].metadata["day"] == day - - assert len(batch_specs) == len(years) * len(months) * len(days) - for spec in batch_specs: - assert "partitioner_method" in spec - assert spec["partitioner_method"] == "partition_on_year_and_month_and_day" + assert "year" in batches[batch_index] + assert "month" in batches[batch_index] + assert "day" in batches[batch_index] + assert batches[batch_index]["year"] == year + assert batches[batch_index]["month"] == month + assert batches[batch_index]["day"] == day + + assert len(batch_specs) == 1 + assert "partitioner_method" in batch_specs[0] + assert batch_specs[0]["partitioner_method"] == "partition_on_year_and_month_and_day" + assert batch.metadata == {"year": 2021, "month": 9, "day": 30} + + +@pytest.mark.parametrize( + ("sort_ascending", "expected_metadata"), [(True, {"year": 2021}), (False, {"year": 2020})] +) +@pytest.mark.postgresql +def test_get_batch_partitioner__sort_ascending_respected( + empty_data_context, + create_source: CreateSourceFixture, + sort_ascending: bool, + expected_metadata: dict, +): + years = [2020, 2021] + batch_specs = [] + + def collect_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: + batch_specs.append(spec) + + with create_source( + validate_batch_spec=collect_batch_spec, + dialect="postgresql", + data_context=empty_data_context, + partitioner_query_response=[{"year": year} for year in years], + ) as source: + # We use a query asset because then we don't have to mock out db connection tests + # in this unit test. + asset = source.add_query_asset( + name="my_asset", + query="select * from table", + ) + partitioner = ColumnPartitionerYearly(column_name="my_col", sort_ascending=sort_ascending) + batch_request = asset.build_batch_request(partitioner=partitioner) + batch = source.get_batch(batch_request) + assert batch.metadata == expected_metadata + + +@pytest.mark.postgresql +def test_get_batch_raises_if_no_batches_available( + empty_data_context, + create_source: CreateSourceFixture, +): + batch_specs = [] + + def collect_batch_spec(spec: SqlAlchemyDatasourceBatchSpec) -> None: + batch_specs.append(spec) + + with create_source( + validate_batch_spec=collect_batch_spec, + dialect="postgresql", + data_context=empty_data_context, + partitioner_query_response=[], + ) as source: + # We use a query asset because then we don't have to mock out db connection tests + # in this unit test. + asset = source.add_query_asset( + name="my_asset", + query="select * from table", + ) + partitioner = ColumnPartitionerYearly(column_name="my_col") + batch_request = asset.build_batch_request(partitioner=partitioner, options={"year": 1995}) + with pytest.raises(NoAvailableBatchesError): + source.get_batch(batch_request) @pytest.mark.postgresql @@ -1185,17 +1213,16 @@ def test_partitioner( ) as source: asset = source.add_query_asset(name="query_asset", query="SELECT * from table") partitioner = PartitionerClass(**partitioner_kwargs) - # Test getting all batches - all_batches = asset.get_batch_list_from_batch_request( + # Test getting all batch itentifiers + all_batches = asset.get_batch_identifiers_list( asset.build_batch_request(partitioner=partitioner) ) assert len(all_batches) == all_batches_cnt # Test getting specified batches - specified_batches = asset.get_batch_list_from_batch_request( - asset.build_batch_request(specified_batch_request, partitioner=partitioner) - ) + batch_request = asset.build_batch_request(specified_batch_request, partitioner=partitioner) + specified_batches = asset.get_batch_identifiers_list(batch_request) assert len(specified_batches) == specified_batch_cnt - assert specified_batches[-1].metadata == last_specified_batch_metadata + assert asset.get_batch(batch_request).metadata == last_specified_batch_metadata @pytest.mark.postgresql @@ -1215,11 +1242,10 @@ def test_sorting_none_in_metadata( # in this unit test. asset = source.add_query_asset(name="my_asset", query="select * from table") partitioner = ColumnPartitionerYearly(column_name="my_col", sort_ascending=False) - batches = source.get_batch_list_from_batch_request( - asset.build_batch_request(partitioner=partitioner) - ) + batch_request = asset.build_batch_request(partitioner=partitioner) + batches = source.get_batch_identifiers_list(batch_request) assert len(batches) == len(years) - assert batches[-1].metadata["year"] is None + assert asset.get_batch(batch_request).metadata["year"] is None @pytest.mark.postgresql @@ -1232,7 +1258,7 @@ def test_create_temp_table(empty_data_context, create_source): ) as source: assert source.create_temp_table is False asset = source.add_query_asset(name="query_asset", query="SELECT * from table") - _ = asset.get_batch_list_from_batch_request(asset.build_batch_request()) + _ = asset.get_batch(asset.build_batch_request()) assert source._execution_engine._create_temp_table is False @@ -1264,7 +1290,7 @@ def test_add_postgres_query_asset_with_batch_metadata( ) assert asset.batch_metadata == asset_specified_metadata partitioner = ColumnPartitionerYearly(column_name="col") - batches = source.get_batch_list_from_batch_request( + batches = source.get_batch_identifiers_list( asset.build_batch_request(partitioner=partitioner) ) assert len(batches) == len(years) @@ -1277,7 +1303,7 @@ def test_add_postgres_query_asset_with_batch_metadata( ) for i, year in enumerate(years): substituted_batch_metadata["year"] = year - assert batches[i].metadata == substituted_batch_metadata + assert batches[i] == substituted_batch_metadata @pytest.mark.postgresql @@ -1308,7 +1334,7 @@ def test_add_postgres_table_asset_with_batch_metadata( ) assert asset.batch_metadata == asset_specified_metadata partitioner = ColumnPartitionerYearly(column_name="my_col") - batches = source.get_batch_list_from_batch_request( + batches = source.get_batch_identifiers_list( asset.build_batch_request(partitioner=partitioner) ) assert len(batches) == len(years) @@ -1321,4 +1347,4 @@ def test_add_postgres_table_asset_with_batch_metadata( ) for i, year in enumerate(years): substituted_batch_metadata["year"] = year - assert batches[i].metadata == substituted_batch_metadata + assert batches[i] == substituted_batch_metadata diff --git a/tests/datasource/fluent/test_spark_datasource.py b/tests/datasource/fluent/test_spark_datasource.py index 42ad508fb9d6..44f09c3a205a 100644 --- a/tests/datasource/fluent/test_spark_datasource.py +++ b/tests/datasource/fluent/test_spark_datasource.py @@ -88,10 +88,9 @@ def test_spark_data_asset_batch_metadata( ) assert dataframe_asset.batch_metadata == batch_metadata - batch_list = dataframe_asset.get_batch_list_from_batch_request( + batch = dataframe_asset.get_batch( dataframe_asset.build_batch_request(options={"dataframe": spark_df}) ) - assert len(batch_list) == 1 substituted_batch_metadata = copy.deepcopy(batch_metadata) substituted_batch_metadata.update( { @@ -99,7 +98,7 @@ def test_spark_data_asset_batch_metadata( "curly_pipeline_filename": __file__, } ) - assert batch_list[0].metadata == substituted_batch_metadata + assert batch.metadata == substituted_batch_metadata @pytest.mark.spark diff --git a/tests/datasource/fluent/test_spark_dbfs_datasource.py b/tests/datasource/fluent/test_spark_dbfs_datasource.py index 4ade45abfe3d..3e21c07349db 100644 --- a/tests/datasource/fluent/test_spark_dbfs_datasource.py +++ b/tests/datasource/fluent/test_spark_dbfs_datasource.py @@ -104,9 +104,7 @@ def test_get_batch_list_from_fully_specified_batch_request( options={"name": "alex", "timestamp": "20200819", "price": "1300"}, partitioner=FileNamePartitionerPath(regex=batching_regex), ) - batches = asset.get_batch_list_from_batch_request(request) - assert len(batches) == 1 - batch = batches[0] + batch = asset.get_batch(request) assert batch.batch_request.datasource_name == spark_dbfs_datasource.name assert batch.batch_request.data_asset_name == asset.name assert batch.batch_request.options == ( @@ -127,5 +125,4 @@ def test_get_batch_list_from_fully_specified_batch_request( request = asset.build_batch_request( options={"name": "alex"}, partitioner=FileNamePartitionerPath(regex=batching_regex) ) - batches = asset.get_batch_list_from_batch_request(request) - assert len(batches) == 2 + assert len(asset.get_batch_identifiers_list(request)) == 2 diff --git a/tests/datasource/fluent/test_spark_filesystem_datasource.py b/tests/datasource/fluent/test_spark_filesystem_datasource.py index 51a824ea2712..1f1ddb0c591b 100644 --- a/tests/datasource/fluent/test_spark_filesystem_datasource.py +++ b/tests/datasource/fluent/test_spark_filesystem_datasource.py @@ -756,7 +756,7 @@ def test_csv_asset_with_non_string_batching_regex_named_parameters( pytest.param(pathlib.Path("samples_2020"), id="pathlib.Path"), ], ) -def test_get_batch_list_from_directory_one_batch( +def test_get_batch_identifiers_list_from_directory_one_batch( path: PathStr, spark_filesystem_datasource: SparkFilesystemDatasource, ): @@ -771,7 +771,7 @@ def test_get_batch_list_from_directory_one_batch( ) batch_def = asset.add_batch_definition_whole_directory(name="test batch def") request = batch_def.build_batch_request() - batches = asset.get_batch_list_from_batch_request(request) + batches = asset.get_batch_identifiers_list(request) assert len(batches) == 1 @@ -801,8 +801,8 @@ def test_get_batch_list_from_directory_merges_files( ) batch_def = asset.add_batch_definition_whole_directory("test batch def") request = batch_def.build_batch_request() - batches = asset.get_batch_list_from_batch_request(request) - batch_data = batches[0].data + batch = asset.get_batch(request) + batch_data = batch.data # The directory contains 12 files with 10,000 records each so the batch data # (spark dataframe) should contain 120,000 records: assert batch_data.dataframe.count() == 12 * 10000 # type: ignore[attr-defined] @@ -860,9 +860,9 @@ def test_get_batch_list_from_partially_specified_batch_request( request = batch_def.build_batch_request(batch_parameters={"year": "2018"}) - batches = asset.get_batch_list_from_batch_request(request) - assert (len(batches)) == 12 - batch_filenames = [pathlib.Path(batch.metadata["path"]).stem for batch in batches] + batch_identifiers_list = asset.get_batch_identifiers_list(request) + assert (len(batch_identifiers_list)) == 12 + batch_filenames = [pathlib.Path(batch["path"]).stem for batch in batch_identifiers_list] assert set(files_for_2018) == set(batch_filenames) @dataclass(frozen=True) @@ -872,7 +872,7 @@ class YearMonth: expected_year_month = {YearMonth(year="2018", month=format(m, "02d")) for m in range(1, 13)} batch_year_month = { - YearMonth(year=batch.metadata["year"], month=batch.metadata["month"]) for batch in batches + YearMonth(year=batch["year"], month=batch["month"]) for batch in batch_identifiers_list } assert expected_year_month == batch_year_month @@ -896,7 +896,7 @@ def test_spark_sorter(spark_filesystem_datasource: SparkFilesystemDatasource, so batch_request = batch_def.build_batch_request() # act - batches = asset.get_batch_list_from_batch_request(batch_request) + batches = asset.get_batch_identifiers_list(batch_request) # assert assert (len(batches)) == 36 @@ -904,7 +904,7 @@ def test_spark_sorter(spark_filesystem_datasource: SparkFilesystemDatasource, so reverse = sort_ascending is False sorted_batches = sorted( batches, - key=lambda batch: (batch.metadata.get("year"), batch.metadata.get("month")), + key=lambda batch: (batch.get("year"), batch.get("month")), reverse=reverse, ) assert sorted_batches == batches @@ -943,7 +943,7 @@ def test_spark_slice_batch_count( batch_slice=batch_slice, partitioner=FileNamePartitionerMonthly(regex=batching_regex), ) - batches = asset.get_batch_list_from_batch_request(batch_request=batch_request) + batches = asset.get_batch_identifiers_list(batch_request=batch_request) assert len(batches) == expected_batch_count @@ -982,7 +982,7 @@ def datasource_test_connection_error_messages( @pytest.mark.spark -def test_get_batch_list_from_batch_request_does_not_modify_input_batch_request( +def test_get_batch_identifiers_list_does_not_modify_input_batch_request( spark_filesystem_datasource: SparkFilesystemDatasource, ): asset = spark_filesystem_datasource.add_csv_asset( @@ -994,14 +994,33 @@ def test_get_batch_list_from_batch_request_does_not_modify_input_batch_request( batch_def = asset.add_batch_definition_monthly(name="Test Batch Definition", regex=regex) request = batch_def.build_batch_request({"year": "2018"}) request_before_call = copy.deepcopy(request) - batches = asset.get_batch_list_from_batch_request(request) - # We assert the request before the call to get_batch_list_from_batch_request is equal to the request after the # noqa: E501 + batches = asset.get_batch_identifiers_list(request) + # We assert the request before the call to get_batch_identifiers_list is equal to the request after the # noqa: E501 # call. This test exists because this call was modifying the request. assert request == request_before_call # We get all 12 batches, one for each month of 2018. assert len(batches) == 12 +@pytest.mark.spark +def test_get_batch_does_not_modify_input_batch_request( + spark_filesystem_datasource: SparkFilesystemDatasource, +): + asset = spark_filesystem_datasource.add_csv_asset( + name="csv_asset", + header=True, + infer_schema=True, + ) + regex = r"yellow_tripdata_sample_(?P\d{4})-(?P\d{2})\.csv" + batch_def = asset.add_batch_definition_monthly(name="Test Batch Definition", regex=regex) + request = batch_def.build_batch_request({"year": "2018"}) + request_before_call = copy.deepcopy(request) + _ = asset.get_batch(request) + # We assert the request before the call to get_batch is equal to the request after the # noqa: E501 + # call. This test exists because this call was modifying the request. + assert request == request_before_call + + @pytest.mark.spark def test_add_csv_asset_with_batch_metadata( spark_filesystem_datasource: SparkFilesystemDatasource, @@ -1020,9 +1039,8 @@ def test_add_csv_asset_with_batch_metadata( batch_parameters = {"year": "2018", "month": "05"} request = batch_def.build_batch_request(batch_parameters) - batches = asset.get_batch_list_from_batch_request(request) - assert len(batches) == 1 - assert batches[0].metadata == { + batch = asset.get_batch(request) + assert batch.metadata == { "path": "yellow_tripdata_sample_2018-05.csv", **batch_parameters, **asset_specified_metadata, @@ -1046,10 +1064,10 @@ def directory_asset_with_no_partitioner( def expected_num_records_directory_asset_no_partitioner_2020_passenger_count_2( directory_asset_with_no_partitioner: DirectoryCSVAsset, ) -> int: - pre_partitioner_batches = directory_asset_with_no_partitioner.get_batch_list_from_batch_request( + pre_partitioner_batch = directory_asset_with_no_partitioner.get_batch( directory_asset_with_no_partitioner.build_batch_request() ) - pre_partitioner_batch_data = pre_partitioner_batches[0].data + pre_partitioner_batch_data = pre_partitioner_batch.data expected_num_records = pre_partitioner_batch_data.dataframe.filter( # type: ignore[attr-defined] F.col("pickup_datetime").contains("2018-01-11") ).count() @@ -1193,20 +1211,6 @@ def test_get_batch_parameters_keys_with_partitioner( ): assert directory_asset.get_batch_parameters_keys(partitioner=partitioner) == expected_keys - @pytest.mark.spark - def test_get_batch_list_from_batch_request_returns_single_batch( - self, - directory_asset: DirectoryCSVAsset, - daily_partitioner: ColumnPartitionerDaily, - ): - batch_parameters = {"year": 2018, "month": 1, "day": 11} - batch_request = directory_asset.build_batch_request( - options=batch_parameters, partitioner=daily_partitioner - ) - batch_list = directory_asset.get_batch_list_from_batch_request(batch_request) - expected_batch_count = 1 - assert len(batch_list) == expected_batch_count - @pytest.fixture def file_asset_with_no_partitioner( @@ -1227,11 +1231,8 @@ def expected_num_records_file_asset_no_partitioner_2020_10_passenger_count_2( single_batch_batch_request = file_asset_with_no_partitioner.build_batch_request( {"year": "2020", "month": "11"} ) - single_batch_list = file_asset_with_no_partitioner.get_batch_list_from_batch_request( - single_batch_batch_request - ) - assert len(single_batch_list) == 1 - pre_partitioner_batch_data = single_batch_list[0].data + batch = file_asset_with_no_partitioner.get_batch(single_batch_batch_request) + pre_partitioner_batch_data = batch.data expected_num_records = pre_partitioner_batch_data.dataframe.filter( # type: ignore[attr-defined] F.col("passenger_count") == 2 ).count() @@ -1251,11 +1252,9 @@ def expected_num_records_file_asset_no_partitioner_2020_10( ) ), ) - single_batch_list = file_asset_with_no_partitioner.get_batch_list_from_batch_request( - single_batch_batch_request - ) + batch = file_asset_with_no_partitioner.get_batch(single_batch_batch_request) - pre_partitioner_batch_data = single_batch_list[0].data + pre_partitioner_batch_data = batch.data expected_num_records = ( pre_partitioner_batch_data.dataframe.filter( # type: ignore[attr-defined] @@ -1283,7 +1282,7 @@ def file_asset( class TestPartitionerFileAsset: @pytest.mark.spark @pytest.mark.xfail(strict=True, reason="Will fix or refactor as part of V1-306") - def test_get_batch_list_from_batch_request_with_partitioner_file_asset_batch_parameters( + def test_parameter_keys_with_partitioner_file_asset_batch_parameters( self, file_asset, daily_partitioner ): assert file_asset.get_batch_parameters_keys(partitioner=daily_partitioner) == ( @@ -1295,25 +1294,7 @@ def test_get_batch_list_from_batch_request_with_partitioner_file_asset_batch_par @pytest.mark.spark @pytest.mark.xfail(strict=True, reason="Will fix or refactor as part of V1-306") - def test_get_batch_list_from_batch_request_with_partitioner_file_asset_one_batch( - self, file_asset, daily_partitioner - ): - post_passenger_count_partitioner_batch_request = file_asset.build_batch_request( - options={"year": "2020", "month": "11", "passenger_count": 2}, - partitioner=daily_partitioner, - ) - post_passenger_count_partitioner_batch_list = file_asset.get_batch_list_from_batch_request( - post_passenger_count_partitioner_batch_request - ) - post_partitioner_expected_num_batches = 1 - assert ( - len(post_passenger_count_partitioner_batch_list) - == post_partitioner_expected_num_batches - ) - - @pytest.mark.spark - @pytest.mark.xfail(strict=True, reason="Will fix or refactor as part of V1-306") - def test_get_batch_list_from_batch_request_with_partitioner_file_asset_one_batch_size( + def test_get_batch_with_partitioner_file_asset_one_batch_size( self, file_asset, daily_partitioner, @@ -1323,12 +1304,10 @@ def test_get_batch_list_from_batch_request_with_partitioner_file_asset_one_batch options={"year": "2020", "month": "11", "passenger_count": 2}, partitioner=daily_partitioner, ) - post_partitioner_batch_list = file_asset.get_batch_list_from_batch_request( - post_partitioner_batch_request - ) + post_partitioner_batch_list = file_asset.get_batch(post_partitioner_batch_request) # Make sure we only have passenger_count == 2 in our batch data - post_partitioner_batch_data = post_partitioner_batch_list[0].data + post_partitioner_batch_data = post_partitioner_batch_list.data assert ( post_partitioner_batch_data.dataframe.filter(F.col("passenger_count") == 2).count() @@ -1357,7 +1336,7 @@ def test_add_file_csv_asset_with_partitioner_conflicting_identifier_batch_parame ) @pytest.mark.spark - def test_add_file_csv_asset_with_partitioner_conflicting_identifier_gets_one_batch( + def test_add_file_csv_asset_with_partitioner_conflicting_identifier_gets_a_batch( self, file_asset_with_no_partitioner: CSVAsset ): regex = re.compile( @@ -1370,11 +1349,8 @@ def test_add_file_csv_asset_with_partitioner_conflicting_identifier_gets_one_bat post_partitioner_batch_request = asset.build_batch_request( options={"year": "2020", "month": "11"}, partitioner=partitioner ) - post_partitioner_batches = asset.get_batch_list_from_batch_request( - post_partitioner_batch_request - ) - post_partitioner_expected_num_batches = 1 - assert len(post_partitioner_batches) == post_partitioner_expected_num_batches + asset.get_batch(post_partitioner_batch_request) + # no errors! @pytest.mark.spark def test_add_file_csv_asset_with_partitioner_conflicting_identifier_gets_correct_data( @@ -1391,10 +1367,8 @@ def test_add_file_csv_asset_with_partitioner_conflicting_identifier_gets_correct post_partitioner_batch_request = asset.build_batch_request( options={"year": "2020", "month": "11"}, partitioner=partitioner ) - post_partitioner_batches = asset.get_batch_list_from_batch_request( - post_partitioner_batch_request - ) - post_partitioner_batch_data = post_partitioner_batches[0].data + post_partitioner_batch = asset.get_batch(post_partitioner_batch_request) + post_partitioner_batch_data = post_partitioner_batch.data assert ( post_partitioner_batch_data.dataframe.count() # type: ignore[attr-defined] diff --git a/tests/datasource/fluent/test_sqlite_datasource.py b/tests/datasource/fluent/test_sqlite_datasource.py index 15323c38f086..c06d717a9cfc 100644 --- a/tests/datasource/fluent/test_sqlite_datasource.py +++ b/tests/datasource/fluent/test_sqlite_datasource.py @@ -163,14 +163,15 @@ def test_sqlite_specific_partitioner( # Test getting all batches partitioner = partitioner_class(**partitioner_kwargs) batch_request = asset.build_batch_request(partitioner=partitioner) - all_batches = asset.get_batch_list_from_batch_request(batch_request=batch_request) + all_batches = asset.get_batch_identifiers_list(batch_request=batch_request) assert len(all_batches) == all_batches_cnt # Test getting specified batches - specified_batches = asset.get_batch_list_from_batch_request( - asset.build_batch_request(specified_batch_request, partitioner=partitioner) - ) + batch_request = asset.build_batch_request(specified_batch_request, partitioner=partitioner) + specified_batches = asset.get_batch_identifiers_list(batch_request) assert len(specified_batches) == specified_batch_cnt - assert specified_batches[-1].metadata == last_specified_batch_metadata + + batch = asset.get_batch(batch_request) + assert batch.metadata == last_specified_batch_metadata @pytest.mark.unit @@ -178,5 +179,5 @@ def test_create_temp_table(empty_data_context, create_sqlite_source): with create_sqlite_source(data_context=empty_data_context, create_temp_table=False) as source: assert source.create_temp_table is False asset = source.add_query_asset(name="query_asset", query="SELECT * from table") - _ = asset.get_batch_list_from_batch_request(asset.build_batch_request()) + _ = asset.get_batch(asset.build_batch_request()) assert source._execution_engine._create_temp_table is False diff --git a/tests/expectations/core/test_expect_column_values_to_be_present_in_other_table.py b/tests/expectations/core/test_expect_column_values_to_be_present_in_other_table.py index 78959480d1d8..5f19bc7565c9 100644 --- a/tests/expectations/core/test_expect_column_values_to_be_present_in_other_table.py +++ b/tests/expectations/core/test_expect_column_values_to_be_present_in_other_table.py @@ -77,7 +77,7 @@ def test_successful_expectation_run(sqlite_datasource): datasource = sqlite_datasource asset_name = "order_table_1" asset = datasource.add_table_asset(name=asset_name, table_name="order_table_1") - batch = asset.get_batch_list_from_batch_request(asset.build_batch_request())[0] + batch = asset.get_batch(asset.build_batch_request()) res = batch.validate( ExpectColumnValuesToBePresentInOtherTable( foreign_key_column="CUSTOMER_ID", @@ -93,7 +93,7 @@ def test_failed_expectation_run(sqlite_datasource): datasource = sqlite_datasource asset_name = "order_table_2" asset = datasource.add_table_asset(name=asset_name, table_name="order_table_2") - batch = asset.get_batch_list_from_batch_request(asset.build_batch_request())[0] + batch = asset.get_batch(asset.build_batch_request()) res = batch.validate( ExpectColumnValuesToBePresentInOtherTable( foreign_key_column="CUSTOMER_ID", @@ -119,7 +119,7 @@ def test_configuration_invalid_column_name(sqlite_datasource): datasource = sqlite_datasource asset_name = "order_table_2" asset = datasource.add_table_asset(name=asset_name, table_name="order_table_2") - batch = asset.get_batch_list_from_batch_request(asset.build_batch_request())[0] + batch = asset.get_batch(asset.build_batch_request()) res = batch.validate( ExpectColumnValuesToBePresentInOtherTable( foreign_key_column="I_DONT_EXIST", diff --git a/tests/expectations/core/test_unexpected_rows_expectation.py b/tests/expectations/core/test_unexpected_rows_expectation.py index ec2e92658fa9..2e72dd649b6d 100644 --- a/tests/expectations/core/test_unexpected_rows_expectation.py +++ b/tests/expectations/core/test_unexpected_rows_expectation.py @@ -36,7 +36,7 @@ def sqlite_batch(sqlite_datasource: SqliteDatasource) -> Batch: asset = datasource.add_table_asset("yellow_tripdata_sample_2022_01") batch_request = asset.build_batch_request() - return asset.get_batch_list_from_batch_request(batch_request)[0] + return asset.get_batch(batch_request) @pytest.mark.unit diff --git a/tests/integration/db/taxi_data_utils.py b/tests/integration/db/taxi_data_utils.py index 0ff05769e47e..b93a833d3c7c 100644 --- a/tests/integration/db/taxi_data_utils.py +++ b/tests/integration/db/taxi_data_utils.py @@ -144,9 +144,9 @@ def _execute_taxi_partitioning_test_cases( # 3. Check if resulting batches are as expected batch_request = batch_definition.build_batch_request() - batch_list = asset.get_batch_list_from_batch_request(batch_request) - assert len(batch_list) == test_case.num_expected_batch_definitions, ( - f"Found {len(batch_list)} batch definitions " + batch_identifiers_list = asset.get_batch_identifiers_list(batch_request) + assert len(batch_identifiers_list) == test_case.num_expected_batch_definitions, ( + f"Found {len(batch_identifiers_list)} batch definitions " f"but expected {test_case.num_expected_batch_definitions}" ) @@ -160,20 +160,19 @@ def _execute_taxi_partitioning_test_cases( else: raise ValueError("Missing test_column_names or test_column_names attribute.") - actual_batch_metadata = [batch.metadata for batch in batch_list] - assert actual_batch_metadata == expected_batch_metadata, ( + assert batch_identifiers_list == expected_batch_metadata, ( f"Batch metadata lists don't match.\n\n" - f"batch_list:\n{batch_list}\n\n" + f"batch_list:\n{batch_identifiers_list}\n\n" f"expected_batch metadata:\n{expected_batch_metadata}" ) # 4. Check that loaded data is as expected, using correctness - # of the first batch as a proxy for correctness of the whole list + # of arbitrary batch as a proxy for correctness of the whole list - first_batch = batch_list[0] + batch = asset.get_batch(batch_request) execution_engine = datasource.get_execution_engine() batch_data: SqlAlchemyBatchData = execution_engine.get_batch_data( - batch_spec=first_batch.batch_spec + batch_spec=batch.batch_spec ) num_rows: int = execution_engine.execute_query( diff --git a/tests/integration/test_definitions/abs/partitioned_on_datetime.py b/tests/integration/test_definitions/abs/partitioned_on_datetime.py index 14be70ffab9d..cad621482f7f 100644 --- a/tests/integration/test_definitions/abs/partitioned_on_datetime.py +++ b/tests/integration/test_definitions/abs/partitioned_on_datetime.py @@ -33,23 +33,22 @@ regex=re.compile(r"yellow_tripdata_sample_(?P.*)-(?P.*)\.csv"), ) -# first batch request: not passing any parameters +# first check: getting all identifiers batch_request = batch_definition.build_batch_request() -batch_list = asset.get_batch_list_from_batch_request(batch_request) -assert len(batch_list) == 3 -assert batch_list[0].metadata == { +batch_identifiers_list = asset.get_batch_identifiers_list(batch_request) +assert len(batch_identifiers_list) == 3 +assert batch_identifiers_list[0] == { "year": "2019", "month": "01", "path": "data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-01.csv", } -# second batch request: passing in parameters +# second batch request: getting the batch second_batch_request = batch_definition.build_batch_request( batch_parameters={"year": "2019", "month": "02"} ) -second_batch_list = asset.get_batch_list_from_batch_request(second_batch_request) -assert len(second_batch_list) == 1 -assert second_batch_list[0].metadata == { +second_batch_list = asset.get_batch(second_batch_request) +assert second_batch_list.metadata == { "year": "2019", "month": "02", "path": "data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-02.csv", diff --git a/tests/integration/test_definitions/abs/select_batch_by_path.py b/tests/integration/test_definitions/abs/select_batch_by_path.py index 94b627413e4c..d82331c540d7 100644 --- a/tests/integration/test_definitions/abs/select_batch_by_path.py +++ b/tests/integration/test_definitions/abs/select_batch_by_path.py @@ -32,9 +32,8 @@ ) batch_request = batch_definition.build_batch_request() -batch_list = asset.get_batch_list_from_batch_request(batch_request) +batch = asset.get_batch(batch_request) -assert len(batch_list) == 1 -assert batch_list[0].metadata == { +assert batch.metadata == { "path": "data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-02.csv" } diff --git a/tests/integration/test_definitions/gcs/partitioned_on_datetime.py b/tests/integration/test_definitions/gcs/partitioned_on_datetime.py index ae9b1c41178f..8785baa0bc66 100644 --- a/tests/integration/test_definitions/gcs/partitioned_on_datetime.py +++ b/tests/integration/test_definitions/gcs/partitioned_on_datetime.py @@ -25,9 +25,9 @@ # not passing in batch parameters my_batch_request = batch_definition.build_batch_request() -batches = data_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 3 -assert batches[0].metadata == { +batch_identifiers_list = data_asset.get_batch_identifiers_list(my_batch_request) +assert len(batch_identifiers_list) == 3 +assert batch_identifiers_list[0] == { "month": "01", "year": "2019", "path": "data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-01.csv", @@ -37,9 +37,8 @@ my_batch_request = batch_definition.build_batch_request( batch_parameters={"year": "2019", "month": "02"} ) -batches = data_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert batches[0].metadata == { +batch = data_asset.get_batch(my_batch_request) +assert batch.metadata == { "month": "02", "year": "2019", "path": "data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-02.csv", diff --git a/tests/integration/test_definitions/gcs/select_batch_by_path.py b/tests/integration/test_definitions/gcs/select_batch_by_path.py index dc9edc250777..4c5f653815d5 100644 --- a/tests/integration/test_definitions/gcs/select_batch_by_path.py +++ b/tests/integration/test_definitions/gcs/select_batch_by_path.py @@ -23,8 +23,7 @@ ) my_batch_request = batch_definition.build_batch_request() -batches = data_asset.get_batch_list_from_batch_request(my_batch_request) -assert len(batches) == 1 -assert batches[0].metadata == { +batches = data_asset.get_batch(my_batch_request) +assert batches.metadata == { "path": "data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-02.csv" } diff --git a/tests/integration/test_definitions/s3/partition_on_datetime.py b/tests/integration/test_definitions/s3/partition_on_datetime.py index 9b8704e6c4a4..eaeb3e50cb74 100644 --- a/tests/integration/test_definitions/s3/partition_on_datetime.py +++ b/tests/integration/test_definitions/s3/partition_on_datetime.py @@ -20,9 +20,9 @@ # Get all batches by month batch_request = batch_definition.build_batch_request() -batch_list = data_asset.get_batch_list_from_batch_request(batch_request) -assert len(batch_list) == 3 -assert batch_list[0].metadata == { +batch_identifiers_list = data_asset.get_batch_identifiers_list(batch_request) +assert len(batch_identifiers_list) == 3 +assert batch_identifiers_list[0] == { "year": "2019", "month": "01", "path": "data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-01.csv", @@ -30,9 +30,8 @@ # Get a specific batch by month batch_request = batch_definition.build_batch_request({"year": "2019", "month": "02"}) -batch_list = data_asset.get_batch_list_from_batch_request(batch_request) -assert len(batch_list) == 1 -assert batch_list[0].metadata == { +batch = data_asset.get_batch(batch_request) +assert batch.metadata == { "year": "2019", "month": "02", "path": "data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-02.csv", diff --git a/tests/integration/test_definitions/s3/select_batch_by_path.py b/tests/integration/test_definitions/s3/select_batch_by_path.py index 3c9c712fa141..2458c09ff975 100644 --- a/tests/integration/test_definitions/s3/select_batch_by_path.py +++ b/tests/integration/test_definitions/s3/select_batch_by_path.py @@ -19,9 +19,8 @@ ) batch_request = batch_definition.build_batch_request() -batch_list = data_asset.get_batch_list_from_batch_request(batch_request) +batch_list = data_asset.get_batch(batch_request) -assert len(batch_list) == 1 -assert batch_list[0].metadata == { +assert batch_list.metadata == { "path": "data/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2019-03.csv" }