Skip to content

Commit

Permalink
Revert "Adjust the query to keep the destination rather than replace" (
Browse files Browse the repository at this point in the history
…#1014)

* Revert "Adjust the query to keep the destination rather than replace. (#1013)"

This reverts commit 836a6a2.

* Add a check to ensure schemas haven't changed.

Signed-off-by: Caleb Brown <calebbrown@google.com>

---------

Signed-off-by: Caleb Brown <calebbrown@google.com>
  • Loading branch information
calebbrown authored Feb 16, 2024
1 parent c19777b commit ec812cd
Showing 1 changed file with 43 additions and 15 deletions.
58 changes: 43 additions & 15 deletions scripts/bq_load.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ for bucket_prefix in `gsutil ls "$RESULT_BUCKET"`; do
--max_bad_records=10000 \
"$table_name" "$bucket_prefix*" "$SCHEMA_FILE"

if [ $? -eq 0 ]; then
echo "Loading \`$PROJECT_ID.$LOAD_DATASET.$table_name\` done."
else
echo "Loading \`$PROJECT_ID.$LOAD_DATASET.$table_name\` failed. Aborting"
exit 1
fi

# Construct a UNION query for joining the prefix shards together
subquery="SELECT * FROM \`$PROJECT_ID.$LOAD_DATASET.$table_name\`"
if [ -n "$union" ]; then
Expand All @@ -62,23 +69,44 @@ for bucket_prefix in `gsutil ls "$RESULT_BUCKET"`; do
union="$union$subquery"
done

# Query to check the destination table schema has not changed.
schema_check_query="
WITH unique_schemas AS (SELECT
TO_JSON_STRING(
ARRAY_AGG(STRUCT(
IF(is_nullable = 'YES', 'NULLABLE', 'REQUIRED') AS mode, column_name AS name, data_type AS type)
ORDER BY ordinal_position), TRUE) AS schema
FROM \`$PROJECT_ID.$DEST_DATASET\`.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = \"$DEST_TABLE\"
UNION DISTINCT
SELECT
TO_JSON_STRING(
ARRAY_AGG(STRUCT(
IF(is_nullable = 'YES', 'NULLABLE', 'REQUIRED') AS mode, column_name AS name, data_type AS type)
ORDER BY ordinal_position), TRUE) AS schema
FROM \`$PROJECT_ID.$LOAD_DATASET\`.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = \"$table_name\")
SELECT IF(c = 1, \"OK\", ERROR(\"Schemas have changed.\")) FROM (SELECT COUNT(*) AS c FROM unique_schemas);"

echo "## Checking \`$PROJECT_ID.$DEST_DATASET.$DEST_TABLE\` schema."
echo "Executing query: '$schema_check_query'"

bq query --headless --nouse_legacy_sql --project_id="$PROJECT_ID" "$schema_check_query"

# Ensure we abort if the exit code was not successful.
if [ $? -eq 0 ]; then
echo "Schemas match. Continuing."
else
echo "Schemas do not match. Aborting."
exit 1
fi

# Query to populate the destination table.
#
# If the table does not exist it will be created. Keeping the table ensures
# that breaking schema changes are not accidentally propagated to the
# destination table.
#
# A transaction is used to keep the update atomic. It will also rollback the
# TRUNCATE if the INSERT fails, such as when the schema has changed.
query="
CREATE TABLE IF NOT EXISTS \`$PROJECT_ID.$DEST_DATASET.$DEST_TABLE\` LIKE \`$PROJECT_ID.$LOAD_DATASET.$table_name\`
PARTITION BY TIMESTAMP_TRUNC(CreatedTimestamp, DAY) OPTIONS(expiration_timestamp=NULL);
BEGIN TRANSACTION;
TRUNCATE TABLE \`$PROJECT_ID.$DEST_DATASET.$DEST_TABLE\`;
INSERT INTO \`$PROJECT_ID.$DEST_DATASET.$DEST_TABLE\` $union;
COMMIT TRANSACTION;"
# If the table exists it will be replaced.
replace_query="CREATE OR REPLACE TABLE \`$PROJECT_ID.$DEST_DATASET.$DEST_TABLE\` LIKE \`$PROJECT_ID.$LOAD_DATASET.$table_name\` PARTITION BY TIMESTAMP_TRUNC(CreatedTimestamp, DAY) OPTIONS(expiration_timestamp=NULL) AS $union;"

echo "## Updating \`$PROJECT_ID.$DEST_DATASET.$DEST_TABLE\` from shards."
echo "Executing query: '$query'"
echo "Executing query: '$replace_query'"

bq query --headless --nouse_legacy_sql --project_id="$PROJECT_ID" "$query"
bq query --headless --nouse_legacy_sql --project_id="$PROJECT_ID" "$replace_query"

0 comments on commit ec812cd

Please sign in to comment.