forked from blohre-usgs/pysgcn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
local_pipeline_run.py
107 lines (86 loc) · 3.54 KB
/
local_pipeline_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import requests
import json
from dotenv import load_dotenv, find_dotenv
from pysgcn import bis_pipeline
import pysppin
load_dotenv(find_dotenv())
ch_ledger = 'ledger'
cache_root = ''
def lambda_handler_4(event, context):
message_in = json.loads(event["body"])
run_id = message_in["run_id"]
sb_item_id = message_in["sb_item_id"]
download_uri = message_in["download_uri"]
cache_manager = CacheManager(download_uri)
send_final_result = None
send_to_stage = None
bis_pipeline.process_4(download_uri, ch_ledger, send_final_result, send_to_stage, message_in["payload"], cache_manager)
def lambda_handler_3(event, context):
message_in = json.loads(event["body"])
run_id = message_in["run_id"]
sb_item_id = message_in["sb_item_id"]
download_uri = message_in["download_uri"]
cache_manager = CacheManager(download_uri)
def send_to_stage(data, stage):
json_doc = {
'run_id': run_id,
'sb_item_id': sb_item_id,
'download_uri': download_uri,
'payload': data
}
lambda_handler_4({"body": json.dumps(json_doc)}, {})
def send_final_result(data):
species = data["data"]
row_id = data["row_id"]
cache_manager.add_to_cache("final_res:{}".format(species["sppin_key"]), species)
# cache_manager.add_to_cache(row_id, species)
bis_pipeline.process_3(download_uri, ch_ledger, send_final_result, send_to_stage, message_in["payload"], cache_manager)
def lambda_handler_2(event, context):
message_in = json.loads(event["body"])
run_id = message_in["run_id"]
sb_item_id = message_in["sb_item_id"]
download_uri = message_in["download_uri"]
cache_manager = CacheManager(download_uri)
def send_to_stage(data, stage):
json_doc = {
'run_id': run_id,
'sb_item_id': sb_item_id,
'download_uri': download_uri,
'payload': data
}
lambda_handler_3({"body": json.dumps(json_doc)}, {})
send_final_result = None
num_species = bis_pipeline.process_2(download_uri, ch_ledger, send_final_result, send_to_stage, message_in["payload"], cache_manager)
print('Species count: ', num_species)
def lambda_handler(event, context):
run_id = event["run_id"]
sb_item_id = event["sb_item_id"]
download_uri = event["download_uri"]
cache_manager = CacheManager(download_uri)
def send_to_stage(data, stage):
json_doc = {
'run_id': run_id,
'sb_item_id': sb_item_id,
'download_uri': download_uri,
'payload': data
}
lambda_handler_2({"body": json.dumps(json_doc)}, {})
send_final_result = None
num_process_files = bis_pipeline.process_1(download_uri, ch_ledger, send_final_result, send_to_stage, sb_item_id, cache_manager)
class CacheManager:
def __init__(self, cache_root):
self.cache_folder = "sppin"
self.cache_path = f"{cache_root}/{self.cache_folder}"
self.sql_cache = pysppin.utils.Sql(cache_location=self.cache_path)
self.table_name = 'cache'
def get_from_cache(self, key):
res = self.sql_cache.get_select_records(self.cache_folder, self.table_name, 'key = ?', key)
return res[0]["value"] if res else None
def add_to_cache(self, key, value):
data = {"key": key, "value": value}
return self.sql_cache.insert_record(self.cache_folder, self.table_name, data)
lambda_handler({
"run_id": "ef33db60-543d-11ea-a34e-023f40fa784e",
"sb_item_id": "56d720ece4b015c306f442d5",
"download_uri": cache_root
}, {})