Skip to content

Commit

Permalink
Rerun Post Bundle Snapshots and log results of reprocesses (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertmitchellv authored Dec 4, 2023
1 parent 05272c9 commit cf4c85a
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ hs_err_pid*
.venv
.dev_venv
.python-version
.ipynb_checkpoints

# Azure Functions
*local.settings.json
Expand Down
101 changes: 101 additions & 0 deletions scripts/Synapse/ReRunECRfromPostBundle.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "962a47d0-b2c4-4516-84c0-b947280645fe",
"metadata": {},
"outputs": [],
"source": [
"import datetime\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import lit, current_timestamp\n",
"from notebookutils import mssparkutils\n",
"\n",
"spark = SparkSession.builder.getOrCreate()\n",
"\n",
"# source and destination paths\n",
"storage_account = \"$STORAGE_ACCOUNT\"\n",
"ecr_post_bundle_file_path = f\"abfss://bundle-snapshots@{storage_account}.dfs.core.windows.net/post/ecr\"\n",
"ecr_rerun_file_path = f\"abfss://source-data@{storage_account}.dfs.core.windows.net//ecr-rerun\"\n",
"\n",
"# parquet log file: timestamp, filename, and destination path\n",
"timestamp_str = datetime.datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
"parquet_file_name = f\"copied_files_log_{timestamp_str}.parquet\"\n",
"delta_tables = f\"abfss://delta-tables@{storage_account}.dfs.core.windows.net/\"\n",
"parquet_file_path = f\"{delta_tables}/ecr-rerun-logs/{parquet_file_name}\"\n",
"\n",
"# dataframe to track moved files\n",
"copied_files_log = spark.createDataFrame([], schema=\"filename string, source_path string, dest_path string, timestamp string, file_exists_skip boolean, success boolean\")\n",
"\n",
"# outer try/except for acessing list of file\n",
"# inner try/except for issues copying files and marking success or failure\n",
"try:\n",
" # get list of files\n",
" files = mssparkutils.fs.ls(ecr_post_bundle_file_path)\n",
"\n",
" for file in files:\n",
" # initialize 'success' flag\n",
" success = True\n",
" try:\n",
" src_path = file.path\n",
" dest_path = f\"{ecr_rerun_file_path}/{file.name}\"\n",
"\n",
" # capture the timestamp before copying the file\n",
" copy_timestamp = datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n",
"\n",
" # check if the file exists\n",
" file_exists = mssparkutils.fs.exists(dest_path)\n",
"\n",
" # copy the files if it doesn't exist\n",
" if not file_exists:\n",
" mssparkutils.fs.cp(src=src_path, dest=dest_path)\n",
" else:\n",
" # if the file already exists, set 'success' to false\n",
" success = False\n",
"\n",
" except Exception as e:\n",
" # if there's an error copying, set 'success' to false\n",
" success = False\n",
" print(f\"Error copying file {file.name}: {str(e)}\")\n",
"\n",
" # log the file copy\n",
" new_row = spark.createDataFrame([(file.name, src_path, dest_path, copy_timestamp, file_exists, success)])\n",
" copied_files_log = copied_files_log.union(new_row)\n",
"\n",
"except Exception as e:\n",
" print(f\"Error retrieving file list: {str(e)}\")\n",
" \n",
"# add current timestamp\n",
"copied_files_log = copied_files_log.withColumn(\"log_timestamp\", current_timestamp())\n",
"\n",
"# write log to parquet\n",
"copied_files_log.write.mode(\"append\").parquet(parquet_file_path)\n",
" \n",
"# inspect log of moved files\n",
"copied_files_log.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

0 comments on commit cf4c85a

Please sign in to comment.