-
Notifications
You must be signed in to change notification settings - Fork 17
/
tap_fixerio.py
116 lines (91 loc) · 3.45 KB
/
tap_fixerio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python3
import json
import sys
import argparse
import time
import requests
import singer
import backoff
from datetime import date, datetime, timedelta
REQUIRED_CONFIG_KEYS = [
"access_key",
]
base_url = 'https://data.fixer.io/api/'
logger = singer.get_logger()
session = requests.Session()
DATE_FORMAT='%Y-%m-%d'
def parse_response(r):
flattened = r['rates']
flattened[r['base']] = 1.0
flattened['date'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.strptime(r['date'], DATE_FORMAT))
return flattened
schema = {'type': 'object',
'properties':
{'date': {'type': 'string',
'format': 'date-time'}},
'additionalProperties': True}
def giveup(error):
logger.error(error.response.text)
response = error.response
return not (response.status_code == 429 or
response.status_code >= 500)
@backoff.on_exception(backoff.constant,
(requests.exceptions.RequestException),
jitter=backoff.random_jitter,
max_tries=5,
giveup=giveup,
interval=30)
def request(url, params):
response = requests.get(url=url, params=params)
response.raise_for_status()
return response
def do_sync(base, start_date, access_key, symbols=None):
logger.info('Replicating exchange rate data from fixer.io starting from {}'.format(start_date))
singer.write_schema('exchange_rate', schema, 'date')
state = {'start_date': start_date}
next_date = start_date
try:
while True:
if symbols:
response = request(base_url + next_date, {'base': base, 'access_key': access_key, 'symbols': ','.join(symbols)})
else:
response = request(base_url + next_date, {'base': base, 'access_key': access_key})
payload = response.json()
if datetime.strptime(next_date, DATE_FORMAT) > datetime.utcnow():
break
elif payload.get('error'):
raise RuntimeError(payload['error'])
else:
singer.write_records('exchange_rate', [parse_response(payload)])
state = {'start_date': next_date}
next_date = (datetime.strptime(next_date, DATE_FORMAT) + timedelta(days=1)).strftime(DATE_FORMAT)
except requests.exceptions.RequestException as e:
logger.fatal('Error on ' + e.request.url +
'; received status ' + str(e.response.status_code) +
': ' + e.response.text)
singer.write_state(state)
sys.exit(-1)
singer.write_state(state)
logger.info('Tap exiting normally')
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-c', '--config', help='Config file', required=False)
parser.add_argument(
'-s', '--state', help='State file', required=False)
#args = parser.parse_args()
args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
if args.config:
config = args.config
else:
config = {}
if args.state:
state = args.state
else:
state = {}
start_date = state.get('start_date',
config.get('start_date', datetime.utcnow().strftime(DATE_FORMAT)))
access_key = state.get('access_key', config.get('access_key'))
do_sync(config.get('base', 'USD'), start_date, access_key, symbols=config.get('symbols', None))
if __name__ == '__main__':
main()