-
Notifications
You must be signed in to change notification settings - Fork 0
/
main_verify.py
83 lines (63 loc) · 2.38 KB
/
main_verify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Основной файл приложения."""
import asyncio
import json
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI
from pydantic_settings import BaseSettings
from config.config import VERIFICATION_PORT
from config.kafka_setup import (
start_consumer,
start_producer,
stop_consumer,
stop_producer,
)
from credit_card_verify.src.routers import readiness
class Settings(BaseSettings):
"""Конфигурация приложения."""
app_host: str = '0.0.0.0' # noqa: F401, S104
app_port: int = VERIFICATION_PORT
app = FastAPI()
app.include_router(readiness.router, tags=['readiness'])
@app.on_event('startup')
async def startup_event():
"""Начало работы приложения."""
app.state.kafka_producer = await start_producer()
app.state.kafka_consumer = await start_consumer()
asyncio.create_task(kafka_listener(app))
@app.on_event('shutdown')
async def shutdown_event():
"""Окончание работы приложения."""
await stop_consumer(app.state.kafka_consumer)
await stop_producer(app.state.kafka_producer)
executor = ProcessPoolExecutor(max_workers=1)
async def kafka_listener(app: FastAPI): # noqa: WPS442, WPS210
"""
Слушатель запросов на верификацию.
Args:
app (FastAPI): Экземпляр приложения.
"""
from credit_card_verify.src.services.verify_service import VerifyService
consumer = app.state.kafka_consumer
producer = app.state.kafka_producer
async for message in consumer:
message_data = json.loads(message.value.decode('utf-8'))
verification_service = VerifyService()
verify_result = await verification_service.verify(
card_number=message_data['card_number'],
selfie_path=message_data['selfie_path'],
document_path=message_data['document_path'],
)
response_data = {
'request_id': message_data['request_id'],
'response': str(verify_result),
}
message_data_bytes = json.dumps(response_data).encode('utf-8')
await producer.send('gran_verify_response', value=message_data_bytes)
if __name__ == '__main__':
import uvicorn # noqa: WPS433
settings = Settings()
uvicorn.run(
app,
host=settings.app_host,
port=settings.app_port,
)