Skip to content

Commit

Permalink
Add Cosmos DB storage/cache option (#1431)
Browse files Browse the repository at this point in the history
* added cosmosdb constructor and database methods

* added rest of abstract method headers

* added cosmos db container methods

* implemented has and delete methods

* finished implementing abstract class methods

* integrated class into storage factory

* integrated cosmosdb class into cache factory

* added support for new config file fields

* replaced primary key cosmosdb initialization with connection strings

* modified cosmosdb setter to require json

* Fix non-default emitters

* Format

* Ruff

* ruff

* first successful run of cosmosdb indexing

* removed extraneous container_name setting

* require base_dir to be typed as str

* reverted merged changed from closed branch

* removed nested try statement

* readded initial non-parquet emitter fix

* added basic support for parquet emitter using internal conversions

* merged with main and resolved conflicts

* fixed more merge conflicts

* added cosmosdb functionality to query pipeline

* tested query for cosmosdb

* collapsed cosmosdb schema to use minimal containers and databases

* simplified create_database and create_container functions

* ruff fixes and semversioner

* spellcheck and ci fixes

* updated pyproject toml and lock file

* apply fixes after merge from main

* add temporary comments

* refactor cache factory

* refactored storage factory

* minor formatting

* update dictionary

* fix spellcheck typo

* fix default value

* fix pydantic model defaults

* update pydantic models

* fix init_content

* cleanup how factory passes parameters to file storage

* remove unnecessary output file type

* update pydantic model

* cleanup code

* implemented clear method

* fix merge from main

* add test stub for cosmosdb

* regenerate lock file

* modified set method to collapse parquet rows

* modified get method to collapse parquet rows

* updated has and delete methods and docstrings to adhere to new schema

* added prefix helper function

* replaced delimiter for prefixed id

* verified empty tests are passing

* fix merges from main

* add find test

* update cicd step name

* tested querying for new schema

* resolved errors from merge conflicts

* refactored set method to handle cache in new schema

* refactored get method to handle cache in new schema

* force unique ids to be written to cosmos for nodes

* found bug with has and delete methods

* modified has and delete to work with cache in new schema

* fix the merge from main

* minor typo fixes

* update lock file

* spellcheck fix

* fix init function signature

* minor formatting updates

* remove https protocol

* change localhost to 127.0.0.1 address

* update pytest to use bacj engine

* verified cache tests

* improved speed of has function

* resolved pytest error with find function

* added test for child method

* make container_name variable private as _container_name

* minor variable name fix

* cleanup cosmos pytest and make the cosmosdb storage class operations more efficient

* update cicd to use different cosmosdb emulator

* test with http protocol

* added pytest for clear()

* add longer timeout for cosmosdb emulator startup

* revert http connection back to https

* add comments to cicd code for future dev usage

* set to container and database clients to none upon deletion

* ruff changes

* add comments to cicd code

* removed unneeded None statements and ruff fixes

* more ruff fixes

* Update test_run.py

* remove unnecessary call to delete container

* ruff format updates

* Reverted test_run.py

* fix ruff formatter errors

* cleanup variable names to be more consistent

* remove extra semversioner file

* revert pydantic model changes

* revert pydantic model change

* revert pydantic model change

* re-enable inline formatting rule

* update documentation in dev guide

---------

Co-authored-by: Alonso Guevara <alonsog@microsoft.com>
Co-authored-by: Josh Bradley <joshbradley@microsoft.com>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent c1c09ba commit 8368b12
Show file tree
Hide file tree
Showing 30 changed files with 925 additions and 302 deletions.
15 changes: 13 additions & 2 deletions .github/workflows/python-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ permissions:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
# Only run the for the latest commit
# only run the for the latest commit
cancel-in-progress: true

env:
Expand All @@ -37,7 +37,7 @@ jobs:
matrix:
python-version: ["3.10"]
os: [ubuntu-latest, windows-latest]
fail-fast: false # Continue running all jobs even if one fails
fail-fast: false # continue running all jobs even if one fails
env:
DEBUG: 1

Expand Down Expand Up @@ -84,6 +84,17 @@ jobs:
id: azuright
uses: potatoqualitee/azuright@v1.1

# For more information on installation/setup of Azure Cosmos DB Emulator
# https://learn.microsoft.com/en-us/azure/cosmos-db/how-to-develop-emulator?tabs=docker-linux%2Cpython&pivots=api-nosql
# Note: the emulator is only available on Windows runners. It can take longer than the default to initially startup so we increase the default timeout.
# If a job fails due to timeout, restarting the cicd job usually resolves the problem.
- name: Install Azure Cosmos DB Emulator
if: runner.os == 'Windows'
run: |
Write-Host "Launching Cosmos DB Emulator"
Import-Module "$env:ProgramFiles\Azure Cosmos DB Emulator\PSModules\Microsoft.Azure.CosmosDB.Emulator"
Start-CosmosDbEmulator -Timeout 500
- name: Integration Test
run: |
poetry run poe test_integration
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20241121202210026640.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Implement cosmosdb storage option for cache and output"
}
1 change: 0 additions & 1 deletion DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ graphrag
├── config # configuration management
├── index # indexing engine
| └─ run/run.py # main entrypoint to build an index
├── llm # generic llm interfaces
├── logger # logger module supporting several options
│   └─ factory.py # └─ main entrypoint to create a logger
├── model # data model definitions associated with the knowledge graph
Expand Down
1 change: 1 addition & 0 deletions dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ints

# Azure
abfs
cosmosdb
Hnsw
odata

Expand Down
3 changes: 3 additions & 0 deletions graphrag/cache/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from graphrag.config.enums import CacheType
from graphrag.storage.blob_pipeline_storage import BlobPipelineStorage
from graphrag.storage.cosmosdb_pipeline_storage import create_cosmosdb_storage
from graphrag.storage.file_pipeline_storage import FilePipelineStorage

if TYPE_CHECKING:
Expand Down Expand Up @@ -50,6 +51,8 @@ def create_cache(
)
case CacheType.blob:
return JsonPipelineCache(BlobPipelineStorage(**kwargs))
case CacheType.cosmosdb:
return JsonPipelineCache(create_cosmosdb_storage(**kwargs))
case _:
if cache_type in cls.cache_types:
return cls.cache_types[cache_type](**kwargs)
Expand Down
3 changes: 3 additions & 0 deletions graphrag/config/create_graphrag_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ def hydrate_parallelization_params(
storage_account_blob_url=reader.str(Fragment.storage_account_blob_url),
container_name=reader.str(Fragment.container_name),
base_dir=reader.str(Fragment.base_dir) or defs.CACHE_BASE_DIR,
cosmosdb_account_url=reader.str(Fragment.cosmosdb_account_url),
)
with (
reader.envvar_prefix(Section.reporting),
Expand All @@ -383,6 +384,7 @@ def hydrate_parallelization_params(
storage_account_blob_url=reader.str(Fragment.storage_account_blob_url),
container_name=reader.str(Fragment.container_name),
base_dir=reader.str(Fragment.base_dir) or defs.STORAGE_BASE_DIR,
cosmosdb_account_url=reader.str(Fragment.cosmosdb_account_url),
)

with (
Expand Down Expand Up @@ -667,6 +669,7 @@ class Fragment(str, Enum):
concurrent_requests = "CONCURRENT_REQUESTS"
conn_string = "CONNECTION_STRING"
container_name = "CONTAINER_NAME"
cosmosdb_account_url = "COSMOSDB_ACCOUNT_URL"
deployment_name = "DEPLOYMENT_NAME"
description = "DESCRIPTION"
enabled = "ENABLED"
Expand Down
4 changes: 4 additions & 0 deletions graphrag/config/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class CacheType(str, Enum):
"""The none cache configuration type."""
blob = "blob"
"""The blob cache configuration type."""
cosmosdb = "cosmosdb"
"""The cosmosdb cache configuration type"""

def __repr__(self):
"""Get a string representation."""
Expand Down Expand Up @@ -60,6 +62,8 @@ class StorageType(str, Enum):
"""The memory storage type."""
blob = "blob"
"""The blob storage type."""
cosmosdb = "cosmosdb"
"""The cosmosdb storage type"""

def __repr__(self):
"""Get a string representation."""
Expand Down
4 changes: 2 additions & 2 deletions graphrag/config/init_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@
## connection_string and container_name must be provided
cache:
type: {defs.CACHE_TYPE.value} # or blob
type: {defs.CACHE_TYPE.value} # one of [blob, cosmosdb, file]
base_dir: "{defs.CACHE_BASE_DIR}"
reporting:
type: {defs.REPORTING_TYPE.value} # or console, blob
base_dir: "{defs.REPORTING_BASE_DIR}"
storage:
type: {defs.STORAGE_TYPE.value} # or blob
type: {defs.STORAGE_TYPE.value} # one of [blob, cosmosdb, file]
base_dir: "{defs.STORAGE_BASE_DIR}"
## only turn this on if running `graphrag index` with custom settings
Expand Down
1 change: 1 addition & 0 deletions graphrag/config/input_models/cache_config_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ class CacheConfigInput(TypedDict):
connection_string: NotRequired[str | None]
container_name: NotRequired[str | None]
storage_account_blob_url: NotRequired[str | None]
cosmosdb_account_url: NotRequired[str | None]
1 change: 1 addition & 0 deletions graphrag/config/input_models/storage_config_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ class StorageConfigInput(TypedDict):
connection_string: NotRequired[str | None]
container_name: NotRequired[str | None]
storage_account_blob_url: NotRequired[str | None]
cosmosdb_account_url: NotRequired[str | None]
3 changes: 3 additions & 0 deletions graphrag/config/models/cache_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ class CacheConfig(BaseModel):
storage_account_blob_url: str | None = Field(
description="The storage account blob url to use.", default=None
)
cosmosdb_account_url: str | None = Field(
description="The cosmosdb account url to use.", default=None
)
3 changes: 3 additions & 0 deletions graphrag/config/models/storage_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ class StorageConfig(BaseModel):
storage_account_blob_url: str | None = Field(
description="The storage account blob url to use.", default=None
)
cosmosdb_account_url: str | None = Field(
description="The cosmosdb account url to use.", default=None
)
28 changes: 27 additions & 1 deletion graphrag/index/config/cache.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""A module containing 'PipelineCacheConfig', 'PipelineFileCacheConfig', 'PipelineMemoryCacheConfig', 'PipelineBlobCacheConfig' models."""
"""A module containing 'PipelineCacheConfig', 'PipelineFileCacheConfig', 'PipelineMemoryCacheConfig', 'PipelineBlobCacheConfig', 'PipelineCosmosDBCacheConfig' models."""

from __future__ import annotations

Expand Down Expand Up @@ -71,9 +71,35 @@ class PipelineBlobCacheConfig(PipelineCacheConfig[Literal[CacheType.blob]]):
"""The storage account blob url for cache"""


class PipelineCosmosDBCacheConfig(PipelineCacheConfig[Literal[CacheType.cosmosdb]]):
"""Represents the cosmosdb cache configuration for the pipeline."""

type: Literal[CacheType.cosmosdb] = CacheType.cosmosdb
"""The type of cache."""

base_dir: str | None = Field(
description="The cosmosdb database name for the cache.", default=None
)
"""The cosmosdb database name for the cache."""

container_name: str = Field(description="The container name for cache.", default="")
"""The container name for cache."""

connection_string: str | None = Field(
description="The cosmosdb primary key for the cache.", default=None
)
"""The cosmosdb primary key for the cache."""

cosmosdb_account_url: str | None = Field(
description="The cosmosdb account url for cache", default=None
)
"""The cosmosdb account url for cache"""


PipelineCacheConfigTypes = (
PipelineFileCacheConfig
| PipelineMemoryCacheConfig
| PipelineBlobCacheConfig
| PipelineNoneCacheConfig
| PipelineCosmosDBCacheConfig
)
36 changes: 34 additions & 2 deletions graphrag/index/config/storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

"""A module containing 'PipelineStorageConfig', 'PipelineFileStorageConfig' and 'PipelineMemoryStorageConfig' models."""
"""A module containing 'PipelineStorageConfig', 'PipelineFileStorageConfig','PipelineMemoryStorageConfig', 'PipelineBlobStorageConfig', and 'PipelineCosmosDBStorageConfig' models."""

from __future__ import annotations

Expand Down Expand Up @@ -66,6 +66,38 @@ class PipelineBlobStorageConfig(PipelineStorageConfig[Literal[StorageType.blob]]
"""The storage account blob url."""


class PipelineCosmosDBStorageConfig(
PipelineStorageConfig[Literal[StorageType.cosmosdb]]
):
"""Represents the cosmosdb storage configuration for the pipeline."""

type: Literal[StorageType.cosmosdb] = StorageType.cosmosdb
"""The type of storage."""

connection_string: str | None = Field(
description="The cosmosdb storage primary key for the storage.", default=None
)
"""The cosmosdb storage primary key for the storage."""

container_name: str = Field(
description="The container name for storage", default=""
)
"""The container name for storage."""

base_dir: str | None = Field(
description="The base directory for the storage.", default=None
)
"""The base directory for the storage."""

cosmosdb_account_url: str | None = Field(
description="The cosmosdb account url.", default=None
)
"""The cosmosdb account url."""


PipelineStorageConfigTypes = (
PipelineFileStorageConfig | PipelineMemoryStorageConfig | PipelineBlobStorageConfig
PipelineFileStorageConfig
| PipelineMemoryStorageConfig
| PipelineBlobStorageConfig
| PipelineCosmosDBStorageConfig
)
42 changes: 42 additions & 0 deletions graphrag/index/create_pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from graphrag.index.config.cache import (
PipelineBlobCacheConfig,
PipelineCacheConfigTypes,
PipelineCosmosDBCacheConfig,
PipelineFileCacheConfig,
PipelineMemoryCacheConfig,
PipelineNoneCacheConfig,
Expand All @@ -44,6 +45,7 @@
)
from graphrag.index.config.storage import (
PipelineBlobStorageConfig,
PipelineCosmosDBStorageConfig,
PipelineFileStorageConfig,
PipelineMemoryStorageConfig,
PipelineStorageConfigTypes,
Expand Down Expand Up @@ -420,6 +422,26 @@ def _get_storage_config(
base_dir=storage_settings.base_dir,
storage_account_blob_url=storage_account_blob_url,
)
case StorageType.cosmosdb:
cosmosdb_account_url = storage_settings.cosmosdb_account_url
connection_string = storage_settings.connection_string
base_dir = storage_settings.base_dir
container_name = storage_settings.container_name
if cosmosdb_account_url is None:
msg = "CosmosDB account url must be provided for cosmosdb storage."
raise ValueError(msg)
if base_dir is None:
msg = "Base directory must be provided for cosmosdb storage."
raise ValueError(msg)
if container_name is None:
msg = "Container name must be provided for cosmosdb storage."
raise ValueError(msg)
return PipelineCosmosDBStorageConfig(
cosmosdb_account_url=cosmosdb_account_url,
connection_string=connection_string,
base_dir=storage_settings.base_dir,
container_name=container_name,
)
case _:
# relative to the root_dir
base_dir = storage_settings.base_dir
Expand Down Expand Up @@ -457,6 +479,26 @@ def _get_cache_config(
base_dir=settings.cache.base_dir,
storage_account_blob_url=storage_account_blob_url,
)
case CacheType.cosmosdb:
cosmosdb_account_url = settings.cache.cosmosdb_account_url
connection_string = settings.cache.connection_string
base_dir = settings.cache.base_dir
container_name = settings.cache.container_name
if base_dir is None:
msg = "Base directory must be provided for cosmosdb cache."
raise ValueError(msg)
if container_name is None:
msg = "Container name must be provided for cosmosdb cache."
raise ValueError(msg)
if connection_string is None and cosmosdb_account_url is None:
msg = "Connection string or cosmosDB account url must be provided for cosmosdb cache."
raise ValueError(msg)
return PipelineCosmosDBCacheConfig(
cosmosdb_account_url=cosmosdb_account_url,
connection_string=connection_string,
base_dir=base_dir,
container_name=container_name,
)
case _:
# relative to root dir
return PipelineFileCacheConfig(base_dir="./cache")
6 changes: 5 additions & 1 deletion graphrag/index/run/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ async def _process_workflow(
return None

context.stats.workflows[workflow_name] = {"overall": 0.0}

await _inject_workflow_data_dependencies(
workflow, workflow_dependencies, dataset, context.storage
workflow,
workflow_dependencies,
dataset,
context.storage,
)

workflow_start_time = time.time()
Expand Down
3 changes: 1 addition & 2 deletions graphrag/query/input/loaders/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ def to_optional_float(data: pd.Series, column_name: str | None) -> float | None:
if value is None:
return None
if not isinstance(value, float):
msg = f"value is not a float: {value} ({type(value)})"
raise ValueError(msg)
return float(value)
else:
msg = f"Column {column_name} not found in data"
raise ValueError(msg)
Expand Down
Loading

0 comments on commit 8368b12

Please sign in to comment.