forked from DataTalksClub/data-engineering-zoomcamp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
data-loading-parquet.py
86 lines (63 loc) · 2.5 KB
/
data-loading-parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#Cleaned up version of data-loading.ipynb
import argparse, os, sys
from time import time
import pandas as pd
import pyarrow.parquet as pq
from sqlalchemy import create_engine
def main(params):
user = params.user
password = params.password
host = params.host
port = params.port
db = params.db
tb = params.tb
url = params.url
# Get the name of the file from url
file_name = url.rsplit('/', 1)[-1].strip()
print(f'Downloading {file_name} ...')
# Download file from url
os.system(f'curl {url.strip()} -o {file_name}')
print('\n')
# Create SQL engine
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
# Read file based on csv or parquet
if '.csv' in file_name:
df = pd.read_csv(file_name, nrows=10)
df_iter = pd.read_csv(file_name, iterator=True, chunksize=100000)
elif '.parquet' in file_name:
file = pq.ParquetFile(file_name)
df = next(file.iter_batches(batch_size=10)).to_pandas()
df_iter = file.iter_batches(batch_size=100000)
else:
print('Error. Only .csv or .parquet files allowed.')
sys.exit()
# Create the table
df.head(0).to_sql(name=tb, con=engine, if_exists='replace')
# Insert values
t_start = time()
count = 0
for batch in df_iter:
count+=1
if '.parquet' in file_name:
batch_df = batch.to_pandas()
else:
batch_df = batch
print(f'inserting batch {count}...')
b_start = time()
batch_df.to_sql(name=tb, con=engine, if_exists='append')
b_end = time()
print(f'inserted! time taken {b_end-b_start:10.3f} seconds.\n')
t_end = time()
print(f'Completed! Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.')
if __name__ == '__main__':
#Parsing arguments
parser = argparse.ArgumentParser(description='Loading data from .paraquet file link to a Postgres datebase.')
parser.add_argument('--user', help='Username for Postgres.')
parser.add_argument('--password', help='Password to the username for Postgres.')
parser.add_argument('--host', help='Hostname for Postgres.')
parser.add_argument('--port', help='Port for Postgres connection.')
parser.add_argument('--db', help='Databse name for Postgres')
parser.add_argument('--tb', help='Destination table name for Postgres.')
parser.add_argument('--url', help='URL for .paraquet file.')
args = parser.parse_args()
main(args)