Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a destination connector for nomicdb #175

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,5 @@ metricsdiff.txt
annotated/

tmp_ingest/

data/
14 changes: 14 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like and IDE specific file, shouldn't be in this PR.

"version": "0.2.0",
"configurations": [
{
"name": "Test ingest nomicdb",
"env": {"PYTHONPATH":"/workspaces/unstructured-ingest/"},
"type": "debugpy",
"request": "launch",
"program": "./test_e2e/python/test-ingest-nomicdb.py",
"console": "integratedTerminal",
"justMyCode": false
}
]
}
3 changes: 3 additions & 0 deletions requirements/connectors/nomicdb.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-c ../common/constraints.txt

nomic
Empty file.
5 changes: 5 additions & 0 deletions test_e2e/python/.env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
UNSTRUCTURED_API_KEY=
UNSTRUCTURED_API_URL=
NOMIC_ORG_NAME=
NOMIC_ADS_NAME=
NOMIC_API_KEY=
80 changes: 80 additions & 0 deletions test_e2e/python/test-ingest-nomicdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, we've moved to adding integration tests rather than e2e tests for new connectors. Take a look at this s3 example: test_s3.py. This should help isolate chunk the connector code and make testing it easier.

from unstructured_client.models import operations, shared
from unstructured_ingest.connector.nomicdb import (
NomicAccessConfig,
NomicWriteConfig,
SimpleNomicConfig
)

from unstructured_ingest.connector.local import SimpleLocalConfig

from unstructured_ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured_ingest.runner import SharePointRunner, LocalRunner
from unstructured_ingest.runner.writers.base_writer import Writer
from unstructured_ingest.runner.writers.nomicdb import (
NomicWriter,
)
from dotenv import load_dotenv
load_dotenv('./test_e2e/python/.env')

def get_writer(
organisation_name: str,
dataset_name: str,
description: str,
api_key: str,
domain: str = "atlas.nomic.ai",
tenant: str = "production",
is_public: bool = False
) -> Writer:
return NomicWriter(
connector_config=SimpleNomicConfig(
organisation_name=organisation_name,
dataset_name=dataset_name,
description=description,
domain=domain,
tenant=tenant,
is_public=is_public,
access_config=NomicAccessConfig(
api_key=api_key
),
),
write_config=NomicWriteConfig(
num_processes=2,
batch_size=80
)
)

def main():
"""
parse data and ingest into nomicdb and construct nomic atlas map
"""
writer = get_writer(
organisation_name=os.getenv('NOMIC_ORG_NAME'),
dataset_name=os.getenv('NOMIC_ADS_NAME'),
description='a dataset created by test-ingest-nomicdb',
api_key=os.getenv('NOMIC_API_KEY'),
is_public=True)

runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="./data/local-ingest-output/",
num_processes=2
),
read_config=ReadConfig(),
writer=writer,
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
strategy="auto",
),
connector_config=SimpleLocalConfig(
input_path='./example-docs/',
recursive=False,
),
)
runner.run()

if __name__ == '__main__':
main()
125 changes: 125 additions & 0 deletions unstructured_ingest/connector/nomicdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import multiprocessing as mp
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For new connectors, this should live in the v2 directory using the new ingest framework: unstructured_ingest/v2/processes/connectors

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will change the general approach you've introduced here so I'll wait to review the actual connector files until that's been updated.

import typing as t
from dataclasses import dataclass

from unstructured_ingest.enhanced_dataclass import enhanced_field
from unstructured_ingest.error import DestinationConnectionError, WriteError
from unstructured_ingest.interfaces import (
AccessConfig,
BaseConnectorConfig,
BaseDestinationConnector,
ConfigSessionHandleMixin,
IngestDocSessionHandleMixin,
WriteConfig,
)
from unstructured_ingest.logger import logger
from unstructured_ingest.utils.data_prep import batch_generator, flatten_dict
from unstructured_ingest.utils.dep_check import requires_dependencies
import nomic
from nomic import atlas

if t.TYPE_CHECKING:
from nomic import AtlasDataset

@dataclass
class NomicAccessConfig(AccessConfig):
api_key: t.Optional[str] = enhanced_field(sensitive=True)

@dataclass
class SimpleNomicConfig(ConfigSessionHandleMixin, BaseConnectorConfig):
organisation_name: str
dataset_name: str
description: str
domain: t.Optional[str] = None
tenant: t.Optional[str] = None
is_public: t.Optional[bool] = False
access_config: t.Optional[NomicAccessConfig] = None

@dataclass
class NomicWriteConfig(WriteConfig):
batch_size: int = 50
num_processes: int = 1

@dataclass
class NomicDestinationConnector(IngestDocSessionHandleMixin, BaseDestinationConnector):
write_config: NomicWriteConfig
connector_config: SimpleNomicConfig
_dataset: t.Optional["AtlasDataset"] = None

@property
def nomic_dataset(self):
if self._dataset is None:
self._dataset = self.create_dataset()
return self._dataset

def initialize(self):
nomic.cli.login(
token=self.connector_config.access_config.api_key,
domain=self.connector_config.domain,
tenant=self.connector_config.tenant
)

@requires_dependencies(["nomic"], extras="nomic")
def create_dataset(self) -> "AtlasDataset":
from nomic import AtlasDataset

dataset = AtlasDataset(
identifier=f"{self.connector_config.organisation_name}/{self.connector_config.dataset_name}",
unique_id_field='element_id',
description=self.connector_config.description,
is_public=self.connector_config.is_public,
)

return dataset

@DestinationConnectionError.wrap
def check_connection(self):
nomic.cli.login(
token=self.connector_config.access_config.api_key, domain=self.connector_config.domain, tenant=self.connector_config.tenant
)

@DestinationConnectionError.wrap
@requires_dependencies(["nomic"], extras="nomic")
def upsert_batch(self, batch: t.List[t.Dict[str,t.Any]]):
dataset = self.nomic_dataset
try:
dataset.add_data(list(batch))
# logger.debug(f"Successfully add {len(batch)} into dataset {dataset.id}")
except Exception as api_error:
raise WriteError(f"Nomic error: {api_error}") from api_error

def write_dict(self, *args, elements_dict: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
logger.info(
f"Upserting {len(elements_dict)} elements to "
f"{self.connector_config.organisation_name}",
)

nomicdb_batch_size = self.write_config.batch_size

logger.info(f'using {self.write_config.num_processes} processes to upload')
if self.write_config.num_processes == 1:
for chunk in batch_generator(elements_dict, nomicdb_batch_size):
self.upsert_batch(chunk)
else:
with mp.Pool(
processes=self.write_config.num_processes,
) as pool:
pool.map(self.upsert_batch, list(batch_generator(elements_dict, nomicdb_batch_size)))

dataset = self.nomic_dataset
dataset.create_index(
indexed_field='text',
topic_model=True,
duplicate_detection=True,
projection=None
)



def normalize_dict(self, element_dict: dict) -> dict:
return {
"element_id": element_dict['element_id'],
"text": element_dict['text'],
"type": element_dict['type'],
"filename": element_dict['metadata']['filename']
}
2 changes: 2 additions & 0 deletions unstructured_ingest/runner/writers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .sql import SqlWriter
from .vectara import VectaraWriter
from .weaviate import WeaviateWriter
from .nomicdb import NomicWriter

writer_map: t.Dict[str, t.Type[Writer]] = {
"astradb": AstraDBWriter,
Expand All @@ -43,6 +44,7 @@
"sql": SqlWriter,
"vectara": VectaraWriter,
"weaviate": WeaviateWriter,
"nomic": NomicWriter
}

__all__ = ["writer_map"]
19 changes: 19 additions & 0 deletions unstructured_ingest/runner/writers/nomicdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import typing as t
from dataclasses import dataclass

from unstructured_ingest.interfaces import BaseDestinationConnector
from unstructured_ingest.runner.writers.base_writer import Writer

if t.TYPE_CHECKING:
from unstructured_ingest.connector.nomic import NomicWriteConfig, SimpleNomicConfig


@dataclass
class NomicWriter(Writer):
write_config: "NomicWriteConfig"
connector_config: "SimpleNomicConfig"

def get_connector_cls(self) -> t.Type[BaseDestinationConnector]:
from unstructured_ingest.connector.nomicdb import NomicDestinationConnector

return NomicDestinationConnector