Skip to content

Commit

Permalink
leverage from file util for all file data
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed Dec 17, 2024
1 parent b510b60 commit 675dbee
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 16 deletions.
4 changes: 2 additions & 2 deletions unstructured_ingest/v2/interfaces/file_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def from_file(cls, path: str) -> "FileData":
raise ValueError(f"file path not valid: {path}")
with open(str(path.resolve()), "rb") as f:
file_data_dict = json.load(f)
file_data = FileData.model_validate(file_data_dict)
file_data = cls.model_validate(file_data_dict)
return file_data

@classmethod
Expand Down Expand Up @@ -99,7 +99,7 @@ def populate_identifier(cls, data: Any) -> Any:

def file_data_from_file(path: str) -> FileData:
try:
BatchFileData.from_file(path=path)
return BatchFileData.from_file(path=path)
except ValidationError:
logger.debug(f"{path} not valid for batch file data")

Expand Down
3 changes: 2 additions & 1 deletion unstructured_ingest/v2/pipeline/steps/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Callable, Optional, TypedDict

from unstructured_ingest.v2.interfaces import FileData
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.pipeline.interfaces import PipelineStep
from unstructured_ingest.v2.processes.chunker import Chunker
Expand Down Expand Up @@ -51,7 +52,7 @@ async def _run_async(
self, fn: Callable, path: str, file_data_path: str, **kwargs
) -> ChunkStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
file_data = file_data_from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_chunk(filepath=output_filepath, file_data=file_data):
logger.debug(f"skipping chunking, output already exists: {output_filepath}")
Expand Down
3 changes: 2 additions & 1 deletion unstructured_ingest/v2/pipeline/steps/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from unstructured_ingest.v2.interfaces import FileData, download_responses
from unstructured_ingest.v2.interfaces.downloader import Downloader
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.pipeline.interfaces import PipelineStep
from unstructured_ingest.v2.utils import serialize_base_model_json
Expand Down Expand Up @@ -92,7 +93,7 @@ def update_file_data(
json.dump(file_data.model_dump(), file, indent=2)

async def _run_async(self, fn: Callable, file_data_path: str) -> list[DownloadStepResponse]:
file_data = FileData.from_file(path=file_data_path)
file_data = file_data_from_file(path=file_data_path)
download_path = self.process.get_download_path(file_data=file_data)
if not self.should_download(file_data=file_data, file_data_path=file_data_path):
logger.debug(f"skipping download, file already exists locally: {download_path}")
Expand Down
3 changes: 2 additions & 1 deletion unstructured_ingest/v2/pipeline/steps/embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Callable, Optional, TypedDict

from unstructured_ingest.v2.interfaces import FileData
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.pipeline.interfaces import PipelineStep
from unstructured_ingest.v2.processes.embedder import Embedder
Expand Down Expand Up @@ -49,7 +50,7 @@ def _save_output(self, output_filepath: str, embedded_content: list[dict]):

async def _run_async(self, fn: Callable, path: str, file_data_path: str) -> EmbedStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
file_data = file_data_from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_embed(filepath=output_filepath, file_data=file_data):
logger.debug(f"skipping embedding, output already exists: {output_filepath}")
Expand Down
4 changes: 2 additions & 2 deletions unstructured_ingest/v2/pipeline/steps/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass
from typing import Callable, Optional

from unstructured_ingest.v2.interfaces.file_data import FileData
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.pipeline.interfaces import PipelineStep
from unstructured_ingest.v2.processes.filter import Filterer
Expand All @@ -20,7 +20,7 @@ def __post_init__(self):
logger.info(f"created {self.identifier} with configs: {config}")

async def _run_async(self, fn: Callable, file_data_path: str, **kwargs) -> Optional[dict]:
file_data = FileData.from_file(path=file_data_path)
file_data = file_data_from_file(path=file_data_path)
fn_kwargs = {"file_data": file_data}
if not asyncio.iscoroutinefunction(fn):
resp = fn(**fn_kwargs)
Expand Down
3 changes: 2 additions & 1 deletion unstructured_ingest/v2/pipeline/steps/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Callable, Optional, TypedDict

from unstructured_ingest.v2.interfaces import FileData
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.pipeline.interfaces import PipelineStep
from unstructured_ingest.v2.processes.partitioner import Partitioner
Expand Down Expand Up @@ -51,7 +52,7 @@ async def _run_async(
self, fn: Callable, path: str, file_data_path: str
) -> Optional[PartitionStepResponse]:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
file_data = file_data_from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=Path(file_data_path))
if not self.should_partition(filepath=output_filepath, file_data=file_data):
logger.debug(f"skipping partitioning, output already exists: {output_filepath}")
Expand Down
4 changes: 2 additions & 2 deletions unstructured_ingest/v2/pipeline/steps/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path
from typing import Callable, Optional, TypedDict

from unstructured_ingest.v2.interfaces.file_data import FileData
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.interfaces.upload_stager import UploadStager
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.pipeline.interfaces import PipelineStep
Expand Down Expand Up @@ -43,7 +43,7 @@ async def _run_async(
output_filename = f"{self.get_hash(extras=[path.name])}{path.suffix}"
fn_kwargs = {
"elements_filepath": path,
"file_data": FileData.from_file(path=file_data_path),
"file_data": file_data_from_file(path=file_data_path),
"output_dir": self.cache_dir,
"output_filename": output_filename,
}
Expand Down
4 changes: 2 additions & 2 deletions unstructured_ingest/v2/pipeline/steps/uncompress.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Callable, TypedDict

from unstructured_ingest.v2.interfaces.file_data import FileData
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.pipeline.interfaces import PipelineStep
from unstructured_ingest.v2.processes.uncompress import Uncompressor
Expand All @@ -28,7 +28,7 @@ def __post_init__(self):
async def _run_async(
self, fn: Callable, path: str, file_data_path: str
) -> list[UncompressStepResponse]:
file_data = FileData.from_file(path=file_data_path)
file_data = file_data_from_file(path=file_data_path)
fn_kwargs = {"file_data": file_data}
if not asyncio.iscoroutinefunction(fn):
new_file_data = fn(**fn_kwargs)
Expand Down
6 changes: 3 additions & 3 deletions unstructured_ingest/v2/pipeline/steps/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Callable, Optional, TypedDict

from unstructured_ingest.v2.interfaces import FileData
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.interfaces.uploader import UploadContent
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.pipeline.interfaces import BatchPipelineStep
Expand Down Expand Up @@ -41,14 +41,14 @@ def __post_init__(self):
@instrument(span_name=STEP_ID)
def _run_batch(self, contents: list[UploadStepContent]) -> None:
upload_contents = [
UploadContent(path=Path(c["path"]), file_data=FileData.from_file(c["file_data_path"]))
UploadContent(path=Path(c["path"]), file_data=file_data_from_file(c["file_data_path"]))
for c in contents
]
self.process.run_batch(contents=upload_contents)

async def _run_async(self, path: str, file_data_path: str, fn: Optional[Callable] = None):
fn = fn or self.process.run_async
fn_kwargs = {"path": Path(path), "file_data": FileData.from_file(path=file_data_path)}
fn_kwargs = {"path": Path(path), "file_data": file_data_from_file(path=file_data_path)}
if not asyncio.iscoroutinefunction(fn):
fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
Expand Down
2 changes: 1 addition & 1 deletion unstructured_ingest/v2/processes/connectors/astradb.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def run(self, **kwargs: Any) -> Generator[AstraDBBatchFileData, None, None]:
collection_name=self.index_config.collection_name,
keyspace=self.index_config.keyspace,
),
batch_items=[BatchItem(identifier=str(b)) for b in batch],
batch_items=[BatchItem(identifier=b) for b in batch],
)
yield fd

Expand Down

0 comments on commit 675dbee

Please sign in to comment.