Skip to content

Commit

Permalink
Update aiomqtt, dependecy injector
Browse files Browse the repository at this point in the history
  • Loading branch information
lukipuki committed May 24, 2024
1 parent cad2dd0 commit ce485b0
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 22 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ classifiers = [
dependencies = [
'aiohttp-retry==2.8.3',
'aiohttp==3.9.*',
'aiomqtt==1.2.*',
'aiomqtt==2.1.*',
'dbus-next==0.2.*; platform_system == "Linux"',
'dependency-injector==4.41.*',
'dependency-injector-fork==4.42.*',
'gpiozero==1.6.*',
'pillow==10.3.*',
'protobuf==4.25.*',
Expand Down
14 changes: 6 additions & 8 deletions python/yaroc/clients/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
from typing import Dict

from aiomqtt import Client as AioMqttClient
from aiomqtt import MqttError
from aiomqtt import MqttCodeError, MqttError
from aiomqtt.client import Will
from aiomqtt.error import MqttCodeError

from ..pb.punches_pb2 import Punch, Punches
from ..pb.status_pb2 import Disconnected, Status
Expand Down Expand Up @@ -70,7 +69,7 @@ def __init__(
self.broker_port,
timeout=20,
will=will,
client_id=self.name,
identifier=self.name,
clean_session=False,
max_inflight_messages=100,
logger=logging.getLogger(),
Expand Down Expand Up @@ -133,11 +132,10 @@ async def loop(self):
try:
async with self.client:
logging.info(f"Connected to mqtt://{BROKER_URL}")
async with self.client.messages() as messages:
topics = self.get_topics(self.mac_addr)
await self.client.subscribe(topics.command)
async for message in messages:
logging.info("Got a command message, processing is not implemented")
topics = self.get_topics(self.mac_addr)
await self.client.subscribe(topics.command)
async for message in self.client.messages:
logging.info("Got a command message, processing is not implemented")

except MqttError:
logging.error(f"Connection lost to mqtt://{BROKER_URL}")
Expand Down
2 changes: 1 addition & 1 deletion python/yaroc/clients/roc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def loop(self):
client_session=session, raise_for_status=True, retry_options=retry_options
)
async with self.client:
await asyncio.sleep(10000000) # We need to sleep, otherwise the client will be GC-ed
await asyncio.sleep(10000000) # We need to sleep, otherwise the client will be GC-ed

async def send_punch(
self,
Expand Down
21 changes: 10 additions & 11 deletions python/yaroc/sources/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,16 @@ async def loop(self):
logger=logging.getLogger(),
) as client:
logging.info(f"Connected to mqtt://{BROKER_URL}")
async with client.messages() as messages:
for mac_addr in online_macs:
await client.subscribe(f"yar/{mac_addr}/#", qos=1)
for mac_addr in radio_macs:
await client.subscribe(f"yar/2/e/serial/!{mac_addr}", qos=1)
await client.subscribe(
f"yar/2/e/{self.meshtastic_channel}/!{mac_addr}", qos=1
)

async for message in messages:
asyncio.create_task(self._on_message(message))
for mac_addr in online_macs:
await client.subscribe(f"yar/{mac_addr}/#", qos=1)
for mac_addr in radio_macs:
await client.subscribe(f"yar/2/e/serial/!{mac_addr}", qos=1)
await client.subscribe(
f"yar/2/e/{self.meshtastic_channel}/!{mac_addr}", qos=1
)

async for message in client.messages:
asyncio.create_task(self._on_message(message))
except MqttError:
logging.error(f"Connection lost to mqtt://{BROKER_URL}")
await asyncio.sleep(5.0)
Expand Down

0 comments on commit ce485b0

Please sign in to comment.