Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source DuckDB: ✨ Add MotherDuck support 🦆🦆 #29428

Merged
merged 55 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
80306e2
feat: (draft) motherduck support for destination-duckdb
aaronsteers Aug 15, 2023
a331b01
docs: instructions for motherduck connectivity
aaronsteers Aug 18, 2023
cb63810
chore: poetry init
aaronsteers Aug 25, 2023
a0b4a2a
chore: mark setup.py for future removal
aaronsteers Aug 25, 2023
74c68eb
fix: typo
aaronsteers Aug 25, 2023
29f3704
chore: add dev container config
aaronsteers Aug 25, 2023
ec275a5
chore: move duckdb dev container config to root
aaronsteers Aug 25, 2023
8a67d3c
chore: dev container opens in subdir
aaronsteers Aug 25, 2023
622e34d
fix: stringified dir name
aaronsteers Aug 26, 2023
e708c08
fix: api key injection
aaronsteers Aug 26, 2023
58b4ab8
chore: create symlink to devcontainer config while in the devcontainer
aaronsteers Aug 26, 2023
b48432e
chore: symlink the devcontainer config file directly
aaronsteers Aug 26, 2023
60a1166
chore: updated gitignore
aaronsteers Aug 26, 2023
401cd3b
fix: path declaration
aaronsteers Aug 26, 2023
be903ae
fix: data type for emitted_at col
aaronsteers Aug 26, 2023
6907b06
fix: motherduck token ingestion
aaronsteers Aug 26, 2023
6c69415
add motherduck tests (passing)
aaronsteers Aug 26, 2023
a4659b7
fix: don't echo path (could be sensitive)
aaronsteers Aug 26, 2023
8f62a79
chore: add back db path logging when we know it is a local path
aaronsteers Aug 26, 2023
0fdbe90
chore: update examples in json schema
aaronsteers Aug 26, 2023
9cb65d9
chore: remove redundant python install
aaronsteers Aug 26, 2023
742364d
chore: remove dind support for duckdb container
aaronsteers Aug 26, 2023
ef0e648
chore: use project-specific venv dir
aaronsteers Aug 26, 2023
967b8e2
fix: pytest path in web codespace
aaronsteers Aug 26, 2023
d5dd6d0
chore: update settings, add '.' to pytest args
aaronsteers Aug 26, 2023
7970318
docs: accept suggestion
aaronsteers Aug 27, 2023
0a6cfaa
chore: rename api_key setting
aaronsteers Aug 28, 2023
aa1d258
chore: replace ‘writeOnly’ annotation with with ‘airbyte_secret'
aaronsteers Aug 29, 2023
646ccdf
Merge branch 'master' into destination-duckdb/feat-motherduck-support
aaronsteers Sep 6, 2023
07294d9
Merge branch 'master' into destination-duckdb/feat-motherduck-support
bnchrch Sep 8, 2023
735d3a6
Update dockerfile
bnchrch Sep 8, 2023
6d09f3d
Fix format
bnchrch Sep 8, 2023
a315a3e
Remove setup.py and requirement.txt
bnchrch Sep 8, 2023
a994139
Roughly update airbyte-ci to build pyproject
bnchrch Sep 8, 2023
1e7e998
Add acceptance-test-config
bnchrch Sep 8, 2023
db58015
Eliminate secrets
bnchrch Sep 9, 2023
03f6efb
Format
bnchrch Sep 9, 2023
5c15c51
Pretty up pipelines and run format
bnchrch Sep 9, 2023
32b6c34
chore: match ci format expectations
aaronsteers Sep 11, 2023
52f31e6
chore: imporve devcontainer, add docker-in-docker
aaronsteers Sep 11, 2023
56d4186
chore: imporved devcontainer
aaronsteers Sep 11, 2023
044fbe8
Update airbyte-ci/connectors/connector_ops/connector_ops/utils.py
bnchrch Sep 13, 2023
a585701
Remove duplicate
bnchrch Sep 13, 2023
d65c853
Merge remote-tracking branch 'origin/master' into destination-duckdb/…
bnchrch Sep 16, 2023
07c2e5c
Bump version
bnchrch Sep 16, 2023
23b21ca
update tests
bnchrch Sep 16, 2023
65161fd
Update tests
bnchrch Sep 16, 2023
41a9981
Merge branch 'master' into destination-duckdb/feat-motherduck-support
aaronsteers Sep 18, 2023
2b31870
Merge branch 'master' into destination-duckdb/feat-motherduck-support
aaronsteers Sep 19, 2023
f290c2b
add titles to spec.json files
aaronsteers Sep 19, 2023
92897fd
docs: customize readme for oss vs cloud
aaronsteers Sep 19, 2023
811d601
Merge remote-tracking branch 'origin/master' into destination-duckdb/…
bnchrch Sep 19, 2023
611b915
Merge remote-tracking branch 'origin/master' into destination-duckdb/…
bnchrch Sep 19, 2023
e0ec605
enable duckdb in cloud
bnchrch Sep 19, 2023
1e7b3b7
docs: fix broken links
aaronsteers Sep 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions .devcontainer/destination-duckdb/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/python
{
"name": "DuckDB Destination Connector DevContainer (Python)",

// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/python:0-3.9",

// Features to add to the dev container. More info: https://containers.dev/features.
"features": {
"ghcr.io/devcontainers-contrib/features/poetry:2": {},
"ghcr.io/devcontainers/features/docker-in-docker": {}
},
"overrideFeatureInstallOrder": [
// Deterministic order maximizes cache reuse
"ghcr.io/devcontainers-contrib/features/poetry",
"ghcr.io/devcontainers/features/docker-in-docker"
],

"workspaceFolder": "/workspaces/airbyte/airbyte-integrations/connectors/destination-duckdb",

// Configure tool-specific properties.
"customizations": {
"vscode": {
"extensions": [
// Python extensions:
"charliermarsh.ruff",
"matangover.mypy",
"ms-python.black",
"ms-python.python",
"ms-python.vscode-pylance",

// Toml support
"tamasfe.even-better-toml",

// Yaml and JSON Schema support:
"redhat.vscode-yaml",

// Contributing:
"GitHub.vscode-pull-request-github"
],
"settings": {
"extensions.ignoreRecommendations": true,
"git.openRepositoryInParentFolders": "always",
"python.defaultInterpreterPath": ".venv/bin/python",
"python.interpreter.infoVisibility": "always",
"python.terminal.activateEnvironment": true,
"python.testing.pytestEnabled": true,
"python.testing.cwd": "/workspaces/airbyte/airbyte-integrations/connectors/destination-duckdb",
"python.testing.pytestArgs": [
"--rootdir=/workspaces/airbyte/airbyte-integrations/connectors/destination-duckdb",
"."
]
}
}
},
"containerEnv": {
"POETRY_VIRTUALENVS_IN_PROJECT": "true"
},

// Mark the root directory as 'safe' for git.
"initializeCommand": "git config --add safe.directory /workspaces/airbyte",

// Use 'postCreateCommand' to run commands after the container is created.
// Post-create tasks:
// 1. Create a symlink directory.
// 2. Create symlinks for the devcontainer.json and docs markdown file.
// 3. Install the Python/Poetry dependencies.
"postCreateCommand": "mkdir -p ./.symlinks && echo '*' > ./.symlinks/.gitignore && ln -sf /workspaces/airbyte/.devcontainer/destination-duckdb/devcontainer.json ./.symlinks/devcontainer.json && ln -sf /workspaces/airbyte/docs/integrations/destinations/duckdb.md ./.symlinks/duckdb-docs.md && poetry install"

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],

// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ def metadata(self) -> Optional[dict]:
def language(self) -> ConnectorLanguage:
if Path(self.code_directory / self.technical_name.replace("-", "_") / "manifest.yaml").is_file():
return ConnectorLanguage.LOW_CODE
if Path(self.code_directory / "setup.py").is_file():
if Path(self.code_directory / "setup.py").is_file() or Path(self.code_directory / "pyproject.toml").is_file():
return ConnectorLanguage.PYTHON
return ConnectorLanguage.PYTHON
try:
with open(self.code_directory / "Dockerfile") as dockerfile:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/connector_ops/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "connector_ops"
version = "0.2.2"
version = "0.2.3"
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
description = "Packaged maintained by the connector operations team to perform CI for connectors"
authors = ["Airbyte <contact@airbyte.io>"]

Expand Down
51 changes: 38 additions & 13 deletions airbyte-ci/connectors/pipelines/pipelines/actions/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,35 @@ async def find_local_dependencies_in_pyproject_toml(
return local_dependency_paths


def _install_python_dependencies_from_setup_py(
container: Container,
additional_dependency_groups: Optional[List] = None,
) -> Container:
install_connector_package_cmd = ["python", "-m", "pip", "install", "."]
container = container.with_exec(install_connector_package_cmd)

if additional_dependency_groups:
# e.g. .[dev,tests]
group_string = f".[{','.join(additional_dependency_groups)}]"
group_install_cmd = ["python", "-m", "pip", "install", group_string]

container = container.with_exec(group_install_cmd)

return container


def _install_python_dependencies_from_requirements_txt(container: Container) -> Container:
install_requirements_cmd = ["python", "-m", "pip", "install", "-r", "requirements.txt"]
return container.with_exec(install_requirements_cmd)


def _install_python_dependencies_from_poetry(container: Container) -> Container:
Copy link
Contributor

@bnchrch bnchrch Sep 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alafanechere Having to install poetry at this level has left me with a thought for when we start removing dockerfiles from python connectors.

The thought is that we have to be really clear in our code and file structure that there is an order of operations.

  1. Set base image
  2. Install system dependencies
  3. Install language libraries

In step 1 we will have to detect

  1. Language (java, python, lowcode)
  2. Version of language

In step 2 we will have to

  1. Provide a method for users to override / add to the system deps in our base image (finalize_build.sh?)

In step 3 we will have to

  1. Detect (or enforce one method) the dependency system (poetry, setup, pipx?, requirements)
  2. And install based on that

And we will likely need to have step 3 generic enough to be used to

  1. Build connector images
  2. Install one off python utility programs (qa-engine, metadata service, etc)

With all that in mind I think we should start spliting environments.py into a structure based around these different types of interchangable systems like this

actions/
    environments/
        utils.py
        system/
            ...
        python/
            pyproject_toml.py
            setup_py.py
            requirements_txt.py
        java/
            ...

Anyway no action required. Just wanted to raise the thought, in case you objected to me moving in this direction

Copy link
Contributor

@alafanechere alafanechere Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnchrch we're aligned.
Just a detail about step 3:
The main change I plan to make is:

  • Currently: we create a test environment in a python base image on top of which we install stuffs like poetry, pytest etc. I don't like this approach because it means tests are executed in a different environment from which the actual connector code is executed in the wild (its docker container)
  • Target: The build step will continue to install all main dependencies. We should install the dev dependencies to run test. So we should just append something like: pip install .[test] for setup.py or poetry install --with test. This will ensure consistency: tests will be run in the same execution context as the actual "prod" environment.

We should indeed implement a switch like statement to run different command according to the detected package manager in use. (edit: I just saw that you implemented this below)

I'm happy to revisit the environments packages (we can also reconsider its legitimacy). I originally named it environements to collect reusable containers with pre-installed tools. My target suggestion will discard the provisionning of a test environment.

pip_install_poetry_cmd = ["python", "-m", "pip", "install", "poetry"]
poetry_disable_virtual_env_cmd = ["poetry", "config", "virtualenvs.create", "false"]
poetry_install_no_venv_cmd = ["poetry", "install", "--no-root", "--no-dev"]
return container.with_exec(pip_install_poetry_cmd).with_exec(poetry_disable_virtual_env_cmd).with_exec(poetry_install_no_venv_cmd)


async def with_installed_python_package(
context: PipelineContext,
python_environment: Container,
Expand All @@ -302,29 +331,25 @@ async def with_installed_python_package(
Returns:
Container: A python environment container with the python package installed.
"""
install_requirements_cmd = ["python", "-m", "pip", "install", "-r", "requirements.txt"]
install_connector_package_cmd = ["python", "-m", "pip", "install", "."]

container = with_python_package(context, python_environment, package_source_code_path, exclude=exclude)

local_dependencies = await find_local_python_dependencies(context, package_source_code_path)

for dependency_directory in local_dependencies:
container = container.with_mounted_directory("/" + dependency_directory, context.get_repo_dir(dependency_directory))

has_setup_py, has_requirements_txt = await check_path_in_workdir(container, "setup.py"), await check_path_in_workdir(
container, "requirements.txt"
)
has_setup_py = await check_path_in_workdir(container, "setup.py")
has_requirements_txt = await check_path_in_workdir(container, "requirements.txt")
has_pyproject_toml = await check_path_in_workdir(container, "pyproject.toml")

if has_pyproject_toml:
container = _install_python_dependencies_from_poetry(container)

if has_setup_py:
container = container.with_exec(install_connector_package_cmd)
if has_requirements_txt:
container = container.with_exec(install_requirements_cmd)
container = _install_python_dependencies_from_setup_py(container, additional_dependency_groups)

if additional_dependency_groups:
container = container.with_exec(
install_connector_package_cmd[:-1] + [install_connector_package_cmd[-1] + f"[{','.join(additional_dependency_groups)}]"]
)
if has_requirements_txt:
container = _install_python_dependencies_from_requirements_txt(container)
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved

return container

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
!main.py
!destination_duckdb
!setup.py
!pyproject.toml
!poetry.lock
!README.md
2 changes: 2 additions & 0 deletions airbyte-integrations/connectors/destination-duckdb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Ignore symlinks created within the dev container
.symlinks
38 changes: 15 additions & 23 deletions airbyte-integrations/connectors/destination-duckdb/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,42 +1,34 @@
FROM python:3.9.11 as base
FROM python:3.9.11
# FROM python:3.9.11-alpine3.15 as base
# switched from alpine as there were tons of errors (in case you want to switch back to alpine)
# - https://stackoverflow.com/a/57485724/5246670
# - numpy error: https://stackoverflow.com/a/22411624/5246670
# - libstdc++ https://github.com/amancevice/docker-pandas/issues/12#issuecomment-717215043
# - musl-dev linux-headers g++ because of: https://stackoverflow.com/a/40407099/5246670

# build and load all requirements
FROM base as builder
WORKDIR /airbyte/integration_code

# upgrade pip to the latest version
# Upgrade system packages and install Poetry
RUN apt-get update && apt-get -y upgrade \
&& pip install --upgrade pip
&& pip install --upgrade pip \
&& pip install poetry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnchrch I think we should add poetry to our base python image. Wdyt? I can add it in #30303

COPY setup.py ./
# install necessary packages to a temporary folder
RUN pip install --prefix=/install .
# build a clean environment
FROM base
# RUN conda install -c conda-forge python-duckdb
WORKDIR /airbyte/integration_code

# copy all loaded and built libraries to a pure basic image
COPY --from=builder /install /usr/local
# add default timezone settings
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
RUN echo "Etc/UTC" > /etc/timezone
# Copy only poetry.lock* in case it doesn't exist
COPY pyproject.toml poetry.lock* ./
RUN poetry config virtualenvs.create false \
&& poetry install --no-root --no-dev

#adding duckdb manually (outside of setup.py - lots of errors)
RUN pip install duckdb

# copy payload code only
# Copy code
COPY main.py ./
COPY destination_duckdb ./destination_duckdb

# Timezone setup
COPY --from=python:3.9.11 /usr/share/zoneinfo/Etc/UTC /etc/localtime
RUN echo "Etc/UTC" > /etc/timezone

# Entry point
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/destination-duckdb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
connector_image: airbyte/destination-duckdb:dev
acceptance_tests:
spec:
tests:
- spec_path: integration_tests/spec.json
config_path: "integration_tests/config.json"
connection:
tests:
- config_path: "integration_tests/config.json"
status: "succeed"

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def _get_destination_path(destination_path: str) -> str:
Get a normalized version of the destination path.
Automatically append /local/ to the start of the path
"""
if destination_path.startswith("md:") or destination_path.startswith("motherduck:"):
return destination_path

if not destination_path.startswith("/local"):
destination_path = os.path.join("/local", destination_path)

Expand All @@ -37,9 +40,11 @@ def _get_destination_path(destination_path: str) -> str:
return destination_path

def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
self,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
input_messages: Iterable[AirbyteMessage],
) -> Iterable[AirbyteMessage]:

"""
Reads the input stream of messages, config, and catalog to write data to the destination.

Expand All @@ -56,17 +61,19 @@ def write(
streams = {s.stream.name for s in configured_catalog.streams}
logger.info(f"Starting write to DuckDB with {len(streams)} streams")

path = config.get("destination_path")
path = str(config.get("destination_path"))
path = self._get_destination_path(path)
# check if file exists

logger.info(f"Opening DuckDB file at {path}")
# Get and register auth token if applicable
motherduck_api_key = str(config.get("motherduck_api_key"))
if motherduck_api_key:
os.environ["motherduck_token"] = motherduck_api_key

con = duckdb.connect(database=path, read_only=False)

# create the tables if needed
# con.execute("BEGIN TRANSACTION")
for configured_stream in configured_catalog.streams:

name = configured_stream.stream.name
table_name = f"_airbyte_raw_{name}"
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
Expand All @@ -82,7 +89,7 @@ def write(
query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
_airbyte_ab_id TEXT PRIMARY KEY,
_airbyte_emitted_at JSON,
_airbyte_emitted_at DATETIME,
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
_airbyte_data JSON
)
"""
Expand All @@ -92,20 +99,16 @@ def write(
buffer = defaultdict(list)

for message in input_messages:

if message.type == Type.STATE:
# flush the buffer
for stream_name in buffer.keys():

logger.info(f"flushing buffer for state: {message}")
query = """
INSERT INTO {table_name}
INSERT INTO {table_name} (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
VALUES (?,?,?)
""".format(
table_name=f"_airbyte_raw_{stream_name}"
)
logger.info(f"query: {query}")

con.executemany(query, buffer[stream_name])

con.commit()
Expand All @@ -120,13 +123,18 @@ def write(
continue

# add to buffer
buffer[stream].append((str(uuid.uuid4()), datetime.datetime.now().isoformat(), json.dumps(data)))
buffer[stream].append(
(
str(uuid.uuid4()),
datetime.datetime.now().isoformat(),
json.dumps(data),
)
)
else:
logger.info(f"Message type {message.type} not supported, skipping")

# flush any remaining messages
for stream_name in buffer.keys():

query = """
INSERT INTO {table_name}
VALUES (?,?,?)
Expand All @@ -150,11 +158,16 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn
:return: AirbyteConnectionStatus indicating a Success or Failure
"""
try:
# parse the destination path
param_path = config.get("destination_path")
path = self._get_destination_path(param_path)
path = config.get("destination_path")
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
path = self._get_destination_path(path)

if path.startswith("/local"):
logger.info(f"Using DuckDB file at {path}")
os.makedirs(os.path.dirname(path), exist_ok=True)

if "motherduck_api_key" in config:
os.environ["motherduck_token"] = config["motherduck_api_key"]

os.makedirs(os.path.dirname(path), exist_ok=True)
con = duckdb.connect(database=path, read_only=False)
con.execute("SELECT 1;")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
"required": ["destination_path"],
"additionalProperties": true,
"properties": {
"motherduck_api_key": {
"type": "string",
"description": "API key to use for authentication to a MotherDuck database.",
"airbyte_secret": true
},
"destination_path": {
"type": "string",
"description": "Path to the .duckdb file. The file will be placed inside that local mount. For more information check out our <a href=\"https://docs.airbyte.io/integrations/destinations/duckdb\">docs</a>",
"example": "/local/destination.duckdb"
"examples": ["/local/destination.duckdb", "md:", "motherduck:"]
},
"schema": {
"type": "string",
Expand Down
Loading
Loading