diff --git a/unstructured_ingest/v2/interfaces/file_data.py b/unstructured_ingest/v2/interfaces/file_data.py index b364caf0..be975d23 100644 --- a/unstructured_ingest/v2/interfaces/file_data.py +++ b/unstructured_ingest/v2/interfaces/file_data.py @@ -50,7 +50,7 @@ def from_file(cls, path: str) -> "FileData": raise ValueError(f"file path not valid: {path}") with open(str(path.resolve()), "rb") as f: file_data_dict = json.load(f) - file_data = FileData.model_validate(file_data_dict) + file_data = cls.model_validate(file_data_dict) return file_data @classmethod @@ -99,7 +99,7 @@ def populate_identifier(cls, data: Any) -> Any: def file_data_from_file(path: str) -> FileData: try: - BatchFileData.from_file(path=path) + return BatchFileData.from_file(path=path) except ValidationError: logger.debug(f"{path} not valid for batch file data") diff --git a/unstructured_ingest/v2/pipeline/steps/chunk.py b/unstructured_ingest/v2/pipeline/steps/chunk.py index 0ae05169..519dcc8f 100644 --- a/unstructured_ingest/v2/pipeline/steps/chunk.py +++ b/unstructured_ingest/v2/pipeline/steps/chunk.py @@ -6,6 +6,7 @@ from typing import Callable, Optional, TypedDict from unstructured_ingest.v2.interfaces import FileData +from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.pipeline.interfaces import PipelineStep from unstructured_ingest.v2.processes.chunker import Chunker @@ -51,7 +52,7 @@ async def _run_async( self, fn: Callable, path: str, file_data_path: str, **kwargs ) -> ChunkStepResponse: path = Path(path) - file_data = FileData.from_file(path=file_data_path) + file_data = file_data_from_file(path=file_data_path) output_filepath = self.get_output_filepath(filename=path) if not self.should_chunk(filepath=output_filepath, file_data=file_data): logger.debug(f"skipping chunking, output already exists: {output_filepath}") diff --git a/unstructured_ingest/v2/pipeline/steps/download.py b/unstructured_ingest/v2/pipeline/steps/download.py index 560d292d..929abb19 100644 --- a/unstructured_ingest/v2/pipeline/steps/download.py +++ b/unstructured_ingest/v2/pipeline/steps/download.py @@ -8,6 +8,7 @@ from unstructured_ingest.v2.interfaces import FileData, download_responses from unstructured_ingest.v2.interfaces.downloader import Downloader +from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.pipeline.interfaces import PipelineStep from unstructured_ingest.v2.utils import serialize_base_model_json @@ -92,7 +93,7 @@ def update_file_data( json.dump(file_data.model_dump(), file, indent=2) async def _run_async(self, fn: Callable, file_data_path: str) -> list[DownloadStepResponse]: - file_data = FileData.from_file(path=file_data_path) + file_data = file_data_from_file(path=file_data_path) download_path = self.process.get_download_path(file_data=file_data) if not self.should_download(file_data=file_data, file_data_path=file_data_path): logger.debug(f"skipping download, file already exists locally: {download_path}") diff --git a/unstructured_ingest/v2/pipeline/steps/embed.py b/unstructured_ingest/v2/pipeline/steps/embed.py index aa718dd3..ba3431a8 100644 --- a/unstructured_ingest/v2/pipeline/steps/embed.py +++ b/unstructured_ingest/v2/pipeline/steps/embed.py @@ -6,6 +6,7 @@ from typing import Callable, Optional, TypedDict from unstructured_ingest.v2.interfaces import FileData +from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.pipeline.interfaces import PipelineStep from unstructured_ingest.v2.processes.embedder import Embedder @@ -49,7 +50,7 @@ def _save_output(self, output_filepath: str, embedded_content: list[dict]): async def _run_async(self, fn: Callable, path: str, file_data_path: str) -> EmbedStepResponse: path = Path(path) - file_data = FileData.from_file(path=file_data_path) + file_data = file_data_from_file(path=file_data_path) output_filepath = self.get_output_filepath(filename=path) if not self.should_embed(filepath=output_filepath, file_data=file_data): logger.debug(f"skipping embedding, output already exists: {output_filepath}") diff --git a/unstructured_ingest/v2/pipeline/steps/filter.py b/unstructured_ingest/v2/pipeline/steps/filter.py index 48ed23f1..4aacc1b9 100644 --- a/unstructured_ingest/v2/pipeline/steps/filter.py +++ b/unstructured_ingest/v2/pipeline/steps/filter.py @@ -2,7 +2,7 @@ from dataclasses import dataclass from typing import Callable, Optional -from unstructured_ingest.v2.interfaces.file_data import FileData +from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.pipeline.interfaces import PipelineStep from unstructured_ingest.v2.processes.filter import Filterer @@ -20,7 +20,7 @@ def __post_init__(self): logger.info(f"created {self.identifier} with configs: {config}") async def _run_async(self, fn: Callable, file_data_path: str, **kwargs) -> Optional[dict]: - file_data = FileData.from_file(path=file_data_path) + file_data = file_data_from_file(path=file_data_path) fn_kwargs = {"file_data": file_data} if not asyncio.iscoroutinefunction(fn): resp = fn(**fn_kwargs) diff --git a/unstructured_ingest/v2/pipeline/steps/partition.py b/unstructured_ingest/v2/pipeline/steps/partition.py index e1f57c87..4ffd549f 100644 --- a/unstructured_ingest/v2/pipeline/steps/partition.py +++ b/unstructured_ingest/v2/pipeline/steps/partition.py @@ -6,6 +6,7 @@ from typing import Callable, Optional, TypedDict from unstructured_ingest.v2.interfaces import FileData +from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.pipeline.interfaces import PipelineStep from unstructured_ingest.v2.processes.partitioner import Partitioner @@ -51,7 +52,7 @@ async def _run_async( self, fn: Callable, path: str, file_data_path: str ) -> Optional[PartitionStepResponse]: path = Path(path) - file_data = FileData.from_file(path=file_data_path) + file_data = file_data_from_file(path=file_data_path) output_filepath = self.get_output_filepath(filename=Path(file_data_path)) if not self.should_partition(filepath=output_filepath, file_data=file_data): logger.debug(f"skipping partitioning, output already exists: {output_filepath}") diff --git a/unstructured_ingest/v2/pipeline/steps/stage.py b/unstructured_ingest/v2/pipeline/steps/stage.py index a23ebbf7..3e8b62e8 100644 --- a/unstructured_ingest/v2/pipeline/steps/stage.py +++ b/unstructured_ingest/v2/pipeline/steps/stage.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import Callable, Optional, TypedDict -from unstructured_ingest.v2.interfaces.file_data import FileData +from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.interfaces.upload_stager import UploadStager from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.pipeline.interfaces import PipelineStep @@ -43,7 +43,7 @@ async def _run_async( output_filename = f"{self.get_hash(extras=[path.name])}{path.suffix}" fn_kwargs = { "elements_filepath": path, - "file_data": FileData.from_file(path=file_data_path), + "file_data": file_data_from_file(path=file_data_path), "output_dir": self.cache_dir, "output_filename": output_filename, } diff --git a/unstructured_ingest/v2/pipeline/steps/uncompress.py b/unstructured_ingest/v2/pipeline/steps/uncompress.py index a99eda88..80e7603d 100644 --- a/unstructured_ingest/v2/pipeline/steps/uncompress.py +++ b/unstructured_ingest/v2/pipeline/steps/uncompress.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Callable, TypedDict -from unstructured_ingest.v2.interfaces.file_data import FileData +from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.pipeline.interfaces import PipelineStep from unstructured_ingest.v2.processes.uncompress import Uncompressor @@ -28,7 +28,7 @@ def __post_init__(self): async def _run_async( self, fn: Callable, path: str, file_data_path: str ) -> list[UncompressStepResponse]: - file_data = FileData.from_file(path=file_data_path) + file_data = file_data_from_file(path=file_data_path) fn_kwargs = {"file_data": file_data} if not asyncio.iscoroutinefunction(fn): new_file_data = fn(**fn_kwargs) diff --git a/unstructured_ingest/v2/pipeline/steps/upload.py b/unstructured_ingest/v2/pipeline/steps/upload.py index 176f8015..21da4c5f 100644 --- a/unstructured_ingest/v2/pipeline/steps/upload.py +++ b/unstructured_ingest/v2/pipeline/steps/upload.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Callable, Optional, TypedDict -from unstructured_ingest.v2.interfaces import FileData +from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.interfaces.uploader import UploadContent from unstructured_ingest.v2.logger import logger from unstructured_ingest.v2.pipeline.interfaces import BatchPipelineStep @@ -41,14 +41,14 @@ def __post_init__(self): @instrument(span_name=STEP_ID) def _run_batch(self, contents: list[UploadStepContent]) -> None: upload_contents = [ - UploadContent(path=Path(c["path"]), file_data=FileData.from_file(c["file_data_path"])) + UploadContent(path=Path(c["path"]), file_data=file_data_from_file(c["file_data_path"])) for c in contents ] self.process.run_batch(contents=upload_contents) async def _run_async(self, path: str, file_data_path: str, fn: Optional[Callable] = None): fn = fn or self.process.run_async - fn_kwargs = {"path": Path(path), "file_data": FileData.from_file(path=file_data_path)} + fn_kwargs = {"path": Path(path), "file_data": file_data_from_file(path=file_data_path)} if not asyncio.iscoroutinefunction(fn): fn(**fn_kwargs) elif semaphore := self.context.semaphore: diff --git a/unstructured_ingest/v2/processes/connectors/astradb.py b/unstructured_ingest/v2/processes/connectors/astradb.py index 38f679de..d30c53d8 100644 --- a/unstructured_ingest/v2/processes/connectors/astradb.py +++ b/unstructured_ingest/v2/processes/connectors/astradb.py @@ -219,7 +219,7 @@ def run(self, **kwargs: Any) -> Generator[AstraDBBatchFileData, None, None]: collection_name=self.index_config.collection_name, keyspace=self.index_config.keyspace, ), - batch_items=[BatchItem(identifier=str(b)) for b in batch], + batch_items=[BatchItem(identifier=b) for b in batch], ) yield fd