Skip to content

Commit

Permalink
Merge pull request #1 from wayfair-incubator/first_pass
Browse files Browse the repository at this point in the history
First pass
  • Loading branch information
matsmaiwald committed Oct 22, 2021
2 parents abc4c56 + b9bb089 commit aa3e293
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 7 deletions.
4 changes: 3 additions & 1 deletion avro_to_bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
__version__ = "0.1.0"
__version__ = "0.2.0"

from avro_to_bigquery.converter import convert_schema # noqa
129 changes: 129 additions & 0 deletions avro_to_bigquery/converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from google.cloud import bigquery # type: ignore

AVRO_TO_BIGQUERY_TYPES = {
"record": "RECORD",
"string": "STRING",
"int": "INTEGER",
"boolean": "BOOL",
"double": "FLOAT",
"float": "FLOAT",
"long": "INT64",
"bytes": "BYTES",
"enum": "STRING",
# logical types
"decimal": "FLOAT",
"uuid": "STRING",
"date": "DATE",
"time-millis": "TIME",
"time-micros": "TIME",
"timestamp-millis": "TIMESTAMP",
"timestamp-micros": "TIMESTAMP",
}


def _convert_type(avro_type):
"""
Convert an Avro type to a BigQuery type
:param avro_type: The Avro type
:return: The BigQuery type
"""
mode = "NULLABLE"
fields = ()

if isinstance(avro_type, list):
# list types are unions, one of them should be null; get the real type
if len(avro_type) == 2:
if avro_type[0] == "null":
avro_type = avro_type[1]
elif avro_type[1] == "null":
avro_type = avro_type[0]
else:
raise ReferenceError(
"One of the union fields should have type `null`"
)
else:
raise ReferenceError(
"A Union type can only consist of two types, "
"one of them should be `null`"
)

if isinstance(avro_type, dict):
field_type, fields, mode = _convert_complex_type(avro_type)

else:
field_type = AVRO_TO_BIGQUERY_TYPES[avro_type]

return field_type, mode, fields


def _convert_complex_type(avro_type):
"""
Convert a Avro complex type to a BigQuery type
:param avro_type: The Avro type
:return: The BigQuery type
"""
fields = ()
mode = "NULLABLE"

if avro_type["type"] == "record":
field_type = "RECORD"
fields = tuple(map(lambda f: _convert_field(f), avro_type["fields"]))
elif avro_type["type"] == "array":
mode = "REPEATED"
if "logicalType" in avro_type["items"]:
field_type = AVRO_TO_BIGQUERY_TYPES[
avro_type["items"]["logicalType"]
]
elif isinstance(avro_type["items"], dict):
# complex array
if avro_type["items"]["type"] == "enum":
field_type = AVRO_TO_BIGQUERY_TYPES[avro_type["items"]["type"]]
else:
field_type = "RECORD"
fields = tuple(
map(
lambda f: _convert_field(f),
avro_type["items"]["fields"],
)
)
else:
# simple array
field_type = AVRO_TO_BIGQUERY_TYPES[avro_type["items"]]
elif avro_type["type"] == "enum":
field_type = AVRO_TO_BIGQUERY_TYPES[avro_type["type"]]
elif "logicalType" in avro_type:
field_type = AVRO_TO_BIGQUERY_TYPES[avro_type["logicalType"]]
else:
raise ReferenceError(f"Unknown complex type {avro_type['type']}")
return field_type, fields, mode


def _convert_field(avro_field):
"""
Convert an Avro field to a BigQuery field
:param avro_field: The Avro field
:return: The BigQuery field
"""

if "logicalType" in avro_field:
field_type, mode, fields = _convert_type(avro_field["logicalType"])
else:
field_type, mode, fields = _convert_type(avro_field["type"])

return bigquery.SchemaField(
avro_field.get("name"),
field_type,
mode=mode,
description=avro_field.get("doc"),
fields=fields,
)


def convert_schema(avro_schema: dict):
"""
Convert an Avro schema to a BigQuery schema
:param avro_schema: The Avro schema
:return: The BigQuery schema
"""

return tuple(map(lambda f: _convert_field(f), avro_schema["fields"]))
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.black]
line-length = 88
line-length = 80
target_version = ['py38']
exclude = '''
/(
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
google-cloud-bigquery==2.27.1
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ classifiers =
python_requires = >=3.6
install_requires =
typing-extensions~=3.7;python_version<"3.8"
google-cloud-bigquery==2.27.1
packages = find:
[options.package_data]
Expand Down
5 changes: 0 additions & 5 deletions tests/unit/test_basic.py

This file was deleted.

185 changes: 185 additions & 0 deletions tests/unit/test_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import pytest

from avro_to_bigquery import convert_schema


def test_convert_avro_schema_to_bigquery_schema():
# arrange
avs = {
"type": "record",
"name": "Test_Schema",
"namespace": "example.avro",
"fields": [
{"name": "full_name", "type": "string", "doc": "The name"},
{"name": "age", "type": "int", "doc": "The age"},
{
"name": "check",
"type": "boolean",
"doc": "Just a boolean tester",
},
{"name": "length", "type": "float", "doc": "The length"},
{"name": "shoe_size", "type": "long", "doc": "The shoe size"},
{"name": "image", "type": "bytes"},
{"name": "complex", "type": ["double", "null"]},
{"name": "size", "type": "double"},
{
"name": "sub",
"type": {
"type": "record",
"name": "sub",
"fields": [
{"name": "sub1", "type": "long"},
{"name": "sub2", "type": "int"},
{
"name": "sub_sub",
"type": {
"type": "record",
"name": "sub_sub",
"fields": [
{"name": "sub_sub1", "type": "boolean"},
{"name": "sub_sub2", "type": "float"},
],
},
},
],
},
},
{
"name": "dimensions",
"type": {
"type": "array",
"default": [],
"items": {
"type": "record",
"name": "dimensions",
"fields": [
{"name": "width", "type": "double"},
{"name": "length", "type": "double"},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis",
},
},
],
},
},
},
{
"name": "birthdate",
"type": {"type": "int", "logicalType": "date"},
},
{
"name": "timestamp",
"type": {"type": "long", "logicalType": "time-micros"},
},
{
"name": "nullable array",
"doc": "A test array",
"type": ["null", {"type": "array", "items": "int"}],
},
{
"name": "logical type array",
"type": [
"null",
{
"type": "array",
"items": {"type": "int", "logicalType": "date"},
},
],
},
{
"name": "enumeration",
"type": [
"null",
{
"type": "enum",
"name": "enum-test-field",
"symbols": ["Test1", "Test2", "Test3"],
},
],
},
],
}

# act
s = convert_schema(avs)

# assert
assert len(s) == 15
assert s[0].name == "full_name"
assert s[1].field_type == "INTEGER"
assert s[2].description == "Just a boolean tester"
assert s[3].mode == "NULLABLE"
assert s[4].name == "shoe_size"
assert s[5].description is None
assert s[6].field_type == "FLOAT"
assert s[7].field_type == "FLOAT"
assert s[8].field_type == "RECORD"
assert s[8].fields[0].name == "sub1"
assert s[8].fields[2].fields[1].name == "sub_sub2"
assert s[9].field_type == "RECORD"
assert s[9].mode == "REPEATED"
assert s[9].fields[0].field_type == "FLOAT"
assert s[9].fields[2].field_type == "TIMESTAMP"
assert s[10].field_type == "DATE"
assert s[11].field_type == "TIME"
assert s[12].name == "nullable array"
assert s[12].field_type == "INTEGER"
assert s[12].mode == "REPEATED"
assert s[13].name == "logical type array"
assert s[13].field_type == "DATE"
assert s[13].mode == "REPEATED"
assert s[14].field_type == "STRING"


def test_incorrect_nullable_field():
# arrange
avs = {
"type": "record",
"name": "Test_Schema",
"namespace": "example.avro",
"fields": [
{
"name": "test1",
"type": ["double", "null"],
"doc": "Correct nullable field",
},
{
"name": "test2",
"type": ["double", "string"],
"doc": "Incorrect nullable field",
},
],
}

# act
with pytest.raises(ReferenceError) as re:
convert_schema(avs)

# assert
assert "One of the union fields should have type `null`" in re.value


def test_incorrect_field_type():
# arrange
avs = {
"type": "record",
"name": "Test_Schema",
"namespace": "example.avro",
"fields": [
{"name": "test1", "type": "string", "doc": "Correct field type"},
{
"name": "test2",
"type": "something_useless",
"doc": "Incorrect field type",
},
],
}

# act
with pytest.raises(KeyError):

# assert
convert_schema(avs)

0 comments on commit aa3e293

Please sign in to comment.