Skip to content

Commit

Permalink
update: save to parquet, upload to aws
Browse files Browse the repository at this point in the history
  • Loading branch information
alanceloth committed Mar 15, 2024
1 parent 7bb8afd commit e42da87
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 12 deletions.
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,35 @@ This project utilizes DuckDB as a local database for development purposes. The r
Following this, the project leverages dbt (data build tool) to retrieve data from the AWS S3 Bucket, perform transformations, and load it into a PostgreSQL for production. Additionally, dbt is responsible for generating a local documentation covering all transformations, schemas, tests, and other relevant information within this layer of the project.

## TODO List

- :white_check_mark: **Create the pydantic GitHub repo with Branch Protection**: Establish the pydantic GitHub repository with Branch Protection; contract modifications require approval.
- :white_check_mark: **Create the pydantic contract**: Define the pydantic contract to govern data handling within the project.
- :white_check_mark: **Create the CI/CD for the contract**: Develop the CI/CD process in the contract GitHub repository.
- :white_check_mark: Add the contract as a submodule to the main repo.
- :white_check_mark: Configure the CI/CD using the GitHub Actions.
- :white_check_mark: Test the Workflow.
- :white_check_mark: **Add Sentry as the Observability tool**: Add Sentry to the project
- :white_check_mark: **Create the Streamlit page to upload CSV files**: Develop a Streamlit page to streamline the process of uploading CSV files.
- :white_check_mark: Create the streamlit frontend to upload files
- :white_check_mark: Create the backend to process the uploaded csv files and check if the schema are corret using the pydantic contract
- :white_check_mark: Create the app.py to execute the application
- :white_check_mark: Test the upload
- :white_check_mark: **Transform the CSV files into Parquet files**: Implement the necessary procedures to transform CSV files into Parquet files.
- [ ] **Save the Parquet files into AWS S3 Bucket**: Set up mechanisms to save the Parquet files into the designated AWS S3 Bucket.
- [ ] **Establish the dbt project**: Initiate the creation of the dbt project for seamless data management.
- [ ] **Extract data from AWS S3 Bucket Parquet files into DuckDB using dbt**: Utilize dbt to extract and process data from AWS S3 Bucket Parquet files into DuckDB.
- [ ] **Create the transactions table from the appended files**: Develop the transactions table based on the processed files.
- [ ] **Create the transaction table within the dbt project**: Establish the transaction table structure within the dbt project.
- [ ] **Define the transaction table schema in the dbt project (including tests for each column)**: Specify the schema for the transaction table in the dbt project, including comprehensive tests for each column.
- [ ] **Export the transactions table to a Parquet file in the AWS S3 Bucket**: Implement procedures to export the transactions table to a Parquet file within the AWS S3 Bucket.
- [ ] **Initial testing and building of the dbt project**: Conduct initial testing and building phases for the dbt project.
- [ ] **Decompose the transaction table into Fact and Dimension tables**: Break down the transaction table into separate Fact and Dimension tables.
- [ ] **Create schemas for Fact and Dimension tables in the dbt project (including tests for each column)**: Develop schemas for Fact and Dimension tables within the dbt project, accompanied by thorough tests for each column.
- [ ] **Second round of testing and building for the dbt project**: Conduct a second round of testing and building for the dbt project.
- [ ] **Load the Fact and Dimension tables into PostgreSQL**: Implement processes to load the Fact and Dimension tables into PostgreSQL.
- [ ] **Update dbt documentation with comprehensive details related to the project**: Enhance the dbt documentation with detailed information regarding the entire project.
- [ ] **Publish the dbt documentation on GitHub Pages**: Make the dbt documentation accessible by publishing it on GitHub Pages.
- [ ] **Create a PowerBI Dashboard using the data**: Develop a PowerBI Dashboard utilizing the processed data.
- [ ] **Create a Jupyter Notebook using the data**: Generate a Jupyter Notebook incorporating the processed data.



Expand Down
15 changes: 9 additions & 6 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from frontend import CSVValidatorUI
from backend import process_csv, csv_to_sql
from backend import process_csv, df_to_sql, save_parquet_to_aws, csv_to_parquet
import logging
import sentry_sdk

import sentry_sdk
from datetime import datetime

sentry_sdk.init(
dsn="https://998b620bfe94c8f55bd0640e8ea26ee0@o4506680328323072.ingest.sentry.io/4506680337104896",
Expand All @@ -16,6 +15,7 @@
profiles_sample_rate=1.0,
)


def main():
ui = CSVValidatorUI()
ui.display_header()
Expand All @@ -31,10 +31,13 @@ def main():
logging.error("Error: %s", error)
sentry_sdk.capture_message("Error: %s", error)
elif ui.display_save_button():
csv_to_sql(df)
df_to_sql(df)
ui.display_success_message()
logging.info("Data saved in the database")
sentry_sdk.capture_message("Data saved in the database")
logging.info("Data saved in the PostgreSQL database")
sentry_sdk.capture_message("Data saved in the PostgreSQL database")
parquet_file = csv_to_parquet(upload_file)
save_parquet_to_aws(parquet_file, "datawarehouse-duckdb-alanceloth", f"sales/transactions{datetime.now()}.parquet")


if __name__ == "__main__":
main()
Expand Down
127 changes: 122 additions & 5 deletions src/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@
import os
import sys
from dotenv import load_dotenv
import duckdb as db
import boto3
from loguru import logger

root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(root_path)

from contract.contract.contract import Transaction

# Inicialização do Loguru
log_directory = "../log"
if not os.path.exists(log_directory):
os.makedirs(log_directory)

log_file_path = os.path.join(log_directory, 'backend_log.log')

logger.add(log_file_path, rotation="500 MB", level="INFO")

logger.info(f"Process started at {pd.Timestamp.now()}")

load_dotenv("keys/.env")

Expand All @@ -19,38 +30,144 @@
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
POSTGRES_DB = os.getenv("POSTGRES_DB")

AWS_REGION= os.getenv("AWS_REGION")
AWS_ACCESS_KEY= os.getenv("AWS_ACCESS_KEY")
AWS_SECRET = os.getenv("AWS_SECRET")


DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"

def process_csv(uploaded_file):
"""
Processes a CSV file containing transaction data.
Args:
uploaded_file: The path to the uploaded CSV file.
Returns:
tuple: A tuple containing three elements:
- If the CSV file is successfully processed, a DataFrame containing the data, a boolean indicating success (True), and None.
- If the CSV file contains extra columns or missing columns, a boolean indicating failure (False), and an error message explaining the issue.
- If an unexpected error occurs during processing, a boolean indicating failure (False), and an error message describing the error.
Notes:
This function reads the uploaded CSV file into a DataFrame and validates its structure against the fields expected by the Transaction model.
If the CSV file contains extra columns or missing columns, the function returns an error message indicating the issue.
If the CSV file is successfully processed, it constructs Transaction objects from each row and returns a DataFrame containing the data.
Raises:
ValueError: If there's an issue with the structure of the CSV file, such as extra or missing columns.
Any other exceptions raised during processing.
"""
try:
df = pd.read_csv(uploaded_file)

extra_cols = set(df.columns) - set(Transaction.model_fields.keys())
if extra_cols:
logger.error(f"CSV file contains extra columns: {', '.join(extra_cols)}")
return False, f"CSV file contains extra columns: {', '.join(extra_cols)}"

missing_cols = set(Transaction.model_fields.keys()) - set(df.columns)
if missing_cols:
logger.error(f"CSV file is missing columns: {', '.join(missing_cols)}")
return False, f"CSV file is missing columns: {', '.join(missing_cols)}"

#df = df.rename(columns=contract.model_fields)

for index, row in df.iterrows():
try:
_ = Transaction(**row.to_dict())
except Exception as e:
logger.error(f"Row {index + 1}: {e}")
raise ValueError(f"Row {index + 1}: {e}")

return df, True, None

except ValueError as ve:
logger.error(str(ve))
return df, False, str(ve)
except Exception as e:
logger.error(f"Unexpected Error: {str(e)}")
return df, False, f"Unexpected Error: {str(e)}"

def csv_to_sql(df):


def df_to_sql(df):
"""
Converts a DataFrame to a SQL table.
Args:
df (DataFrame): The DataFrame to be converted.
Returns:
tuple: A tuple containing two elements:
- A boolean indicating whether the conversion was successful (True) or not (False).
- If the conversion failed, a string describing the error encountered. If successful, None.
Notes:
This function converts the provided DataFrame to a SQL table named "transactions".
It replaces any existing table with the same name if it exists.
Raises:
Any exceptions raised during the conversion process.
"""
try:
df.to_sql(name="transactions", con=DATABASE_URL, if_exists="replace", index=False)
logger.info("DataFrame converted to SQL table successfully.")
return True, None
except Exception as e:
return False, str(e)
logger.error(f"Failed to convert DataFrame to SQL table: {str(e)}")
return False, str(e)


def csv_to_parquet(csv_path: str) -> str:
"""
Transforms a CSV file into a Parquet file using DuckDB.
Args:
csv_path (str): The path to the CSV file to be converted.
Returns:
str: The path to the generated Parquet file.
Notes:
This function uses DuckDB to read the CSV file and convert it into a Parquet file.
The returned path points to the location of the generated Parquet file.
Raises:
Any exceptions raised during the conversion process.
"""
try:
parquet_file = db.read_csv(csv_path).to_parquet()
logger.info(f"CSV file converted to Parquet: {parquet_file}")
return parquet_file
except Exception as e:
logger.error(f"Failed to convert CSV to Parquet: {str(e)}")
raise e


def save_parquet_to_aws(parquet_file: str, bucket_name: str, bucket_path: str):
"""
Saves a Parquet file to an Amazon S3 bucket.
Args:
parquet_file (str): The local path to the Parquet file to be uploaded.
bucket_name (str): The name of the S3 bucket where the file will be uploaded.
bucket_path (str): The key (path) under which the file will be stored in the S3 bucket.
Returns:
None
Raises:
FileNotFoundError: If the specified Parquet file does not exist.
Exception: If any other error occurs during the upload process.
"""
s3 = boto3.client('s3',
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET)

try:
s3.upload_file(parquet_file, bucket_name, bucket_path)
logger.info(f"File {parquet_file} successfully sent to bucket {bucket_name}.")
except FileNotFoundError:
logger.error(f"File {parquet_file} not found.")
except Exception as e:
logger.error(f"An error occurred while sending the file to S3: {e}")

0 comments on commit e42da87

Please sign in to comment.