-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create a destination connector for nomicdb #175
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -207,3 +207,5 @@ metricsdiff.txt | |
annotated/ | ||
|
||
tmp_ingest/ | ||
|
||
data/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
{ | ||
"version": "0.2.0", | ||
"configurations": [ | ||
{ | ||
"name": "Test ingest nomicdb", | ||
"env": {"PYTHONPATH":"/workspaces/unstructured-ingest/"}, | ||
"type": "debugpy", | ||
"request": "launch", | ||
"program": "./test_e2e/python/test-ingest-nomicdb.py", | ||
"console": "integratedTerminal", | ||
"justMyCode": false | ||
} | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
-c ../common/constraints.txt | ||
|
||
nomic |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
UNSTRUCTURED_API_KEY= | ||
UNSTRUCTURED_API_URL= | ||
NOMIC_ORG_NAME= | ||
NOMIC_ADS_NAME= | ||
NOMIC_API_KEY= |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import os | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general, we've moved to adding integration tests rather than e2e tests for new connectors. Take a look at this s3 example: test_s3.py. This should help isolate chunk the connector code and make testing it easier. |
||
from unstructured_client.models import operations, shared | ||
from unstructured_ingest.connector.nomicdb import ( | ||
NomicAccessConfig, | ||
NomicWriteConfig, | ||
SimpleNomicConfig | ||
) | ||
|
||
from unstructured_ingest.connector.local import SimpleLocalConfig | ||
|
||
from unstructured_ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig | ||
from unstructured_ingest.runner import SharePointRunner, LocalRunner | ||
from unstructured_ingest.runner.writers.base_writer import Writer | ||
from unstructured_ingest.runner.writers.nomicdb import ( | ||
NomicWriter, | ||
) | ||
from dotenv import load_dotenv | ||
load_dotenv('./test_e2e/python/.env') | ||
|
||
def get_writer( | ||
organisation_name: str, | ||
dataset_name: str, | ||
description: str, | ||
api_key: str, | ||
domain: str = "atlas.nomic.ai", | ||
tenant: str = "production", | ||
is_public: bool = False | ||
) -> Writer: | ||
return NomicWriter( | ||
connector_config=SimpleNomicConfig( | ||
organisation_name=organisation_name, | ||
dataset_name=dataset_name, | ||
description=description, | ||
domain=domain, | ||
tenant=tenant, | ||
is_public=is_public, | ||
access_config=NomicAccessConfig( | ||
api_key=api_key | ||
), | ||
), | ||
write_config=NomicWriteConfig( | ||
num_processes=2, | ||
batch_size=80 | ||
) | ||
) | ||
|
||
def main(): | ||
""" | ||
parse data and ingest into nomicdb and construct nomic atlas map | ||
""" | ||
writer = get_writer( | ||
organisation_name=os.getenv('NOMIC_ORG_NAME'), | ||
dataset_name=os.getenv('NOMIC_ADS_NAME'), | ||
description='a dataset created by test-ingest-nomicdb', | ||
api_key=os.getenv('NOMIC_API_KEY'), | ||
is_public=True) | ||
|
||
runner = LocalRunner( | ||
processor_config=ProcessorConfig( | ||
verbose=True, | ||
output_dir="./data/local-ingest-output/", | ||
num_processes=2 | ||
), | ||
read_config=ReadConfig(), | ||
writer=writer, | ||
partition_config=PartitionConfig( | ||
partition_by_api=True, | ||
api_key=os.getenv("UNSTRUCTURED_API_KEY"), | ||
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"), | ||
strategy="auto", | ||
), | ||
connector_config=SimpleLocalConfig( | ||
input_path='./example-docs/', | ||
recursive=False, | ||
), | ||
) | ||
runner.run() | ||
|
||
if __name__ == '__main__': | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
import multiprocessing as mp | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For new connectors, this should live in the v2 directory using the new ingest framework: unstructured_ingest/v2/processes/connectors There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will change the general approach you've introduced here so I'll wait to review the actual connector files until that's been updated. |
||
import typing as t | ||
from dataclasses import dataclass | ||
|
||
from unstructured_ingest.enhanced_dataclass import enhanced_field | ||
from unstructured_ingest.error import DestinationConnectionError, WriteError | ||
from unstructured_ingest.interfaces import ( | ||
AccessConfig, | ||
BaseConnectorConfig, | ||
BaseDestinationConnector, | ||
ConfigSessionHandleMixin, | ||
IngestDocSessionHandleMixin, | ||
WriteConfig, | ||
) | ||
from unstructured_ingest.logger import logger | ||
from unstructured_ingest.utils.data_prep import batch_generator, flatten_dict | ||
from unstructured_ingest.utils.dep_check import requires_dependencies | ||
import nomic | ||
from nomic import atlas | ||
|
||
if t.TYPE_CHECKING: | ||
from nomic import AtlasDataset | ||
|
||
@dataclass | ||
class NomicAccessConfig(AccessConfig): | ||
api_key: t.Optional[str] = enhanced_field(sensitive=True) | ||
|
||
@dataclass | ||
class SimpleNomicConfig(ConfigSessionHandleMixin, BaseConnectorConfig): | ||
organisation_name: str | ||
dataset_name: str | ||
description: str | ||
domain: t.Optional[str] = None | ||
tenant: t.Optional[str] = None | ||
is_public: t.Optional[bool] = False | ||
access_config: t.Optional[NomicAccessConfig] = None | ||
|
||
@dataclass | ||
class NomicWriteConfig(WriteConfig): | ||
batch_size: int = 50 | ||
num_processes: int = 1 | ||
|
||
@dataclass | ||
class NomicDestinationConnector(IngestDocSessionHandleMixin, BaseDestinationConnector): | ||
write_config: NomicWriteConfig | ||
connector_config: SimpleNomicConfig | ||
_dataset: t.Optional["AtlasDataset"] = None | ||
|
||
@property | ||
def nomic_dataset(self): | ||
if self._dataset is None: | ||
self._dataset = self.create_dataset() | ||
return self._dataset | ||
|
||
def initialize(self): | ||
nomic.cli.login( | ||
token=self.connector_config.access_config.api_key, | ||
domain=self.connector_config.domain, | ||
tenant=self.connector_config.tenant | ||
) | ||
|
||
@requires_dependencies(["nomic"], extras="nomic") | ||
def create_dataset(self) -> "AtlasDataset": | ||
from nomic import AtlasDataset | ||
|
||
dataset = AtlasDataset( | ||
identifier=f"{self.connector_config.organisation_name}/{self.connector_config.dataset_name}", | ||
unique_id_field='element_id', | ||
description=self.connector_config.description, | ||
is_public=self.connector_config.is_public, | ||
) | ||
|
||
return dataset | ||
|
||
@DestinationConnectionError.wrap | ||
def check_connection(self): | ||
nomic.cli.login( | ||
token=self.connector_config.access_config.api_key, domain=self.connector_config.domain, tenant=self.connector_config.tenant | ||
) | ||
|
||
@DestinationConnectionError.wrap | ||
@requires_dependencies(["nomic"], extras="nomic") | ||
def upsert_batch(self, batch: t.List[t.Dict[str,t.Any]]): | ||
dataset = self.nomic_dataset | ||
try: | ||
dataset.add_data(list(batch)) | ||
# logger.debug(f"Successfully add {len(batch)} into dataset {dataset.id}") | ||
except Exception as api_error: | ||
raise WriteError(f"Nomic error: {api_error}") from api_error | ||
|
||
def write_dict(self, *args, elements_dict: t.List[t.Dict[str, t.Any]], **kwargs) -> None: | ||
logger.info( | ||
f"Upserting {len(elements_dict)} elements to " | ||
f"{self.connector_config.organisation_name}", | ||
) | ||
|
||
nomicdb_batch_size = self.write_config.batch_size | ||
|
||
logger.info(f'using {self.write_config.num_processes} processes to upload') | ||
if self.write_config.num_processes == 1: | ||
for chunk in batch_generator(elements_dict, nomicdb_batch_size): | ||
self.upsert_batch(chunk) | ||
else: | ||
with mp.Pool( | ||
processes=self.write_config.num_processes, | ||
) as pool: | ||
pool.map(self.upsert_batch, list(batch_generator(elements_dict, nomicdb_batch_size))) | ||
|
||
dataset = self.nomic_dataset | ||
dataset.create_index( | ||
indexed_field='text', | ||
topic_model=True, | ||
duplicate_detection=True, | ||
projection=None | ||
) | ||
|
||
|
||
|
||
def normalize_dict(self, element_dict: dict) -> dict: | ||
return { | ||
"element_id": element_dict['element_id'], | ||
"text": element_dict['text'], | ||
"type": element_dict['type'], | ||
"filename": element_dict['metadata']['filename'] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import typing as t | ||
from dataclasses import dataclass | ||
|
||
from unstructured_ingest.interfaces import BaseDestinationConnector | ||
from unstructured_ingest.runner.writers.base_writer import Writer | ||
|
||
if t.TYPE_CHECKING: | ||
from unstructured_ingest.connector.nomic import NomicWriteConfig, SimpleNomicConfig | ||
|
||
|
||
@dataclass | ||
class NomicWriter(Writer): | ||
write_config: "NomicWriteConfig" | ||
connector_config: "SimpleNomicConfig" | ||
|
||
def get_connector_cls(self) -> t.Type[BaseDestinationConnector]: | ||
from unstructured_ingest.connector.nomicdb import NomicDestinationConnector | ||
|
||
return NomicDestinationConnector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like and IDE specific file, shouldn't be in this PR.