Skip to content

Commit

Permalink
Remove HostInfo from SI Workers
Browse files Browse the repository at this point in the history
  • Loading branch information
lukipuki committed May 26, 2024
1 parent 0673cc0 commit 53b4327
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 63 deletions.
4 changes: 2 additions & 2 deletions python/yaroc/clients/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def send_punch(
) -> bool:
punches = Punches()
try:
punches.punches.append(create_punch_proto(punch_log))
punches.punches.append(create_punch_proto(punch_log.punch))
except Exception as err:
logging.error(f"Creation of Punch proto failed: {err}")
punches.sending_timestamp.GetCurrentTime()
Expand Down Expand Up @@ -194,7 +194,7 @@ async def send_punch(
self,
punch_log: SiPunchLog,
) -> bool:
res = await self._retries.send(create_punch_proto(punch_log))
res = await self._retries.send(create_punch_proto(punch_log.punch))
return res if res is not None else False

async def send_status(self, status: Status, mac_addr: str) -> bool:
Expand Down
6 changes: 3 additions & 3 deletions python/yaroc/pb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from google.protobuf.timestamp_pb2 import Timestamp

from ..rs import SiPunchLog
from ..rs import SiPunch
from .punches_pb2 import Punch
from .status_pb2 import Coordinates

Expand All @@ -14,9 +14,9 @@ def _datetime_to_prototime(time: datetime) -> Timestamp:
return ret


def create_punch_proto(punch_log: SiPunchLog) -> Punch:
def create_punch_proto(si_punch: SiPunch) -> Punch:
punch = Punch()
punch.raw = bytes(punch_log.punch.raw)
punch.raw = bytes(si_punch.raw)
return punch


Expand Down
14 changes: 11 additions & 3 deletions python/yaroc/rs.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,23 @@ class SiPunch(object):
mode: int
raw: bytes

@staticmethod
def new(
card: int,
code: int,
time: datetime,
mode: int,
) -> "SiPunch": ...
@staticmethod
def from_raw(payload: bytes) -> "SiPunch": ...

class SiPunchLog(object):
punch: SiPunch
latency: timedelta
host_info: HostInfo

@staticmethod
def new(
card: int, code: int, time: datetime, mode: int, host_info: HostInfo, now: datetime
) -> "SiPunchLog": ...
def new(punch: SiPunch, host_info: HostInfo, now: datetime) -> "SiPunchLog": ...
@staticmethod
def from_raw(payload: bytes, host_info: HostInfo, now: datetime) -> "SiPunchLog": ...
@staticmethod
Expand Down
27 changes: 19 additions & 8 deletions python/yaroc/scripts/send_punch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import datetime
import logging
import socket
import time
Expand All @@ -8,6 +9,7 @@

from ..clients.client import ClientGroup
from ..pb.status_pb2 import DeviceEvent, EventType, MiniCallHome, Status
from ..rs import HostInfo, SiPunchLog
from ..sources.si import SiPunchManager
from ..utils.container import Container, create_clients
from ..utils.sys_info import create_sys_minicallhome, eth_mac_addr
Expand All @@ -18,15 +20,15 @@ class PunchSender:
def __init__(
self,
client_group: ClientGroup,
mac_addr: str,
host_info: HostInfo,
mch_interval: int | None = 30,
si_manager: SiPunchManager = Provide[Container.si_manager],
):
if client_group.len() == 0:
logging.warning("No clients enabled, will listen to punches but nothing will be sent")
self.client_group = client_group
self.si_manager = si_manager
self.mac_addr = mac_addr
self.host_info = host_info
if mch_interval is None:
mch_interval = 30
self._mch_interval = mch_interval
Expand All @@ -40,12 +42,16 @@ async def periodic_mini_call_home(self):
mini_call_home.codes = str(self.si_manager)
status = Status()
status.mini_call_home.CopyFrom(mini_call_home)
await self.client_group.send_status(status, self.mac_addr)
await self.client_group.send_status(status, self.host_info.mac_address)
await asyncio.sleep(self._mch_interval - (time.time() - time_start))

async def send_punches(self):
async for si_punch in self.si_manager.punches():
asyncio.create_task(self.client_group.send_punch(si_punch))
asyncio.create_task(
self.client_group.send_punch(
SiPunchLog.new(si_punch, self.host_info, datetime.datetime.now().astimezone())
)
)

async def udev_events(self):
# TODO: get rid of the following sleep
Expand All @@ -58,7 +64,7 @@ async def udev_events(self):
device_event.type = EventType.Added if dev_event.added else EventType.Removed
status = Status()
status.dev_event.CopyFrom(device_event)
await self.client_group.send_status(status, self.mac_addr)
await self.client_group.send_status(status, self.host_info.mac_addr)

async def loop(self):
def handle_exception(loop, context):
Expand Down Expand Up @@ -91,16 +97,21 @@ async def main():
if "mac_addr" not in config:
config["mac_addr"] = eth_mac_addr()
assert config["mac_addr"] is not None
config["hostname"] = socket.gethostname()
hostname = socket.gethostname()
config["hostname"] = hostname

container = Container()
container.config.from_dict(config)
container.init_resources()
container.wire(modules=["yaroc.utils.container", __name__])
logging.info(f"Starting SendPunch for {config['hostname']}/{config['mac_addr']}")
logging.info(f"Starting SendPunch for {hostname}/{config['mac_addr']}")

client_group = await create_clients(container.client_factories)
ps = PunchSender(client_group, config["mac_addr"], config.get("call_home_interval", None))
ps = PunchSender(
client_group,
HostInfo.new(hostname, config["mac_addr"]),
config.get("call_home_interval", None),
)
await ps.loop()


Expand Down
52 changes: 22 additions & 30 deletions python/yaroc/sources/si.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from usbmonitor import USBMonitor
from usbmonitor.attributes import DEVNAME, ID_MODEL_ID, ID_VENDOR_ID

from ..rs import HostInfo, SiPunchLog
from ..rs import SiPunch

DEFAULT_TIMEOUT_MS = 3.0
START_MODE = 3
Expand All @@ -34,14 +34,13 @@ class SiWorker:
def __init__(self):
self._codes: set[int] = set()

async def process_punch(self, punch_log: SiPunchLog, queue: Queue):
async def process_punch(self, punch: SiPunch, queue: Queue[SiPunch]):
now = datetime.now().astimezone()
punch = punch_log.punch
logging.info(
f"{punch.card} punched {punch.code} at {punch.time:%H:%M:%S.%f}, received after "
f"{(now-punch.time).total_seconds():3.2f}s"
)
await queue.put(punch_log)
await queue.put(punch)
self._codes.add(punch.code)

def __str__(self):
Expand All @@ -52,14 +51,13 @@ def __str__(self):
class SerialSiWorker(SiWorker):
"""Serial port worker"""

def __init__(self, port: str, host_info: HostInfo):
def __init__(self, port: str):
super().__init__()
self.port = port
self.name = "srr"
self.host_info = host_info
self._finished = Event()

async def loop(self, queue: Queue[SiPunchLog]):
async def loop(self, queue: Queue[SiPunch]):
successful = False
for i in range(3):
try:
Expand All @@ -82,9 +80,7 @@ async def loop(self, queue: Queue[SiPunchLog]):
if len(data) == 0:
await asyncio.sleep(1.0)
continue
now = datetime.now().astimezone()
punch = SiPunchLog.from_raw(data, self.host_info, now)
await self.process_punch(punch, queue)
await self.process_punch(SiPunch.from_raw(data), queue)

except serial.serialutil.SerialException as err:
logging.error(f"Fatal serial exception: {err}")
Expand All @@ -100,45 +96,42 @@ def close(self):
class BtSerialSiWorker(SiWorker):
"""Bluetooth serial worker"""

def __init__(self, hostname: str, mac_addr: str):
def __init__(self, mac_addr: str):
super().__init__()
self.name = "lora"
self.host_info = HostInfo.new(hostname, mac_addr)
self.mac_address = mac_addr
logging.info(f"Starting a bluetooth serial worker, connecting to {mac_addr}")

def __hash__(self):
return self.mac_addr.__hash__()
return self.mac_address.__hash__()

async def loop(self, queue: Queue, _status_queue):
sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
sock.setblocking(False)
loop = asyncio.get_event_loop()
try:
await loop.sock_connect(sock, (self.host_info.mac_address, 1))
await loop.sock_connect(sock, (self.mac_address, 1))
except Exception as err:
logging.error(f"Error connecting to {self.host_info.mac_address}: {err}")
logging.info(f"Connected to {self.host_info.mac_address}")
logging.error(f"Error connecting to {self.mac_address}: {err}")
logging.info(f"Connected to {self.mac_address}")

while True:
try:
data = await loop.sock_recv(sock, 20)
if len(data) == 0:
await asyncio.sleep(1.0)
continue
now = datetime.now().astimezone()
punch = SiPunchLog.from_raw(data, self.host_info, now)
await self.process_punch(punch, queue)
await self.process_punch(SiPunch.from_raw(data), queue)

except Exception as err:
logging.error(f"Loop crashing: {err}")
return


class UdevSiFactory(SiWorker):
def __init__(self, name: str, mac_addr: str):
def __init__(self, name: str):
self._udev_workers: Dict[str, tuple[SerialSiWorker, Task, str]] = {}
self._device_queue: Queue[tuple[str, dict[str, Any]]] = Queue()
self.host_info = HostInfo.new(name, mac_addr)

@staticmethod
def extract_com(device_name: str) -> str:
Expand All @@ -149,7 +142,7 @@ def extract_com(device_name: str) -> str:

return match.groups()[0]

async def loop(self, queue: Queue[SiPunchLog], status_queue: Queue[DeviceEvent]):
async def loop(self, queue: Queue[SiPunch], status_queue: Queue[DeviceEvent]):
self._loop = asyncio.get_event_loop()
logging.info("Starting USB SportIdent device manager")
self.monitor = USBMonitor(({ID_VENDOR_ID: "10c4"}, {ID_VENDOR_ID: "1a86"}))
Expand Down Expand Up @@ -183,7 +176,7 @@ async def loop(self, queue: Queue[SiPunchLog], status_queue: Queue[DeviceEvent])
logging.info(f"Inserted SportIdent device {device_node}")

try:
worker = SerialSiWorker(device_node, self.host_info)
worker = SerialSiWorker(device_node)
task = asyncio.create_task(worker.loop(queue))
self._udev_workers[parent_device_node] = (worker, task, device_node)
await status_queue.put(DeviceEvent(True, device_node))
Expand Down Expand Up @@ -234,11 +227,10 @@ def __str__(self):
class FakeSiWorker(SiWorker):
"""Creates fake SportIdent events, useful for benchmarks and tests."""

def __init__(self, mac_addr: str, punch_interval_secs: float = 12):
def __init__(self, str, punch_interval_secs: float = 12):
super().__init__()
self._punch_interval = punch_interval_secs
self.mac_addr = mac_addr
self.name = "fake"
self._punch_interval = punch_interval_secs
logging.info(
"Starting a fake SportIdent worker, sending a punch every "
f"{self._punch_interval} seconds"
Expand All @@ -252,8 +244,8 @@ async def loop(self, queue: Queue, _status_queue):
while True:
time_start = time.time()
now = datetime.now().astimezone()
punch_log = SiPunchLog.new(46283, 47, now, 18, HostInfo.new("fake", self.mac_addr), now)
await self.process_punch(punch_log, queue)
punch = SiPunch.new(46283, 47, now, 18)
await self.process_punch(punch, queue)
await asyncio.sleep(self._punch_interval - (time.time() - time_start))


Expand All @@ -267,7 +259,7 @@ class SiPunchManager:

def __init__(self, workers: list[SiWorker]) -> None:
self._si_workers: set[SiWorker] = set(workers)
self._queue: Queue[SiPunchLog] = Queue()
self._queue: Queue[SiPunch] = Queue()
self._status_queue: Queue[DeviceEvent] = Queue()

def __str__(self) -> str:
Expand All @@ -281,7 +273,7 @@ async def loop(self):
await asyncio.sleep(3) # Allow some time for an MQTT connection
await asyncio.gather(*loops, return_exceptions=True)

async def punches(self) -> AsyncIterator[SiPunchLog]:
async def punches(self) -> AsyncIterator[SiPunch]:
while True:
yield await self._queue.get()

Expand Down
2 changes: 1 addition & 1 deletion src/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl MessageHandler {
format!("Wrong length of chunk={length}"),
)
})?,
&host_info,
host_info,
now,
))
}
Expand Down
22 changes: 6 additions & 16 deletions src/punch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,24 +180,17 @@ impl SiPunch {
#[pymethods]
impl SiPunchLog {
#[staticmethod]
pub fn new(
card: u32,
code: u16,
time: DateTime<FixedOffset>,
mode: u8,
host_info: &HostInfo,
now: DateTime<FixedOffset>,
) -> Self {
pub fn from_raw(payload: [u8; 20], host_info: &HostInfo, now: DateTime<FixedOffset>) -> Self {
let punch = SiPunch::from_raw(payload);
Self {
punch: SiPunch::new(card, code, time, mode),
latency: now - time,
latency: now - punch.time,
punch,
host_info: host_info.clone(),
}
}

#[staticmethod]
pub fn from_raw(payload: [u8; 20], host_info: &HostInfo, now: DateTime<FixedOffset>) -> Self {
let punch = SiPunch::from_raw(payload);
pub fn new(punch: SiPunch, host_info: &HostInfo, now: DateTime<FixedOffset>) -> Self {
Self {
latency: now - punch.time,
punch,
Expand Down Expand Up @@ -315,10 +308,7 @@ mod test_punch {
mac_address: "abcdef123456".to_owned(),
};
let punch = SiPunchLog::new(
46283,
47,
time,
1,
SiPunch::new(46283, 47, time, 1),
&host_info,
time + Duration::milliseconds(2831),
);
Expand Down

0 comments on commit 53b4327

Please sign in to comment.