-
Notifications
You must be signed in to change notification settings - Fork 3
/
metrics.py
103 lines (84 loc) · 2.79 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import datetime as dt
import logging
import time
from aiogram import types
from aiogram.dispatcher.middlewares import BaseMiddleware
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import WriteType
from config import INFLUXDB_URL, INFLUXDB_ORG, INFLUXDB_BUCKET, INFLUXDB_TOKEN, ENVIRONMENT, ENABLE_METRICS
logger = logging.getLogger(__name__)
update_types = {
"message",
"edited_message",
"channel_post",
"edited_channel_post",
"inline_query",
"chosen_inline_result",
"callback_query",
"shipping_query",
"pre_checkout_query",
}
if ENABLE_METRICS:
influxdb_client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_options = WriteOptions(
write_type=WriteType.asynchronous,
batch_size=500,
flush_interval=5_000,
retry_interval=1_000,
max_retries=1,
max_retry_delay=2_000,
exponential_base=1
)
writer = influxdb_client.write_api(write_options=write_options)
def send_metric(measurement: str, tags: dict, fields: dict):
tags.update(service="bot-api-search", testing=ENVIRONMENT)
data = {
"measurement": measurement,
"tags": tags,
"fields": fields
}
record = Point.from_dict(data).time(dt.datetime.utcnow())
try:
writer.write(bucket=INFLUXDB_BUCKET, record=record)
except Exception as e:
logger.exception(f"Can't write to bucket: {e}")
def measure_time_metric(subject_type):
def decorator(func):
def wrapper(*args, **kwargs):
start = time.monotonic()
res = func(*args, **kwargs)
if not ENABLE_METRICS:
return res
time_data = {
"measurement": "spent_time",
"tags": {
"env": ENVIRONMENT,
"type": subject_type,
"func_name": func.__name__,
},
"fields": {
"time_delta": time.monotonic() - start
}
}
send_metric(time_data)
return res
wrapper.__name__ = func.__name__
return wrapper
return decorator
class MetricsMiddleware(BaseMiddleware):
async def on_pre_process_update(self, update: types.Update, *args):
for t in update_types:
if getattr(update, t):
update_type = t
break
else:
update_type = "unknown"
tags = {"type": update_type}
fields = {"id": update.update_id}
if update_type == "chosen_inline_result":
fields.update(chosen_result=update.chosen_inline_result.result_id)
send_metric(
measurement="updates",
tags=tags,
fields=fields
)