Skip to content

Commit

Permalink
File-based CDK: allow for extension mismatch (#29835)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored Aug 25, 2023
1 parent 3a193c5 commit 82a96e0
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def check_availability_and_parsability(
"""
try:
files = self._check_list_files(stream)
self._check_extensions(stream, files)
self._check_parse_record(stream, files[0], logger)
except CheckAvailabilityError:
return False, "".join(traceback.format_exc())
Expand All @@ -73,11 +72,6 @@ def _check_list_files(self, stream: "AbstractFileBasedStream") -> List[RemoteFil

return files

def _check_extensions(self, stream: "AbstractFileBasedStream", files: List[RemoteFile]) -> None:
if not all(f.extension_agrees_with_file_type(stream.config.file_type) for f in files):
raise CheckAvailabilityError(FileBasedSourceError.EXTENSION_MISMATCH, stream=stream.name)
return None

def _check_parse_record(self, stream: "AbstractFileBasedStream", file: RemoteFile, logger: logging.Logger) -> None:
parser = stream.get_parser(stream.config.file_type)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

class FileBasedSourceError(Enum):
EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict."
EXTENSION_MISMATCH = "The file type that you specified for this stream does not agree with the extension of one or more files in the stream. You may need to modify your glob patterns."
GLOB_PARSE_ERROR = (
"Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from functools import cached_property, lru_cache
from typing import Any, Dict, Iterable, List, Mapping, Optional

from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, PrimaryKeyType
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
Expand Down Expand Up @@ -38,7 +38,7 @@ class AbstractFileBasedStream(Stream):
def __init__(
self,
config: FileBasedStreamConfig,
catalog_schema: Optional[ConfiguredAirbyteCatalog],
catalog_schema: Optional[Mapping[str, Any]],
stream_reader: AbstractFileBasedStreamReader,
availability_strategy: AbstractFileBasedAvailabilityStrategy,
discovery_policy: AbstractDiscoveryPolicy,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import unittest
from datetime import datetime
from unittest.mock import Mock, PropertyMock

from airbyte_cdk.sources.file_based.availability_strategy.default_file_based_availability_strategy import (
DefaultFileBasedAvailabilityStrategy,
)
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream

_FILE_WITH_UNKNOWN_EXTENSION = RemoteFile(uri="a.unknown_extension", last_modified=datetime.now(), file_type="csv")
_ANY_CONFIG = FileBasedStreamConfig(
name="config.name",
file_type="parquet",
format=JsonlFormat(),
)
_ANY_SCHEMA = {"key": "value"}


class DefaultFileBasedAvailabilityStrategyTest(unittest.TestCase):

def setUp(self) -> None:
self._stream_reader = Mock(spec=AbstractFileBasedStreamReader)
self._strategy = DefaultFileBasedAvailabilityStrategy(self._stream_reader)

self._parser = Mock(spec=FileTypeParser)
self._stream = Mock(spec=AbstractFileBasedStream)
self._stream.get_parser.return_value = self._parser
self._stream.catalog_schema = _ANY_SCHEMA
self._stream.config = _ANY_CONFIG
self._stream.validation_policy = PropertyMock(validate_schema_before_sync=False)

def test_given_file_extension_does_not_match_when_check_availability_and_parsability_then_stream_is_still_available(self) -> None:
"""
Before, we had a validation on the file extension but it turns out that in production, users sometimes have mismatch there. The
example we've seen was for JSONL parser but the file extension was just `.json`. Note that there we more than one record extracted
from this stream so it's not just that the file is one JSON object
"""
self._stream.list_files.return_value = [_FILE_WITH_UNKNOWN_EXTENSION]
self._parser.parse_records.return_value = [{"a record": 1}]

is_available, reason = self._strategy.check_availability_and_parsability(self._stream, Mock(), Mock())

assert is_available
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,6 @@
).build()


error_extension_mismatch_scenario = (
_base_failure_scenario.copy()
.set_name("error_extension_mismatch_scenario")
.set_file_type("jsonl")
.set_expected_check_error(None, FileBasedSourceError.EXTENSION_MISMATCH.value)
).build()


error_listing_files_scenario = (
_base_failure_scenario.copy()
.set_name("error_listing_files_scenario")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
)
from unit_tests.sources.file_based.scenarios.check_scenarios import (
error_empty_stream_scenario,
error_extension_mismatch_scenario,
error_listing_files_scenario,
error_multi_stream_scenario,
error_reading_file_scenario,
Expand Down Expand Up @@ -309,7 +308,6 @@ def test_spec(capsys: CaptureFixture[str], scenario: TestScenario) -> None:

check_scenarios = [
error_empty_stream_scenario,
error_extension_mismatch_scenario,
error_listing_files_scenario,
error_reading_file_scenario,
error_record_validation_user_provided_schema_scenario,
Expand Down

0 comments on commit 82a96e0

Please sign in to comment.