Skip to content

Test INSPECT, DEID & REID #698

Test INSPECT, DEID & REID

Test INSPECT, DEID & REID #698

Workflow file for this run

name: Test INSPECT, DEID & REID
on:
pull_request:
types: [ opened, reopened, synchronize ]
schedule:
- cron: '30 9 * * *'
workflow_dispatch:
env:
PROJECT_ID: "dlp-dataflow-deid-ci-392604"
DATASET_ID: "demo_dataset"
PARQUET_DATASET_ID: "parquet_results"
ORC_DATASET_ID: "orc_results"
REGION: "us-central1"
GCS_BUCKET: "dlp-dataflow-deid-ci-392604-demo-data"
GCS_NOTIFICATION_TOPIC: "projects/dlp-dataflow-deid-ci-392604/topics/dlp-dataflow-deid-ci-gcs-notification-topic"
SAMPLE_DATA_DIR: "sample_data_for_ci_workflow"
INPUT_FILE_NAME: "tiny_csv"
INPUT_STREAMING_WRITE_FILE_NAME: "streaming_write"
INPUT_PARQUET_FILE_NAME: "tiny_parquet"
INPUT_ORC_FILE_NAME: "tiny_orc"
INSPECT_TEMPLATE_PATH: "projects/dlp-dataflow-deid-ci-392604/locations/global/inspectTemplates/dlp-demo-inspect-latest-1689137435622"
DEID_TEMPLATE_PATH: "projects/dlp-dataflow-deid-ci-392604/locations/global/deidentifyTemplates/dlp-demo-deid-latest-1689137435622"
PARQUET_DEID_TEMPLATE_PATH: "projects/dlp-dataflow-deid-ci-392604/locations/global/deidentifyTemplates/parquet-dlp-demo-deid-latest-1689137435622"
REID_TEMPLATE_PATH: "projects/dlp-dataflow-deid-ci-392604/locations/global/deidentifyTemplates/dlp-demo-reid-latest-1689137435622"
SERVICE_ACCOUNT_EMAIL: "demo-service-account@dlp-dataflow-deid-ci-392604.iam.gserviceaccount.com"
PUBSUB_TOPIC_NAME: "demo-topic"
INSPECTION_TABLE_ID: "dlp_inspection_result"
NUM_INSPECTION_RECORDS_THRESHOLD: "50"
PARQUET_INSPECTION_RECORDS_THRESHOLD: "30"
ORC_INSPECTION_RECORDS_THRESHOLD: "60"
REIDENTIFICATION_QUERY_FILE: "reid_query.sql"
OUTPUT_GCS_BUCKET: "deidentified_files"
ORC_DEID_RECORDS_COUNT: "20"
jobs:
build-and-unit-test:
permissions:
pull-requests: write
contents: write
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Java
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: 17
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run gradle build
run: ./gradlew clean build
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
generate-uuid:
needs: build-and-unit-test
runs-on: ubuntu-latest
timeout-minutes: 5
outputs:
output1: ${{ steps.gen-uuid.outputs.uuid }}
output2: ${{ steps.gen-uuid.outputs.inspect_job_name }}
output3: ${{ steps.gen-uuid.outputs.deid_job_name }}
output4: ${{ steps.gen-uuid.outputs.reid_job_name }}
output5: ${{ steps.gen-uuid.outputs.dataset_id }}
output6: ${{ steps.gen-uuid.outputs.parquet_inspect_job_name }}
output7: ${{ steps.gen-uuid.outputs.parquet_deid_job_name }}
output8: ${{ steps.gen-uuid.outputs.parquet_dataset_id }}
output9: ${{ steps.gen-uuid.outputs.output_gcs_bucket }}
output10: ${{ steps.gen-uuid.outputs.orc_inspect_job_name }}
output11: ${{ steps.gen-uuid.outputs.orc_deid_bq_job_name }}
output12: ${{ steps.gen-uuid.outputs.orc_dataset_id }}
output13: ${{ steps.gen-uuid.outputs.orc_deid_gcs_job_name }}
steps:
- name: Generate UUID for workflow
id: gen-uuid
run: |
new_uuid=$(uuidgen)
modified_uuid=$(echo "$new_uuid" | tr '-' '_')
echo "uuid=$new_uuid" >> "$GITHUB_OUTPUT"
echo "inspect_job_name=inspect-$new_uuid" >> "$GITHUB_OUTPUT"
echo "deid_job_name=deid-$new_uuid" >> "$GITHUB_OUTPUT"
echo "reid_job_name=reid-$new_uuid" >> "$GITHUB_OUTPUT"
echo "dataset_id=${{ env.DATASET_ID }}_$modified_uuid" >> "$GITHUB_OUTPUT"
echo "parquet_inspect_job_name=parquet-inspect-$new_uuid" >> "$GITHUB_OUTPUT"
echo "parquet_deid_job_name=parquet-deid-$new_uuid" >> "$GITHUB_OUTPUT"
echo "parquet_dataset_id=${{ env.PARQUET_DATASET_ID }}_$modified_uuid" >> "$GITHUB_OUTPUT"
echo "output_gcs_bucket=${{ env.OUTPUT_GCS_BUCKET }}_$modified_uuid" >> "$GITHUB_OUTPUT"
echo "orc_inspect_job_name=orc-inspect-$new_uuid" >> "$GITHUB_OUTPUT"
echo "orc_deid_bq_job_name=orc-deid-bq-$new_uuid" >> "$GITHUB_OUTPUT"
echo "orc_dataset_id=${{ env.ORC_DATASET_ID }}_$modified_uuid" >> "$GITHUB_OUTPUT"
echo "orc_deid_gcs_job_name=orc-deid-gcs-$new_uuid" >> "$GITHUB_OUTPUT"
create-dataset:
needs:
- generate-uuid
runs-on:
- self-hosted
- CI
timeout-minutes: 5
steps:
- name: Create BQ dataset
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
PARQUET_DATASET_ID: ${{ needs.generate-uuid.outputs.output8 }}
ORC_DATASET_ID: ${{ needs.generate-uuid.outputs.output12 }}
run: |
bq --location=US mk -d --description "GitHub CI workflow dataset" ${{ env.DATASET_ID }}
bq --location=US mk -d --description "GitHub CI workflow dataset to store parquet results" ${{ env.PARQUET_DATASET_ID }}
bq --location=US mk -d --description "GitHub CI workflow dataset to store orc results" ${{ env.ORC_DATASET_ID }}
create-output-bucket:
needs:
- generate-uuid
runs-on:
- self-hosted
- CI
timeout-minutes: 5
steps:
- name: Create Output GCS Bucket
env:
OUTPUT_GCS_BUCKET: ${{ needs.generate-uuid.outputs.output9 }}
run: |
gcloud storage buckets create gs://${{ env.OUTPUT_GCS_BUCKET }} --location=${{env.REGION}}
inspection:
needs:
- generate-uuid
- create-dataset
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: 17
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run DLP Pipeline
env:
INSPECT_JOB_NAME: ${{ needs.generate-uuid.outputs.output2 }}
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--streaming --enableStreamingEngine \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=1 \
--maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${{env.GCS_BUCKET}}/${{env.INPUT_FILE_NAME}}*.csv \
--dataset=${{env.DATASET_ID}} \
--workerMachineType=n1-highmem-4 \
--inspectTemplateName=${{env.INSPECT_TEMPLATE_PATH}} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}} \
--jobName=${{env.INSPECT_JOB_NAME}} \
--gcsNotificationTopic=${{env.GCS_NOTIFICATION_TOPIC}}"
sleep 30s
gsutil cp gs://${{env.GCS_BUCKET}}/temp/csv/tiny_csv_pub_sub.csv gs://${{env.GCS_BUCKET}}
- name: Verify BQ table
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
not_verified=true
table_count=0
while $not_verified; do
table_count=$(($(bq query --use_legacy_sql=false --format csv 'SELECT * FROM `${{env.PROJECT_ID}}.${{env.DATASET_ID}}`.__TABLES__ WHERE table_id="${{env.INSPECTION_TABLE_ID}}"' | wc -l ) -1))
if [[ "$table_count" == "1" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of tables in BQ with id ${{env.INSPECTION_TABLE_ID}}: $table_count ."
done
echo "Verified number of tables in BQ with id ${{env.INSPECTION_TABLE_ID}}: $table_count ."
- name: Verify distinct rows
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
not_verified=true
row_count=0
while $not_verified; do
row_count_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(*) FROM `${{env.PROJECT_ID}}.${{env.DATASET_ID}}.${{env.INSPECTION_TABLE_ID}}`')
row_count=$(echo "$row_count_json" | jq -r '.[].f0_')
if [[ "$row_count" -gt ${{env.NUM_INSPECTION_RECORDS_THRESHOLD}} ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of rows in ${{env.INSPECTION_TABLE_ID}}: $row_count."
done
echo "Verified number of rows in ${{env.INSPECTION_TABLE_ID}}: $row_count."
# Inspect only existing parquet files
inspect-parquet-data:
needs:
- generate-uuid
- create-dataset
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: 17
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run DLP Pipeline
env:
PARQUET_INSPECT_JOB_NAME: ${{ needs.generate-uuid.outputs.output6 }}
PARQUET_DATASET_ID: ${{ needs.generate-uuid.outputs.output8 }}
run: |
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${{env.GCS_BUCKET}}/${{env.SAMPLE_DATA_DIR}}/${{env.INPUT_PARQUET_FILE_NAME}}*.parquet \
--dataset=${{env.PARQUET_DATASET_ID}} \
--inspectTemplateName=${{env.INSPECT_TEMPLATE_PATH}} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}} \
--jobName=${{env.PARQUET_INSPECT_JOB_NAME}}"
- name: Verify BQ table
env:
PARQUET_DATASET_ID: ${{ needs.generate-uuid.outputs.output8 }}
run: |
not_verified=true
table_count=0
while $not_verified; do
table_count=$(($(bq query --use_legacy_sql=false --format csv 'SELECT * FROM `${{env.PROJECT_ID}}.${{env.PARQUET_DATASET_ID}}`.__TABLES__ WHERE table_id="${{env.INSPECTION_TABLE_ID}}"' | wc -l ) -1))
if [[ "$table_count" == "1" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of tables in BQ with id ${{env.INSPECTION_TABLE_ID}}: $table_count ."
done
echo "Verified number of tables in BQ with id ${{env.INSPECTION_TABLE_ID}}: $table_count ."
- name: Verify distinct rows
env:
PARQUET_DATASET_ID: ${{ needs.generate-uuid.outputs.output8 }}
run: |
not_verified=true
row_count=0
while $not_verified; do
row_count_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(*) FROM `${{env.PROJECT_ID}}.${{env.PARQUET_DATASET_ID}}.${{env.INSPECTION_TABLE_ID}}`')
row_count=$(echo "$row_count_json" | jq -r '.[].f0_')
if [[ "$row_count" == ${{env.PARQUET_INSPECTION_RECORDS_THRESHOLD}} ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of rows in ${{env.INSPECTION_TABLE_ID}}: $row_count."
done
echo "Verified number of rows in ${{env.INSPECTION_TABLE_ID}}: $row_count."
# Inspect only existing orc files
inspect-orc-data:
needs:
- generate-uuid
- create-dataset
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: 17
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run DLP Pipeline
env:
ORC_INSPECT_JOB_NAME: ${{ needs.generate-uuid.outputs.output10 }}
ORC_DATASET_ID: ${{ needs.generate-uuid.outputs.output12 }}
run: |
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${{env.GCS_BUCKET}}/${{env.SAMPLE_DATA_DIR}}/${{env.INPUT_ORC_FILE_NAME}}.orc \
--dataset=${{env.ORC_DATASET_ID}} \
--inspectTemplateName=${{env.INSPECT_TEMPLATE_PATH}} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}} \
--jobName=${{env.ORC_INSPECT_JOB_NAME}}"
- name: Verify BQ table
env:
ORC_DATASET_ID: ${{ needs.generate-uuid.outputs.output12 }}
run: |
not_verified=true
table_count=0
while $not_verified; do
table_count=$(($(bq query --use_legacy_sql=false --format csv 'SELECT * FROM `${{env.PROJECT_ID}}.${{env.ORC_DATASET_ID}}`.__TABLES__ WHERE table_id="${{env.INSPECTION_TABLE_ID}}"' | wc -l ) -1))
if [[ "$table_count" == "1" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of tables in ORC BQ dataset with id ${{env.INSPECTION_TABLE_ID}}: $table_count ."
done
echo "Verified number of tables in ORC BQ dataset with id ${{env.INSPECTION_TABLE_ID}}: $table_count ."
- name: Verify distinct rows
env:
ORC_DATASET_ID: ${{ needs.generate-uuid.outputs.output12 }}
run: |
not_verified=true
row_count=0
while $not_verified; do
row_count_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(*) FROM `${{env.PROJECT_ID}}.${{env.ORC_DATASET_ID}}.${{env.INSPECTION_TABLE_ID}}`')
row_count=$(echo "$row_count_json" | jq -r '.[].f0_')
if [[ "$row_count" == ${{env.ORC_INSPECTION_RECORDS_THRESHOLD}} ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of rows in ${{env.INSPECTION_TABLE_ID}}: $row_count."
done
echo "Verified number of rows in ${{env.INSPECTION_TABLE_ID}}: $row_count."
de-identification:
needs:
- generate-uuid
- create-dataset
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: 17
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run DLP Pipeline
env:
DEID_JOB_NAME: ${{ needs.generate-uuid.outputs.output3 }}
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--streaming --enableStreamingEngine \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=2 \
--maxNumWorkers=3 \
--runner=DataflowRunner \
--filePattern=gs://${{env.GCS_BUCKET}}/${{env.INPUT_FILE_NAME}}*.csv \
--dataset=${{env.DATASET_ID}} \
--workerMachineType=n1-highmem-4 \
--inspectTemplateName=${{env.INSPECT_TEMPLATE_PATH}} \
--deidentifyTemplateName=${{env.DEID_TEMPLATE_PATH}} \
--batchSize=200000 \
--DLPMethod=DEID \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}} \
--jobName=${{env.DEID_JOB_NAME}} \
--gcsNotificationTopic=${{env.GCS_NOTIFICATION_TOPIC}}"
sleep 30s
if gsutil stat gs://${{env.GCS_BUCKET}}/${{env.INPUT_FILE_NAME}}_pub_sub.csv; then
echo "Pub Sub CSV File exists hence need not copy anymore"
else
gsutil cp gs://${{env.GCS_BUCKET}}/temp/csv/tiny_csv_pub_sub.csv gs://${{env.GCS_BUCKET}}
fi
- name: Verify BQ tables
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
not_verified=true
table_count=0
while $not_verified; do
table_count=$(($(bq query --use_legacy_sql=false --format csv 'SELECT * FROM `${{env.PROJECT_ID}}.${{env.DATASET_ID}}`.__TABLES__ WHERE table_id LIKE "${{env.INPUT_FILE_NAME}}%"' | wc -l ) -1))
if [[ "$table_count" == "2" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of tables in BQ with id ${{env.INPUT_FILE_NAME}}*: $table_count ."
done
echo "Verified number of tables in BQ with id ${{env.INPUT_FILE_NAME}}*: $table_count ."
- name: Verify distinct rows of existing file
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
rc_orig=$(($(gcloud storage cat gs://${{env.GCS_BUCKET}}/${{env.INPUT_FILE_NAME}}.csv | wc -l ) -1))
not_verified=true
row_count=0
while $not_verified; do
row_count_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(DISTINCT(ID)) FROM `${{env.PROJECT_ID}}.${{env.DATASET_ID}}.${{env.INPUT_FILE_NAME}}`')
row_count=$(echo "$row_count_json" | jq -r '.[].f0_')
if [[ "$row_count" == "$rc_orig" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of rows in ${{env.INPUT_FILE_NAME}}: $row_count."
done
echo "# records in input CSV file are: $rc_orig."
echo "Verified number of rows in ${{env.INPUT_FILE_NAME}}: $row_count."
- name: Verify distinct rows of newly added file
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
rc_orig=$(($(gcloud storage cat gs://${{env.GCS_BUCKET}}/${{env.INPUT_FILE_NAME}}_pub_sub.csv | wc -l ) -1))
not_verified=true
while $not_verified; do
row_count_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(DISTINCT(ID)) FROM `${{env.PROJECT_ID}}.${{env.DATASET_ID}}.${{env.INPUT_FILE_NAME}}_pub_sub`')
row_count=$(echo "$row_count_json" | jq -r '.[].f0_')
if [[ "$row_count" == "$rc_orig" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of rows in ${{env.INPUT_FILE_NAME}}_pub_sub: $row_count."
done
echo "# records in input CSV file are: $rc_orig."
echo "Verified number of rows in ${{env.INPUT_FILE_NAME}}_pub_sub: $row_count."
# Deidentify only existing orc files and store results in BigQuery dataset
deidentify-orc-data-bq:
needs:
- generate-uuid
- create-dataset
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: 17
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run DLP Pipeline
env:
ORC_DEID_BQ_JOB_NAME: ${{ needs.generate-uuid.outputs.output11 }}
ORC_DATASET_ID: ${{ needs.generate-uuid.outputs.output12 }}
run: |
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${{env.GCS_BUCKET}}/${{env.SAMPLE_DATA_DIR}}/${{env.INPUT_ORC_FILE_NAME}}.orc \
--dataset=${{env.ORC_DATASET_ID}} \
--inspectTemplateName=${{env.INSPECT_TEMPLATE_PATH}} \
--deidentifyTemplateName=${{env.PARQUET_DEID_TEMPLATE_PATH}} \
--batchSize=200000 \
--DLPMethod=DEID \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}} \
--jobName=${{env.ORC_DEID_BQ_JOB_NAME}}"
- name: Verify BQ tables
env:
ORC_DATASET_ID: ${{ needs.generate-uuid.outputs.output12 }}
run: |
not_verified=true
table_count=0
while $not_verified; do
table_count=$(($(bq query --use_legacy_sql=false --format csv 'SELECT * FROM `${{env.PROJECT_ID}}.${{env.ORC_DATASET_ID}}`.__TABLES__ WHERE table_id LIKE "${{env.INPUT_ORC_FILE_NAME}}%"' | wc -l ) -1))
if [[ "$table_count" == "1" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of tables in BQ with id ${{env.INPUT_ORC_FILE_NAME}}: $table_count ."
done
echo "Verified number of tables in BQ with id ${{env.INPUT_ORC_FILE_NAME}}: $table_count ."
- name: Verify distinct rows of existing file
env:
ORC_DATASET_ID: ${{ needs.generate-uuid.outputs.output12 }}
run: |
rc_orig=${{ env.ORC_DEID_RECORDS_COUNT }}
not_verified=true
row_count=0
while $not_verified; do
row_count_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(DISTINCT(ID)) FROM `${{env.PROJECT_ID}}.${{env.ORC_DATASET_ID}}.${{env.INPUT_ORC_FILE_NAME}}`')
row_count=$(echo "$row_count_json" | jq -r '.[].f0_')
if [[ "$row_count" == "$rc_orig" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of rows in ${{env.INPUT_ORC_FILE_NAME}}: $row_count."
done
echo "# records in input ORC file are: $rc_orig."
echo "Verified number of rows in ${{env.INPUT_ORC_FILE_NAME}}: $row_count."
deidentify-orc-data-gcs:
needs:
- generate-uuid
- create-output-bucket
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v1
with:
java-version: 11
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run DLP Pipeline
env:
ORC_DEID_GCS_JOB_NAME: ${{ needs.generate-uuid.outputs.output13 }}
OUTPUT_GCS_BUCKET: ${{ needs.generate-uuid.outputs.output9 }}
run: |
gradle run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=2 \
--maxNumWorkers=3 \
--runner=DataflowRunner \
--filePattern=gs://${{env.GCS_BUCKET}}/${{env.SAMPLE_DATA_DIR}}/${{env.INPUT_ORC_FILE_NAME}}.orc \
--outputBucket=gs://${{env.OUTPUT_GCS_BUCKET}} \
--workerMachineType=n1-highmem-4 \
--inspectTemplateName=${{env.INSPECT_TEMPLATE_PATH}} \
--deidentifyTemplateName=${{env.PARQUET_DEID_TEMPLATE_PATH}} \
--batchSize=200000 \
--DLPMethod=DEID \
--jobName=${{env.ORC_DEID_GCS_JOB_NAME}}-gcs \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}}"
- name: Verify output file
env:
OUTPUT_GCS_BUCKET: ${{ needs.generate-uuid.outputs.output9 }}
run: |
not_verified=true
file_count=0
while $not_verified; do
file_count=$(gcloud storage ls gs://${{env.OUTPUT_GCS_BUCKET}}/${{env.INPUT_ORC_FILE_NAME}}*.orc | wc -l)
if [[ "$file_count" == "1" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got output files in GCS with name ${{env.INPUT_ORC_FILE_NAME}}*: $file_count ."
done
echo "Verified output files in GCS with name ${{env.INPUT_ORC_FILE_NAME}}*: $file_count ."
# Deidentify only existing parquet files
deidentify-parquet-data:
needs:
- generate-uuid
- create-dataset
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: 17
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run DLP Pipeline
env:
PARQUET_DEID_JOB_NAME: ${{ needs.generate-uuid.outputs.output7 }}
PARQUET_DATASET_ID: ${{ needs.generate-uuid.outputs.output8 }}
run: |
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${{env.GCS_BUCKET}}/${{env.SAMPLE_DATA_DIR}}/${{env.INPUT_PARQUET_FILE_NAME}}.parquet \
--dataset=${{env.PARQUET_DATASET_ID}} \
--inspectTemplateName=${{env.INSPECT_TEMPLATE_PATH}} \
--deidentifyTemplateName=${{env.PARQUET_DEID_TEMPLATE_PATH}} \
--batchSize=200000 \
--DLPMethod=DEID \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}} \
--jobName=${{env.PARQUET_DEID_JOB_NAME}}"
- name: Verify BQ tables
env:
PARQUET_DATASET_ID: ${{ needs.generate-uuid.outputs.output8 }}
run: |
not_verified=true
table_count=0
while $not_verified; do
table_count=$(($(bq query --use_legacy_sql=false --format csv 'SELECT * FROM `${{env.PROJECT_ID}}.${{env.PARQUET_DATASET_ID}}`.__TABLES__ WHERE table_id LIKE "${{env.INPUT_PARQUET_FILE_NAME}}%"' | wc -l ) -1))
if [[ "$table_count" == "1" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of tables in BQ with id ${{env.INPUT_PARQUET_FILE_NAME}}*: $table_count ."
done
echo "Verified number of tables in BQ with id ${{env.INPUT_PARQUET_FILE_NAME}}*: $table_count ."
- name: Verify distinct rows of existing file
env:
PARQUET_DATASET_ID: ${{ needs.generate-uuid.outputs.output8 }}
run: |
rc_orig=$(($(gcloud storage cat gs://${{env.GCS_BUCKET}}/${{env.SAMPLE_DATA_DIR}}/${{env.INPUT_FILE_NAME}}.csv | wc -l ) -1))
not_verified=true
row_count=0
while $not_verified; do
row_count_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(DISTINCT(ID)) FROM `${{env.PROJECT_ID}}.${{env.PARQUET_DATASET_ID}}.${{env.INPUT_PARQUET_FILE_NAME}}`')
row_count=$(echo "$row_count_json" | jq -r '.[].f0_')
if [[ "$row_count" == "$rc_orig" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of rows in ${{env.INPUT_PARQUET_FILE_NAME}}: $row_count."
done
echo "# records in input Parquet file are: $rc_orig."
echo "Verified number of rows in ${{env.INPUT_PARQUET_FILE_NAME}}: $row_count."
de-identification-streaming-write:
needs:
- generate-uuid
- create-dataset
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: 17
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run DLP Pipeline for streaming write
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=1 \
--maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${{env.GCS_BUCKET}}/${{env.INPUT_STREAMING_WRITE_FILE_NAME}}.csv \
--dataset=${{env.DATASET_ID}} \
--workerMachineType=n1-highmem-4 \
--inspectTemplateName=${{env.INSPECT_TEMPLATE_PATH}} \
--deidentifyTemplateName=${{env.DEID_TEMPLATE_PATH}} \
--batchSize=200000 \
--DLPMethod=DEID \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}} \
--useStorageWriteApi \
--storageWriteApiTriggeringFrequencySec=2 \
--numStorageWriteApiStreams=2"
- name: Verify BQ table for streaming write
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
not_verified=true
table_count=0
while $not_verified; do
table_count=$(($(bq query --use_legacy_sql=false --format csv 'SELECT * FROM `${{env.PROJECT_ID}}.${{env.DATASET_ID}}`.__TABLES__ WHERE table_id LIKE "${{env.INPUT_STREAMING_WRITE_FILE_NAME}}%"' | wc -l ) -1))
if [[ "$table_count" == "1" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of tables in BQ with id ${{env.INPUT_STREAMING_WRITE_FILE_NAME}}: $table_count ."
done
echo "Verified number of tables in BQ with id ${{env.INPUT_STREAMING_WRITE_FILE_NAME}}: $table_count ."
- name: Verify distinct rows of streaming write file
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
rc_orig=$(($(gcloud storage cat gs://${{env.GCS_BUCKET}}/${{env.INPUT_STREAMING_WRITE_FILE_NAME}}.csv | wc -l ) -1))
not_verified=true
row_count=0
while $not_verified; do
row_count_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(DISTINCT(ID)) FROM `${{env.PROJECT_ID}}.${{env.DATASET_ID}}.${{env.INPUT_STREAMING_WRITE_FILE_NAME}}`')
row_count=$(echo "$row_count_json" | jq -r '.[].f0_')
if [[ "$row_count" == "$rc_orig" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got number of rows in ${{env.INPUT_STREAMING_WRITE_FILE_NAME}}: $row_count."
done
echo "# records in input CSV file are: $rc_orig."
echo "Verified number of rows in ${{env.INPUT_STREAMING_WRITE_FILE_NAME}}: $row_count."
de-identification-with-gcs-output:
needs:
- generate-uuid
- create-output-bucket
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v1
with:
java-version: 11
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run DLP Pipeline
env:
DEID_JOB_NAME: ${{ needs.generate-uuid.outputs.output3 }}
OUTPUT_GCS_BUCKET: ${{ needs.generate-uuid.outputs.output9 }}
run: |
gradle run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=2 \
--maxNumWorkers=3 \
--runner=DataflowRunner \
--filePattern=gs://${{env.GCS_BUCKET}}/${{env.INPUT_FILE_NAME}}.csv \
--outputBucket=gs://${{env.OUTPUT_GCS_BUCKET}} \
--workerMachineType=n1-highmem-4 \
--inspectTemplateName=${{env.INSPECT_TEMPLATE_PATH}} \
--deidentifyTemplateName=${{env.DEID_TEMPLATE_PATH}} \
--batchSize=200000 \
--DLPMethod=DEID \
--jobName=${{env.DEID_JOB_NAME}}-gcs \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}}"
- name: Verify output file
env:
OUTPUT_GCS_BUCKET: ${{ needs.generate-uuid.outputs.output9 }}
run: |
not_verified=true
file_count=0
while $not_verified; do
file_count=$(gcloud storage ls gs://${{env.OUTPUT_GCS_BUCKET}}/${{env.INPUT_FILE_NAME}}*.csv | wc -l)
if [[ "$file_count" == "1" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
echo "Got output files in GCS with name ${{env.INPUT_FILE_NAME}}*: $file_count ."
done
echo "Verified output files in GCS with name ${{env.INPUT_FILE_NAME}}*: $file_count ."
- name: Verify rows in output file
env:
OUTPUT_GCS_BUCKET: ${{ needs.generate-uuid.outputs.output9 }}
run: |
output_file_name=$(gcloud storage ls gs://${{env.OUTPUT_GCS_BUCKET}}/${{env.INPUT_FILE_NAME}}*.csv)
rc_orig=$(($(gcloud storage cat gs://${{env.GCS_BUCKET}}/${{env.INPUT_FILE_NAME}}.csv | wc -l ) -1))
rc_output=$(($(gcloud storage cat $output_file_name | wc -l )))
if [[ "$rc_output" == "$rc_orig" ]]; then
echo "PASSED";
fi
echo "Verified number of rows in output file : $output_file_name"
re-identification:
needs:
- generate-uuid
- create-dataset
- de-identification
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: 17
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Store query in GCS bucket
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
export QUERY="SELECT ID, Card_Number, Card_Holders_Name FROM \`${{env.PROJECT_ID}}.${{env.DATASET_ID}}.${{env.INPUT_FILE_NAME}}\`"
cat << EOF | gsutil cp - gs://${GCS_BUCKET}/${{env.REIDENTIFICATION_QUERY_FILE}}
${QUERY}
EOF
- name: Create a PubSub topic
run: |
if [[ $(gcloud pubsub topics list --filter="name:${{env.PUBSUB_TOPIC_NAME}}") ]]; then
echo "Topic already created!"
else
gcloud pubsub topics create ${{env.PUBSUB_TOPIC_NAME}}
echo "Created a new topic!"
fi
- name: Run DLP Pipeline
env:
REID_JOB_NAME: ${{ needs.generate-uuid.outputs.output4 }}
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs=" \
--region=${{env.REGION}} \
--project=${{env.PROJECT_ID}} \
--tempLocation=gs://${{env.GCS_BUCKET}}/temp \
--numWorkers=1 \
--maxNumWorkers=2 \
--runner=DataflowRunner \
--tableRef=${{env.PROJECT_ID}}:${{env.DATASET_ID}}.${{env.INPUT_FILE_NAME}} \
--dataset=${{env.DATASET_ID}} \
--topic=projects/${{env.PROJECT_ID}}/topics/${{env.PUBSUB_TOPIC_NAME}} \
--autoscalingAlgorithm=THROUGHPUT_BASED \
--workerMachineType=n1-highmem-4 \
--deidentifyTemplateName=${{env.REID_TEMPLATE_PATH}} \
--DLPMethod=REID \
--keyRange=1024 \
--queryPath=gs://${GCS_BUCKET}/${{env.REIDENTIFICATION_QUERY_FILE}} \
--serviceAccount=${{env.SERVICE_ACCOUNT_EMAIL}} \
--jobName=${{env.REID_JOB_NAME}}"
- name: Verify BQ table
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
not_verified=true
table_count=0
while $not_verified; do
table_count=$(($(bq query --use_legacy_sql=false --format csv 'SELECT * FROM `${{env.PROJECT_ID}}.${{env.DATASET_ID}}`.__TABLES__ WHERE table_id="${{env.INPUT_FILE_NAME}}_re_id"' | wc -l ) -1))
if [[ "$table_count" == "1" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
done
echo "Verified number of tables in BQ with id ${{env.INPUT_FILE_NAME}}_re_id: $table_count ."
- name: Verify distinct rows
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
run: |
rc_orig=$(($(bq query --nouse_legacy_sql --format=csv "$(gcloud storage cat gs://${{env.GCS_BUCKET}}/${{env.REIDENTIFICATION_QUERY_FILE}})" | wc -l) - 1))
not_verified=true
row_count=0
while $not_verified; do
row_count_json=$(bq query --use_legacy_sql=false --format json 'SELECT COUNT(ID) FROM `${{env.PROJECT_ID}}.${{env.DATASET_ID}}.${{env.INPUT_FILE_NAME}}_re_id`')
row_count=$(echo "$row_count_json" | jq -r '.[].f0_')
if [[ "$row_count" == "$rc_orig" ]]; then
echo "PASSED";
not_verified=false;
else
sleep 30s
fi
done
echo "# records in input query are: $rc_orig."
echo "Verified number of rows in ${{env.INPUT_FILE_NAME}}_re_id: $row_count."
clean-up:
if: "!cancelled()"
needs:
- generate-uuid
- inspection
- inspect-parquet-data
- inspect-orc-data
- de-identification
- deidentify-parquet-data
- deidentify-orc-data-bq
- deidentify-orc-data-gcs
- de-identification-streaming-write
- de-identification-with-gcs-output
- re-identification
runs-on:
- self-hosted
- CI
timeout-minutes: 30
steps:
- name: Clean-up BQ dataset and GCS Bucket
if: "!cancelled()"
env:
DATASET_ID: ${{ needs.generate-uuid.outputs.output5 }}
PARQUET_DATASET_ID: ${{ needs.generate-uuid.outputs.output8 }}
ORC_DATASET_ID: ${{ needs.generate-uuid.outputs.output12 }}
run: |
bq rm -r -f -d ${{env.PROJECT_ID}}:${{env.DATASET_ID}}
bq rm -r -f -d ${{env.PROJECT_ID}}:${{env.PARQUET_DATASET_ID}}
bq rm -r -f -d ${{env.PROJECT_ID}}:${{env.ORC_DATASET_ID}}
- name: Clean up pub_sub file
run: |
if ($(gsutil rm -f gs://${{env.GCS_BUCKET}}/${{env.INPUT_FILE_NAME}}_pub_sub.csv)); then
echo "Cleared pub_sub file!"
else
echo "pub_sub file not present in storage bucket."
fi
- name: Clean up gcs bucket
env:
OUTPUT_GCS_BUCKET: ${{ needs.generate-uuid.outputs.output9 }}
run: |
gcloud storage rm --recursive gs://${{env.OUTPUT_GCS_BUCKET}}
- name: Cancel Inspection Job
if: "!cancelled()"
env:
INSPECT_JOB_NAME: ${{ needs.generate-uuid.outputs.output2 }}
run: |
inspect_job_data=$(gcloud dataflow jobs list --project ${{env.PROJECT_ID}} --status active --format json --filter="name=${{env.INSPECT_JOB_NAME}}")
inspect_job_id=$(echo "$inspect_job_data" | jq -r '.[].id')
if [[ "$inspect_job_id" == "" ]]; then
echo "No job found with name: ${{env.INSPECT_JOB_NAME}}."
else
gcloud dataflow jobs cancel $inspect_job_id --project ${{env.PROJECT_ID}}
fi
- name: Cancel De-identification Job
if: "!cancelled()"
env:
DEID_JOB_NAME: ${{ needs.generate-uuid.outputs.output3 }}
run: |
deid_job_data=$(gcloud dataflow jobs list --project ${{env.PROJECT_ID}} --status active --format json --filter="name=${{env.DEID_JOB_NAME}}")
deid_job_id=$(echo "$deid_job_data" | jq -r '.[].id')
if [[ "$deid_job_id" == "" ]]; then
echo "No job found with name: ${{env.DEID_JOB_NAME}}."
else
gcloud dataflow jobs cancel $deid_job_id --project ${{env.PROJECT_ID}}
fi