Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB: Add table loader for full-load operations #226

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions .github/workflows/dynamodb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
---
name: "Tests: DynamoDB"

on:
pull_request:
branches: ~
paths:
- '.github/workflows/dynamodb.yml'
- 'cratedb_toolkit/io/dynamodb/**'
- 'pyproject.toml'
push:
branches: [ main ]
paths:
- '.github/workflows/dynamodb.yml'
- 'cratedb_toolkit/io/dynamodb/**'
- 'pyproject.toml'

# Allow job to be triggered manually.
workflow_dispatch:

# Run job each second night after CrateDB nightly has been published.
# The reason about "why each second night", is because free capacity
# for Codecov uploads is limited.
schedule:
- cron: '0 3 */2 * *'

# Cancel in-progress jobs when pushing to the same branch.
concurrency:
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.ref }}

jobs:

tests:

runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
# TODO: yarl, dependency of influxio, is currently not available on Python 3.12.
# https://github.com/aio-libs/yarl/pull/942
python-version: ["3.8", "3.11"]
localstack-version: ["3.6"]

env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}
LOCALSTACK_VERSION: ${{ matrix.localstack-version }}
# Do not tear down Testcontainers
TC_KEEPALIVE: true

name: "
Python ${{ matrix.python-version }},
LocalStack ${{ matrix.localstack-version }},
OS ${{ matrix.os }}
"
steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: 'pyproject.toml'

- name: Set up project
run: |

# `setuptools 0.64.0` adds support for editable install hooks (PEP 660).
# https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400
pip install "setuptools>=64" --upgrade

# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop]

- name: Run linter and software tests
run: |
pytest -m dynamodb

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: ./coverage.xml
flags: dynamodb
env_vars: OS,PYTHON
name: codecov-umbrella
fail_ci_if_error: false
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- MongoDB: Add capability to give type hints and add transformations
- Dependencies: Adjust code for lorrystream version 0.0.3
- Dependencies: Update to lorrystream 0.0.4 and commons-codec 0.0.7
- DynamoDB: Add table loader for full-load operations

## 2024/07/25 v0.0.16
- `ctk load table`: Added support for MongoDB Change Streams
Expand Down
10 changes: 9 additions & 1 deletion cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,15 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
source_url = resource.url
target_url = self.address.dburi
source_url_obj = URL(source_url)
if source_url.startswith("influxdb"):
if source_url.startswith("dynamodb"):
from cratedb_toolkit.io.dynamodb.api import dynamodb_copy

if not dynamodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)

elif source_url.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

http_scheme = "http://"
Expand Down
Empty file.
26 changes: 26 additions & 0 deletions cratedb_toolkit/io/dynamodb/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import boto3
from yarl import URL


class DynamoDBAdapter:
def __init__(self, dynamodb_url: URL, echo: bool = False):
self.session = boto3.Session(
aws_access_key_id=dynamodb_url.user,
aws_secret_access_key=dynamodb_url.password,
region_name=dynamodb_url.query.get("region"),
)
endpoint_url = None
if dynamodb_url.host and dynamodb_url.host.lower() != "aws":
endpoint_url = f"http://{dynamodb_url.host}:{dynamodb_url.port}"
self.dynamodb_resource = self.session.resource("dynamodb", endpoint_url=endpoint_url)
self.dynamodb_client = self.session.client("dynamodb", endpoint_url=endpoint_url)

def scan(self, table_name: str):
"""
Return all items from DynamoDB table.
"""
return self.dynamodb_client.scan(TableName=table_name)

def count_records(self, table_name: str):
table = self.dynamodb_resource.Table(table_name)
return table.item_count
40 changes: 40 additions & 0 deletions cratedb_toolkit/io/dynamodb/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging

from cratedb_toolkit.io.dynamodb.copy import DynamoDBFullLoad

logger = logging.getLogger(__name__)


def dynamodb_copy(source_url, target_url, progress: bool = False):
"""

Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@localhost:4566/us-east-1/ProductCatalog
ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@localhost:4566/arn:aws:dynamodb:us-east-1:000000000000:table/ProductCatalog

ctk load table dynamodb://arn:aws:dynamodb:us-east-1:000000000000:table/ProductCatalog
arn:aws:dynamodb:us-east-1:841394475918:table/stream-demo

ctk load table dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/ProductCatalog?region=eu-central-1

Resources
---------
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html

Backlog
-------
Currently, it is not directly possible to address DynamoDB tables by ARN, i.e. for using a different AccountID.
- https://github.com/boto/boto3/issues/2658
- https://stackoverflow.com/questions/71019941/how-to-point-to-the-arn-of-a-dynamodb-table-instead-of-using-the-name-when-using
- https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/configure-cross-account-access-to-amazon-dynamodb.html
"""
logger.info("Invoking DynamoDBFullLoad")
ddb_full = DynamoDBFullLoad(
dynamodb_url=source_url,
cratedb_url=target_url,
progress=progress,
)
ddb_full.start()
return True
35 changes: 35 additions & 0 deletions cratedb_toolkit/io/dynamodb/backlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# DynamoDB Backlog

## Iteration +1
- Pagination / Batch Getting.
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html#programming-with-python-pagination

- Use `batch_get_item`.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_get_item.html

- Scan by query instead of full.


## Iteration +2

### Resumption on errors?
Another variant to scan the table, probably for resuming on errors?
```python
key = None
while True:
if key is None:
response = table.scan()
else:
response = table.scan(ExclusiveStartKey=key)
key = response.get("LastEvaluatedKey", None)
```

### Item transformations?
That's another item transformation idea picked up from an example program.
Please advise if this is sensible in all situations, or if it's just a
special case.

```python
if 'id' in item and not isinstance(item['id'], str):
item['id'] = str(item['id'])
```
97 changes: 97 additions & 0 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# ruff: noqa: S608
import logging
import typing as t

import sqlalchemy as sa
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB
from tqdm import tqdm
from yarl import URL

from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


class DynamoDBFullLoad:
"""
Copy DynamoDB table into CrateDB table.
"""

def __init__(
self,
dynamodb_url: str,
cratedb_url: str,
progress: bool = False,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname

self.dynamodb_url = URL(dynamodb_url)
self.dynamodb_adapter = DynamoDBAdapter(self.dynamodb_url)
self.dynamodb_table = self.dynamodb_url.path.lstrip("/")
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.translator = DynamoDBCrateDBTranslator(table_name=self.cratedb_table)

self.progress = progress

def start(self):
"""
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB.
"""
records_in = self.dynamodb_adapter.count_records(self.dynamodb_table)
logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}")
with self.cratedb_adapter.engine.connect() as connection:
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table)
Copy link
Member Author

@amotl amotl Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another variant to scan the table, maybe for resuming on errors?

key = None
while True:
  if key is None:
    response = table.scan()
  else:
    response = table.scan(ExclusiveStartKey=key)
  key = response.get("LastEvaluatedKey", None)

/cc @wierdvanderhaar

records_out = 0
for sql in self.items_to_sql(result["Items"]):
if sql:
try:
connection.execute(sa.text(sql))
records_out += 1
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
progress_bar.update()
progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out < records_in:
logger.warning("No data has been copied")

def items_to_sql(self, items):
"""
Convert data for record items to INSERT statements.
"""
for item in items:
yield self.translator.to_sql(item)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's another item transformation idea picked up from an example program. Please advise if this is sensible in all situations, or if it's just a special case.

if 'id' in item and not isinstance(item['id'], str):
    item['id'] = str(item['id'])

/cc @wierdvanderhaar



class DynamoDBCrateDBTranslator(DynamoCDCTranslatorCrateDB):
@property
def sql_ddl(self):
"""`
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
"""
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
"""
values_clause = self.image_to_values(record)
sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES ('{values_clause}');"
return sql

@staticmethod
def quote_table_name(name: str):
# TODO @ Upstream: Quoting table names should be the responsibility of the caller.
return name
2 changes: 2 additions & 0 deletions cratedb_toolkit/io/processor/kinesis_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import logging
import os
import sys
import typing as t

import sqlalchemy as sa
from commons_codec.exception import UnknownOperationError
Expand Down Expand Up @@ -77,6 +78,7 @@

# TODO: Automatically create destination table.
# TODO: Propagate mapping definitions and other settings.
cdc: t.Union[DMSTranslatorCrateDB, DynamoCDCTranslatorCrateDB]
if MESSAGE_FORMAT == "dms":
cdc = DMSTranslatorCrateDB(column_types=column_types)
elif MESSAGE_FORMAT == "dynamodb":
Expand Down
41 changes: 41 additions & 0 deletions cratedb_toolkit/testing/testcontainers/localstack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os

from testcontainers.localstack import LocalStackContainer

from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer


class LocalStackContainerWithKeepalive(KeepaliveContainer, LocalStackContainer):
"""
A Testcontainer for LocalStack with improved configurability.

It honors the `TC_KEEPALIVE` and `LOCALSTACK_VERSION` environment variables.

Defining `TC_KEEPALIVE` will set a signal not to shut down the container
after running the test cases, in order to speed up subsequent invocations.

`LOCALSTACK_VERSION` will define the designated LocalStack version, which is
useful when used within a test matrix. Its default value is `latest`.
"""

LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "latest")

def __init__(
self,
image: str = f"localstack/localstack:{LOCALSTACK_VERSION}",
**kwargs,
) -> None:
super().__init__(image=image, **kwargs)
self.with_name("testcontainers-localstack")
6 changes: 4 additions & 2 deletions cratedb_toolkit/testing/testcontainers/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def start(self):
define the `CRATEDB_KEEPALIVE` or `TC_KEEPALIVE` environment variables.
"""

self._configure()
if hasattr(self, "_configure"):
self._configure()

if self._name is None:
raise ValueError(
Expand Down Expand Up @@ -110,7 +111,8 @@ def start(self):
logger.info(f"Starting container: {container_id} ({container_name})")
self._container.start()

self._connect()
if hasattr(self, "_connect"):
self._connect()
return self

def stop(self, **kwargs):
Expand Down
Loading