-
Notifications
You must be signed in to change notification settings - Fork 0
/
resolverlite_lambda.py
109 lines (91 loc) · 3.84 KB
/
resolverlite_lambda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# -*- coding: utf-8 -*-
__author__ = "SAI"
__status__ = "Dev"
import asyncio
import datetime
import ujson
import uvloop
from aiofiles import open as aiofiles_open
from os import unlink
from lib.workers import get_async_writer, create_io_reader, TargetReader, TaskProducer, Executor, OutputPrinter, \
TargetWorker
from gzip import compress as gzip_compress
from lib.core import Stats
from lib.yandex import parse_args_env
async def main(event, context):
target_settings, config, s3_config, sqs_config = await parse_args_env(event)
queue_input = asyncio.Queue()
queue_tasks = asyncio.Queue()
queue_prints = asyncio.Queue()
task_semaphore = asyncio.Semaphore(config.senders)
statistics = Stats() if config.statistics else None
async with aiofiles_open(config.output_file, mode=config.write_mode) as file_with_results:
writer_coroutine = get_async_writer(config)
target_worker = TargetWorker(statistics,
task_semaphore,
queue_prints,
config.show_only_success,
use_msgpack=config.use_msgpack)
input_reader: TargetReader = create_io_reader(statistics, queue_input, target_settings, config)
task_producer = TaskProducer(statistics, queue_input, queue_tasks, target_worker)
executor = Executor(statistics, queue_tasks, queue_prints)
printer = OutputPrinter(config.output_file, statistics, queue_prints, file_with_results, writer_coroutine)
running_tasks = [asyncio.create_task(worker.run())
for worker in [input_reader, task_producer, executor, printer]]
await asyncio.wait(running_tasks)
# region send file to S3 bucket
with open(config.output_file, 'rb') as outfile:
data = outfile.read()
data_packed = gzip_compress(data, compresslevel=4)
# region sending to s3 bucket
client_s3 = s3_config['client']
bucket = s3_config['about_bucket']['bucket']
key_bucket = s3_config['about_bucket']['key']
resp_from_s3 = await client_s3.put_object(Bucket=bucket,
Key=key_bucket,
Body=data_packed)
try:
http_status = resp_from_s3['ResponseMetadata']['HTTPStatusCode']
except Exception as exp:
http_status = 0
print(exp)
# endregion
try:
await s3_config['client'].close()
except Exception as e:
print(e)
print('errors when closing S3 Client connection')
# need delete tmp file
try:
unlink(config.input_file)
unlink(config.output_file)
except:
pass
# region sending to sqs about file saved to buckets
if sqs_config:
message_sqs = {'bucket': bucket,
'key': key_bucket,
'timestamp': int(datetime.datetime.now().timestamp())}
body_message: str = ujson.dumps(message_sqs)
status = await sqs_config['client'].send_message(QueueUrl=sqs_config['queue_url'],
MessageBody=body_message)
try:
status_code: int = status['ResponseMetadata']['HTTPStatusCode']
if status_code != 200:
print(f'SQS: errors: {status_code}')
else:
print(f'SQS sent: {bucket}/{key_bucket}')
except Exception as error_send:
print(f'SQS: error: {error_send}')
try:
await sqs_config['client'].close()
except Exception as e:
print(e)
print('errors when closing SQS Client connection')
# endregion
return http_status
def handler(event, context):
uvloop.install()
s3_status = asyncio.run(main(event, context))
return {'statusCode': 200,
'body': s3_status}