Skip to content

Commit

Permalink
Merge pull request #194 from DNO-inc/bodya
Browse files Browse the repository at this point in the history
 version 1.0.0 & websocket based chat updates
  • Loading branch information
m-o-d-e-r authored Oct 2, 2023
2 parents 410951c + a9c60ee commit cff043b
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 76 deletions.
2 changes: 1 addition & 1 deletion burrito/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pytz import timezone


__version__ = "0.8.0 indev"
__version__ = "1.0.0 alpha"

CURRENT_TIME_ZONE = timezone("Europe/Kyiv")
9 changes: 8 additions & 1 deletion burrito/apps/comments/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
from burrito.models.tickets_model import Tickets

from burrito.utils.mongo_util import mongo_insert, mongo_update, mongo_delete
from burrito.utils.tickets_util import is_ticket_exist, am_i_own_this_ticket, send_notification, make_short_user_data
from burrito.utils.tickets_util import (
is_ticket_exist,
am_i_own_this_ticket,
send_notification,
make_short_user_data,
send_comment_update
)
from burrito.utils.permissions_checker import check_permission
from burrito.utils.auth import get_auth_core, AuthTokenPayload, BurritoJWT
from burrito.utils.query_util import STATUS_OPEN
Expand Down Expand Up @@ -61,6 +67,7 @@ async def comments__create(
body=f"Someone has created a new comment in ticket {ticket.ticket_id}"
)
)
send_comment_update(ticket.ticket_id, comment_id)

if ticket.status.status_id in (4, 6) and am_i_own_this_ticket(ticket.creator.user_id, token_payload.user_id):
create_ticket_action(
Expand Down
118 changes: 100 additions & 18 deletions burrito/apps/ws/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import time
import threading

from fastapi import HTTPException
from redis.client import PubSub
from websockets.sync.server import serve
from websockets.legacy.server import WebSocketServerProtocol

from burrito.models.tickets_model import Tickets

from burrito.utils.config_reader import get_config
from burrito.utils.logger import get_logger
from burrito.utils.auth import check_jwt_token
from burrito.utils.redis_utils import get_redis_connector
from burrito.utils.auth import AuthTokenPayload
from burrito.utils.tickets_util import is_ticket_exist


def recv_data(websocket: WebSocketServerProtocol) -> bytes:
Expand All @@ -36,45 +40,77 @@ def recv_ping(websocket: WebSocketServerProtocol) -> None:

if raw_data == "PING":
send_data(websocket, b"PONG")
get_logger().info("PING/PONG")

except Exception as exc:
get_logger().warning(f"PING/PONG {exc}")
get_logger().warning(f"PING/PONG {exc}", exc_info=True)
break

time.sleep(5)
get_logger().info(f"Pinging thread is finished {thread_id}")


def main_handler(websocket: WebSocketServerProtocol):
thread_id = threading.get_native_id()
get_logger().info(f"New thread started: {thread_id}")
def notifications_cycle(websocket: WebSocketServerProtocol, token_payload: AuthTokenPayload):
pubsub: PubSub = get_redis_connector().pubsub()
pubsub.subscribe(f"user_{token_payload.user_id}")

raw_data = recv_data(websocket)
pinging_thread = threading.Thread(target=recv_ping, args=(websocket,), daemon=True)
pinging_thread.start()

if isinstance(raw_data, bytes):
raw_data = raw_data.decode("utf-8")
try:
while True:
if not pinging_thread.is_alive():
break

if not raw_data:
send_data(websocket, b"Auth fail")
message = pubsub.get_message()

token_payload: AuthTokenPayload = None
try:
token_payload = check_jwt_token(raw_data)
if message:
data = message.get("data")

except:
send_data(websocket, b"Auth fail")
if isinstance(data, bytes):
send_data(websocket, data)

time.sleep(0.5)

except Exception as exc:
get_logger().warning(f"{exc}", exc_info=True)


def chat_cycle(websocket: WebSocketServerProtocol, token_payload: AuthTokenPayload):
chat_number = recv_data(websocket)
if chat_number.isdigit():
chat_number = int(chat_number)

try:
ticket: Tickets = is_ticket_exist(chat_number)

if ticket.hidden and token_payload.user_id not in (
ticket.creator.user_id,
ticket.assignee.user_id if ticket.assignee else -1
):
send_data(websocket, b"Is not allowed to interact with this ticket")
close_conn(websocket)
return

except HTTPException:
get_logger().warning(f"Ticket with ID {chat_number} is not exist")
send_data(websocket, f"Ticket with ID {chat_number} is not exist")
close_conn(websocket)
return

send_data(websocket, b"Auth OK")
except Exception:
get_logger().warning(f"User {token_payload.user_id} tried to join chat {ticket.ticket_id}", exc_info=True)
close_conn(websocket)

get_logger().info(f"User {token_payload.user_id} joined the chat ({chat_number})")

pubsub: PubSub = get_redis_connector().pubsub()
pubsub.subscribe(f"user_{token_payload.user_id}")
pubsub.subscribe(f"chat_{chat_number}")

pinging_thread = threading.Thread(target=recv_ping, args=(websocket,), daemon=True)
pinging_thread.start()

send_data(websocket, f"Successfully joined to chat {chat_number}".encode("utf-8"))

try:
while True:
if not pinging_thread.is_alive():
Expand All @@ -91,7 +127,53 @@ def main_handler(websocket: WebSocketServerProtocol):
time.sleep(0.5)

except Exception as exc:
get_logger().warning(f"{exc}")
get_logger().warning(f"{exc}", exc_info=True)

get_logger().info(f"User {token_payload.user_id} left the chat ({chat_number})")


__CONNECTION_MODES = {
"NOTIFICATIONS": notifications_cycle,
"CHAT": chat_cycle
}


def main_handler(websocket: WebSocketServerProtocol):
thread_id = threading.get_native_id()
get_logger().info(f"New thread started: {thread_id}")

raw_data = recv_data(websocket)

if isinstance(raw_data, bytes):
raw_data = raw_data.decode("utf-8")

if not raw_data:
send_data(websocket, b"Auth fail")

token_payload: AuthTokenPayload = None
try:
token_payload = check_jwt_token(raw_data)

except Exception:
send_data(websocket, b"Auth fail")
close_conn(websocket)
return

send_data(websocket, b"Auth OK")

connection_mode = recv_data(websocket)
if isinstance(connection_mode, bytes):
connection_mode = connection_mode.decode("utf-8")
if not connection_mode:
send_data(websocket, b"Connection mode is invalid")

connection_handler = __CONNECTION_MODES.get(connection_mode)
if connection_handler:
send_data(websocket, b"Connection mode is OK")
connection_handler(websocket, token_payload)
else:
send_data(websocket, b"Connection mode is invalid")
get_logger().warning(f"User {token_payload.user_id} has selected wrong connection mode '{connection_mode}'")

get_logger().info(f"Thread {thread_id} is finished")

Expand Down
14 changes: 1 addition & 13 deletions burrito/utils/hash_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,5 @@ def compare_password(password: str, hashed_password: str) -> bool:

try:
return _hasher.verify(hashed_password, password)
except:
except Exception:
return False


def get_verification_code() -> str:
"""_summary_
Generate and return email verification code
Returns:
str: verification code
"""

return "".join((str(SystemRandom().randint(0, 9)) for i in range(6)))
4 changes: 4 additions & 0 deletions burrito/utils/tickets_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,7 @@ def send_notification(ticket: Tickets | int, notification: Notifications):

if mongo_items_count(NotificationMetaData, notification_id=notification_id) == 0:
mongo_delete(Notifications, _id=notification_id)


def send_comment_update(ticket_id: int, comment_id: str) -> None:
get_redis_connector().publish(f"chat_{ticket_id}", comment_id.encode("utf-8"))
40 changes: 0 additions & 40 deletions burrito/utils/websockets.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,9 @@
import orjson as json
from starlette.websockets import WebSocket

from burrito.utils.singleton_pattern import singleton
from burrito.utils.auth import check_jwt_token

from burrito.models.ws_message import WebSocketMessage
from burrito.models.m_notifications_model import Notifications


@singleton
class WebsocketManager:
def __init__(self) -> None:
self._websocket_list: list[WebSocket] = []

async def accept(self, websocket: WebSocket):
await websocket.accept()
self._websocket_list.append(websocket)

async def check_token(self, websocket: WebSocket) -> bool:
token = (await self.recv(websocket)).decode("utf-8")

try:
return await check_jwt_token(token)
except:
await self.send(websocket, b"Auth fail")
await self.close(websocket)

async def recv(self, websocket: WebSocket) -> bytes:
return await websocket.receive_bytes()

async def send(self, websocket: WebSocket, data: bytes):
await websocket.send_bytes(data)

async def close(self, websocket: WebSocket):
self._websocket_list.remove(websocket)
await websocket.close()


def get_websocket_manager():
return WebsocketManager()


_WEBSOCKET_MANAGER: WebsocketManager = get_websocket_manager()


def make_websocket_message(type_: str, obj: Notifications) -> bytes:
return json.dumps(
WebSocketMessage(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "Burrito"
version = "0.8.0.dev2"
version = "1.0.0.alpha"
description = "API for the issue tracker"
authors = ["DimonBor", "m-o-d-e-r"]
readme = "README.md"
Expand Down
3 changes: 1 addition & 2 deletions scripts/burrito_cluster_ping.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import socket

import requests
from dotenv import dotenv_values, find_dotenv
import rich

Expand All @@ -26,7 +25,7 @@ def _ping(data):
try:
sock.connect(("localhost", int(data[1])))
rich.print(f"\t[green][+] Host is OK: {data}")
except:
except Exception:
rich.print(f"\t[red][-] No connection to host: {data}")
finally:
sock.close()
Expand Down

0 comments on commit cff043b

Please sign in to comment.