From 53b4327c9757071241a612b43ebe9f9d41070d85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Pol=C3=A1=C4=8Dek?= Date: Sun, 26 May 2024 02:37:27 +0200 Subject: [PATCH] Remove HostInfo from SI Workers --- python/yaroc/clients/mqtt.py | 4 +-- python/yaroc/pb/utils.py | 6 ++-- python/yaroc/rs.pyi | 14 ++++++-- python/yaroc/scripts/send_punch.py | 27 +++++++++++----- python/yaroc/sources/si.py | 52 +++++++++++++----------------- src/message_handler.rs | 2 +- src/punch.rs | 22 ++++--------- 7 files changed, 64 insertions(+), 63 deletions(-) diff --git a/python/yaroc/clients/mqtt.py b/python/yaroc/clients/mqtt.py index 296ca7b..38a2763 100644 --- a/python/yaroc/clients/mqtt.py +++ b/python/yaroc/clients/mqtt.py @@ -87,7 +87,7 @@ async def send_punch( ) -> bool: punches = Punches() try: - punches.punches.append(create_punch_proto(punch_log)) + punches.punches.append(create_punch_proto(punch_log.punch)) except Exception as err: logging.error(f"Creation of Punch proto failed: {err}") punches.sending_timestamp.GetCurrentTime() @@ -194,7 +194,7 @@ async def send_punch( self, punch_log: SiPunchLog, ) -> bool: - res = await self._retries.send(create_punch_proto(punch_log)) + res = await self._retries.send(create_punch_proto(punch_log.punch)) return res if res is not None else False async def send_status(self, status: Status, mac_addr: str) -> bool: diff --git a/python/yaroc/pb/utils.py b/python/yaroc/pb/utils.py index ecd3470..8af94a6 100644 --- a/python/yaroc/pb/utils.py +++ b/python/yaroc/pb/utils.py @@ -3,7 +3,7 @@ from google.protobuf.timestamp_pb2 import Timestamp -from ..rs import SiPunchLog +from ..rs import SiPunch from .punches_pb2 import Punch from .status_pb2 import Coordinates @@ -14,9 +14,9 @@ def _datetime_to_prototime(time: datetime) -> Timestamp: return ret -def create_punch_proto(punch_log: SiPunchLog) -> Punch: +def create_punch_proto(si_punch: SiPunch) -> Punch: punch = Punch() - punch.raw = bytes(punch_log.punch.raw) + punch.raw = bytes(si_punch.raw) return punch diff --git a/python/yaroc/rs.pyi b/python/yaroc/rs.pyi index 492a3bf..b6d4059 100644 --- a/python/yaroc/rs.pyi +++ b/python/yaroc/rs.pyi @@ -19,15 +19,23 @@ class SiPunch(object): mode: int raw: bytes + @staticmethod + def new( + card: int, + code: int, + time: datetime, + mode: int, + ) -> "SiPunch": ... + @staticmethod + def from_raw(payload: bytes) -> "SiPunch": ... + class SiPunchLog(object): punch: SiPunch latency: timedelta host_info: HostInfo @staticmethod - def new( - card: int, code: int, time: datetime, mode: int, host_info: HostInfo, now: datetime - ) -> "SiPunchLog": ... + def new(punch: SiPunch, host_info: HostInfo, now: datetime) -> "SiPunchLog": ... @staticmethod def from_raw(payload: bytes, host_info: HostInfo, now: datetime) -> "SiPunchLog": ... @staticmethod diff --git a/python/yaroc/scripts/send_punch.py b/python/yaroc/scripts/send_punch.py index 23241ab..930ceeb 100644 --- a/python/yaroc/scripts/send_punch.py +++ b/python/yaroc/scripts/send_punch.py @@ -1,4 +1,5 @@ import asyncio +import datetime import logging import socket import time @@ -8,6 +9,7 @@ from ..clients.client import ClientGroup from ..pb.status_pb2 import DeviceEvent, EventType, MiniCallHome, Status +from ..rs import HostInfo, SiPunchLog from ..sources.si import SiPunchManager from ..utils.container import Container, create_clients from ..utils.sys_info import create_sys_minicallhome, eth_mac_addr @@ -18,7 +20,7 @@ class PunchSender: def __init__( self, client_group: ClientGroup, - mac_addr: str, + host_info: HostInfo, mch_interval: int | None = 30, si_manager: SiPunchManager = Provide[Container.si_manager], ): @@ -26,7 +28,7 @@ def __init__( logging.warning("No clients enabled, will listen to punches but nothing will be sent") self.client_group = client_group self.si_manager = si_manager - self.mac_addr = mac_addr + self.host_info = host_info if mch_interval is None: mch_interval = 30 self._mch_interval = mch_interval @@ -40,12 +42,16 @@ async def periodic_mini_call_home(self): mini_call_home.codes = str(self.si_manager) status = Status() status.mini_call_home.CopyFrom(mini_call_home) - await self.client_group.send_status(status, self.mac_addr) + await self.client_group.send_status(status, self.host_info.mac_address) await asyncio.sleep(self._mch_interval - (time.time() - time_start)) async def send_punches(self): async for si_punch in self.si_manager.punches(): - asyncio.create_task(self.client_group.send_punch(si_punch)) + asyncio.create_task( + self.client_group.send_punch( + SiPunchLog.new(si_punch, self.host_info, datetime.datetime.now().astimezone()) + ) + ) async def udev_events(self): # TODO: get rid of the following sleep @@ -58,7 +64,7 @@ async def udev_events(self): device_event.type = EventType.Added if dev_event.added else EventType.Removed status = Status() status.dev_event.CopyFrom(device_event) - await self.client_group.send_status(status, self.mac_addr) + await self.client_group.send_status(status, self.host_info.mac_addr) async def loop(self): def handle_exception(loop, context): @@ -91,16 +97,21 @@ async def main(): if "mac_addr" not in config: config["mac_addr"] = eth_mac_addr() assert config["mac_addr"] is not None - config["hostname"] = socket.gethostname() + hostname = socket.gethostname() + config["hostname"] = hostname container = Container() container.config.from_dict(config) container.init_resources() container.wire(modules=["yaroc.utils.container", __name__]) - logging.info(f"Starting SendPunch for {config['hostname']}/{config['mac_addr']}") + logging.info(f"Starting SendPunch for {hostname}/{config['mac_addr']}") client_group = await create_clients(container.client_factories) - ps = PunchSender(client_group, config["mac_addr"], config.get("call_home_interval", None)) + ps = PunchSender( + client_group, + HostInfo.new(hostname, config["mac_addr"]), + config.get("call_home_interval", None), + ) await ps.loop() diff --git a/python/yaroc/sources/si.py b/python/yaroc/sources/si.py index ca83d51..72b0f11 100644 --- a/python/yaroc/sources/si.py +++ b/python/yaroc/sources/si.py @@ -16,7 +16,7 @@ from usbmonitor import USBMonitor from usbmonitor.attributes import DEVNAME, ID_MODEL_ID, ID_VENDOR_ID -from ..rs import HostInfo, SiPunchLog +from ..rs import SiPunch DEFAULT_TIMEOUT_MS = 3.0 START_MODE = 3 @@ -34,14 +34,13 @@ class SiWorker: def __init__(self): self._codes: set[int] = set() - async def process_punch(self, punch_log: SiPunchLog, queue: Queue): + async def process_punch(self, punch: SiPunch, queue: Queue[SiPunch]): now = datetime.now().astimezone() - punch = punch_log.punch logging.info( f"{punch.card} punched {punch.code} at {punch.time:%H:%M:%S.%f}, received after " f"{(now-punch.time).total_seconds():3.2f}s" ) - await queue.put(punch_log) + await queue.put(punch) self._codes.add(punch.code) def __str__(self): @@ -52,14 +51,13 @@ def __str__(self): class SerialSiWorker(SiWorker): """Serial port worker""" - def __init__(self, port: str, host_info: HostInfo): + def __init__(self, port: str): super().__init__() self.port = port self.name = "srr" - self.host_info = host_info self._finished = Event() - async def loop(self, queue: Queue[SiPunchLog]): + async def loop(self, queue: Queue[SiPunch]): successful = False for i in range(3): try: @@ -82,9 +80,7 @@ async def loop(self, queue: Queue[SiPunchLog]): if len(data) == 0: await asyncio.sleep(1.0) continue - now = datetime.now().astimezone() - punch = SiPunchLog.from_raw(data, self.host_info, now) - await self.process_punch(punch, queue) + await self.process_punch(SiPunch.from_raw(data), queue) except serial.serialutil.SerialException as err: logging.error(f"Fatal serial exception: {err}") @@ -100,24 +96,24 @@ def close(self): class BtSerialSiWorker(SiWorker): """Bluetooth serial worker""" - def __init__(self, hostname: str, mac_addr: str): + def __init__(self, mac_addr: str): super().__init__() self.name = "lora" - self.host_info = HostInfo.new(hostname, mac_addr) + self.mac_address = mac_addr logging.info(f"Starting a bluetooth serial worker, connecting to {mac_addr}") def __hash__(self): - return self.mac_addr.__hash__() + return self.mac_address.__hash__() async def loop(self, queue: Queue, _status_queue): sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) sock.setblocking(False) loop = asyncio.get_event_loop() try: - await loop.sock_connect(sock, (self.host_info.mac_address, 1)) + await loop.sock_connect(sock, (self.mac_address, 1)) except Exception as err: - logging.error(f"Error connecting to {self.host_info.mac_address}: {err}") - logging.info(f"Connected to {self.host_info.mac_address}") + logging.error(f"Error connecting to {self.mac_address}: {err}") + logging.info(f"Connected to {self.mac_address}") while True: try: @@ -125,9 +121,7 @@ async def loop(self, queue: Queue, _status_queue): if len(data) == 0: await asyncio.sleep(1.0) continue - now = datetime.now().astimezone() - punch = SiPunchLog.from_raw(data, self.host_info, now) - await self.process_punch(punch, queue) + await self.process_punch(SiPunch.from_raw(data), queue) except Exception as err: logging.error(f"Loop crashing: {err}") @@ -135,10 +129,9 @@ async def loop(self, queue: Queue, _status_queue): class UdevSiFactory(SiWorker): - def __init__(self, name: str, mac_addr: str): + def __init__(self, name: str): self._udev_workers: Dict[str, tuple[SerialSiWorker, Task, str]] = {} self._device_queue: Queue[tuple[str, dict[str, Any]]] = Queue() - self.host_info = HostInfo.new(name, mac_addr) @staticmethod def extract_com(device_name: str) -> str: @@ -149,7 +142,7 @@ def extract_com(device_name: str) -> str: return match.groups()[0] - async def loop(self, queue: Queue[SiPunchLog], status_queue: Queue[DeviceEvent]): + async def loop(self, queue: Queue[SiPunch], status_queue: Queue[DeviceEvent]): self._loop = asyncio.get_event_loop() logging.info("Starting USB SportIdent device manager") self.monitor = USBMonitor(({ID_VENDOR_ID: "10c4"}, {ID_VENDOR_ID: "1a86"})) @@ -183,7 +176,7 @@ async def loop(self, queue: Queue[SiPunchLog], status_queue: Queue[DeviceEvent]) logging.info(f"Inserted SportIdent device {device_node}") try: - worker = SerialSiWorker(device_node, self.host_info) + worker = SerialSiWorker(device_node) task = asyncio.create_task(worker.loop(queue)) self._udev_workers[parent_device_node] = (worker, task, device_node) await status_queue.put(DeviceEvent(True, device_node)) @@ -234,11 +227,10 @@ def __str__(self): class FakeSiWorker(SiWorker): """Creates fake SportIdent events, useful for benchmarks and tests.""" - def __init__(self, mac_addr: str, punch_interval_secs: float = 12): + def __init__(self, str, punch_interval_secs: float = 12): super().__init__() - self._punch_interval = punch_interval_secs - self.mac_addr = mac_addr self.name = "fake" + self._punch_interval = punch_interval_secs logging.info( "Starting a fake SportIdent worker, sending a punch every " f"{self._punch_interval} seconds" @@ -252,8 +244,8 @@ async def loop(self, queue: Queue, _status_queue): while True: time_start = time.time() now = datetime.now().astimezone() - punch_log = SiPunchLog.new(46283, 47, now, 18, HostInfo.new("fake", self.mac_addr), now) - await self.process_punch(punch_log, queue) + punch = SiPunch.new(46283, 47, now, 18) + await self.process_punch(punch, queue) await asyncio.sleep(self._punch_interval - (time.time() - time_start)) @@ -267,7 +259,7 @@ class SiPunchManager: def __init__(self, workers: list[SiWorker]) -> None: self._si_workers: set[SiWorker] = set(workers) - self._queue: Queue[SiPunchLog] = Queue() + self._queue: Queue[SiPunch] = Queue() self._status_queue: Queue[DeviceEvent] = Queue() def __str__(self) -> str: @@ -281,7 +273,7 @@ async def loop(self): await asyncio.sleep(3) # Allow some time for an MQTT connection await asyncio.gather(*loops, return_exceptions=True) - async def punches(self) -> AsyncIterator[SiPunchLog]: + async def punches(self) -> AsyncIterator[SiPunch]: while True: yield await self._queue.get() diff --git a/src/message_handler.rs b/src/message_handler.rs index 4487d4a..aed39ef 100644 --- a/src/message_handler.rs +++ b/src/message_handler.rs @@ -237,7 +237,7 @@ impl MessageHandler { format!("Wrong length of chunk={length}"), ) })?, - &host_info, + host_info, now, )) } diff --git a/src/punch.rs b/src/punch.rs index f41f458..7905067 100644 --- a/src/punch.rs +++ b/src/punch.rs @@ -180,24 +180,17 @@ impl SiPunch { #[pymethods] impl SiPunchLog { #[staticmethod] - pub fn new( - card: u32, - code: u16, - time: DateTime, - mode: u8, - host_info: &HostInfo, - now: DateTime, - ) -> Self { + pub fn from_raw(payload: [u8; 20], host_info: &HostInfo, now: DateTime) -> Self { + let punch = SiPunch::from_raw(payload); Self { - punch: SiPunch::new(card, code, time, mode), - latency: now - time, + latency: now - punch.time, + punch, host_info: host_info.clone(), } } #[staticmethod] - pub fn from_raw(payload: [u8; 20], host_info: &HostInfo, now: DateTime) -> Self { - let punch = SiPunch::from_raw(payload); + pub fn new(punch: SiPunch, host_info: &HostInfo, now: DateTime) -> Self { Self { latency: now - punch.time, punch, @@ -315,10 +308,7 @@ mod test_punch { mac_address: "abcdef123456".to_owned(), }; let punch = SiPunchLog::new( - 46283, - 47, - time, - 1, + SiPunch::new(46283, 47, time, 1), &host_info, time + Duration::milliseconds(2831), );