Skip to content

Commit

Permalink
feat: add web URL loader & refine indexing logics (#397)
Browse files Browse the repository at this point in the history
* feat: add web URL loader & refine indexing logics

* fix: comfort mypy
  • Loading branch information
taprosoft authored Oct 15, 2024
1 parent 41966fc commit b113efc
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 60 deletions.
2 changes: 2 additions & 0 deletions libs/kotaemon/kotaemon/indices/ingests/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
PDFThumbnailReader,
TxtReader,
UnstructuredReader,
WebReader,
)

web_reader = WebReader()
unstructured = UnstructuredReader()
adobe_reader = AdobeReader()
azure_reader = AzureAIDocumentIntelligenceLoader(
Expand Down
2 changes: 2 additions & 0 deletions libs/kotaemon/kotaemon/loaders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .pdf_loader import PDFThumbnailReader
from .txt_loader import TxtReader
from .unstructured_loader import UnstructuredReader
from .web_loader import WebReader

__all__ = [
"AutoReader",
Expand All @@ -28,4 +29,5 @@
"AdobeReader",
"TxtReader",
"PDFThumbnailReader",
"WebReader",
]
43 changes: 43 additions & 0 deletions libs/kotaemon/kotaemon/loaders/web_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from pathlib import Path
from typing import Optional

import requests
from decouple import config

from kotaemon.base import Document

from .base import BaseReader

JINA_API_KEY = config("JINA_API_KEY", default="")
JINA_URL = config("JINA_URL", default="https://r.jina.ai/")


class WebReader(BaseReader):
def run(
self, file_path: str | Path, extra_info: Optional[dict] = None, **kwargs
) -> list[Document]:
return self.load_data(Path(file_path), extra_info=extra_info, **kwargs)

def fetch_url(self, url: str):
# setup the request
api_url = f"https://r.jina.ai/{url}"
headers = {
"X-With-Links-Summary": "true",
}
if JINA_API_KEY:
headers["Authorization"] = f"Bearer {JINA_API_KEY}"

response = requests.get(api_url, headers=headers)
response.raise_for_status()

data = response.text
return data

def load_data(
self, file_path: str | Path, extra_info: Optional[dict] = None, **kwargs
) -> list[Document]:
file_path = str(file_path)
output = self.fetch_url(file_path)
metadata = extra_info or {}

return [Document(text=output, metadata=metadata)]
2 changes: 1 addition & 1 deletion libs/ktem/ktem/index/file/graph/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def prepare_graph_index_path(graph_id: str):
class GraphRAGIndexingPipeline(IndexDocumentPipeline):
"""GraphRAG specific indexing pipeline"""

def route(self, file_path: Path) -> IndexPipeline:
def route(self, file_path: str | Path) -> IndexPipeline:
"""Simply disable the splitter (chunking) for this pipeline"""
pipeline = super().route(file_path)
pipeline.splitter = None
Expand Down
2 changes: 1 addition & 1 deletion libs/ktem/ktem/index/file/knet/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_user_settings(cls):
},
}

def route(self, file_path: Path) -> IndexPipeline:
def route(self, file_path: str | Path) -> IndexPipeline:
"""Simply disable the splitter (chunking) for this pipeline"""
pipeline = super().route(file_path)
pipeline.splitter = None
Expand Down
129 changes: 96 additions & 33 deletions libs/ktem/ktem/index/file/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
adobe_reader,
azure_reader,
unstructured,
web_reader,
)
from kotaemon.indices.rankings import BaseReranking, LLMReranking, LLMTrulensScoring
from kotaemon.indices.splitters import BaseSplitter, TokenSplitter
Expand Down Expand Up @@ -444,7 +445,7 @@ def handle_chunks_vectorstore(self, chunks, file_id):
session.add_all(nodes)
session.commit()

def get_id_if_exists(self, file_path: Path) -> Optional[str]:
def get_id_if_exists(self, file_path: str | Path) -> Optional[str]:
"""Check if the file is already indexed
Args:
Expand All @@ -453,13 +454,14 @@ def get_id_if_exists(self, file_path: Path) -> Optional[str]:
Returns:
the file id if the file is indexed, otherwise None
"""
file_name = file_path.name if isinstance(file_path, Path) else file_path
if self.private:
cond: tuple = (
self.Source.name == file_path.name,
self.Source.name == file_name,
self.Source.user == self.user_id,
)
else:
cond = (self.Source.name == file_path.name,)
cond = (self.Source.name == file_name,)

with Session(engine) as session:
stmt = select(self.Source).where(*cond)
Expand All @@ -469,6 +471,29 @@ def get_id_if_exists(self, file_path: Path) -> Optional[str]:

return None

def store_url(self, url: str) -> str:
"""Store URL into the database and storage, return the file id
Args:
url: the URL
Returns:
the file id
"""
file_hash = sha256(url.encode()).hexdigest()
source = self.Source(
name=url,
path=file_hash,
size=0,
user=self.user_id, # type: ignore
)
with Session(engine) as session:
session.add(source)
session.commit()
file_id = source.id

return file_id

def store_file(self, file_path: Path) -> str:
"""Store file into the database and storage, return the file id
Expand All @@ -495,7 +520,7 @@ def store_file(self, file_path: Path) -> str:

return file_id

def finish(self, file_id: str, file_path: Path) -> str:
def finish(self, file_id: str, file_path: str | Path) -> str:
"""Finish the indexing"""
with Session(engine) as session:
stmt = select(self.Source).where(self.Source.id == file_id)
Expand Down Expand Up @@ -561,37 +586,55 @@ def run(
def stream(
self, file_path: str | Path, reindex: bool, **kwargs
) -> Generator[Document, None, tuple[str, list[Document]]]:
# check for duplication
file_path = Path(file_path).resolve()
# check if the file is already indexed
if isinstance(file_path, Path):
file_path = file_path.resolve()

file_id = self.get_id_if_exists(file_path)
if file_id is not None:
if not reindex:
raise ValueError(
f"File {file_path.name} already indexed. Please rerun with "
"reindex=True to force reindexing."
)

if isinstance(file_path, Path):
if file_id is not None:
if not reindex:
raise ValueError(
f"File {file_path.name} already indexed. Please rerun with "
"reindex=True to force reindexing."
)
else:
# remove the existing records
yield Document(
f" => Removing old {file_path.name}", channel="debug"
)
self.delete_file(file_id)
file_id = self.store_file(file_path)
else:
# remove the existing records
yield Document(f" => Removing old {file_path.name}", channel="debug")
self.delete_file(file_id)
# add record to db
file_id = self.store_file(file_path)
else:
# add record to db
file_id = self.store_file(file_path)
if file_id is not None:
raise ValueError(f"URL {file_path} already indexed.")
else:
# add record to db
file_id = self.store_url(file_path)

# extract the file
extra_info = default_file_metadata_func(str(file_path))
if isinstance(file_path, Path):
extra_info = default_file_metadata_func(str(file_path))
file_name = file_path.name
else:
extra_info = {"file_name": file_path}
file_name = file_path

extra_info["file_id"] = file_id
extra_info["collection_name"] = self.collection_name

yield Document(f" => Converting {file_path.name} to text", channel="debug")
yield Document(f" => Converting {file_name} to text", channel="debug")
docs = self.loader.load_data(file_path, extra_info=extra_info)
yield Document(f" => Converted {file_path.name} to text", channel="debug")
yield from self.handle_docs(docs, file_id, file_path.name)
yield Document(f" => Converted {file_name} to text", channel="debug")
yield from self.handle_docs(docs, file_id, file_name)

self.finish(file_id, file_path)

yield Document(f" => Finished indexing {file_path.name}", channel="debug")
yield Document(f" => Finished indexing {file_name}", channel="debug")
return file_id, docs


Expand Down Expand Up @@ -658,20 +701,30 @@ def get_pipeline(cls, user_settings, index_settings) -> BaseFileIndexIndexing:
)
return obj

def route(self, file_path: Path) -> IndexPipeline:
def is_url(self, file_path: str | Path) -> bool:
return isinstance(file_path, str) and (
file_path.startswith("http://") or file_path.startswith("https://")
)

def route(self, file_path: str | Path) -> IndexPipeline:
"""Decide the pipeline based on the file type
Can subclass this method for a more elaborate pipeline routing strategy.
"""
_, chunk_size, chunk_overlap = dev_settings()

ext = file_path.suffix.lower()
reader = self.readers.get(ext, unstructured)
if reader is None:
raise NotImplementedError(
f"No supported pipeline to index {file_path.name}. Please specify "
"the suitable pipeline for this file type in the settings."
)
# check if file_path is a URL
if self.is_url(file_path):
reader = web_reader
else:
assert isinstance(file_path, Path)
ext = file_path.suffix.lower()
reader = self.readers.get(ext, unstructured)
if reader is None:
raise NotImplementedError(
f"No supported pipeline to index {file_path.name}. Please specify "
"the suitable pipeline for this file type in the settings."
)

print("Using reader", reader)
pipeline: IndexPipeline = IndexPipeline(
Expand Down Expand Up @@ -715,9 +768,14 @@ def stream(

n_files = len(file_paths)
for idx, file_path in enumerate(file_paths):
file_path = Path(file_path)
if self.is_url(file_path):
file_name = file_path
else:
file_path = Path(file_path)
file_name = file_path.name

yield Document(
content=f"Indexing [{idx + 1}/{n_files}]: {file_path.name}",
content=f"Indexing [{idx + 1}/{n_files}]: {file_name}",
channel="debug",
)

Expand All @@ -730,7 +788,11 @@ def stream(
file_ids.append(file_id)
errors.append(None)
yield Document(
content={"file_path": file_path, "status": "success"},
content={
"file_path": file_path,
"file_name": file_name,
"status": "success",
},
channel="index",
)
except Exception as e:
Expand All @@ -740,6 +802,7 @@ def stream(
yield Document(
content={
"file_path": file_path,
"file_name": file_name,
"status": "failed",
"message": str(e),
},
Expand Down
Loading

0 comments on commit b113efc

Please sign in to comment.