Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/vectara-destination-to-v2 #158

Merged
merged 59 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
6f6a4dd
vectara v2 still work in progress
guilherme-uns Oct 8, 2024
abbab10
takingn errors out
guilherme-uns Oct 8, 2024
a11f8a3
merge
guilherme-uns Oct 8, 2024
9a1fc1c
fix lint
guilherme-uns Oct 8, 2024
0bf69f5
linting
guilherme-uns Oct 8, 2024
265d0e8
order imports
guilherme-uns Oct 8, 2024
eba45e8
ruff --fix
guilherme-uns Oct 9, 2024
c4f208f
fix version
guilherme-uns Oct 9, 2024
4805c55
fixing PR issues by Roman
guilherme-uns Oct 10, 2024
c6fa937
ruff
guilherme-uns Oct 10, 2024
9d2cf32
wrking get metadata on stager
guilherme-uns Oct 11, 2024
8ca8f7a
remove coments
guilherme-uns Oct 11, 2024
7c38cf7
changing conform dict to the correct way and including vdoc on stager…
guilherme-uns Oct 12, 2024
4f64fdd
taking path out from file by Potter chat to eave only url, even thoug…
guilherme-uns Oct 14, 2024
5b8de54
fix potter comments
guilherme-uns Oct 14, 2024
47b252d
change version
guilherme-uns Oct 15, 2024
39a0422
version update
guilherme-uns Oct 16, 2024
7df96a3
merge
guilherme-uns Oct 16, 2024
ccfb736
make tidy
guilherme-uns Oct 16, 2024
d56b77c
add secret to access config
guilherme-uns Oct 18, 2024
d139e82
merge
guilherme-uns Oct 18, 2024
a02f310
make tidy
guilherme-uns Oct 18, 2024
1b1a011
.
guilherme-uns Oct 18, 2024
121d939
get secret value
guilherme-uns Oct 18, 2024
884e788
change version
guilherme-uns Oct 18, 2024
7ed0d51
add async
guilherme-uns Oct 21, 2024
bf18c04
mrg
guilherme-uns Oct 21, 2024
d2f5965
change import
guilherme-uns Oct 21, 2024
5df25c0
vectara requirements
guilherme-uns Oct 21, 2024
265740e
make tidy
guilherme-uns Oct 21, 2024
4c1e9a5
mt
guilherme-uns Oct 21, 2024
babc450
mrge
guilherme-uns Oct 21, 2024
42ed602
fix PR comments
guilherme-uns Oct 22, 2024
2eacc95
vectara example to be able to debug async
guilherme-uns Oct 22, 2024
1da9c7c
no wait worn
guilherme-uns Oct 23, 2024
ca6efbd
improving logging
guilherme-uns Oct 24, 2024
b8808a1
precheck without async.
guilherme-uns Oct 24, 2024
a5e3b61
merge
guilherme-uns Oct 24, 2024
549451c
mke tidy
guilherme-uns Oct 24, 2024
a2048a4
lint
guilherme-uns Oct 24, 2024
dde17f0
linting
guilherme-uns Oct 24, 2024
45fbab9
some fixes vectara
guilherme-uns Oct 25, 2024
85a5810
migrate to vectara v2 api
bryan-unstructured Nov 27, 2024
8c7d510
add integration test for vectara
bryan-unstructured Nov 27, 2024
cf0560c
fix syntax
bryan-unstructured Nov 27, 2024
82999eb
divide elements to batches
bryan-unstructured Nov 27, 2024
4f7b3af
Add retry logic to document query
bryan-unstructured Nov 28, 2024
8a3999a
change integration test to regular function
bryan-unstructured Nov 28, 2024
a536674
clean up corpus after integration test
bryan-unstructured Nov 28, 2024
43ace32
fix syntax error
bryan-unstructured Dec 3, 2024
14d5f86
update connection config in example
bryan-unstructured Dec 4, 2024
7b12e0c
Remove unnecessary var
bryan-unstructured Dec 18, 2024
606e3a6
remove batch_size because Vectara api does not support batch indexing…
bryan-unstructured Dec 18, 2024
1f8e1c3
remove asyncio.run to avoid conflict with async context
bryan-unstructured Dec 19, 2024
0cf0c14
Resolve merge conflict by incorporating both suggestions
bryan-unstructured Dec 19, 2024
29175ba
update stager to reflect new structure
bryan-unstructured Dec 19, 2024
2212caa
update uploader to reflect new structure
bryan-unstructured Dec 19, 2024
0faf4df
fix syntax
bryan-unstructured Dec 20, 2024
086cc53
Merge branch 'main' into DS-92-vectara-destination-to-v2
bryan-unstructured Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.2.0-dev0

### Enhancements

* **Migrate Vectara Destination Connector to v2**


## 0.2.0

### Enhancements
Expand Down
2 changes: 2 additions & 0 deletions requirements/connectors/vectara.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
-c ../common/constraints.txt

requests
aiofiles
httpx
2 changes: 2 additions & 0 deletions requirements/connectors/vectara.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ idna==3.10
# via requests
requests==2.32.3
# via -r ./connectors/vectara.in
aiofiles==24.1.0
# via -r ./connectors/vectara.in
urllib3==1.26.20
# via
# -c ./connectors/../common/constraints.txt
Expand Down
267 changes: 267 additions & 0 deletions test/integration/connectors/test_vectara.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
import json
import os
import time
from pathlib import Path
from typing import Generator
from uuid import uuid4

import pytest
import requests

from test.integration.connectors.utils.constants import DESTINATION_TAG
from test.integration.utils import requires_env
from unstructured_ingest.v2.interfaces.file_data import FileData, SourceIdentifiers
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.processes.connectors.vectara import (
CONNECTOR_TYPE as VECTARA_CONNECTOR_TYPE,
)
from unstructured_ingest.v2.processes.connectors.vectara import (
VectaraAccessConfig,
VectaraConnectionConfig,
VectaraUploader,
VectaraUploaderConfig,
VectaraUploadStager,
VectaraUploadStagerConfig,
)


def validate_upload(response: dict, expected_data: dict):
element_id = expected_data["element_id"]
expected_text = expected_data["text"]
filename = expected_data["metadata"]["filename"]
filetype = expected_data["metadata"]["filetype"]
page_number = expected_data["metadata"]["page_number"]

response = response["search_results"][0]

assert response is not None
assert response["text"] == expected_text
assert response["part_metadata"]["element_id"] == element_id
assert response["part_metadata"]["filename"] == filename
assert response["part_metadata"]["filetype"] == filetype
assert response["part_metadata"]["page_number"] == page_number


@requires_env("VECTARA_OAUTH_CLIENT_ID", "VECTARA_OAUTH_SECRET", "VECTARA_CUSTOMER_ID")
def _get_jwt_token():
"""Connect to the server and get a JWT token."""
customer_id = os.environ["VECTARA_CUSTOMER_ID"]
token_endpoint = (
f"https://vectara-prod-{customer_id}.auth.us-west-2.amazoncognito.com/oauth2/token"
)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
data = {
"grant_type": "client_credentials",
"client_id": os.environ["VECTARA_OAUTH_CLIENT_ID"],
"client_secret": os.environ["VECTARA_OAUTH_SECRET"],
}

response = requests.post(token_endpoint, headers=headers, data=data)
response.raise_for_status()
response_json = response.json()

return response_json.get("access_token")


def query_data(corpus_key: str, element_id: str) -> dict:

url = f"https://api.vectara.io/v2/corpora/{corpus_key}/query"

# the query below requires the corpus to have filter attributes for element_id

data = json.dumps(
{
"query": "string",
"search": {
"metadata_filter": f"part.element_id = '{element_id}'",
"lexical_interpolation": 1,
"limit": 10,
},
}
)

jwt_token = _get_jwt_token()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-source": "unstructured",
}

response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
response_json = response.json()

return response_json


def create_corpora(corpus_key: str, corpus_name: str) -> None:
url = "https://api.vectara.io/v2/corpora"
data = json.dumps({"key": corpus_key, "name": corpus_name, "description": "integration test"})
jwt_token = _get_jwt_token()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-source": "unstructured",
}

response = requests.post(url, headers=headers, data=data)
response.raise_for_status()


def replace_filter_attributes(corpus_key: str) -> None:
url = f"https://api.vectara.io/v2/corpora/{corpus_key}/replace_filter_attributes"
data = json.dumps(
{
"filter_attributes": [
{"name": "element_id", "level": "part", "indexed": True, "type": "text"}
]
}
)
jwt_token = _get_jwt_token()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-source": "unstructured",
}

response = requests.post(url, headers=headers, data=data)
response.raise_for_status()


def delete_corpora(corpus_key: str) -> None:
url = f"https://api.vectara.io/v2/corpora/{corpus_key}"

jwt_token = _get_jwt_token()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-source": "unstructured",
}

response = requests.delete(url, headers=headers)
response.raise_for_status()


def list_corpora() -> list:
url = "https://api.vectara.io/v2/corpora?limit=100"
jwt_token = _get_jwt_token()
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {jwt_token}",
"X-source": "unstructured",
}
response = requests.get(url, headers=headers)
response.raise_for_status()
response_json = response.json()
if response_json.get("corpora"):
return [item["key"] for item in response_json.get("corpora")]
else:
return []


def wait_for_ready(corpus_key: str, timeout=60, interval=2) -> None:
def is_ready_status():
corpora_list = list_corpora()
return corpus_key in corpora_list

start = time.time()
is_ready = is_ready_status()
while not is_ready and time.time() - start < timeout:
time.sleep(interval)
is_ready = is_ready_status()
if not is_ready:
raise TimeoutError("time out waiting for corpus to be ready")


def wait_for_delete(corpus_key: str, timeout=60, interval=2) -> None:
start = time.time()
while time.time() - start < timeout:
corpora_list = list_corpora()
if corpus_key not in corpora_list:
return
time.sleep(interval)

raise TimeoutError("time out waiting for corpus to delete")


@pytest.fixture
def corpora_util() -> Generator[str, None, None]:
random_id = str(uuid4()).split("-")[0]
corpus_key = f"ingest-test-{random_id}"
corpus_name = "ingest-test"
logger.info(f"Creating corpus with key: {corpus_key}")
try:
create_corpora(corpus_key, corpus_name)
replace_filter_attributes(corpus_key)
wait_for_ready(corpus_key=corpus_key)
yield corpus_key
except Exception as e:
logger.error(f"failed to create corpus {corpus_key}: {e}")
finally:
logger.info(f"deleting corpus: {corpus_key}")
delete_corpora(corpus_key)
wait_for_delete(corpus_key=corpus_key)


@pytest.mark.asyncio
@pytest.mark.tags(VECTARA_CONNECTOR_TYPE, DESTINATION_TAG, "vectara")
@requires_env("VECTARA_OAUTH_CLIENT_ID", "VECTARA_OAUTH_SECRET", "VECTARA_CUSTOMER_ID")
async def test_vectara_destination(
upload_file: Path, tmp_path: Path, corpora_util: str, retries=30, interval=10
):
corpus_key = corpora_util
connection_kwargs = {
"customer_id": os.environ["VECTARA_CUSTOMER_ID"],
"corpus_key": corpus_key,
}

oauth_client_id = os.environ["VECTARA_OAUTH_CLIENT_ID"]
oauth_secret = os.environ["VECTARA_OAUTH_SECRET"]

file_data = FileData(
source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name),
connector_type=VECTARA_CONNECTOR_TYPE,
identifier="mock-file-data",
)

stager_config = VectaraUploadStagerConfig(batch_size=10)
stager = VectaraUploadStager(upload_stager_config=stager_config)
new_upload_file = stager.run(
elements_filepath=upload_file,
output_dir=tmp_path,
output_filename=upload_file.name,
file_data=file_data,
)

uploader = VectaraUploader(
connection_config=VectaraConnectionConfig(
**connection_kwargs,
access_config=VectaraAccessConfig(
oauth_client_id=oauth_client_id, oauth_secret=oauth_secret
),
),
upload_config=VectaraUploaderConfig(),
)

if uploader.is_async():
await uploader.run_async(path=new_upload_file, file_data=file_data)

with upload_file.open() as upload_fp:
elements = json.load(upload_fp)
first_element = elements[0]

for i in range(retries):
response = query_data(corpus_key, first_element["element_id"])
if not response["search_results"]:
time.sleep(interval)
else:
break

validate_upload(response=response, expected_data=first_element)
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.0" # pragma: no cover
__version__ = "0.2.0-dev0" # pragma: no cover
53 changes: 53 additions & 0 deletions unstructured_ingest/v2/examples/vectara.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import random
from pathlib import Path

from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.logger import logger
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalDownloaderConfig,
LocalIndexerConfig,
)
from unstructured_ingest.v2.processes.connectors.vectara import (
CONNECTOR_TYPE,
VectaraAccessConfig,
VectaraConnectionConfig,
VectaraUploaderConfig,
VectaraUploadStagerConfig,
)
from unstructured_ingest.v2.processes.embedder import EmbedderConfig
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

base_path = Path(__file__).parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest" / CONNECTOR_TYPE
output_path = work_dir / "output"
download_path = work_dir / "download"

if __name__ == "__main__":
logger.info(f"writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"),
downloader_config=LocalDownloaderConfig(download_dir=download_path),
source_connection_config=LocalConnectionConfig(),
partitioner_config=PartitionerConfig(strategy="fast"),
chunker_config=ChunkerConfig(
chunking_strategy="by_title",
chunk_include_orig_elements=False,
chunk_max_characters=1500,
chunk_multipage_sections=True,
),
embedder_config=EmbedderConfig(embedding_provider="huggingface"),
destination_connection_config=VectaraConnectionConfig(
access_config=VectaraAccessConfig(oauth_client_id=None, oauth_secret=None),
customer_id="2268229652",
corpus_name=f"test-corpus-vectara-{random.randint(1000,9999)}",
corpus_key="3232",
token_url="https://vectara-prod-{}.auth.us-west-2.amazoncognito.com/oauth2/token",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question, does this example run out of the box (as is), or should user eg. create new Vectara cluster and replace here some vars like corpus_id or customer_id?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my past impression, connectors are not supposed to create cluster/db/index if they don't exist. So this example won't work out of the box. User needs to create corpora first, then replace the ids here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, then adding a short comment about it would be good/helpful for the users, who won't need to look into code of connector to know that it has to be done. Or run it without creating, and getting confused by errors. Other connectors don't have such comments, but we could start by increasing the standard here.

),
stager_config=VectaraUploadStagerConfig(batch_size=10),
uploader_config=VectaraUploaderConfig(),
).run()
4 changes: 4 additions & 0 deletions unstructured_ingest/v2/processes/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
from .singlestore import singlestore_destination_entry
from .slack import CONNECTOR_TYPE as SLACK_CONNECTOR_TYPE
from .slack import slack_source_entry
from .vectara import CONNECTOR_TYPE as VECTARA_CONNECTOR_TYPE
from .vectara import vectara_destination_entry
from .weaviate import CONNECTOR_TYPE as WEAVIATE_CONNECTOR_TYPE
from .weaviate import weaviate_destination_entry

Expand Down Expand Up @@ -103,3 +105,5 @@
add_source_entry(source_type=OUTLOOK_CONNECTOR_TYPE, entry=outlook_source_entry)

add_source_entry(source_type=SLACK_CONNECTOR_TYPE, entry=slack_source_entry)

add_destination_entry(destination_type=VECTARA_CONNECTOR_TYPE, entry=vectara_destination_entry)
Loading