Skip to content

Commit

Permalink
CLeanup MqttClient
Browse files Browse the repository at this point in the history
  • Loading branch information
lukipuki committed Oct 26, 2023
1 parent 7590b55 commit f377975
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
31 changes: 19 additions & 12 deletions src/yaroc/clients/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ class MqttClient(Client):
def __init__(
self,
mac_address: str,
name_prefix: str = "PahoMQTT",
name_prefix: str = "aiomqtt",
broker_url: str = BROKER_URL,
broker_port: int = BROKER_PORT,
):
self.topic_punches, self.topic_coords, self.topic_status = topics_from_mac(mac_address)
self.name = f"{name_prefix}-{mac_address}"
self.broker_url = broker_url
self.broker_port = broker_port
self.lock = asyncio.Lock()

def __del__(self):
self.client.loop_stop()
Expand All @@ -58,26 +59,34 @@ async def send_punch(
punches = Punches()
punches.punches.append(create_punch_proto(card_number, si_time, code, mode, process_time))
punches.sending_timestamp.GetCurrentTime()
return await self._send(self.topic_punches, punches.SerializeToString())
await self._send(self.topic_punches, punches.SerializeToString(), qos=1)

async def send_coords(self, lat: float, lon: float, alt: float, timestamp: datetime):
coords = create_coords_proto(lat, lon, alt, timestamp)
return await self._send(self.topic_coords, coords.SerializeToString())
await self._send(self.topic_coords, coords.SerializeToString(), qos=0)

async def send_mini_call_home(self, mch: MiniCallHome):
status = Status()
status.mini_call_home.CopyFrom(mch)
return await self._send(self.topic_status, status.SerializeToString(), qos=0)
await asyncio.sleep(1.0) # TODO: lock the client
await self._send(self.topic_status, status.SerializeToString(), qos=0)

async def _send(self, topic: str, message: bytes, qos: int = 1):
async def _send(self, topic: str, msg: bytes, qos: int):
async with self.lock:
try:
await self.client.publish(topic, payload=msg, qos=qos)
logging.info("Message sent")
except MqttCodeError as e:
logging.error(f"Message not sent: {e}")

async def loop(self):
disconnected = Disconnected()
disconnected.client_name = self.name

status = Status()
status.disconnected.CopyFrom(disconnected)
will = Will(topic=self.topic_status, payload=status.SerializeToString(), qos=1)

# TODO: as a first hack this is fine, but the client should be persisted
async with AioMqttClient(
self.broker_url,
self.broker_port,
Expand All @@ -88,13 +97,11 @@ async def _send(self, topic: str, message: bytes, qos: int = 1):
max_inflight_messages=100,
logger=logging.getLogger(),
) as client:
# TODO: Add connection/disconnected notifications
self.client = client
logging.info(f"Connected to mqtt://{BROKER_URL}")
# TODO: emit asyncio.Event

try:
await client.publish(topic, payload=message, qos=qos)
logging.info("Message sent") # TODO: message ID
except MqttCodeError as e:
logging.error(f"Message not sent: {e}")
await asyncio.sleep(10000000.0) # TODO: what about disconnects?


class SIM7020MqttClient(Client):
Expand Down
3 changes: 3 additions & 0 deletions src/yaroc/scripts/send_punch.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ def loop(self):
asyncio.run_coroutine_threadsafe(self.periodic_mini_call_home(), async_loop)
asyncio.run_coroutine_threadsafe(self.send_punches(), async_loop)
asyncio.run_coroutine_threadsafe(self.udev_events(), async_loop)

for client in self.clients: # TODO: separate loop?
asyncio.run_coroutine_threadsafe(client.loop(), async_loop)
async_loop.run_forever()


Expand Down

0 comments on commit f377975

Please sign in to comment.