Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: websocket implementation #7

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions examples/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio
import os

from fishfish.ws import FishWebsocket, models, WebsocketEvents


async def on_domain_create(data: models.WSDomainCreate):
print(f"{data.domain} was just created")


async def on_error(error: Exception):
raise error


async def main():
ws: FishWebsocket = FishWebsocket(os.environ["API_KEY"])
ws.register_error_handler(on_error)
ws.register_listener(WebsocketEvents.DOMAIN_CREATE, on_domain_create)

task = await ws.start()
await task # Run until complete


asyncio.run(main())
2 changes: 1 addition & 1 deletion fishfish/models/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Domain:
category: Category
added: datetime.datetime
checked: datetime.datetime
target: Optional[str]
target: Optional[str] = None

@classmethod
def from_dict(cls, data) -> Domain:
Expand Down
2 changes: 1 addition & 1 deletion fishfish/models/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class URL:
category: Category
added: datetime.datetime
checked: datetime.datetime
target: Optional[str]
target: Optional[str] = None

@classmethod
def from_dict(cls, data) -> URL:
Expand Down
2 changes: 2 additions & 0 deletions fishfish/ws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .events import WebsocketEvents
from .fish_websocket import FishWebsocket
41 changes: 41 additions & 0 deletions fishfish/ws/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import annotations

from enum import Enum

from fishfish.ws.models import *


class WebsocketEvents(str, Enum):
URL_CREATE = "url_create"
URL_UPDATE = "url_update"
URL_DELETE = "url_delete"
DOMAIN_CREATE = "domain_create"
DOMAIN_UPDATE = "domain_update"
DOMAIN_DELETE = "domain_delete"

@classmethod
def from_string(cls, event_type: str) -> WebsocketEvents:
"""Create the correct event based on the event type.

Raises
------
ValueError
Unknown event type
"""
ctx = getattr(cls, event_type)
if ctx is None:
raise ValueError(f"Unknown event type {event_type}")

return ctx

def create_model(self, data: dict):
"""Create the correct data model for this event type."""
factories = {
WebsocketEvents.URL_CREATE: WSUrlCreate,
WebsocketEvents.URL_UPDATE: WSUrlUpdate,
WebsocketEvents.URL_DELETE: WSUrlDelete,
WebsocketEvents.DOMAIN_CREATE: WSDomainCreate,
WebsocketEvents.DOMAIN_UPDATE: WSDomainUpdate,
WebsocketEvents.DOMAIN_DELETE: WSDomainDelete,
}
return factories[self].from_dict(data)
113 changes: 113 additions & 0 deletions fishfish/ws/fish_websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import asyncio
import datetime
import json
import logging
import traceback
from typing import Optional

import httpx
import websockets as websockets

from fishfish import Token, Unauthorized
from fishfish.ws import WebsocketEvents

log = logging.getLogger(__name__)


class FishWebsocket:
def __init__(self, api_key: str):
self.__refresh_token: Optional[str] = api_key
self.__current_session_token: Optional[Token] = None
self._listeners: dict[WebsocketEvents, list] = {
WebsocketEvents.URL_CREATE: [],
WebsocketEvents.URL_UPDATE: [],
WebsocketEvents.URL_DELETE: [],
WebsocketEvents.DOMAIN_CREATE: [],
WebsocketEvents.DOMAIN_UPDATE: [],
WebsocketEvents.DOMAIN_DELETE: [],
}
self._error_handler = None

def _set_session_key(self):
if (
self.__current_session_token
and not self.__current_session_token.has_expired
):
return

r = httpx.post(
"https://api.fishfish.gg/v1/users/@me/tokens",
headers={"Authorization": self.__refresh_token},
)
if r.status_code == 401:
raise Unauthorized("Your provided FishFish token is invalid.")

data = r.json()
token = data["token"]
expires = datetime.datetime.fromtimestamp(data["expires"])
self.__current_session_token = Token(token, expires)

async def _process_event(self, data):
try:
event_type: WebsocketEvents = WebsocketEvents.from_string(data["type"])
model = event_type.create_model(data["data"])
iters = [coro(model) for coro in self._listeners[event_type]]
if iters:
await asyncio.gather(*iters)
except Exception as e:
if not self._error_handler:
log.error("Attempting to process %s threw %s", str(e), str(e))
log.error("%s", "".join(traceback.format_exception(e)))
return

asyncio.create_task(self._error_handler(e))

async def _ws_loop(self):
self._set_session_key()
async with websockets.connect(
"wss://api.fishfish.gg/v1/stream/",
extra_headers={"Authorization": self.__current_session_token.token},
) as websocket:
async for message in websocket:
asyncio.create_task(self._process_event(json.loads(message)))

async def start(self) -> asyncio.Task:
return asyncio.create_task(self._ws_loop())

def register_listener(self, event: WebsocketEvents, coro):
"""Register a listener for a websocket event.

Parameters
----------
event: WebsocketEvents
The event to listen for.
coro
An async function or method to call
when this event is fired.

The first parameter will be the event data.

Raises
------
ValueError
The provided function or method was not async.
"""
if not asyncio.iscoroutinefunction(coro):
raise ValueError(
f"The provided value for 'coro' must be a coroutine function."
)

self._listeners[event].append(coro)

def register_error_handler(self, coro):
"""Register a function to handle errors.

Parameters
----------
coro
An async function or method to call
when an error occurs.

The first parameter will be the exception.
"""
self._error_handler = coro
2 changes: 2 additions & 0 deletions fishfish/ws/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .urls import WSUrlDelete, WSUrlCreate, WSUrlUpdate
from .domains import WSDomainCreate, WSDomainUpdate, WSDomainDelete
63 changes: 63 additions & 0 deletions fishfish/ws/models/domains.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional


@dataclass
class WSDomainDelete:
"""A websocket domain delete event model.

Attributes
----------
domain: str
The domain which was deleted.
"""

domain: str

@classmethod
def from_dict(cls, data: dict) -> WSDomainDelete:
return cls(**data)


@dataclass
class WSDomainCreate(WSDomainDelete):
"""A websocket domain create event model.

Attributes
----------
domain: str
The domain which was deleted.
description: Optional[str]
The description for this domain.
category: Optional[str]
The category for this domain.
target: Optional[str]
The target for this domain.
"""

description: Optional[str] = None
category: Optional[str] = None
target: Optional[str] = None


@dataclass
class WSDomainUpdate(WSDomainCreate):
"""A websocket domain update event model.

Attributes
----------
domain: str
The domain which was deleted.
description: Optional[str]
The description for this domain.
category: Optional[str]
The category for this domain.
target: Optional[str]
The target for this domain.
checked: Optional[int]
When this domain was checked into the db?
"""

checked: Optional[int] = None
63 changes: 63 additions & 0 deletions fishfish/ws/models/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional


@dataclass
class WSUrlDelete:
"""A websocket url delete event model.

Attributes
----------
url: str
The url which was deleted.
"""

url: str

@classmethod
def from_dict(cls, data: dict) -> WSUrlDelete:
return cls(**data)


@dataclass
class WSUrlCreate(WSUrlDelete):
"""A websocket url create event model.

Attributes
----------
url: str
The url which was deleted.
description: Optional[str]
The description for this url.
category: Optional[str]
The category for this url.
target: Optional[str]
The target for this url.
"""

description: Optional[str] = None
category: Optional[str] = None
target: Optional[str] = None


@dataclass
class WSUrlUpdate(WSUrlCreate):
"""A websocket url update event model.

Attributes
----------
url: str
The url which was deleted.
description: Optional[str]
The description for this url.
category: Optional[str]
The category for this url.
target: Optional[str]
The target for this url.
checked: Optional[int]
When this url was checked into the db?
"""

checked: Optional[int] = None
Loading