-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat/vectara-destination-to-v2 #158
Merged
Merged
Changes from 50 commits
Commits
Show all changes
59 commits
Select commit
Hold shift + click to select a range
6f6a4dd
vectara v2 still work in progress
guilherme-uns abbab10
takingn errors out
guilherme-uns a11f8a3
merge
guilherme-uns 9a1fc1c
fix lint
guilherme-uns 0bf69f5
linting
guilherme-uns 265d0e8
order imports
guilherme-uns eba45e8
ruff --fix
guilherme-uns c4f208f
fix version
guilherme-uns 4805c55
fixing PR issues by Roman
guilherme-uns c6fa937
ruff
guilherme-uns 9d2cf32
wrking get metadata on stager
guilherme-uns 8ca8f7a
remove coments
guilherme-uns 7c38cf7
changing conform dict to the correct way and including vdoc on stager…
guilherme-uns 4f64fdd
taking path out from file by Potter chat to eave only url, even thoug…
guilherme-uns 5b8de54
fix potter comments
guilherme-uns 47b252d
change version
guilherme-uns 39a0422
version update
guilherme-uns 7df96a3
merge
guilherme-uns ccfb736
make tidy
guilherme-uns d56b77c
add secret to access config
guilherme-uns d139e82
merge
guilherme-uns a02f310
make tidy
guilherme-uns 1b1a011
.
guilherme-uns 121d939
get secret value
guilherme-uns 884e788
change version
guilherme-uns 7ed0d51
add async
guilherme-uns bf18c04
mrg
guilherme-uns d2f5965
change import
guilherme-uns 5df25c0
vectara requirements
guilherme-uns 265740e
make tidy
guilherme-uns 4c1e9a5
mt
guilherme-uns babc450
mrge
guilherme-uns 42ed602
fix PR comments
guilherme-uns 2eacc95
vectara example to be able to debug async
guilherme-uns 1da9c7c
no wait worn
guilherme-uns ca6efbd
improving logging
guilherme-uns b8808a1
precheck without async.
guilherme-uns a5e3b61
merge
guilherme-uns 549451c
mke tidy
guilherme-uns a2048a4
lint
guilherme-uns dde17f0
linting
guilherme-uns 45fbab9
some fixes vectara
guilherme-uns 85a5810
migrate to vectara v2 api
bryan-unstructured 8c7d510
add integration test for vectara
bryan-unstructured cf0560c
fix syntax
bryan-unstructured 82999eb
divide elements to batches
bryan-unstructured 4f7b3af
Add retry logic to document query
bryan-unstructured 8a3999a
change integration test to regular function
bryan-unstructured a536674
clean up corpus after integration test
bryan-unstructured 43ace32
fix syntax error
bryan-unstructured 14d5f86
update connection config in example
bryan-unstructured 7b12e0c
Remove unnecessary var
bryan-unstructured 606e3a6
remove batch_size because Vectara api does not support batch indexing…
bryan-unstructured 1f8e1c3
remove asyncio.run to avoid conflict with async context
bryan-unstructured 0cf0c14
Resolve merge conflict by incorporating both suggestions
bryan-unstructured 29175ba
update stager to reflect new structure
bryan-unstructured 2212caa
update uploader to reflect new structure
bryan-unstructured 0faf4df
fix syntax
bryan-unstructured 086cc53
Merge branch 'main' into DS-92-vectara-destination-to-v2
bryan-unstructured File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,10 @@ | ||
## 0.2.0-dev0 | ||
|
||
### Enhancements | ||
|
||
* **Migrate Vectara Destination Connector to v2** | ||
|
||
|
||
## 0.2.0 | ||
|
||
### Enhancements | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
-c ../common/constraints.txt | ||
|
||
requests | ||
aiofiles | ||
httpx |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,267 @@ | ||
import json | ||
import os | ||
import time | ||
from pathlib import Path | ||
from typing import Generator | ||
from uuid import uuid4 | ||
|
||
import pytest | ||
import requests | ||
|
||
from test.integration.connectors.utils.constants import DESTINATION_TAG | ||
from test.integration.utils import requires_env | ||
from unstructured_ingest.v2.interfaces.file_data import FileData, SourceIdentifiers | ||
from unstructured_ingest.v2.logger import logger | ||
from unstructured_ingest.v2.processes.connectors.vectara import ( | ||
CONNECTOR_TYPE as VECTARA_CONNECTOR_TYPE, | ||
) | ||
from unstructured_ingest.v2.processes.connectors.vectara import ( | ||
VectaraAccessConfig, | ||
VectaraConnectionConfig, | ||
VectaraUploader, | ||
VectaraUploaderConfig, | ||
VectaraUploadStager, | ||
VectaraUploadStagerConfig, | ||
) | ||
|
||
|
||
def validate_upload(response: dict, expected_data: dict): | ||
element_id = expected_data["element_id"] | ||
expected_text = expected_data["text"] | ||
filename = expected_data["metadata"]["filename"] | ||
filetype = expected_data["metadata"]["filetype"] | ||
page_number = expected_data["metadata"]["page_number"] | ||
|
||
response = response["search_results"][0] | ||
|
||
assert response is not None | ||
assert response["text"] == expected_text | ||
assert response["part_metadata"]["element_id"] == element_id | ||
assert response["part_metadata"]["filename"] == filename | ||
assert response["part_metadata"]["filetype"] == filetype | ||
assert response["part_metadata"]["page_number"] == page_number | ||
|
||
|
||
@requires_env("VECTARA_OAUTH_CLIENT_ID", "VECTARA_OAUTH_SECRET", "VECTARA_CUSTOMER_ID") | ||
def _get_jwt_token(): | ||
"""Connect to the server and get a JWT token.""" | ||
customer_id = os.environ["VECTARA_CUSTOMER_ID"] | ||
token_endpoint = ( | ||
f"https://vectara-prod-{customer_id}.auth.us-west-2.amazoncognito.com/oauth2/token" | ||
) | ||
headers = { | ||
"Content-Type": "application/x-www-form-urlencoded", | ||
} | ||
data = { | ||
"grant_type": "client_credentials", | ||
"client_id": os.environ["VECTARA_OAUTH_CLIENT_ID"], | ||
"client_secret": os.environ["VECTARA_OAUTH_SECRET"], | ||
} | ||
|
||
response = requests.post(token_endpoint, headers=headers, data=data) | ||
response.raise_for_status() | ||
response_json = response.json() | ||
|
||
return response_json.get("access_token") | ||
|
||
|
||
def query_data(corpus_key: str, element_id: str) -> dict: | ||
|
||
url = f"https://api.vectara.io/v2/corpora/{corpus_key}/query" | ||
|
||
# the query below requires the corpus to have filter attributes for element_id | ||
|
||
data = json.dumps( | ||
{ | ||
"query": "string", | ||
"search": { | ||
"metadata_filter": f"part.element_id = '{element_id}'", | ||
"lexical_interpolation": 1, | ||
"limit": 10, | ||
}, | ||
} | ||
) | ||
|
||
jwt_token = _get_jwt_token() | ||
headers = { | ||
"Content-Type": "application/json", | ||
"Accept": "application/json", | ||
"Authorization": f"Bearer {jwt_token}", | ||
"X-source": "unstructured", | ||
} | ||
|
||
response = requests.post(url, headers=headers, data=data) | ||
response.raise_for_status() | ||
response_json = response.json() | ||
|
||
return response_json | ||
|
||
|
||
def create_corpora(corpus_key: str, corpus_name: str) -> None: | ||
url = "https://api.vectara.io/v2/corpora" | ||
data = json.dumps({"key": corpus_key, "name": corpus_name, "description": "integration test"}) | ||
jwt_token = _get_jwt_token() | ||
headers = { | ||
"Content-Type": "application/json", | ||
"Accept": "application/json", | ||
"Authorization": f"Bearer {jwt_token}", | ||
"X-source": "unstructured", | ||
} | ||
|
||
response = requests.post(url, headers=headers, data=data) | ||
response.raise_for_status() | ||
|
||
|
||
def replace_filter_attributes(corpus_key: str) -> None: | ||
url = f"https://api.vectara.io/v2/corpora/{corpus_key}/replace_filter_attributes" | ||
data = json.dumps( | ||
{ | ||
"filter_attributes": [ | ||
{"name": "element_id", "level": "part", "indexed": True, "type": "text"} | ||
] | ||
} | ||
) | ||
jwt_token = _get_jwt_token() | ||
headers = { | ||
"Content-Type": "application/json", | ||
"Accept": "application/json", | ||
"Authorization": f"Bearer {jwt_token}", | ||
"X-source": "unstructured", | ||
} | ||
|
||
response = requests.post(url, headers=headers, data=data) | ||
response.raise_for_status() | ||
|
||
|
||
def delete_corpora(corpus_key: str) -> None: | ||
url = f"https://api.vectara.io/v2/corpora/{corpus_key}" | ||
|
||
jwt_token = _get_jwt_token() | ||
headers = { | ||
"Content-Type": "application/json", | ||
"Accept": "application/json", | ||
"Authorization": f"Bearer {jwt_token}", | ||
"X-source": "unstructured", | ||
} | ||
|
||
response = requests.delete(url, headers=headers) | ||
response.raise_for_status() | ||
|
||
|
||
def list_corpora() -> list: | ||
url = "https://api.vectara.io/v2/corpora?limit=100" | ||
jwt_token = _get_jwt_token() | ||
headers = { | ||
"Content-Type": "application/json", | ||
"Accept": "application/json", | ||
"Authorization": f"Bearer {jwt_token}", | ||
"X-source": "unstructured", | ||
} | ||
response = requests.get(url, headers=headers) | ||
response.raise_for_status() | ||
response_json = response.json() | ||
if response_json.get("corpora"): | ||
return [item["key"] for item in response_json.get("corpora")] | ||
else: | ||
return [] | ||
|
||
|
||
def wait_for_ready(corpus_key: str, timeout=60, interval=2) -> None: | ||
def is_ready_status(): | ||
corpora_list = list_corpora() | ||
return corpus_key in corpora_list | ||
|
||
start = time.time() | ||
is_ready = is_ready_status() | ||
while not is_ready and time.time() - start < timeout: | ||
time.sleep(interval) | ||
is_ready = is_ready_status() | ||
if not is_ready: | ||
raise TimeoutError("time out waiting for corpus to be ready") | ||
|
||
|
||
def wait_for_delete(corpus_key: str, timeout=60, interval=2) -> None: | ||
start = time.time() | ||
while time.time() - start < timeout: | ||
corpora_list = list_corpora() | ||
if corpus_key not in corpora_list: | ||
return | ||
time.sleep(interval) | ||
|
||
raise TimeoutError("time out waiting for corpus to delete") | ||
|
||
|
||
@pytest.fixture | ||
def corpora_util() -> Generator[str, None, None]: | ||
random_id = str(uuid4()).split("-")[0] | ||
corpus_key = f"ingest-test-{random_id}" | ||
corpus_name = "ingest-test" | ||
logger.info(f"Creating corpus with key: {corpus_key}") | ||
try: | ||
create_corpora(corpus_key, corpus_name) | ||
replace_filter_attributes(corpus_key) | ||
wait_for_ready(corpus_key=corpus_key) | ||
yield corpus_key | ||
except Exception as e: | ||
logger.error(f"failed to create corpus {corpus_key}: {e}") | ||
finally: | ||
logger.info(f"deleting corpus: {corpus_key}") | ||
delete_corpora(corpus_key) | ||
wait_for_delete(corpus_key=corpus_key) | ||
|
||
|
||
@pytest.mark.asyncio | ||
@pytest.mark.tags(VECTARA_CONNECTOR_TYPE, DESTINATION_TAG, "vectara") | ||
@requires_env("VECTARA_OAUTH_CLIENT_ID", "VECTARA_OAUTH_SECRET", "VECTARA_CUSTOMER_ID") | ||
async def test_vectara_destination( | ||
upload_file: Path, tmp_path: Path, corpora_util: str, retries=30, interval=10 | ||
): | ||
corpus_key = corpora_util | ||
connection_kwargs = { | ||
"customer_id": os.environ["VECTARA_CUSTOMER_ID"], | ||
"corpus_key": corpus_key, | ||
} | ||
|
||
oauth_client_id = os.environ["VECTARA_OAUTH_CLIENT_ID"] | ||
oauth_secret = os.environ["VECTARA_OAUTH_SECRET"] | ||
|
||
file_data = FileData( | ||
source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name), | ||
connector_type=VECTARA_CONNECTOR_TYPE, | ||
identifier="mock-file-data", | ||
) | ||
|
||
stager_config = VectaraUploadStagerConfig(batch_size=10) | ||
stager = VectaraUploadStager(upload_stager_config=stager_config) | ||
new_upload_file = stager.run( | ||
elements_filepath=upload_file, | ||
output_dir=tmp_path, | ||
output_filename=upload_file.name, | ||
file_data=file_data, | ||
) | ||
|
||
uploader = VectaraUploader( | ||
connection_config=VectaraConnectionConfig( | ||
**connection_kwargs, | ||
access_config=VectaraAccessConfig( | ||
oauth_client_id=oauth_client_id, oauth_secret=oauth_secret | ||
), | ||
), | ||
upload_config=VectaraUploaderConfig(), | ||
) | ||
|
||
if uploader.is_async(): | ||
await uploader.run_async(path=new_upload_file, file_data=file_data) | ||
|
||
with upload_file.open() as upload_fp: | ||
elements = json.load(upload_fp) | ||
first_element = elements[0] | ||
|
||
for i in range(retries): | ||
response = query_data(corpus_key, first_element["element_id"]) | ||
if not response["search_results"]: | ||
time.sleep(interval) | ||
else: | ||
break | ||
|
||
validate_upload(response=response, expected_data=first_element) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
__version__ = "0.2.0" # pragma: no cover | ||
__version__ = "0.2.0-dev0" # pragma: no cover |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import random | ||
from pathlib import Path | ||
|
||
from unstructured_ingest.v2.interfaces import ProcessorConfig | ||
from unstructured_ingest.v2.logger import logger | ||
from unstructured_ingest.v2.pipeline.pipeline import Pipeline | ||
from unstructured_ingest.v2.processes.chunker import ChunkerConfig | ||
from unstructured_ingest.v2.processes.connectors.local import ( | ||
LocalConnectionConfig, | ||
LocalDownloaderConfig, | ||
LocalIndexerConfig, | ||
) | ||
from unstructured_ingest.v2.processes.connectors.vectara import ( | ||
CONNECTOR_TYPE, | ||
VectaraAccessConfig, | ||
VectaraConnectionConfig, | ||
VectaraUploaderConfig, | ||
VectaraUploadStagerConfig, | ||
) | ||
from unstructured_ingest.v2.processes.embedder import EmbedderConfig | ||
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig | ||
|
||
base_path = Path(__file__).parent.parent.parent.parent | ||
docs_path = base_path / "example-docs" | ||
work_dir = base_path / "tmp_ingest" / CONNECTOR_TYPE | ||
output_path = work_dir / "output" | ||
download_path = work_dir / "download" | ||
|
||
if __name__ == "__main__": | ||
logger.info(f"writing all content in: {work_dir.resolve()}") | ||
Pipeline.from_configs( | ||
context=ProcessorConfig(work_dir=str(work_dir.resolve())), | ||
indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"), | ||
downloader_config=LocalDownloaderConfig(download_dir=download_path), | ||
source_connection_config=LocalConnectionConfig(), | ||
partitioner_config=PartitionerConfig(strategy="fast"), | ||
chunker_config=ChunkerConfig( | ||
chunking_strategy="by_title", | ||
chunk_include_orig_elements=False, | ||
chunk_max_characters=1500, | ||
chunk_multipage_sections=True, | ||
), | ||
embedder_config=EmbedderConfig(embedding_provider="huggingface"), | ||
destination_connection_config=VectaraConnectionConfig( | ||
access_config=VectaraAccessConfig(oauth_client_id=None, oauth_secret=None), | ||
customer_id="2268229652", | ||
corpus_name=f"test-corpus-vectara-{random.randint(1000,9999)}", | ||
corpus_key="3232", | ||
token_url="https://vectara-prod-{}.auth.us-west-2.amazoncognito.com/oauth2/token", | ||
), | ||
stager_config=VectaraUploadStagerConfig(batch_size=10), | ||
uploader_config=VectaraUploaderConfig(), | ||
).run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General question, does this example run out of the box (as is), or should user eg. create new Vectara cluster and replace here some vars like corpus_id or customer_id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my past impression, connectors are not supposed to create cluster/db/index if they don't exist. So this example won't work out of the box. User needs to create corpora first, then replace the ids here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, then adding a short comment about it would be good/helpful for the users, who won't need to look into code of connector to know that it has to be done. Or run it without creating, and getting confused by errors. Other connectors don't have such comments, but we could start by increasing the standard here.