Skip to content

Commit

Permalink
Fix MQTT reconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
lukipuki committed Oct 29, 2023
1 parent 4b9d6bc commit 3ddb15a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
4 changes: 4 additions & 0 deletions src/yaroc/clients/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ async def loop(self):
await asyncio.sleep(10000000.0)
except MqttError:
logging.error(f"Connection lost to mqtt://{BROKER_URL}")
await asyncio.sleep(5.0)


class SIM7020MqttClient(Client):
Expand Down Expand Up @@ -134,6 +135,9 @@ def __init__(
self._send_punches, 3.0, 2.0, timedelta(hours=3), retry_loop, batch_count=4
)

async def loop(self):
await asyncio.sleep(10000000.0)

def _handle_registration(self, line: str):
return self._retries.execute(self._sim7020.mqtt_connect)

Expand Down
23 changes: 15 additions & 8 deletions src/yaroc/scripts/mqtt_forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Dict

from aiomqtt import Client as MqttClient
from aiomqtt import Message
from aiomqtt import Message, MqttError
from aiomqtt.types import PayloadType
from google.protobuf.timestamp_pb2 import Timestamp

Expand All @@ -15,6 +15,8 @@
from ..pb.punches_pb2 import Punches
from ..pb.status_pb2 import Status
from ..utils.container import Container, create_clients
BROKER_URL = "broker.hivemq.com"
BROKER_PORT = 1883


class MqttForwader:
Expand Down Expand Up @@ -134,13 +136,18 @@ async def loop(self):
for client in clients:
asyncio.run_coroutine_threadsafe(client.loop(), async_loop)

async with MqttClient("broker.hivemq.com", 1883, timeout=30) as client:
logging.info("Connected to mqtt://broker.hivemq.com")
async with client.messages() as messages:
for mac_addr in self.clients.keys():
await client.subscribe(f"yaroc/{mac_addr}/#", qos=1)
async for message in messages:
await self._on_message(message)
while True:
try:
async with MqttClient(BROKER_URL, BROKER_PORT, timeout=30) as client:
logging.info(f"Connected to mqtt://{BROKER_URL}")
async with client.messages() as messages:
for mac_addr in self.clients.keys():
await client.subscribe(f"yaroc/{mac_addr}/#", qos=1)
async for message in messages:
await self._on_message(message)
except MqttError:
logging.error(f"Connection lost to mqtt://{BROKER_URL}")
await asyncio.sleep(5.0)


def main():
Expand Down

0 comments on commit 3ddb15a

Please sign in to comment.