From b516b6095d7e755055960aab627e8e59759d2011 Mon Sep 17 00:00:00 2001 From: Stephanie Hong Date: Mon, 30 Jan 2023 22:47:51 -0500 Subject: [PATCH] shared code logic library --- pipeline_logic/v2/shared-logic/src/setup.cfg | 5 + pipeline_logic/v2/shared-logic/src/setup.py | 19 + .../src/source_cdm_utils/__init__.py | 0 .../src/source_cdm_utils/clean.py | 46 ++ .../src/source_cdm_utils/collision_check.py | 46 ++ .../src/source_cdm_utils/manifest.py | 36 ++ .../src/source_cdm_utils/parse.py | 171 ++++++ .../src/source_cdm_utils/person_ids.py | 73 +++ .../src/source_cdm_utils/pre_clean.py | 458 +++++++++++++++ .../src/source_cdm_utils/schema.py | 555 ++++++++++++++++++ .../src/source_cdm_utils/status.py | 30 + .../src/source_cdm_utils/unzip.py | 86 +++ 12 files changed, 1525 insertions(+) create mode 100644 pipeline_logic/v2/shared-logic/src/setup.cfg create mode 100644 pipeline_logic/v2/shared-logic/src/setup.py create mode 100644 pipeline_logic/v2/shared-logic/src/source_cdm_utils/__init__.py create mode 100644 pipeline_logic/v2/shared-logic/src/source_cdm_utils/clean.py create mode 100644 pipeline_logic/v2/shared-logic/src/source_cdm_utils/collision_check.py create mode 100644 pipeline_logic/v2/shared-logic/src/source_cdm_utils/manifest.py create mode 100644 pipeline_logic/v2/shared-logic/src/source_cdm_utils/parse.py create mode 100644 pipeline_logic/v2/shared-logic/src/source_cdm_utils/person_ids.py create mode 100644 pipeline_logic/v2/shared-logic/src/source_cdm_utils/pre_clean.py create mode 100644 pipeline_logic/v2/shared-logic/src/source_cdm_utils/schema.py create mode 100644 pipeline_logic/v2/shared-logic/src/source_cdm_utils/status.py create mode 100644 pipeline_logic/v2/shared-logic/src/source_cdm_utils/unzip.py diff --git a/pipeline_logic/v2/shared-logic/src/setup.cfg b/pipeline_logic/v2/shared-logic/src/setup.cfg new file mode 100644 index 0000000..dc7aa4b --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/setup.cfg @@ -0,0 +1,5 @@ +[pep8] +max-line-length = 120 + +[flake8] +max-line-length = 120 diff --git a/pipeline_logic/v2/shared-logic/src/setup.py b/pipeline_logic/v2/shared-logic/src/setup.py new file mode 100644 index 0000000..065fec6 --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/setup.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +"""Library setup script.""" + +import os +from setuptools import find_packages, setup + +setup( + name=os.environ['PKG_NAME'], + version=os.environ['PKG_VERSION'], + + description='My Python library project', + + author="UNITE", + + packages=find_packages(exclude=['contrib', 'docs', 'test']), + + # Please instead specify your dependencies in conda_recipe/meta.yml + install_requires=[], +) diff --git a/pipeline_logic/v2/shared-logic/src/source_cdm_utils/__init__.py b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipeline_logic/v2/shared-logic/src/source_cdm_utils/clean.py b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/clean.py new file mode 100644 index 0000000..a28a2ff --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/clean.py @@ -0,0 +1,46 @@ +from pyspark.sql import functions as F + + +empty_payer_plan_cols = [ + "payer_source_concept_id", + "plan_concept_id", + "plan_source_concept_id", + "sponsor_concept_id", + "sponsor_source_concept_id", + "stop_reason_concept_id", + "stop_reason_source_concept_id" +] + + +def conceptualize(domain, df, concept): + concept = concept.select("concept_id", "concept_name") + + concept_id_columns = [col for col in df.columns if col.endswith("_concept_id")] + # _concept_id columns should be Integer + df = df.select( + *[col for col in df.columns if col not in concept_id_columns] + + [F.col(col).cast("integer").alias(col) for col in concept_id_columns] + ) + + for col in concept_id_columns: + new_df = df + + if col in empty_payer_plan_cols: + # Create an empty *_concept_name column for these cols to prevent an OOM during the join while keeping the schema consistent + new_df = new_df.withColumn("concept_name", F.lit(None).cast("string")) + else: + new_df = new_df.join(concept, [new_df[col] == concept["concept_id"]], "left_outer").drop("concept_id") + + concept_type = col[:col.index("_concept_id")] + new_df = new_df.withColumnRenamed("concept_name", concept_type+"_concept_name") + + df = new_df + + # occurs in observation, measurement + if "value_as_number" in df.columns: + df = df.withColumn("value_as_number", df.value_as_number.cast("double")) + + if "person_id" in df.columns: + df = df.filter(df.person_id.isNotNull()) + + return df diff --git a/pipeline_logic/v2/shared-logic/src/source_cdm_utils/collision_check.py b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/collision_check.py new file mode 100644 index 0000000..e771163 --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/collision_check.py @@ -0,0 +1,46 @@ +from pyspark.sql import functions as F, types as T +from pyspark.sql.window import Window as W + + +def new_duplicate_rows_with_collision_bits(omop_domain, lookup_df, ctx, pk_col, full_hash_col): + + # Extract all duplicate rows from domain table + # Keep two columns: 51 bit hash (which caused collision) and full hash (to differentiate collisions) + w = W.partitionBy(pk_col) + duplicates_df = omop_domain.dataframe().select('*', F.count(pk_col).over(w).alias('dupeCount'))\ + .where('dupeCount > 1')\ + .drop('dupeCount') + duplicates_df = duplicates_df.select(pk_col, full_hash_col) + + if ctx.is_incremental: + # Count how many rows in the lookup table exist for the collided hash value + cache = lookup_df.dataframe('previous', schema=T.StructType([ + T.StructField(pk_col, T.LongType(), True), + T.StructField(full_hash_col, T.StringType(), True), + T.StructField("collision_bits", T.IntegerType(), True) + ])) + cache_count = cache.groupby(pk_col).count() + + # Keep only the rows in duplicates_df that are not currently in lookup table + cond = [pk_col, full_hash_col] + duplicates_df = duplicates_df.join(cache, cond, 'left_anti') + + # Create counter for rows in duplicates_df + # Subtract 1 because the default collision resolution bit value is 0 + w2 = W.partitionBy(pk_col).orderBy(pk_col) + duplicates_df = duplicates_df.withColumn('row_num', F.row_number().over(w2)) + duplicates_df = duplicates_df.withColumn('row_num', (F.col('row_num') - 1)) + + # If there are already entries in the lookup table for the given primary key, + # then add the number of existing entries to the row number counter + if ctx.is_incremental: + duplicates_df = duplicates_df.join(cache_count, pk_col, 'left') + duplicates_df = duplicates_df.fillna(0, subset=['count']) + duplicates_df = duplicates_df.withColumn('row_num', (F.col('row_num') + F.col('count').cast(T.IntegerType()))) + + duplicates_df = duplicates_df.withColumnRenamed('row_num', 'collision_bits') + + # Remove 'count' column for incremental transforms: + duplicates_df = duplicates_df.select(pk_col, full_hash_col, 'collision_bits') + + return duplicates_df diff --git a/pipeline_logic/v2/shared-logic/src/source_cdm_utils/manifest.py b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/manifest.py new file mode 100644 index 0000000..88c9212 --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/manifest.py @@ -0,0 +1,36 @@ +from datetime import datetime +from pyspark.sql import functions as F +from source_cdm_utils import schema + + +def manifest(ctx, manifest_schema, manifest_df, site_id_df, data_partner_ids, omop_vocab, control_map): + # Handle empty manifest + if manifest_df.count() == 0: + schema_struct = schema.schema_dict_to_struct(manifest_schema, False) + data = [["[No manifest provided]" for _ in manifest_schema]] + processed_df = ctx.spark_session.createDataFrame(data, schema_struct) + + # Add CDM + site_id_df = site_id_df.join(data_partner_ids, "data_partner_id", "left") + try: + cdm = site_id_df.head().source_cdm + processed_df = processed_df.withColumn("CDM_NAME", F.lit(cdm)) + except IndexError: + pass + else: + processed_df = manifest_df + + curr_date = datetime.date(datetime.now()) + processed_df = processed_df.withColumn("CONTRIBUTION_DATE", F.lit(curr_date).cast("date")) + + omop_vocab = omop_vocab.where(omop_vocab["vocabulary_id"] == "None").where(omop_vocab["vocabulary_name"] == "OMOP Standardized Vocabularies") + vocabulary_version = omop_vocab.head().vocabulary_version + processed_df = processed_df.withColumn("N3C_VOCAB_VERSION", F.lit(vocabulary_version).cast("string")) + + # Compute approximate expected person count using the CONTROL_MAP file + # Approximate xpected person count = (# rows in CONTROL_MAP) / 2 * 3 + control_map_total_count = control_map.count() + approx_expected_person_count = int((control_map_total_count / 2) * 3) + processed_df = processed_df.withColumn("APPROX_EXPECTED_PERSON_COUNT", F.lit(approx_expected_person_count)) + + return processed_df diff --git a/pipeline_logic/v2/shared-logic/src/source_cdm_utils/parse.py b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/parse.py new file mode 100644 index 0000000..d24c7d9 --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/parse.py @@ -0,0 +1,171 @@ +import csv +from pyspark.sql import Row, functions as F, types as T + +header_col = "__is_header__" +errorCols = ["row_number", "error_type", "error_details"] +ErrorRow = Row(header_col, *errorCols) + + +def required_parse(filename_input, payload_input, domain, clean_output, error_output, all_cols, required_cols): + parse(filename_input, payload_input, domain, clean_output, error_output, all_cols, required_cols) + + +def optional_parse(filename_input, payload_input, domain, clean_output, error_output, all_cols, required_cols): + parse(filename_input, payload_input, domain, clean_output, error_output, all_cols, required_cols) + + +def cached_parse(filename_input, payload_input, domain, clean_output, error_output, all_cols, required_cols): + regexPattern = "(?i).*" + domain + "\\.csv" + fs = payload_input.filesystem() + files_df = fs.files(regex=regexPattern) + + if files_df.count() > 0: + parse(filename_input, payload_input, domain, clean_output, error_output, all_cols, required_cols) + else: + clean_output.abort() + error_output.abort() + + +def metadata_parse(payload_input, filename, clean_output, error_output, all_cols, required_cols): + regex = "(?i).*" + filename + "\\.csv" + clean_df, error_df = parse_csv(payload_input, regex, all_cols, required_cols) + + clean_output.write_dataframe(clean_df) + error_output.write_dataframe(error_df) + + +# API above, functionality below + + +def parse(filename_input, payload_input, domain, clean_output, error_output, all_cols, required_cols): + regex = "(?i).*" + domain + "\\.csv" + clean_df, error_df = parse_csv(payload_input, regex, all_cols, required_cols) + + payload_filename = filename_input.dataframe().where(F.col("newest_payload") == True).head().payload # noqa + clean_df = clean_df.withColumn("payload", F.lit(payload_filename)) + + clean_output.write_dataframe(clean_df) + error_output.write_dataframe(error_df) + + +def parse_csv(payload_input, regex, all_cols, required_cols): + # Get the correct file from the unzipped payload + files_df = payload_input.filesystem().files(regex=regex) + + # Parse the CSV into clean rows and error rows + parser = CsvParser(payload_input, all_cols, required_cols) + result_rdd = files_df.rdd.flatMap(parser) + + # The idea behind caching here was that it would make sure Spark didn't parse the CSV twice, once for the clean + # rows and once for the error rows. However some CSVs are greater than 100G, and this line always caused an OOM + # for those. After removing this line, we could parse the 100G even with a small profile. + # result_rdd = result_rdd.cache() + + # Separate into good and bad rows + clean_df = rddToDf(result_rdd, "clean", T.StructType([T.StructField(col, T.StringType()) for col in all_cols])) + error_df = rddToDf(result_rdd, "error", T.StructType([T.StructField(col, T.StringType()) for col in errorCols])) + + # Return + return (clean_df, error_df) + + +def rddToDf(inputRdd, rowType, schema): + # Filter by type and get the rows + resultRdd = inputRdd.filter(lambda row: row[0] == rowType) + resultRdd = resultRdd.map(lambda row: row[1]) + + if resultRdd.isEmpty(): + # Convert to DF using the given schema + resultDf = resultRdd.toDF(schema) + else: + # Convert to DF using the RDD Rows' schema + resultDf = resultRdd.toDF() + + # Drop the header row - get only the data rows. This is needed to ensure we get the right schema. + resultDf = resultDf.filter(resultDf[header_col] == False).drop(header_col) + + return resultDf + + +class CsvParser(): + def __init__(self, rawInput, all_cols, required_cols): + self.rawInput = rawInput + self.all_cols = all_cols + self.required_cols = required_cols + + def __call__(self, csvFilePath): + try: + dialect = self.determineDialect(csvFilePath) + except Exception as e: + yield ("error", ErrorRow(False, "0", "Could not determine the CSV dialect", repr(e))) + return + + with self.rawInput.filesystem().open(csvFilePath.path, errors='ignore') as csvFile: + csvReader = csv.reader(csvFile, dialect=dialect) + yield from self.parseHeader(csvReader) + yield from self.parseFile(csvReader) + + def determineDialect(self, csvFilePath): + with self.rawInput.filesystem().open(csvFilePath.path, errors='ignore') as csvFile: + dialect = csv.Sniffer().sniff(csvFile.readline(), delimiters=",|") + + return dialect + + def parseHeader(self, csvReader): + header = next(csvReader) + header = [x.strip().strip("\ufeff").upper() for x in header] + header = [*filter(lambda col: col, header)] # Remove empty headers + + self.CleanRow = Row(header_col, *header) + self.expected_num_fields = len(header) + + yield ("clean", self.CleanRow(True, *header)) + yield ("error", ErrorRow(True, "", "", "")) + + warningDetails = { + "all columns": self.all_cols, + "required columns": self.required_cols, + "header": header + } + + # Throw warning for every column in the required schema but not in the header + for col in self.required_cols: + if col not in header: + message = f"Header did not contain required column `{col}`" + yield ("error", ErrorRow(False, "0", message, warningDetails)) + + # Throw warning for every column in the header but not in the schema + for col in header: + if col not in self.all_cols: + message = f"Header contained unexpected extra column `{col}`" + yield ("error", ErrorRow(False, "0", message, warningDetails)) + + def parseFile(self, csvReader): + i = 0 + while True: + i += 1 + + nextError = False + try: + row = next(csvReader) + except StopIteration: + break + except Exception as e: + nextError = [str(i), "Unparsable row", repr(e)] + + # Hit an error parsing + if nextError: + yield ("error", ErrorRow(False, *nextError)) + + # Properly formatted row + elif len(row) == self.expected_num_fields: + yield ("clean", self.CleanRow(False, *row)) + + # Ignore empty rows/extra newlines + elif not row: + continue + + # Improperly formatted row + else: + message = f"Incorrect number of fields. Expected {str(self.expected_num_fields)} but found {str(len(row))}." + yield ("error", ErrorRow(False, str(i), message, str(row))) diff --git a/pipeline_logic/v2/shared-logic/src/source_cdm_utils/person_ids.py b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/person_ids.py new file mode 100644 index 0000000..df37d7d --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/person_ids.py @@ -0,0 +1,73 @@ +from pyspark.sql import functions as F, Window as W +from transforms.verbs import dataframes as D + + +def person_id_pre_clean(ctx, person, **input_dfs): + pairs_to_union = [] + persons_to_union = [] + + for (alias, df) in input_dfs.items(): + # Get person_ids for "missing persons" check + persons_to_union.append(df.select("person_id", "data_partner_id")) + + # If the domain has a person_id and visit_occurrence_id column + if alias in domains_to_filter_multiple_persons_per_visit: + # Get person_id + visit_occurrence_id pairs for "multiple persons per visit" check + pairs_to_union.append(df.select("person_id", "visit_occurrence_id", "data_partner_id")) + + distinct_pairs = D.union_many(*pairs_to_union).distinct() + all_persons = D.union_many(*persons_to_union).distinct() + + # DETERMINE ROWS TO REMOVE + CREATE UNIFORM SCHEMA + # Rows with multiple persons per visit + multiple_persons_df = get_rows_with_multiple_persons_per_visit(distinct_pairs) \ + .select("person_id", + "visit_occurrence_id", + "data_partner_id", + "removal_reason") + + # Rows with person_ids not found in the person table + # NOTE that this will report 'null' as a missing person if null person_ids exist. + missing_persons_df = get_rows_with_missing_persons(all_persons, person) \ + .select("person_id", + "data_partner_id", + "removal_reason") \ + .withColumn("visit_occurrence_id", F.lit(None)) + + bad_person_ids = multiple_persons_df.unionByName(missing_persons_df) + + return bad_person_ids + + +domains_to_filter_multiple_persons_per_visit = [ + "condition_occurrence", + "procedure_occurrence", + "measurement", + "observation", + "visit_occurrence", + "drug_exposure", + "device_exposure" +] + + +# Return rows that have a visit_occurrence_id associated with more than 1 person_id +def get_rows_with_multiple_persons_per_visit(domain_df): + w = W.partitionBy("visit_occurrence_id") + # collect all rows that have a visit_occurrence_id associated with more than 1 person_id + bad_rows_df = domain_df.filter(F.col('visit_occurrence_id').isNotNull()) \ + .withColumn('distinct_person_count', F.size(F.collect_set("person_id").over(w))) + bad_rows_df = bad_rows_df \ + .filter(bad_rows_df.distinct_person_count > 1) \ + .drop('distinct_person_count') \ + .withColumn('removal_reason', F.lit('MULTIPLE_PERSONS_PER_VISIT_OCCURRENCE')) + + return bad_rows_df + + +# Return rows that have a person_id not found in the person domain +# REQUIRES that both {all_persons_df} and {person_domain_df} have person_id column +# NOTE that this will report 'null' as a missing person if null person_ids exist. +def get_rows_with_missing_persons(all_persons_df, person_domain_df): + bad_rows_df = all_persons_df.join(person_domain_df, on='person_id', how='left_anti') + + return bad_rows_df.withColumn('removal_reason', F.lit('PERSON_ID_NOT_IN_PERSON_DOMAIN')) diff --git a/pipeline_logic/v2/shared-logic/src/source_cdm_utils/pre_clean.py b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/pre_clean.py new file mode 100644 index 0000000..6533880 --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/pre_clean.py @@ -0,0 +1,458 @@ +""" +This file is the heart of the cleaning steps, but has become quite bloated over time. Should be re-written. +- tschwab, July 2022 + +updated by mchoudhury Oct 2022 +""" + + +from pyspark.sql import functions as F, Window as W +from transforms.verbs import dataframes as D + +DOMAIN_PKEYS = { + "care_site": "care_site_id", + "condition_era": "condition_era_id", + "condition_occurrence": "condition_occurrence_id", + "control_map": "control_map_id", + "death": "person_id", + "dose_era": "dose_era_id", + "drug_era": "drug_era_id", + "drug_exposure": "drug_exposure_id", + "device_exposure": "device_exposure_id", + "location": "location_id", + "measurement": "measurement_id", + "note": "note_id", + "note_nlp": "note_nlp_id", + "observation": "observation_id", + "observation_period": "observation_period_id", + "payer_plan_period": "payer_plan_period_id", + "person": "person_id", + "procedure_occurrence": "procedure_occurrence_id", + "provider": "provider_id", + "visit_detail": "visit_detail_id", + "visit_occurrence": "visit_occurrence_id" +} + +possible_phi_cols = { + "condition_occurrence": [ + "condition_source_value", + "condition_status_source_value" + ], + "death": [ + "cause_source_value" + ], + "measurement": [ + "value_source_value", + "measurement_source_value" + ], + "observation": [ + "value_as_string", + "observation_source_value", + ], + "person": [ + "gender_source_value", + "race_source_value", + "ethnicity_source_value", + ], + "procedure_occurrence": [ + "procedure_source_value" + ], + "provider": [ + "specialty_source_value", + "gender_source_value" + ], + "visit_occurrence": [ + "visit_source_value", + "admitting_source_value", + "discharge_to_source_value", + ] +} + + +def clean_free_text_cols(input_df, domain, p_key, ctx): + domain = domain.lower() + phi_cols = possible_phi_cols.get(domain, []) + final_domain_df = input_df + final_nulled_rows_df = None + + # Only apply this cleaning logic to domains with possible PHI columns + if phi_cols: + # Reduce the domain to only 2 identifying columns and the possible phi columns + reduced_input_df = input_df.select( + p_key, + "data_partner_id", + *phi_cols) + + # For efficiency, use lists to store dataframes that need to be unioned/joined after the 'for' loop + nulled_rows_dfs = [] + preserve_rows_dfs = [] + # TODO Improvement: partitioning in a for loop like this (especially if it happens multiple times) causes lots of shuffling + for col in phi_cols: + # Reduce to only 2 identifying columns and the particular column we are investigating. + # We can filter to non-null values because we will never mark a null column as possible PHI. + df = reduced_input_df \ + .select(p_key, + "data_partner_id", + col) \ + .where(F.col(col).isNotNull()) + + # (1) Create flag for records >= 60 characters in length + df = df.withColumn("length_flag", F.length(df[col]) >= 60) + # (2) Create flag for any non-numeric values with a frequency <= 2 + # Get value frequencies + # w = W.partitionBy(col) + df = df.withColumn('free_text_val_freq', F.lit(3)) # F.count('*').over(w)) + # Define conditions + freq_cond = df['free_text_val_freq'] <= 2 + non_numeric_cond = df[col].rlike(".*[a-zA-Z]+.*") # entry contains any letters + # Create flags + df = df \ + .withColumn("non_numeric_flag", non_numeric_cond) \ + .withColumn("non_numeric_infrequent_flag", freq_cond & non_numeric_cond) \ + .drop('free_text_val_freq') + + # (3) Create flag for records that match a PHI regex expression for personally-identifying prefixes + regex_check = "Mr\.|Mrs\.|\bMiss\b|Dr\.|, M\.?D\.?" + df = df.withColumn("regex_flag", + F.when(~df.non_numeric_flag, F.lit(False)) # All characters are numeric + .otherwise(df[col].rlike(regex_check))) + + # Log the rows that were flagged for this column (ie: BAD values) + nulled_rows_df = df \ + .where( + (df["length_flag"] == True) | + (df["non_numeric_infrequent_flag"] == True) | + (df["regex_flag"] == True)) \ + .withColumn("column", F.lit(col)) \ + .withColumn("nulled_value", F.col(col)) \ + .select( + "data_partner_id", + "column", + "nulled_value", + "length_flag", + "non_numeric_infrequent_flag", + "regex_flag" + ) + nulled_rows_dfs.append(nulled_rows_df) + + # Log the rows that were kept for this column (ie: GOOD values) + preserve_rows_df = df \ + .where( + (df["length_flag"] == False) & + (df["non_numeric_infrequent_flag"] == False) & + (df["regex_flag"] == False)) \ + .select( + p_key, + col + ) + preserve_rows_dfs.append(preserve_rows_df) + + # Union all nulled rows + final_nulled_rows_df = D.union_many(*nulled_rows_dfs, spark_session=ctx.spark_session) + + # Update input_df to return + # Remove all possible PHI columns from the dataframe + final_domain_df = input_df \ + .drop(*phi_cols) + # Join back in the possible PHI columns based ONLY on the preserved rows + for df in preserve_rows_dfs: + final_domain_df = final_domain_df \ + .join(df, + on=p_key, + how='left') + + return (final_domain_df, final_nulled_rows_df) + + +# Utility function that converts a dataset into a dictionary of {value}:{removal_reason} grouped by {col_name} to +# use as input parameter for get_bad_rows_by_column_val. +# REQUIRES bad_vals_df schema {col_name:string, value:int or string, removal_reason:string} +# RETURNS dictionary of form {col_name : {value : removal_reason, ...}, ...} +def get_bad_vals_dict(bad_vals_df): + bad_vals_df = bad_vals_df \ + .groupBy('col_name').agg( + F.map_from_entries( + F.collect_list( + F.struct('value', 'removal_reason'))).alias('val_reason_dict')) + # Create dictionary + bad_vals_dict = bad_vals_df.rdd.map(lambda x: (x[0], x[1])).collectAsMap() # noqa + return bad_vals_dict + + +# Utility function that identifies bad rows is {domain_df} if {col_name}'s value is in {bad_vals_dict}.keys() +# Expects bad_vals_dict to be a dict of form {value : removal_reason} +# Returns filtered {bad_rows_df}, which has the same schema as domain_df + 'removal_reason' column +def get_bad_rows_by_column_val(domain_df, col_name, bad_vals_dict): + # Determine bad rows + bad_rows_df = domain_df \ + .filter(F.col(col_name).isin(*bad_vals_dict.keys())) \ + .withColumn('removal_reason', get_value(bad_vals_dict)(F.col(col_name))) + + return bad_rows_df + + +def get_value(dictionary): + def f(x): + return dictionary.get(x) + return F.udf(f) + + +def clear_person_source_value(df): + df = df.withColumn("person_source_value", F.lit(None).cast("string")) + return df + + +# Per NCATS request, for any records in the person table with a race_concept_id = 8657 ("American Indian or Alaska Native"), +# re-map to a race_concept_id = 45878142 ("Other"). +# Also, for these rows, replace race_source_value with "Other" and race_source_concept_id with 0. +# tschwab update August 2022 - NCATS asked us to undo this hiding logic +def filter_anai_records(person_df): + return person_df + + anai_race_concept_id = 8657 # Standard concept: concept_name = "American Indian or Alaska Native", domain_id = "Race" + other_standard_concept_id = 45878142 # Standard concept: concept_name = "Other", domain_id = "Meas Value" + + person_df = person_df.withColumn( + "anai_flag", + F.when(F.col("race_concept_id") == F.lit(anai_race_concept_id), F.lit(1))\ + .otherwise(F.lit(0)) + ).withColumn( + "race_concept_id", + F.when(F.col("anai_flag") == F.lit(1), F.lit(other_standard_concept_id))\ + .otherwise(F.col("race_concept_id")) + ).withColumn( + "race_source_concept_id", + F.when(F.col("anai_flag") == F.lit(1), F.lit(0))\ + .otherwise(F.col("race_source_concept_id")) + ).withColumn( + "race_source_value", + F.when(F.col("anai_flag") == F.lit(1), F.lit("Other"))\ + .otherwise(F.col("race_source_value")) + ) + + # Handle records with AN/AI source values that aren't mapped to the AN/AI concept_id + person_df = person_df.withColumn( + "race_source_value", + F.when(F.col("race_source_value").rlike("(?i)(.*indian.*native.*)|(.*indian.*alask.*)|(.*amer.*indian.*)|(.*choctaw.*indian.*)"), F.lit(None))\ + .otherwise(F.col("race_source_value")) + ) + return person_df.drop("anai_flag") + + +def do_pre_clean( + domain, + processed, nulled_rows, removed_rows, + foundry_df, removed_person_ids, + ahrq_xwalk, tribal_zips, loincs_to_remove, ctx +): + + # Convert transform inputs to dataframes + foundry_df, removed_person_ids, ahrq_xwalk, tribal_zips, loincs_to_remove = \ + foundry_df.dataframe(), removed_person_ids.dataframe(), ahrq_xwalk.dataframe(), \ + tribal_zips.dataframe(), loincs_to_remove.dataframe() + + # Solve death dupes - deaths have no primary key + if domain.lower() == "death": + p_key = "real_primary_key" + foundry_df = foundry_df.withColumn(p_key, F.monotonically_increasing_id()) + else: + p_key = DOMAIN_PKEYS[domain] + + bad_rows_dfs = [] + + # --------------------ALL domains + # -----CLEAN FREE TEXT + # Clean free text columns to address PHI and site-identifying information. + foundry_df, nulled_rows_df = clean_free_text_cols(foundry_df, domain, p_key, ctx) + # Log the rows that were nulled if this domain has any free text columns/any rows were flagged + if nulled_rows_df: + nulled_rows.write_dataframe(nulled_rows_df) + + # --------------------DOMAINS with PERSON_ID + # -----BAD PERSON_IDS + if "person_id" in foundry_df.columns: + # Remove person ids based on lds_clean_removed_person_ids dataset + # Keep only one row per person_id + removed_person_ids_distinct = removed_person_ids \ + .dropDuplicates(["person_id"]) \ + .where(F.col("person_id").isNotNull()) \ + .withColumn("col_name", F.lit("person_id")) \ + .withColumn("value", F.col("person_id")) \ + .select("col_name", + "value", + "removal_reason") + + column_values_dict = get_bad_vals_dict(removed_person_ids_distinct) + for (col_name, val_reason_dict) in column_values_dict.items(): + bad_person_id_rows = get_bad_rows_by_column_val(foundry_df, + col_name, + val_reason_dict) + if bad_person_id_rows: + bad_rows_dfs.append(bad_person_id_rows) + + # Remove rows with null person_ids + null_person_id_rows = foundry_df \ + .where(F.col("person_id").isNull()) \ + .withColumn("removal_reason", F.lit("NULL_PERSON_ID")) + if null_person_id_rows: + bad_rows_dfs.append(null_person_id_rows) + + # --------------------PERSON domain + if domain.lower() == "person": + # Null person_source_value column, which can contain site names + foundry_df = clear_person_source_value(foundry_df) + # Handle records with AN/AI race_concept_id (map to "Other" instead) + foundry_df = filter_anai_records(foundry_df) + + # --------------------ANY domain -- removing rows based on LOINCS_TO_REMOVE input df + # Filter to current domain and update col_name based on current domain + # ie: '_concept_id' --> 'measurement_concept_id' + bad_loincs = loincs_to_remove + bad_loincs = bad_loincs.filter(F.array_contains(F.col("domains"), domain.lower())) + bad_loincs = bad_loincs \ + .select('col_name', 'value', 'removal_reason') \ + .withColumn('col_name', F.concat(F.lit(domain.lower()), F.col('col_name'))) + + if bad_loincs: + # Grouping by 'column', create a dictionary of values:removal_reasons + bad_loincs = bad_loincs \ + .groupBy('col_name').agg( + F.map_from_entries( + F.collect_list( + F.struct('value', 'removal_reason'))).alias('val_reason_dict')) + # Create a dictionary of form {col_name : {value : removal_reason, ...}, ...} + column_values_dict = bad_loincs.rdd.map(lambda x: (x[0], x[1])).collectAsMap() # noqa + + # Get bad rows by searching each col_name for values + for (col_name, val_reason_dict) in column_values_dict.items(): + bad_loinc_rows = get_bad_rows_by_column_val(foundry_df, + col_name, + val_reason_dict) + if bad_loinc_rows: + bad_rows_dfs.append(bad_loinc_rows) + + # --------------------CONDITION_OCCURRENCE or OBSERVATION domains + if domain.lower() in ["condition_occurrence", "observation"]: + # Remove records containing AHRQ codes + concept_col_dict = { + "condition_occurrence": "condition_concept_id", + "observation": "observation_concept_id" + } + source_col_dict = { + "condition_occurrence": "condition_source_value", + "observation": "observation_source_value" + } + + ahrq_concept_ids = ahrq_xwalk.select("standard_concept_id").distinct() + ahrq_source_values = ahrq_xwalk.select("icd10_source_code").distinct() + + # -----AHRQ_CONCEPT_ID + # create dictionary of form {value : removal_reason} + ahrq_concept_id_dict = ahrq_concept_ids.rdd.map(lambda x: (x[0], 'AHRQ_CONCEPT_ID')).collectAsMap() # noqa + ahrq_concept_id_rows = \ + get_bad_rows_by_column_val(foundry_df, + concept_col_dict[domain.lower()], + ahrq_concept_id_dict) + if ahrq_concept_id_rows: + bad_rows_dfs.append(ahrq_concept_id_rows) + + # -----AHRQ_SOURCE_VALUE + # create dictionary of form {value : removal_reason} + ahrq_source_values_dict = ahrq_source_values.rdd.map(lambda x: (x[0], 'AHRQ_SOURCE_VALUE')).collectAsMap() # noqa + ahrq_source_value_rows = \ + get_bad_rows_by_column_val(foundry_df, + source_col_dict[domain.lower()], + ahrq_source_values_dict) + if ahrq_source_value_rows: + bad_rows_dfs.append(ahrq_source_value_rows) + + # --------------------CARE_SITE domain + if domain.lower() == "care_site": + foundry_df = foundry_df.withColumn("care_site_name", F.lit(None).cast('string')) + + # --------------------LOCATION domain + if domain.lower() == "location": + # -----TRIBAL_ZIP_CODE + tribal_zips = tribal_zips.select('Zip_Code') + # create dictionary of form {value : removal_reason} + tribal_zips_dict = tribal_zips.rdd.map(lambda x: (x[0], 'TRIBAL_ZIP_CODE')).collectAsMap() # noqa + tribal_zip_rows = \ + get_bad_rows_by_column_val(foundry_df, + 'zip', + tribal_zips_dict) + # add to list of bad_rows_dfs if not empty + if tribal_zip_rows: + bad_rows_dfs.append(tribal_zip_rows) + + # Clean up the many zip formats + zipcol = F.trim(F.col("zip")) + foundry_df = foundry_df.withColumn( + "zip", + + # Standard 5 digit + F.when(zipcol.rlike("^\\d{5}$"), zipcol) + + # 123XX format + .when(zipcol.rlike("^\\d{3}XX$"), F.substring(zipcol, 1, 3)) + + # Standard 3 digit + .when(zipcol.rlike("^\\d{3}$"), zipcol) + + # 9 digit with a dash + .when(zipcol.rlike("^\\d{5}-\\d{4}$"), F.substring(zipcol, 1, 5)) + + # 9 digit with no dash + .when(zipcol.rlike("^\\d{9}$"), F.substring(zipcol, 1, 5)) + + # Null everything else. There are other formats, such as Canadian, the string "Unknown", + # the string "OT", 5 digits then a dash then less than 4 digits, + # and every number of digits from 2-8. + .otherwise(F.lit(None).cast('string')) + ) + + # --------------------VISIT_DETAIL domain + if domain.lower() == "visit_detail": + # Null out the source value + foundry_df = foundry_df.withColumn("visit_detail_source_value", F.lit(None).cast("string")) + + # --------------------NOTE_NLP domain + if domain.lower() == "note_nlp": + # Null out the time values - they are the time of the NLP processing, not the time of the EHR data + foundry_df = foundry_df.withColumn("nlp_date", F.lit(None).cast("date")) + foundry_df = foundry_df.withColumn("nlp_datetime", F.lit(None).cast("timestamp")) + + # Split + foundry_df = foundry_df.withColumn("term_modifiers_split", F.split("term_modifiers", "[,;]")) + + foundry_df = foundry_df.withColumn("term_modifier_certainty", F.filter("term_modifiers_split", lambda col: col.startswith("certainty="))) + foundry_df = foundry_df.withColumn("term_modifier_subject", F.filter("term_modifiers_split", lambda col: col.startswith("subject=") | col.startswith("experiencer="))) + foundry_df = foundry_df.withColumn("term_modifier_status", F.filter("term_modifiers_split", lambda col: col.startswith("status="))) + + foundry_df = foundry_df.withColumn("term_modifier_certainty", F.split(F.col("term_modifier_certainty").getItem(0), "=").getItem(1)) + foundry_df = foundry_df.withColumn("term_modifier_subject", F.split(F.col("term_modifier_subject").getItem(0), "=").getItem(1)) + foundry_df = foundry_df.withColumn("term_modifier_status", F.split(F.col("term_modifier_status").getItem(0), "=").getItem(1)) + + foundry_df = foundry_df.drop("term_modifiers_split") + + # --------------------DEVICE_EXPOSURE domain + if domain.lower() == "device_exposure": + foundry_df = foundry_df.withColumn("device_source_value", F.lit(None).cast("string")) + + # --------------------ALL domains + # Remove bad rows + if len(bad_rows_dfs) > 0: + removed_rows_df = D.union_many(*bad_rows_dfs, spark_session=ctx.spark_session) + # Remove dupes -- some rows may be caught by more than one check, but we only want to record it once + removed_rows_df = removed_rows_df.dropDuplicates([p_key]) + foundry_df = foundry_df.join(removed_rows_df.select(p_key), + on=p_key, + how="left_anti") + removed_rows.write_dataframe(removed_rows_df) + + else: + removed_rows.write_dataframe(ctx.spark_session.createDataFrame([], schema=foundry_df.schema)) + + if domain.lower() == "death": + foundry_df = foundry_df.drop(p_key) + + processed.write_dataframe(foundry_df) diff --git a/pipeline_logic/v2/shared-logic/src/source_cdm_utils/schema.py b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/schema.py new file mode 100644 index 0000000..a823053 --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/schema.py @@ -0,0 +1,555 @@ +from pyspark.sql import functions as F, types as T + + +# Used in step 2 of all CDMs +def apply_schema(df, schema_dict, include_payload=True): + # Convert empty strings to null values + exprs = [F.when(F.col(x) != "", F.col(x)).otherwise(None).alias(x) for x in df.columns] + df = df.select(*exprs) + + # Make sure all column names are uppercase + for original_col_name in df.columns: + df = df.withColumnRenamed(original_col_name, original_col_name.upper()) + input_cols = df.columns + + # Iterate through all columns in OMOP domain schema + # Cast those that are present to proper types, or create empty columns + for col_name, col_type in schema_dict.items(): + if col_name in input_cols: + if col_type == T.DateType() or col_type == T.TimestampType(): + # Handle dates/datetimes in Unix timestamp format or in regular string timestamp format + df = df.withColumn(col_name, F.coalesce(F.from_unixtime(df[col_name]), df[col_name])) + + # Handle Oracle format (site 117) + df = df.withColumn( + col_name, + F.when( + df[col_name].rlike("^\\d{2}-[A-Za-z]{3}-\\d{2} \\d{2}\\.\\d{2}\\.\\d{2}\\.\\d{3,9} (AM|PM)$"), + F.from_unixtime(F.unix_timestamp(df[col_name], format='dd-MMM-yy hh.mm.ss.SSS a')) + ).otherwise(df[col_name]) + ) + + # Handle site 84's format + df = df.withColumn( + col_name, + F.when( + df[col_name].rlike("^\\d{1,2}\\/\\d{1,2}\\/\\d{4} \\d{1,2}:\\d{2}:\\d{2} (AM|PM)$"), + F.from_unixtime(F.unix_timestamp(df[col_name], format='MM/dd/yy hh:mm:ss a')) + ).otherwise(df[col_name]) + ) + + df = df.withColumn(col_name, df[col_name].cast(col_type)) + + else: + # Populate column with null if it isn't in input table + df = df.withColumn(col_name, F.lit(None).cast(col_type)) + + # Reorder columns according to schema. Make sure we pass-through the payload name column + col_list = list(schema_dict.keys()) + if include_payload: + col_list.append("PAYLOAD") + df = df.select(col_list) + + # Rename columns to lowercase + for original_col_name in df.columns: + df = df.withColumnRenamed(original_col_name, original_col_name.lower()) + + return df + + +# Create column to store an id for each site, to be used in SQL code for primary key generation +def add_site_id_col(df, site_id_df): + site_id_df = site_id_df.dataframe() + site_id = site_id_df.head().data_partner_id # Noqa + + df = df.withColumn("data_partner_id", F.lit(site_id)) + df = df.withColumn("data_partner_id", df["data_partner_id"].cast(T.IntegerType())) + return df + + +# Helper functions + +def schema_dict_to_struct(schema_dict, all_string_type): + field_list = [] + for col_name, col_type in schema_dict.items(): + if all_string_type: + field_list.append(T.StructField(col_name, T.StringType())) + else: + field_list.append(T.StructField(col_name, col_type)) + + struct_schema = T.StructType(field_list) + return struct_schema + + +def schema_dict_all_string_type(schema_dict, all_lowercase=False, add_payload=False): + result = {} + + for col_name in schema_dict.keys(): + if all_lowercase: + col_name = col_name.lower() + result[col_name] = T.StringType() + + if add_payload: + result["payload"] = T.StringType() + + return result + + +omop_complete = { + "care_site": { + "CARE_SITE_ID": T.LongType(), + "CARE_SITE_NAME": T.StringType(), + "PLACE_OF_SERVICE_CONCEPT_ID": T.IntegerType(), + "LOCATION_ID": T.LongType(), + "CARE_SITE_SOURCE_VALUE": T.StringType(), + "PLACE_OF_SERVICE_SOURCE_VALUE": T.StringType(), + }, + "condition_era": { + "CONDITION_ERA_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "CONDITION_CONCEPT_ID": T.IntegerType(), + "CONDITION_ERA_START_DATE": T.DateType(), + "CONDITION_ERA_END_DATE": T.DateType(), + "CONDITION_OCCURRENCE_COUNT": T.IntegerType(), + }, + "condition_occurrence": { + "CONDITION_OCCURRENCE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "CONDITION_CONCEPT_ID": T.IntegerType(), + "CONDITION_START_DATE": T.DateType(), + "CONDITION_START_DATETIME": T.TimestampType(), + "CONDITION_END_DATE": T.DateType(), + "CONDITION_END_DATETIME": T.TimestampType(), + "CONDITION_TYPE_CONCEPT_ID": T.IntegerType(), + "STOP_REASON": T.StringType(), + "PROVIDER_ID": T.LongType(), + "VISIT_OCCURRENCE_ID": T.LongType(), + "VISIT_DETAIL_ID": T.LongType(), + "CONDITION_SOURCE_VALUE": T.StringType(), + "CONDITION_SOURCE_CONCEPT_ID": T.IntegerType(), + "CONDITION_STATUS_SOURCE_VALUE": T.StringType(), + "CONDITION_STATUS_CONCEPT_ID": T.IntegerType(), + }, + "control_map": { + "CONTROL_MAP_ID": T.LongType(), + "CASE_PERSON_ID": T.LongType(), + "BUDDY_NUM": T.IntegerType(), + "CONTROL_PERSON_ID": T.LongType(), + "CASE_AGE": T.IntegerType(), + "CASE_SEX": T.StringType(), + "CASE_RACE": T.StringType(), + "CASE_ETHN": T.StringType(), + "CONTROL_AGE": T.IntegerType(), + "CONTROL_SEX": T.StringType(), + "CONTROL_RACE": T.StringType(), + "CONTROL_ETHN": T.StringType() + }, + "death": { + "PERSON_ID": T.LongType(), + "DEATH_DATE": T.DateType(), + "DEATH_DATETIME": T.TimestampType(), + "DEATH_TYPE_CONCEPT_ID": T.IntegerType(), + "CAUSE_CONCEPT_ID": T.IntegerType(), + "CAUSE_SOURCE_VALUE": T.StringType(), + "CAUSE_SOURCE_CONCEPT_ID": T.IntegerType(), + }, + "device_exposure": { + "DEVICE_EXPOSURE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "DEVICE_CONCEPT_ID": T.IntegerType(), + "DEVICE_EXPOSURE_START_DATE": T.DateType(), + "DEVICE_EXPOSURE_START_DATETIME": T.TimestampType(), + "DEVICE_EXPOSURE_END_DATE": T.DateType(), + "DEVICE_EXPOSURE_END_DATETIME": T.TimestampType(), + "DEVICE_TYPE_CONCEPT_ID": T.IntegerType(), + "UNIQUE_DEVICE_ID": T.StringType(), + "QUANTITY": T.IntegerType(), + "PROVIDER_ID": T.LongType(), + "VISIT_OCCURRENCE_ID": T.LongType(), + "VISIT_DETAIL_ID": T.LongType(), + "DEVICE_SOURCE_VALUE": T.StringType(), + "DEVICE_SOURCE_CONCEPT_ID": T.IntegerType(), + }, + "dose_era": { + "DOSE_ERA_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "DRUG_CONCEPT_ID": T.IntegerType(), + "UNIT_CONCEPT_ID": T.IntegerType(), + "DOSE_VALUE": T.FloatType(), + "DOSE_ERA_START_DATE": T.DateType(), + "DOSE_ERA_END_DATE": T.DateType(), + }, + "drug_era": { + "DRUG_ERA_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "DRUG_CONCEPT_ID": T.IntegerType(), + "DRUG_ERA_START_DATE": T.DateType(), + "DRUG_ERA_END_DATE": T.DateType(), + "DRUG_EXPOSURE_COUNT": T.IntegerType(), + "GAP_DAYS": T.IntegerType(), + }, + "drug_exposure": { + "DRUG_EXPOSURE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "DRUG_CONCEPT_ID": T.IntegerType(), + "DRUG_EXPOSURE_START_DATE": T.DateType(), + "DRUG_EXPOSURE_START_DATETIME": T.TimestampType(), + "DRUG_EXPOSURE_END_DATE": T.DateType(), + "DRUG_EXPOSURE_END_DATETIME": T.TimestampType(), + "VERBATIM_END_DATE": T.DateType(), + "DRUG_TYPE_CONCEPT_ID": T.IntegerType(), + "STOP_REASON": T.StringType(), + "REFILLS": T.IntegerType(), + "QUANTITY": T.FloatType(), + "DAYS_SUPPLY": T.IntegerType(), + "SIG": T.StringType(), + "ROUTE_CONCEPT_ID": T.IntegerType(), + "LOT_NUMBER": T.StringType(), + "PROVIDER_ID": T.LongType(), + "VISIT_OCCURRENCE_ID": T.LongType(), + "VISIT_DETAIL_ID": T.LongType(), + "DRUG_SOURCE_VALUE": T.StringType(), + "DRUG_SOURCE_CONCEPT_ID": T.IntegerType(), + "ROUTE_SOURCE_VALUE": T.StringType(), + "DOSE_UNIT_SOURCE_VALUE": T.StringType(), + }, + "location": { + "LOCATION_ID": T.LongType(), + "ADDRESS_1": T.StringType(), + "ADDRESS_2": T.StringType(), + "CITY": T.StringType(), + "STATE": T.StringType(), + "ZIP": T.StringType(), + "COUNTY": T.StringType(), + "LOCATION_SOURCE_VALUE": T.StringType(), + }, + "measurement": { + "MEASUREMENT_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "MEASUREMENT_CONCEPT_ID": T.IntegerType(), + "MEASUREMENT_DATE": T.DateType(), + "MEASUREMENT_DATETIME": T.TimestampType(), + "MEASUREMENT_TIME": T.StringType(), + "MEASUREMENT_TYPE_CONCEPT_ID": T.IntegerType(), + "OPERATOR_CONCEPT_ID": T.IntegerType(), + "VALUE_AS_NUMBER": T.DoubleType(), + "VALUE_AS_CONCEPT_ID": T.IntegerType(), + "UNIT_CONCEPT_ID": T.IntegerType(), + "RANGE_LOW": T.FloatType(), + "RANGE_HIGH": T.FloatType(), + "PROVIDER_ID": T.LongType(), + "VISIT_OCCURRENCE_ID": T.LongType(), + "VISIT_DETAIL_ID": T.LongType(), + "MEASUREMENT_SOURCE_VALUE": T.StringType(), + "MEASUREMENT_SOURCE_CONCEPT_ID": T.IntegerType(), + "UNIT_SOURCE_VALUE": T.StringType(), + "VALUE_SOURCE_VALUE": T.StringType(), + }, + "note": { + "NOTE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "NOTE_DATE": T.DateType(), + "NOTE_DATETIME": T.TimestampType(), + "NOTE_TYPE_CONCEPT_ID": T.IntegerType(), + "NOTE_CLASS_CONCEPT_ID": T.IntegerType(), + "NOTE_TITLE": T.StringType(), + "NOTE_TEXT": T.StringType(), + "ENCODING_CONCEPT_ID": T.IntegerType(), + "LANGUAGE_CONCEPT_ID": T.IntegerType(), + "PROVIDER_ID": T.LongType(), + "VISIT_OCCURRENCE_ID": T.LongType(), + "VISIT_DETAIL_ID": T.LongType(), + "NOTE_SOURCE_VALUE": T.StringType(), + }, + "note_nlp": { + "NOTE_NLP_ID": T.LongType(), + "NOTE_ID": T.LongType(), + "SECTION_CONCEPT_ID": T.IntegerType(), + "SNIPPET": T.StringType(), + "OFFSET": T.StringType(), + "LEXICAL_VARIANT": T.StringType(), + "NOTE_NLP_CONCEPT_ID": T.IntegerType(), + "NOTE_NLP_SOURCE_CONCEPT_ID": T.IntegerType(), + "NLP_SYSTEM": T.StringType(), + "NLP_DATE": T.DateType(), + "NLP_DATETIME": T.TimestampType(), + "TERM_EXISTS": T.BooleanType(), + "TERM_TEMPORAL": T.StringType(), + "TERM_MODIFIERS": T.StringType(), + }, + "observation": { + "OBSERVATION_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "OBSERVATION_CONCEPT_ID": T.IntegerType(), + "OBSERVATION_DATE": T.DateType(), + "OBSERVATION_DATETIME": T.TimestampType(), + "OBSERVATION_TYPE_CONCEPT_ID": T.IntegerType(), + "VALUE_AS_NUMBER": T.DoubleType(), + "VALUE_AS_STRING": T.StringType(), + "VALUE_AS_CONCEPT_ID": T.IntegerType(), + "QUALIFIER_CONCEPT_ID": T.IntegerType(), + "UNIT_CONCEPT_ID": T.IntegerType(), + "PROVIDER_ID": T.LongType(), + "VISIT_OCCURRENCE_ID": T.LongType(), + "VISIT_DETAIL_ID": T.LongType(), + "OBSERVATION_SOURCE_VALUE": T.StringType(), + "OBSERVATION_SOURCE_CONCEPT_ID": T.IntegerType(), + "UNIT_SOURCE_VALUE": T.StringType(), + "QUALIFIER_SOURCE_VALUE": T.StringType(), + }, + "observation_period": { + "OBSERVATION_PERIOD_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "OBSERVATION_PERIOD_START_DATE": T.DateType(), + "OBSERVATION_PERIOD_END_DATE": T.DateType(), + "PERIOD_TYPE_CONCEPT_ID": T.IntegerType(), + }, + 'payer_plan_period': { + 'PAYER_PLAN_PERIOD_ID': T.LongType(), + 'PERSON_ID': T.LongType(), + 'PAYER_PLAN_PERIOD_START_DATE': T.DateType(), + 'PAYER_PLAN_PERIOD_END_DATE': T.DateType(), + 'PAYER_CONCEPT_ID': T.IntegerType(), + 'PAYER_SOURCE_VALUE': T.StringType(), + 'PAYER_SOURCE_CONCEPT_ID': T.IntegerType(), + 'PLAN_CONCEPT_ID': T.IntegerType(), + 'PLAN_SOURCE_VALUE': T.StringType(), + 'PLAN_SOURCE_CONCEPT_ID': T.IntegerType(), + 'SPONSOR_CONCEPT_ID': T.IntegerType(), + 'SPONSOR_SOURCE_VALUE': T.StringType(), + 'SPONSOR_SOURCE_CONCEPT_ID': T.IntegerType(), + 'FAMILY_SOURCE_VALUE': T.StringType(), + 'STOP_REASON_CONCEPT_ID': T.IntegerType(), + 'STOP_REASON_SOURCE_VALUE': T.StringType(), + 'STOP_REASON_SOURCE_CONCEPT_ID': T.IntegerType(), + }, + "person": { + "PERSON_ID": T.LongType(), + "GENDER_CONCEPT_ID": T.IntegerType(), + "YEAR_OF_BIRTH": T.IntegerType(), + "MONTH_OF_BIRTH": T.IntegerType(), + "DAY_OF_BIRTH": T.IntegerType(), + "BIRTH_DATETIME": T.TimestampType(), + "RACE_CONCEPT_ID": T.IntegerType(), + "ETHNICITY_CONCEPT_ID": T.IntegerType(), + "LOCATION_ID": T.LongType(), + "PROVIDER_ID": T.LongType(), + "CARE_SITE_ID": T.LongType(), + "PERSON_SOURCE_VALUE": T.StringType(), + "GENDER_SOURCE_VALUE": T.StringType(), + "GENDER_SOURCE_CONCEPT_ID": T.IntegerType(), + "RACE_SOURCE_VALUE": T.StringType(), + "RACE_SOURCE_CONCEPT_ID": T.IntegerType(), + "ETHNICITY_SOURCE_VALUE": T.StringType(), + "ETHNICITY_SOURCE_CONCEPT_ID": T.IntegerType(), + }, + "procedure_occurrence": { + "PROCEDURE_OCCURRENCE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "PROCEDURE_CONCEPT_ID": T.IntegerType(), + "PROCEDURE_DATE": T.DateType(), + "PROCEDURE_DATETIME": T.TimestampType(), + "PROCEDURE_TYPE_CONCEPT_ID": T.IntegerType(), + "MODIFIER_CONCEPT_ID": T.IntegerType(), + "QUANTITY": T.IntegerType(), + "PROVIDER_ID": T.LongType(), + "VISIT_OCCURRENCE_ID": T.LongType(), + "VISIT_DETAIL_ID": T.LongType(), + "PROCEDURE_SOURCE_VALUE": T.StringType(), + "PROCEDURE_SOURCE_CONCEPT_ID": T.IntegerType(), + "MODIFIER_SOURCE_VALUE": T.StringType(), + }, + "provider": { + "PROVIDER_ID": T.LongType(), + "PROVIDER_NAME": T.StringType(), + "NPI": T.StringType(), + "DEA": T.StringType(), + "SPECIALTY_CONCEPT_ID": T.IntegerType(), + "CARE_SITE_ID": T.LongType(), + "YEAR_OF_BIRTH": T.IntegerType(), + "GENDER_CONCEPT_ID": T.IntegerType(), + "PROVIDER_SOURCE_VALUE": T.StringType(), + "SPECIALTY_SOURCE_VALUE": T.StringType(), + "SPECIALTY_SOURCE_CONCEPT_ID": T.IntegerType(), + "GENDER_SOURCE_VALUE": T.StringType(), + "GENDER_SOURCE_CONCEPT_ID": T.IntegerType(), + }, + "visit_detail": { + "VISIT_DETAIL_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "VISIT_DETAIL_CONCEPT_ID": T.IntegerType(), + "VISIT_DETAIL_START_DATE": T.DateType(), + "VISIT_DETAIL_START_DATETIME": T.TimestampType(), + "VISIT_DETAIL_END_DATE": T.DateType(), + "VISIT_DETAIL_END_DATETIME": T.TimestampType(), + "VISIT_DETAIL_TYPE_CONCEPT_ID": T.IntegerType(), + "PROVIDER_ID": T.LongType(), + "CARE_SITE_ID": T.LongType(), + "VISIT_DETAIL_SOURCE_VALUE": T.StringType(), + "VISIT_DETAIL_SOURCE_CONCEPT_ID": T.IntegerType(), + "ADMITTING_SOURCE_VALUE": T.StringType(), + "ADMITTING_SOURCE_CONCEPT_ID": T.IntegerType(), + "DISCHARGE_TO_SOURCE_VALUE": T.StringType(), + "DISCHARGE_TO_CONCEPT_ID": T.IntegerType(), + "PRECEDING_VISIT_DETAIL_ID": T.LongType(), + "VISIT_DETAIL_PARENT_ID": T.LongType(), + "VISIT_OCCURRENCE_ID": T.LongType() + }, + "visit_occurrence": { + "VISIT_OCCURRENCE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "VISIT_CONCEPT_ID": T.IntegerType(), + "VISIT_START_DATE": T.DateType(), + "VISIT_START_DATETIME": T.TimestampType(), + "VISIT_END_DATE": T.DateType(), + "VISIT_END_DATETIME": T.TimestampType(), + "VISIT_TYPE_CONCEPT_ID": T.IntegerType(), + "PROVIDER_ID": T.LongType(), + "CARE_SITE_ID": T.LongType(), + "VISIT_SOURCE_VALUE": T.StringType(), + "VISIT_SOURCE_CONCEPT_ID": T.IntegerType(), + "ADMITTING_SOURCE_CONCEPT_ID": T.IntegerType(), + "ADMITTING_SOURCE_VALUE": T.StringType(), + "DISCHARGE_TO_CONCEPT_ID": T.IntegerType(), + "DISCHARGE_TO_SOURCE_VALUE": T.StringType(), + "PRECEDING_VISIT_OCCURRENCE_ID": T.LongType(), + }, +} + + +omop_required = { + "care_site": { + "CARE_SITE_ID": T.LongType(), + }, + "condition_era": { + "CONDITION_ERA_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "CONDITION_CONCEPT_ID": T.IntegerType(), + "CONDITION_ERA_START_DATE": T.DateType(), + "CONDITION_ERA_END_DATE": T.DateType(), + }, + "condition_occurrence": { + "CONDITION_OCCURRENCE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "CONDITION_CONCEPT_ID": T.IntegerType(), + "CONDITION_START_DATE": T.DateType(), + "CONDITION_TYPE_CONCEPT_ID": T.IntegerType(), + }, + "control_map": { + "CONTROL_MAP_ID": T.LongType(), + "CASE_PERSON_ID": T.LongType(), + "BUDDY_NUM": T.IntegerType() + }, + "death": { + "PERSON_ID": T.LongType(), + "DEATH_DATE": T.DateType(), + }, + "device_exposure": { + "DEVICE_EXPOSURE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "DEVICE_CONCEPT_ID": T.IntegerType(), + "DEVICE_EXPOSURE_START_DATE": T.DateType(), + "DEVICE_TYPE_CONCEPT_ID": T.IntegerType(), + }, + "dose_era": { + "DOSE_ERA_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "DRUG_CONCEPT_ID": T.IntegerType(), + "UNIT_CONCEPT_ID": T.IntegerType(), + "DOSE_VALUE": T.FloatType(), + "DOSE_ERA_START_DATE": T.DateType(), + "DOSE_ERA_END_DATE": T.DateType(), + }, + "drug_era": { + "DRUG_ERA_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "DRUG_CONCEPT_ID": T.IntegerType(), + "DRUG_ERA_START_DATE": T.DateType(), + "DRUG_ERA_END_DATE": T.DateType(), + }, + "drug_exposure": { + "DRUG_EXPOSURE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "DRUG_CONCEPT_ID": T.IntegerType(), + "DRUG_EXPOSURE_START_DATE": T.DateType(), + "DRUG_TYPE_CONCEPT_ID": T.IntegerType(), + }, + "location": { + "LOCATION_ID": T.LongType(), + }, + "measurement": { + "MEASUREMENT_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "MEASUREMENT_CONCEPT_ID": T.IntegerType(), + "MEASUREMENT_DATE": T.DateType(), + "MEASUREMENT_TYPE_CONCEPT_ID": T.IntegerType(), + }, + "note": { + # "NOTE_ID": T.LongType(), + # "PERSON_ID": T.LongType(), + # "NOTE_DATE": T.DateType(), + # "NOTE_TYPE_CONCEPT_ID": T.IntegerType(), + # "NOTE_CLASS_TYPE_ID": T.IntegerType(), + # "NOTE_TEXT": T.StringType(), + # "ENCODING_CONCEPT_ID": T.IntegerType(), + # "LANGUAGE_CONCEPT_ID": T.IntegerType(), + }, + "note_nlp": { + # "NOTE_NLP_ID": T.LongType(), + # "NOTE_ID": T.LongType(), + # "LEXICAL_VARIANT": T.StringType(), + # "NLP_DATE": T.DateType(), + }, + "observation": { + "OBSERVATION_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "OBSERVATION_CONCEPT_ID": T.IntegerType(), + "OBSERVATION_DATE": T.DateType(), + "OBSERVATION_TYPE_CONCEPT_ID": T.IntegerType(), + }, + "observation_period": { + "OBSERVATION_PERIOD_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "OBSERVATION_PERIOD_START_DATE": T.DateType(), + "PERIOD_TYPE_CONCEPT_ID": T.IntegerType(), + }, + 'payer_plan_period': { + 'PAYER_PLAN_PERIOD_ID': T.LongType(), + 'PERSON_ID': T.LongType(), + 'PAYER_PLAN_PERIOD_START_DATE': T.DateType(), + 'PAYER_PLAN_PERIOD_END_DATE': T.DateType(), + 'PAYER_CONCEPT_ID': T.IntegerType(), + }, + "person": { + "PERSON_ID": T.LongType(), + "GENDER_CONCEPT_ID": T.IntegerType(), + "YEAR_OF_BIRTH": T.IntegerType(), + "ETHNICITY_CONCEPT_ID": T.IntegerType(), + }, + "procedure_occurrence": { + "PROCEDURE_OCCURRENCE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "PROCEDURE_CONCEPT_ID": T.IntegerType(), + "PROCEDURE_DATE": T.DateType(), + "PROCEDURE_TYPE_CONCEPT_ID": T.IntegerType(), + }, + "provider": { + "PROVIDER_ID": T.LongType(), + }, + "visit_detail": { + "VISIT_DETAIL_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "VISIT_DETAIL_CONCEPT_ID": T.IntegerType(), + "VISIT_DETAIL_START_DATE": T.DateType(), + "VISIT_DETAIL_END_DATE": T.DateType(), + "VISIT_DETAIL_TYPE_CONCEPT_ID": T.IntegerType(), + "VISIT_OCCURRENCE_ID": T.LongType() + }, + "visit_occurrence": { + "VISIT_OCCURRENCE_ID": T.LongType(), + "PERSON_ID": T.LongType(), + "VISIT_CONCEPT_ID": T.IntegerType(), + "VISIT_START_DATE": T.DateType(), + "VISIT_TYPE_CONCEPT_ID": T.IntegerType(), + } +} diff --git a/pipeline_logic/v2/shared-logic/src/source_cdm_utils/status.py b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/status.py new file mode 100644 index 0000000..af5de59 --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/status.py @@ -0,0 +1,30 @@ +from pyspark.sql import types as T, functions as F + +schema = T.StructType([ + T.StructField('parsed_payload', T.StringType()), + T.StructField('unreleased_payload', T.StringType()), + T.StructField('data_partner_id', T.IntegerType()) +]) + + +def payload_status(ctx, start_df, end_df, site_id): + # start_df tells us the payload currently being processed in the transformation pipeline + try: + start_payload = start_df.where(F.col("newest_payload") == True).head().payload # noqa + except AttributeError: + start_payload = "Payload info not available" + + # end_df tells us the payload that has successfully made it through the pipeline + try: + end_payload = end_df.select("payload").distinct().head().payload # noqa + except AttributeError: + end_payload = "Payload info not available" + + data_partner_id = int(site_id.head().data_partner_id) # noqa + + result = ctx.spark_session.createDataFrame( + data=[(start_payload, end_payload, data_partner_id)], + schema=schema + ) + + return result diff --git a/pipeline_logic/v2/shared-logic/src/source_cdm_utils/unzip.py b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/unzip.py new file mode 100644 index 0000000..ac6f013 --- /dev/null +++ b/pipeline_logic/v2/shared-logic/src/source_cdm_utils/unzip.py @@ -0,0 +1,86 @@ +''' + +The get_newest_payload() and extract_filenames() functions are not great and should be re-written someday. +- tschwab + +''' + +import os +import tempfile +import shutil +from zipfile import ZipFile +from pyspark.sql import functions as F, types as T + +# Read and write 100 MB chunks +CHUNK_SIZE = 100 * 1024 * 1024 + + +def unzipLatest(foundryZip, regex, foundryOutput): + fs = foundryZip.filesystem() + files_df = fs.files(regex=regex) + newest_file = get_newest_payload(files_df) + unzip(foundryZip, foundryOutput, newest_file) + + +def unzip(foundryInput, foundryOutput, filename): + inputFS = foundryInput.filesystem() + outputFS = foundryOutput.filesystem() + + # Create a temp file to pass to zip library, because it needs to be able to .seek() + with tempfile.NamedTemporaryFile() as temp: + # Copy contents of file from Foundry into temp file + with inputFS.open(filename, 'rb') as newest: + shutil.copyfileobj(newest, temp) + temp.flush() + + # For each file in the zip, unzip and add it to output dataset + zipObj = ZipFile(temp.name) + for filename in zipObj.namelist(): + with outputFS.open(filename, 'wb') as out: + input_file = zipObj.open(filename) + data = input_file.read(CHUNK_SIZE) + while data: + out.write(data) + data = input_file.read(CHUNK_SIZE) + + +def extract_filenames(ctx, zip_file, regex, payload_filename): + # Get the paths and determine the newest one + fs = zip_file.filesystem() + files_df = fs.files(regex=regex) + newest_file = get_newest_payload(files_df) + files_df = files_df.withColumn("newest_payload", F.when(F.col("path") == newest_file, F.lit(True)).otherwise(F.lit(False))) + + # Get just the filename, not the path + get_basename = F.udf(lambda x: os.path.basename(x), T.StringType()) + ctx.spark_session.udf.register("get_basename", get_basename) + files_df = files_df.withColumn("payload", get_basename(F.col("path"))) + + # Select the needed data and repartition to a single file + result = files_df.select("payload", "newest_payload") + result = result.coalesce(1) + + # Write the result + payload_filename.write_dataframe(result) + + +# Given a filesystem dataframe containing payload zip files, return the most recent payload name +def get_newest_payload(files_df): + # Look for 8 digits at the end of the filepath representing payload date + files_df = files_df.withColumn("payload_date", F.regexp_extract(F.col("path"), "(?i)(\\d{8})(.*)(\\.zip)$", 1)) + + # Handle either yyyyMMDD or MMDDyyyy format + files_df = files_df.withColumn( + "processed_date", + F.when(F.regexp_extract(F.col("path"), "(202.)\\d{4}", 1) == F.lit(""), F.concat(F.col("payload_date").substr(5, 4), F.col("payload_date").substr(1,4)))\ + .otherwise(F.col("payload_date")) + ) + + # If site submitted multiple files on the same day (e.g. "payload_20201015.zip" and "payload_20201015_1.zip", extract the increment + files_df = files_df.withColumn("same_date_increment", F.regexp_extract(F.col("path"), "(?i)(\\d{8})(.*)(\\.zip)$", 2)) + + # Sort by processed payload date, then by increment, then by modified time and grab the most recent payload + files_df = files_df.orderBy(["processed_date", "same_date_increment", "modified"], ascending=False) + newest_file_path = files_df.head().path + + return newest_file_path