-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DynamoDB: Add table loader for full-load operations #226
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
--- | ||
name: "Tests: DynamoDB" | ||
|
||
on: | ||
pull_request: | ||
branches: ~ | ||
paths: | ||
- '.github/workflows/dynamodb.yml' | ||
- 'cratedb_toolkit/io/dynamodb/**' | ||
- 'pyproject.toml' | ||
push: | ||
branches: [ main ] | ||
paths: | ||
- '.github/workflows/dynamodb.yml' | ||
- 'cratedb_toolkit/io/dynamodb/**' | ||
- 'pyproject.toml' | ||
|
||
# Allow job to be triggered manually. | ||
workflow_dispatch: | ||
|
||
# Run job each second night after CrateDB nightly has been published. | ||
# The reason about "why each second night", is because free capacity | ||
# for Codecov uploads is limited. | ||
schedule: | ||
- cron: '0 3 */2 * *' | ||
|
||
# Cancel in-progress jobs when pushing to the same branch. | ||
concurrency: | ||
cancel-in-progress: true | ||
group: ${{ github.workflow }}-${{ github.ref }} | ||
|
||
jobs: | ||
|
||
tests: | ||
|
||
runs-on: ${{ matrix.os }} | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
os: ["ubuntu-latest"] | ||
# TODO: yarl, dependency of influxio, is currently not available on Python 3.12. | ||
# https://github.com/aio-libs/yarl/pull/942 | ||
python-version: ["3.8", "3.11"] | ||
localstack-version: ["3.6"] | ||
|
||
env: | ||
OS: ${{ matrix.os }} | ||
PYTHON: ${{ matrix.python-version }} | ||
LOCALSTACK_VERSION: ${{ matrix.localstack-version }} | ||
# Do not tear down Testcontainers | ||
TC_KEEPALIVE: true | ||
|
||
name: " | ||
Python ${{ matrix.python-version }}, | ||
LocalStack ${{ matrix.localstack-version }}, | ||
OS ${{ matrix.os }} | ||
" | ||
steps: | ||
|
||
- name: Acquire sources | ||
uses: actions/checkout@v4 | ||
|
||
- name: Set up Python | ||
uses: actions/setup-python@v5 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
architecture: x64 | ||
cache: 'pip' | ||
cache-dependency-path: 'pyproject.toml' | ||
|
||
- name: Set up project | ||
run: | | ||
|
||
# `setuptools 0.64.0` adds support for editable install hooks (PEP 660). | ||
# https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400 | ||
pip install "setuptools>=64" --upgrade | ||
|
||
# Install package in editable mode. | ||
pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop] | ||
|
||
- name: Run linter and software tests | ||
run: | | ||
pytest -m dynamodb | ||
|
||
- name: Upload coverage to Codecov | ||
uses: codecov/codecov-action@v4 | ||
env: | ||
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} | ||
with: | ||
files: ./coverage.xml | ||
flags: dynamodb | ||
env_vars: OS,PYTHON | ||
name: codecov-umbrella | ||
fail_ci_if_error: false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import boto3 | ||
from yarl import URL | ||
|
||
|
||
class DynamoDBAdapter: | ||
def __init__(self, dynamodb_url: URL, echo: bool = False): | ||
self.session = boto3.Session( | ||
aws_access_key_id=dynamodb_url.user, | ||
aws_secret_access_key=dynamodb_url.password, | ||
region_name=dynamodb_url.query.get("region"), | ||
) | ||
endpoint_url = None | ||
if dynamodb_url.host and dynamodb_url.host.lower() != "aws": | ||
endpoint_url = f"http://{dynamodb_url.host}:{dynamodb_url.port}" | ||
self.dynamodb_resource = self.session.resource("dynamodb", endpoint_url=endpoint_url) | ||
self.dynamodb_client = self.session.client("dynamodb", endpoint_url=endpoint_url) | ||
|
||
def scan(self, table_name: str): | ||
""" | ||
Return all items from DynamoDB table. | ||
""" | ||
return self.dynamodb_client.scan(TableName=table_name) | ||
|
||
def count_records(self, table_name: str): | ||
table = self.dynamodb_resource.Table(table_name) | ||
return table.item_count |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
import logging | ||
|
||
from cratedb_toolkit.io.dynamodb.copy import DynamoDBFullLoad | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def dynamodb_copy(source_url, target_url, progress: bool = False): | ||
""" | ||
|
||
Synopsis | ||
-------- | ||
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo | ||
ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@localhost:4566/us-east-1/ProductCatalog | ||
ctk load table dynamodb://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@localhost:4566/arn:aws:dynamodb:us-east-1:000000000000:table/ProductCatalog | ||
|
||
ctk load table dynamodb://arn:aws:dynamodb:us-east-1:000000000000:table/ProductCatalog | ||
arn:aws:dynamodb:us-east-1:841394475918:table/stream-demo | ||
|
||
ctk load table dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/ProductCatalog?region=eu-central-1 | ||
|
||
Resources | ||
--------- | ||
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html | ||
|
||
Backlog | ||
------- | ||
Currently, it is not directly possible to address DynamoDB tables by ARN, i.e. for using a different AccountID. | ||
- https://github.com/boto/boto3/issues/2658 | ||
- https://stackoverflow.com/questions/71019941/how-to-point-to-the-arn-of-a-dynamodb-table-instead-of-using-the-name-when-using | ||
- https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/configure-cross-account-access-to-amazon-dynamodb.html | ||
""" | ||
logger.info("Invoking DynamoDBFullLoad") | ||
ddb_full = DynamoDBFullLoad( | ||
dynamodb_url=source_url, | ||
cratedb_url=target_url, | ||
progress=progress, | ||
) | ||
ddb_full.start() | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# DynamoDB Backlog | ||
|
||
## Iteration +1 | ||
- Pagination / Batch Getting. | ||
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html#programming-with-python-pagination | ||
|
||
- Use `batch_get_item`. | ||
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_get_item.html | ||
|
||
- Scan by query instead of full. | ||
|
||
|
||
## Iteration +2 | ||
|
||
### Resumption on errors? | ||
Another variant to scan the table, probably for resuming on errors? | ||
```python | ||
key = None | ||
while True: | ||
if key is None: | ||
response = table.scan() | ||
else: | ||
response = table.scan(ExclusiveStartKey=key) | ||
key = response.get("LastEvaluatedKey", None) | ||
``` | ||
|
||
### Item transformations? | ||
That's another item transformation idea picked up from an example program. | ||
Please advise if this is sensible in all situations, or if it's just a | ||
special case. | ||
|
||
```python | ||
if 'id' in item and not isinstance(item['id'], str): | ||
item['id'] = str(item['id']) | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
# ruff: noqa: S608 | ||
import logging | ||
import typing as t | ||
|
||
import sqlalchemy as sa | ||
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB | ||
from tqdm import tqdm | ||
from yarl import URL | ||
|
||
from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter | ||
from cratedb_toolkit.model import DatabaseAddress | ||
from cratedb_toolkit.util import DatabaseAdapter | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class DynamoDBFullLoad: | ||
""" | ||
Copy DynamoDB table into CrateDB table. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
dynamodb_url: str, | ||
cratedb_url: str, | ||
progress: bool = False, | ||
): | ||
cratedb_address = DatabaseAddress.from_string(cratedb_url) | ||
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() | ||
cratedb_table = cratedb_table_address.fullname | ||
|
||
self.dynamodb_url = URL(dynamodb_url) | ||
self.dynamodb_adapter = DynamoDBAdapter(self.dynamodb_url) | ||
self.dynamodb_table = self.dynamodb_url.path.lstrip("/") | ||
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) | ||
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) | ||
self.translator = DynamoDBCrateDBTranslator(table_name=self.cratedb_table) | ||
|
||
self.progress = progress | ||
|
||
def start(self): | ||
""" | ||
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB. | ||
""" | ||
records_in = self.dynamodb_adapter.count_records(self.dynamodb_table) | ||
logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}") | ||
with self.cratedb_adapter.engine.connect() as connection: | ||
if not self.cratedb_adapter.table_exists(self.cratedb_table): | ||
connection.execute(sa.text(self.translator.sql_ddl)) | ||
connection.commit() | ||
records_target = self.cratedb_adapter.count_records(self.cratedb_table) | ||
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") | ||
progress_bar = tqdm(total=records_in) | ||
result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table) | ||
records_out = 0 | ||
for sql in self.items_to_sql(result["Items"]): | ||
if sql: | ||
try: | ||
connection.execute(sa.text(sql)) | ||
records_out += 1 | ||
except sa.exc.ProgrammingError as ex: | ||
logger.warning(f"Running query failed: {ex}") | ||
progress_bar.update() | ||
progress_bar.close() | ||
connection.commit() | ||
logger.info(f"Number of records written: {records_out}") | ||
if records_out < records_in: | ||
logger.warning("No data has been copied") | ||
|
||
def items_to_sql(self, items): | ||
""" | ||
Convert data for record items to INSERT statements. | ||
""" | ||
for item in items: | ||
yield self.translator.to_sql(item) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's another item transformation idea picked up from an example program. Please advise if this is sensible in all situations, or if it's just a special case. if 'id' in item and not isinstance(item['id'], str):
item['id'] = str(item['id']) /cc @wierdvanderhaar |
||
|
||
|
||
class DynamoDBCrateDBTranslator(DynamoCDCTranslatorCrateDB): | ||
@property | ||
def sql_ddl(self): | ||
"""` | ||
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. | ||
""" | ||
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" | ||
|
||
def to_sql(self, record: t.Dict[str, t.Any]) -> str: | ||
""" | ||
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record. | ||
""" | ||
values_clause = self.image_to_values(record) | ||
sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES ('{values_clause}');" | ||
return sql | ||
|
||
@staticmethod | ||
def quote_table_name(name: str): | ||
# TODO @ Upstream: Quoting table names should be the responsibility of the caller. | ||
return name |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
# not use this file except in compliance with the License. You may obtain | ||
# a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
# License for the specific language governing permissions and limitations | ||
# under the License. | ||
import os | ||
|
||
from testcontainers.localstack import LocalStackContainer | ||
|
||
from cratedb_toolkit.testing.testcontainers.util import KeepaliveContainer | ||
|
||
|
||
class LocalStackContainerWithKeepalive(KeepaliveContainer, LocalStackContainer): | ||
""" | ||
A Testcontainer for LocalStack with improved configurability. | ||
|
||
It honors the `TC_KEEPALIVE` and `LOCALSTACK_VERSION` environment variables. | ||
|
||
Defining `TC_KEEPALIVE` will set a signal not to shut down the container | ||
after running the test cases, in order to speed up subsequent invocations. | ||
|
||
`LOCALSTACK_VERSION` will define the designated LocalStack version, which is | ||
useful when used within a test matrix. Its default value is `latest`. | ||
""" | ||
|
||
LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "latest") | ||
|
||
def __init__( | ||
self, | ||
image: str = f"localstack/localstack:{LOCALSTACK_VERSION}", | ||
**kwargs, | ||
) -> None: | ||
super().__init__(image=image, **kwargs) | ||
self.with_name("testcontainers-localstack") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another variant to scan the table, maybe for resuming on errors?
/cc @wierdvanderhaar