Skip to content

Commit

Permalink
Add BulkResponse wrapper for improved decoding of HTTP bulk responses
Browse files Browse the repository at this point in the history
CrateDB HTTP bulk responses include `rowcount=` items, either signalling
if a bulk operation succeeded or failed.

- success means `rowcount=1`
- failure means `rowcount=-2`

https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
  • Loading branch information
amotl committed Oct 3, 2024
1 parent 7cb2c68 commit 433dfdf
Show file tree
Hide file tree
Showing 5 changed files with 427 additions and 284 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Unreleased
"Threads may share the module, but not connections."
- Added ``error_trace`` to string representation of an Error to relay
server stacktraces into exception messages.
- Added ``BulkResponse`` wrapper for improved decoding of CrateDB HTTP bulk
responses including ``rowcount=`` items.

.. _Migrate from crate.client to sqlalchemy-cratedb: https://cratedb.com/docs/sqlalchemy-cratedb/migrate-from-crate-client.html
.. _sqlalchemy-cratedb: https://pypi.org/project/sqlalchemy-cratedb/
Expand Down
66 changes: 66 additions & 0 deletions src/crate/client/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import typing as t
from functools import cached_property


class BulkResultItem(t.TypedDict):
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int


class BulkResponse:
"""
Manage CrateDB bulk request responses.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
"""

def __init__(
self,
records: t.Union[t.Iterable[t.Dict[str, t.Any]], None],
results: t.Union[t.Iterable[BulkResultItem], None]):
self.records = records
self.results = results

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
"""
Compute list of failed records.
CrateDB signals failed inserts using `rowcount=-2`.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
if self.records is None or self.results is None:
return []

Check warning on line 38 in src/crate/client/result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/result.py#L38

Added line #L38 was not covered by tests
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.records, self.results):
if status["rowcount"] == -2:
errors.append(record)
return errors

@cached_property
def record_count(self) -> int:
"""
Compute bulk size / length of parameter list.
"""
if not self.records:
return 0

Check warning on line 51 in src/crate/client/result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/result.py#L51

Added line #L51 was not covered by tests
return len(self.records)

@cached_property
def success_count(self) -> int:
"""
Compute number of succeeding records within a batch.
"""
return self.record_count - self.failed_count

@cached_property
def failed_count(self) -> int:
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)
70 changes: 70 additions & 0 deletions src/crate/client/test_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import sys
import unittest

from crate import client
from crate.client.exceptions import ProgrammingError
from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline
from crate.testing.settings import crate_host


class BulkOperationTest(unittest.TestCase):

def setUp(self):
setUpCrateLayerBaseline(self)

def tearDown(self):
tearDownDropEntitiesBaseline(self)

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_with_bulk_response_partial(self):

# Import at runtime is on purpose, to permit skipping the test case.
from crate.client.result import BulkResponse

connection = client.connect(crate_host)
cursor = connection.cursor()

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

# Run a batch insert that only partially succeeds.
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")]
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records)

# Verify CrateDB response.
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}])

# Verify decoded response.
bulk_response = BulkResponse(invalid_records, result)
self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")])
self.assertEqual(bulk_response.record_count, 2)
self.assertEqual(bulk_response.success_count, 1)
self.assertEqual(bulk_response.failed_count, 1)

cursor.execute("REFRESH TABLE foobar;")
cursor.execute("SELECT * FROM foobar;")
result = cursor.fetchall()
self.assertEqual(result, [[1, "Hotzenplotz 1"]])

cursor.close()
connection.close()

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_empty(self):

connection = client.connect(crate_host)
cursor = connection.cursor()

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

# Run a batch insert that is empty.
with self.assertRaises(ProgrammingError) as cm:
cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", [])
self.assertEqual(
cm.exception.message,
"SQLParseException[The query contains a parameter placeholder $1, "
"but there are only 0 parameter values]")

cursor.close()
connection.close()
Loading

0 comments on commit 433dfdf

Please sign in to comment.