diff --git a/src/yaroc/clients/mqtt.py b/src/yaroc/clients/mqtt.py index de2e256..430f2d8 100644 --- a/src/yaroc/clients/mqtt.py +++ b/src/yaroc/clients/mqtt.py @@ -35,7 +35,7 @@ class MqttClient(Client): def __init__( self, mac_address: str, - name_prefix: str = "PahoMQTT", + name_prefix: str = "aiomqtt", broker_url: str = BROKER_URL, broker_port: int = BROKER_PORT, ): @@ -43,6 +43,7 @@ def __init__( self.name = f"{name_prefix}-{mac_address}" self.broker_url = broker_url self.broker_port = broker_port + self.initialized = asyncio.Event() def __del__(self): self.client.loop_stop() @@ -58,18 +59,26 @@ async def send_punch( punches = Punches() punches.punches.append(create_punch_proto(card_number, si_time, code, mode, process_time)) punches.sending_timestamp.GetCurrentTime() - return await self._send(self.topic_punches, punches.SerializeToString()) + await self._send(self.topic_punches, punches.SerializeToString(), qos=1) async def send_coords(self, lat: float, lon: float, alt: float, timestamp: datetime): coords = create_coords_proto(lat, lon, alt, timestamp) - return await self._send(self.topic_coords, coords.SerializeToString()) + await self._send(self.topic_coords, coords.SerializeToString(), qos=0) async def send_mini_call_home(self, mch: MiniCallHome): status = Status() status.mini_call_home.CopyFrom(mch) - return await self._send(self.topic_status, status.SerializeToString(), qos=0) + await self._send(self.topic_status, status.SerializeToString(), qos=0) - async def _send(self, topic: str, message: bytes, qos: int = 1): + async def _send(self, topic: str, msg: bytes, qos: int): + await self.initialized.wait() + try: + await self.client.publish(topic, payload=msg, qos=qos) + logging.info("Message sent") + except MqttCodeError as e: + logging.error(f"Message not sent: {e}") + + async def loop(self): disconnected = Disconnected() disconnected.client_name = self.name @@ -77,7 +86,6 @@ async def _send(self, topic: str, message: bytes, qos: int = 1): status.disconnected.CopyFrom(disconnected) will = Will(topic=self.topic_status, payload=status.SerializeToString(), qos=1) - # TODO: as a first hack this is fine, but the client should be persisted async with AioMqttClient( self.broker_url, self.broker_port, @@ -88,13 +96,12 @@ async def _send(self, topic: str, message: bytes, qos: int = 1): max_inflight_messages=100, logger=logging.getLogger(), ) as client: - # TODO: Add connection/disconnected notifications + self.client = client + self.initialized.set() + logging.info(f"Connected to mqtt://{BROKER_URL}") + # TODO: emit asyncio.Event - try: - await client.publish(topic, payload=message, qos=qos) - logging.info("Message sent") # TODO: message ID - except MqttCodeError as e: - logging.error(f"Message not sent: {e}") + await asyncio.sleep(10000000.0) # TODO: what about disconnects? class SIM7020MqttClient(Client): diff --git a/src/yaroc/scripts/send_punch.py b/src/yaroc/scripts/send_punch.py index 9a0705e..f77874f 100644 --- a/src/yaroc/scripts/send_punch.py +++ b/src/yaroc/scripts/send_punch.py @@ -76,6 +76,9 @@ def loop(self): asyncio.run_coroutine_threadsafe(self.periodic_mini_call_home(), async_loop) asyncio.run_coroutine_threadsafe(self.send_punches(), async_loop) asyncio.run_coroutine_threadsafe(self.udev_events(), async_loop) + + for client in self.clients: # TODO: separate loop? + asyncio.run_coroutine_threadsafe(client.loop(), async_loop) async_loop.run_forever()