-
Notifications
You must be signed in to change notification settings - Fork 1
/
update_database.py
executable file
·181 lines (148 loc) · 5.07 KB
/
update_database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Update the database with a CSV file
Usage:
update_database.py --file <path-to-csv> --table <table> --region <region> [--purge] [--drop]
update_database.py (-h | --help)
update_database.py --version
Options:
-h, --help Show this screen.
--version Show version.
-f, --file <path-to-csv> Path to the CSV file.
-t, --table <table> Name of the database table to import the data into.
-r, --region <region> Name of the region of this data.
-p, --purge Purge the database before import the CSV.
-d, --drop Drop the database table before import the CSV.
"""
import datetime
import csv
import sys
import os
import traceback
import psycopg2
from docopt import docopt
from dotenv import load_dotenv, find_dotenv
print("NODE_ENV", os.getenv('NODE_ENV'))
if os.getenv('NODE_ENV'):
load_dotenv(f".env.{os.getenv('NODE_ENV')}")
else:
load_dotenv(find_dotenv())
arguments = docopt(__doc__, version='Update database with CSV file 1.0')
if not arguments['--table'] in ('calendar', 'station'):
raise ValueError("--table must be either 'calendar' or 'station'")
if not os.path.exists(arguments['--file']):
raise FileNotFoundError(f"--file must be an existing file, {arguments['--file']} not found")
def create_table(cur, table, drop=False, purge=False):
if drop:
cur.execute(f"DROP TABLE IF EXISTS {table};")
if table == 'calendar':
query = f"""
CREATE TABLE IF NOT EXISTS {table} (
region text,
zip text,
area text,
station text,
waste_type text,
col_date timestamptz,
description text,
PRIMARY KEY (region, area, waste_type, col_date)
);
"""
cur.execute(query)
cur.execute(f"CREATE INDEX IF NOT EXISTS idx_region on calendar(region);")
cur.execute(f"CREATE INDEX IF NOT EXISTS idx_waste_type on calendar(waste_type);")
elif table == 'station':
query = f"""
CREATE TABLE IF NOT EXISTS {table} (
region text,
zip text,
name text,
oil boolean,
metal boolean,
glass boolean,
textile boolean,
description text,
PRIMARY KEY (region, name)
);
"""
cur.execute(query)
cur.execute(f"CREATE INDEX IF NOT EXISTS idx_station_region on station(region);")
if purge:
cur.execute(f"TRUNCATE {table};")
def load_csv(cur, path, table):
print(f"Load csv file {path} to {table}")
tmp_table = f"""
CREATE TEMP TABLE tmp_table
ON COMMIT DROP
AS
SELECT *
FROM {table}
WITH NO DATA;
"""
cur.execute(tmp_table)
with open(path, 'r') as f:
copy_sql = f"""
COPY tmp_table FROM STDIN WITH
CSV
HEADER
DELIMITER AS ','
QUOTE '"'
"""
cur.copy_expert(sql=copy_sql, file=f)
if table == 'calendar':
insert_sql = f"""
INSERT INTO calendar
SELECT DISTINCT ON (region, area, waste_type, col_date) *
FROM tmp_table
"""
elif table == 'station':
insert_sql = f"""
INSERT INTO station
SELECT DISTINCT ON (region, name) *
FROM tmp_table
"""
cur.execute(insert_sql)
def cleanup_table(cur, table, region):
cleanup_sql = f"DELETE from {table} where region = '{region}'"
cur.execute(cleanup_sql)
def reindex(cur, table):
if table == 'calendar':
cur.execute("REINDEX INDEX idx_region;")
cur.execute("REINDEX INDEX idx_waste_type;")
elif table == 'station':
cur.execute("REINDEX INDEX idx_station_region;")
try:
DB_URL = os.getenv('DATABASE_URL')
if not DB_URL:
raise Exception("DATABASE_URL not provided")
print("Import database")
# read database connection url from the enivron variable we just set.
conn = None
try:
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# print database info
print('PostgreSQL database version:')
cur.execute('SELECT version()')
db_version = cur.fetchone()
print(db_version)
# create table
create_table(cur, arguments['--table'], purge=arguments['--purge'], drop=arguments['--drop'])
# delete existing entries
cleanup_table(cur, arguments['--table'], arguments['--region'])
# load csvs in temp table
load_csv(cur, arguments['--file'], arguments['--table'])
# commit changes
conn.commit()
# reindex both indices
reindex(cur, arguments['--table'])
cur.close()
finally:
# close the communication with the database server by calling the close()
if conn is not None:
conn.close()
print('Database connection closed.')
except Exception as e:
print("Error: %s" % e, file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
sys.exit(1)