-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
prometheus.py
289 lines (240 loc) · 10.2 KB
/
prometheus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
from discord.ext import commands, tasks
from discord import Interaction, InteractionType
import logging
from pymongo.errors import ServerSelectionTimeoutError
from prometheus_client import start_http_server
from typing import cast, List, Dict
from psutil import virtual_memory, cpu_percent
from killua.metrics import *
from killua.static.constants import DB, API_ROUTES
from killua.utils.classes import User
from killua.bot import BaseBot as Bot
log = logging.getLogger("prometheus")
class PrometheusCog(commands.Cog):
"""
A Cog to be added to a discord bot. The prometheus server will start once the bot is ready
using the `on_ready` listener.
"""
def __init__(self, client: Bot, port: int = 8000):
"""
Parameters:
bot: The Discord bot
port: The port for the Prometheus server
"""
self.client = client
self.port = port
self.initial = False
self.api_previous: Dict[str, Dict[str, int]] = {}
self.spam_previous: int = 0
if self.client.run_in_docker:
if not self.latency_loop.is_running():
self.latency_loop.start()
if not self.system_usage_loop.is_running():
self.system_usage_loop.start()
if not self.db_loop.is_running():
self.db_loop.start()
async def update_api_stats(self):
url = (
f"http://{'api' if self.client.run_in_docker else '0.0.0.0'}:{self.client.dev_port}"
if self.client.is_dev
else self.client.url
)
data = await self.client.session.get(
url + "/diagnostics", headers={"Authorization": self.client.secret_api_key}
)
if data.status != 200:
return
json = await data.json()
response_time = data.headers.get("X-Response-Time")
API_RESPONSE_TIME.set(int(response_time.replace("ms", "")))
if time := cast(dict, json["ipc"]).get("response_time"): # Can be None
IPC_RESPONSE_TIME.set(time)
reqs = 0
not_spam = 0
for key, val in cast(dict, json["usage"]).items():
reqs += len(val["requests"])
if not key in API_ROUTES:
continue
not_spam += len(val["requests"])
new_requests = len(val["requests"]) - self.api_previous.get(key, {}).get(
"requests", 0
)
if key not in self.api_previous:
self.api_previous[key] = {}
self.api_previous[key]["requests"] = len(val["requests"])
API_REQUESTS_COUNTER.labels(key, "requests").inc(amount=new_requests)
new_success = val["successful_responses"] - self.api_previous.get(
key, {}
).get("successful_responses", 0)
self.api_previous[key]["successful_responses"] = val["successful_responses"]
API_REQUESTS_COUNTER.labels(key, "success").inc(amount=new_success)
new_spam = reqs - not_spam - self.spam_previous
self.spam_previous = reqs - not_spam
API_SPAM_REQUESTS.inc(new_spam)
async def save_locales(self):
locales = [
doc["locale"]
async for doc in DB.teams.find({"locale": {"$exists": True}})
if "locale" in doc and doc["locale"]
]
# turn into dict with locale as key and count as value
locale_count = {locale: locales.count(locale) for locale in locales}
for locale, count in locale_count.items():
LOCALE.labels(
cast(str, locale).split("-")[-1].upper()
if "-" in locale
else cast(str, locale).upper()
).set(count)
async def init_gauges(self):
log.debug("Initializing gauges")
num_of_commands = len(self.get_all_commands())
COMMANDS_GAUGE.set(num_of_commands)
# The main point of this is to initialise the Counter
# with the correct labels, so that the labels are present
# in the metrics even if no one has voted there yet.
VOTES.labels("topgg")
VOTES.labels("discordbotlist")
registered_users = await DB.teams.count_documents({})
REGISTERED_USER_GAUGE.set(registered_users)
dau = (await DB.const.find_one({"_id": "growth"}))["growth"][-1]["daily_users"]
DAILY_ACTIVE_USERS.set(dau)
APPROXIMATE_USER_COUNT.set(await self.client.get_approximate_user_count())
USER_INSTALLS.set(
(await self.client.application_info()).approximate_user_install_count
)
await self.save_locales()
# Update command stats
usage_data: Dict[str, int] = (await DB.const.find_one({"_id": "usage"}))[
"command_usage"
]
cmds = self.client.get_raw_formatted_commands()
for cmd in cmds:
if (
not cmd.extras
or not "id" in cmd.extras
or not str(cmd.extras["id"]) in usage_data
):
continue
COMMAND_USAGE.labels(
self.client._get_group(cmd), cmd.name, cmd.extras["id"]
).set(usage_data[str(cmd.extras["id"])])
await self.update_api_stats()
def get_all_commands(self) -> List[commands.Command]:
return self.client.get_raw_formatted_commands()
def start_prometheus(self):
log.debug(f"Starting Prometheus Server on port {self.port}")
start_http_server(self.port)
self.started = True
@tasks.loop(seconds=5)
async def latency_loop(self):
for shard, latency in self.client.latencies:
LATENCY_GAUGE.labels(shard).set(latency)
@tasks.loop(minutes=10)
async def db_loop(self):
try:
registered_users = await DB.teams.count_documents({})
REGISTERED_USER_GAUGE.set(registered_users)
APPROXIMATE_USER_COUNT.set(await self.client.get_approximate_user_count())
# This is not a db stat but to my knowledge no event exists for it so we have to do it here
USER_INSTALLS.set(
(await self.client.application_info()).approximate_user_install_count
)
todo_list_amount = await DB.todo.count_documents({})
TODO_LISTS.set(todo_list_amount)
todos = sum([len(todo["todos"]) async for todo in DB.todo.find({})])
TODOS.set(todos)
tags = await DB.guilds.find({"tags": {"$exists": True}}).to_list(
length=None
)
tag_amount = sum([len(v["tags"]) for v in tags])
TAGS.set(tag_amount)
await self.save_locales()
await self.update_api_stats()
except ServerSelectionTimeoutError:
logging.warning(
"Failed to save mongodb stats to DB due to connection error"
)
# Skip this iteration
except Exception as e: # The loop should not be stopped due to an error
logging.critical(
f"Failed to save mongodb stats to DB due to an unexpected error: {e}"
)
@tasks.loop(seconds=5)
async def system_usage_loop(self):
RAM_USAGE_GAUGE.set(virtual_memory().percent)
CPU_USAGE_GAUGE.set(cpu_percent())
MEMORY_USAGE_GAUGE.set(
virtual_memory().available * 100 / virtual_memory().total
)
@commands.Cog.listener()
async def on_ready(self):
# This is very intentionally not cog_load.
# I hope future me remembers why. For whatever reason,
# commands which are NOT part of a GroupCog are only
# loaded AFTER the Bot is ready. I am not sure why, they
# could already be added since it doesn't need any info
# from Discord, but it is what it is and this is the way
# to make sure all commands are returned by get_all_raw_commands.
#
# Given how significant it is especially for the command stats
# to take into account historical data, I am glad I spotted this.
if self.initial:
return
self.initial = True
if self.client.run_in_docker:
GUILD_GAUGE.set(len(self.client.guilds))
await self.init_gauges()
# Set connection back up (since we in on_ready)
CONNECTION_GAUGE.labels(None).set(1)
self.start_prometheus()
else:
log.info("Running outside of Docker, not starting Prometheus server")
@commands.Cog.listener()
async def on_command(self, ctx: commands.Context):
ON_COMMAND_COUNTER.inc()
if not ctx.command.extras.get("id"):
return
COMMAND_USAGE.labels(
self.client._get_group(ctx.command),
ctx.command.name,
str(ctx.command.extras["id"]),
).inc()
@commands.Cog.listener()
async def on_interaction(self, interaction: Interaction):
old = await (await User.new(interaction.user.id)).log_locale(
interaction.locale[-1]
)
if old:
LOCALE.labels(old).dec()
if not interaction.type in [InteractionType.application_command]:
# don't save just any interaction
return
ON_INTERACTION_COUNTER.labels(not interaction.is_guild_integration()).inc()
@commands.Cog.listener()
async def on_connect(self):
CONNECTION_GAUGE.labels(None).set(1)
@commands.Cog.listener()
async def on_resumed(self):
CONNECTION_GAUGE.labels(None).set(1)
@commands.Cog.listener()
async def on_disconnect(self):
CONNECTION_GAUGE.labels(None).set(0)
@commands.Cog.listener()
async def on_shard_ready(self, shard_id):
CONNECTION_GAUGE.labels(shard_id).set(1)
@commands.Cog.listener()
async def on_shard_connect(self, shard_id):
CONNECTION_GAUGE.labels(shard_id).set(1)
@commands.Cog.listener()
async def on_shard_resumed(self, shard_id):
CONNECTION_GAUGE.labels(shard_id).set(1)
@commands.Cog.listener()
async def on_shard_disconnect(self, shard_id):
CONNECTION_GAUGE.labels(shard_id).set(0)
@commands.Cog.listener()
async def on_guild_join(self, _):
GUILD_GAUGE.set(len(self.client.guilds))
@commands.Cog.listener()
async def on_guild_remove(self, _):
GUILD_GAUGE.set(len(self.client.guilds))
Cog = PrometheusCog