Skip to content

Commit

Permalink
Support delta write + Add unity plugin (#1)
Browse files Browse the repository at this point in the history
* Start

* Add tests for delta write, modify macros

* Update delta file

* Fix tests

* Define delta extra

* Add support for installing extensions from custom repositories, modify init db to create secrets prior to attaching catalogs, add UC client to delta plugin to dynamically create schemas/tables if they don't exist

* fix secret creation to use the __default_ prefix when no name is specified

* fix tests, add date as supported type to convert

* run linter and run add unitycatalog to dev reqs

* fix tests

* fix delta write test

* clean up tests

* fixed all tests

* rollback some accidently edited files

* clean up

* added unitycatalog as delta-plugin option, rewrite delta plugin

* fix credential tests

* Add TODOs

* add correct type hinting for schema conversion

* update type mapping

* remove redundant mock

* create new unity plugin, separate unity from delta

* refactoring

* Add tenacity to add retries for executing target config compiled code

* Add tenacity to add retries for executing target config compiled code

* Overwrite ref macro to support unity table reads

* fix ref to fix failing tests

* refactor ref macro

* wait incrementally, get catalog from profile.yml

* remove None check for uc client

* add tests for the unity plugin

* fix Mock unity catalog server step in pipeline

* fix Mock unity catalog server step in pipeline

* fix Mock unity catalog server step in pipeline

* fix Mock unity catalog server step in pipeline

* fix Mock unity catalog server step in pipeline

* change access for test-unity.sh

* update tox.ini

* fix tests

* move decorator to utils, make handle more robust for concurrent operations by a retry decorator

* make handle more robust for concurrent operations by a retry decorator

* remove secrets creation for each cursor init

* fix test

* remove threads from unity test

* clean up tests

* add unity catalog load support

* clean up

* remove catalog from full_table_name

* clean up unity plugin

* fix the catalog base url to add the http path to the host_and_port

* add link to git issue

* update doc

---------

Co-authored-by: Frank Mbonu <fcm073@gmail.com>
  • Loading branch information
dan1elt0m and chidifrank authored Aug 27, 2024
1 parent 8c95984 commit ff9d4c4
Show file tree
Hide file tree
Showing 31 changed files with 1,157 additions and 119 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,51 @@ jobs:
name: buenavista_results_${{ matrix.python-version }}-${{ steps.date.outputs.date }}.csv
path: buenavista_results.csv

unity:
name: Unity Catalog functional test / python ${{ matrix.python-version }}

runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
python-version: [ '3.9' ]

env:
TOXENV: "unity"
PYTEST_ADDOPTS: "-v --color=yes --csv unity_results.csv"

steps:
- name: Check out the repository
uses: actions/checkout@v4
with:
persist-credentials: false

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install python dependencies
run: |
python -m pip install tox
python -m pip --version
tox --version
- name: Mock Unity catalog server and run tox
run: ./scripts/test-unity.sh

- name: Get current date
if: always()
id: date
run: echo "date=$(date +'%Y-%m-%dT%H_%M_%S')" >> $GITHUB_OUTPUT #no colons allowed for artifacts

- uses: actions/upload-artifact@v3
if: always()
with:
name: unity_results{{ matrix.python-version }}-${{ steps.date.outputs.date }}.csv
path: unity_results.csv

fsspec:
name: fsspec test / python ${{ matrix.python-version }}

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,4 @@ target/
.idea/
.vscode/
.env
.venv/*
2 changes: 2 additions & 0 deletions .mock-uc-server-stats.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
configured_endpoints: 25
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/eventual%2Funitycatalog-afb7536b3c70b699dd3090dc47209959c1591beae2945ee5fe14e1b53139fe83.yml
17 changes: 11 additions & 6 deletions dbt/adapters/duckdb/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from urllib.parse import urlparse

from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.exceptions import DbtRuntimeError

from dbt.adapters.contracts.connection import Credentials
from dbt.adapters.duckdb.secrets import DEFAULT_SECRET_PREFIX
from dbt.adapters.duckdb.secrets import Secret


Expand Down Expand Up @@ -77,6 +75,12 @@ class Retries(dbtClassMixin):
retryable_exceptions: List[str] = field(default_factory=lambda: ["IOException"])


@dataclass
class Extension(dbtClassMixin):
name: str
repository: Optional[str] = None


@dataclass
class DuckDBCredentials(Credentials):
database: str = "main"
Expand All @@ -88,7 +92,7 @@ class DuckDBCredentials(Credentials):
config_options: Optional[Dict[str, Any]] = None

# any DuckDB extensions we want to install and load (httpfs, parquet, etc.)
extensions: Optional[Tuple[str, ...]] = None
extensions: Optional[List[Extension]] = None

# any additional pragmas we want to configure on our DuckDB connections;
# a list of the built-in pragmas can be found here:
Expand Down Expand Up @@ -177,11 +181,12 @@ def __post_init__(self):
if self.secrets:
self._secrets = [
Secret.create(
secret_type=secret.pop("type"),
name=secret.pop("name", f"{DEFAULT_SECRET_PREFIX}{num + 1}"),
secret_type=secret_type,
name=secret.pop("name", f"__default_{secret_type}"),
**secret,
)
for num, secret in enumerate(self.secrets)
for secret in self.secrets
if (secret_type := secret.get("type"))
]

def secrets_sql(self) -> List[str]:
Expand Down
18 changes: 13 additions & 5 deletions dbt/adapters/duckdb/environments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,19 @@ def initialize_db(
# install any extensions on the connection
if creds.extensions is not None:
for extension in creds.extensions:
conn.install_extension(extension)
conn.load_extension(extension)
if extension.repository:
conn.execute(f"SET custom_extension_repository = '{extension.repository}'")
else:
conn.execute(
"SET custom_extension_repository = 'http://extensions.duckdb.org'"
)
conn.install_extension(extension.name)
conn.load_extension(extension.name)

# install any secrets on the connection
if creds.secrets:
for sql in creds.secrets_sql():
conn.execute(sql)

# Attach any fsspec filesystems on the database
if creds.filesystems:
Expand Down Expand Up @@ -207,9 +218,6 @@ def initialize_cursor(
# to the correct type
cursor.execute(f"SET {key} = '{value}'")

for sql in creds.secrets_sql():
cursor.execute(sql)

# update cursor if something is lost in the copy
# of the parent connection
if plugins:
Expand Down
69 changes: 64 additions & 5 deletions dbt/adapters/duckdb/environments/local.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import threading

import pyarrow
from dbt_common.exceptions import DbtRuntimeError
from duckdb import CatalogException

from . import Environment
from .. import credentials
from .. import utils
from ..utils import get_retry_decorator
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.contracts.connection import Connection

Expand All @@ -29,6 +32,7 @@ def execute(self, sql, bindings=None):

class DuckDBConnectionWrapper:
def __init__(self, cursor, env):
self._conn = env.conn
self._cursor = DuckDBCursorWrapper(cursor)
self._env = env

Expand Down Expand Up @@ -98,7 +102,8 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
handle = self.handle()
cursor = handle.cursor()

if source_config.schema:
# Schema creation is currently not supported by the uc_catalog duckdb extension
if source_config.schema and plugin_name != "unity":
cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {source_config.schema}")

save_mode = source_config.get("save_mode", "overwrite")
Expand Down Expand Up @@ -133,13 +138,49 @@ def load_source(self, plugin_name: str, source_config: utils.SourceConfig):
# save to df instance to register on each cursor creation
self._REGISTERED_DF[df_name] = df

cursor.execute(
f"CREATE OR REPLACE {materialization} {source_table_name} AS SELECT * FROM {df_name}"
)
# CREATE OR REPLACE table creation is currently not supported by the uc_catalog duckdb extension
if plugin_name != "unity":
cursor.execute(
f"CREATE OR REPLACE {materialization} {source_table_name} AS SELECT * FROM {df_name}"
)

cursor.close()
handle.close()

def get_arrow_dataframe(
self, compiled_code: str, retries: int, wait_time: float
) -> pyarrow.lib.Table:
"""Get the arrow dataframe from the compiled code.
:param compiled_code: Compiled code
:param retries: Number of retries
:param wait_time: Wait time between retries
:returns: Arrow dataframe
"""

@get_retry_decorator(retries, wait_time, CatalogException)
def execute_query():
try:
# Get the handle and cursor
handle = self.handle()
cursor = handle.cursor()

# Execute the compiled code
df = cursor.sql(compiled_code).arrow()

return df
except CatalogException as e:
# Reset the connection to refresh the catalog
self.conn = None

# Raise the exception to retry the operation
raise CatalogException(
f"{str(e)}: failed to execute compiled code {compiled_code}"
)

return execute_query()

def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) -> None:
if plugin_name not in self._plugins:
if plugin_name.startswith("glue|"):
Expand All @@ -155,7 +196,25 @@ def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) ->
+ ",".join(self._plugins.keys())
)
plugin = self._plugins[plugin_name]
plugin.store(target_config)

handle = self.handle()
cursor = handle.cursor()

# Get the number of retries and the wait time for a dbt model
retries = int(target_config.config.get("retries", 20))
wait_time = float(target_config.config.get("wait_time", 0.05))

# Get the arrow dataframe
df = self.get_arrow_dataframe(
compiled_code=target_config.config.model.compiled_code,
retries=retries,
wait_time=wait_time,
)

plugin.store(target_config, df)

cursor.close()
handle.close()

def close(self):
if self.conn:
Expand Down
3 changes: 2 additions & 1 deletion dbt/adapters/duckdb/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(
"""
self.name = name
self.creds = credentials
self.plugin_config = plugin_config
self.initialize(plugin_config)

def initialize(self, plugin_config: Dict[str, Any]):
Expand Down Expand Up @@ -134,7 +135,7 @@ def load(self, source_config: SourceConfig):
"""
raise NotImplementedError(f"load method not implemented for {self.name}")

def store(self, target_config: TargetConfig):
def store(self, target_config: TargetConfig, df=None):
raise NotImplementedError(f"store method not implemented for {self.name}")

def configure_cursor(self, cursor):
Expand Down
Loading

0 comments on commit ff9d4c4

Please sign in to comment.