Skip to content

Commit

Permalink
add lock to try to deal with concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
tillsteinbach committed Oct 29, 2023
1 parent a49bcd7 commit 8f1f362
Showing 1 changed file with 65 additions and 56 deletions.
121 changes: 65 additions & 56 deletions weconnect_mqtt/weconnect_mqtt_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import time
import ssl
from threading import Thread, Lock

from dateutil import tz

Expand Down Expand Up @@ -362,6 +363,7 @@ def __init__(self, clientId=None, protocol=paho.mqtt.client.MQTTv311, transport=
pictureFormat=None, topicFilterRegex=None, convertTimezone=None, timeFormat=None, withRawJsonTopic=False, passive=False,
updateOnConnect=True):
super().__init__(client_id=clientId, transport=transport, protocol=protocol)
self.lock = Lock()
self.weConnect = None
self.prefix = prefix
self.interval = interval
Expand Down Expand Up @@ -460,60 +462,61 @@ def connectWeConnect(self, weConnect):
def updateWeConnect(self, reraise=False): # noqa: C901
if self.passive:
return
LOG.info('Update data from WeConnect')
self.hasChanges = False
try:
self.weConnect.update(updateCapabilities=self.updateCapabilities, updatePictures=self.updatePictures, selective=self.selective, force=True)
self.setConnected(connected=True)
self.setError(code=WeConnectErrors.SUCCESS)
topic = f'{self.prefix}/mqtt/weconnectUpdated'
convertedTime = datetime.utcnow().replace(microsecond=0, tzinfo=timezone.utc)
if self.convertTimezone is not None:
convertedTime = convertedTime.astimezone(self.convertTimezone)
if self.timeFormat is not None:
convertedTimeString = convertedTime.strftime(self.timeFormat)
else:
convertedTimeString = convertedTime.isoformat()
self.publish(topic=topic, qos=1, retain=True,
payload=convertedTimeString)
if topic not in self.topics:
self.addTopic(topic)
except errors.TooManyRequestsError as error:
self.temporaryInterval = 900
self.setConnected(connected=False)
errorMessage = 'Retrieval error during update. Too many requests from your account. Will try again after 15 minutes'
self.setError(code=WeConnectErrors.RETRIEVAL_FAILED, message=errorMessage)
LOG.info(errorMessage)
if reraise:
raise error
except errors.RetrievalError as error:
self.setConnected(connected=False)
errorMessage = f'Retrieval error during update. Will try again after configured interval of {self.interval}s'
self.setError(code=WeConnectErrors.RETRIEVAL_FAILED, message=errorMessage)
LOG.info(errorMessage)
if reraise:
raise error
except errors.APICompatibilityError as error:
self.setConnected(connected=False)
errorMessage = f'API compatibility error ({str(error)}) during update. Will try again after configured interval of {self.interval}s'
self.setError(code=WeConnectErrors.API_COMPATIBILITY, message=errorMessage)
LOG.info(errorMessage)
if reraise:
raise error
except errors.TemporaryAuthentificationError as error:
self.setConnected(connected=False)
errorMessage = f'Temporary authentification error during update. Will try again after configured interval of {self.interval}s'
self.setError(code=WeConnectErrors.AUTHENTIFICATION, message=errorMessage)
LOG.info(errorMessage)
if reraise:
raise error
except socket.error as error:
self.setConnected(connected=False)
errorMessage = f'Socket error during update. Will try again after configured interval of {self.interval}s'
self.setError(code=WeConnectErrors.RETRIEVAL_FAILED, message=errorMessage)
LOG.info(errorMessage)
if reraise:
raise error
with self.lock:
LOG.info('Update data from WeConnect')
self.hasChanges = False
try:
self.weConnect.update(updateCapabilities=self.updateCapabilities, updatePictures=self.updatePictures, selective=self.selective, force=True)
self.setConnected(connected=True)
self.setError(code=WeConnectErrors.SUCCESS)
topic = f'{self.prefix}/mqtt/weconnectUpdated'
convertedTime = datetime.utcnow().replace(microsecond=0, tzinfo=timezone.utc)
if self.convertTimezone is not None:
convertedTime = convertedTime.astimezone(self.convertTimezone)
if self.timeFormat is not None:
convertedTimeString = convertedTime.strftime(self.timeFormat)
else:
convertedTimeString = convertedTime.isoformat()
self.publish(topic=topic, qos=1, retain=True,
payload=convertedTimeString)
if topic not in self.topics:
self.addTopic(topic)
except errors.TooManyRequestsError as error:
self.temporaryInterval = 900
self.setConnected(connected=False)
errorMessage = 'Retrieval error during update. Too many requests from your account. Will try again after 15 minutes'
self.setError(code=WeConnectErrors.RETRIEVAL_FAILED, message=errorMessage)
LOG.info(errorMessage)
if reraise:
raise error
except errors.RetrievalError as error:
self.setConnected(connected=False)
errorMessage = f'Retrieval error during update. Will try again after configured interval of {self.interval}s'
self.setError(code=WeConnectErrors.RETRIEVAL_FAILED, message=errorMessage)
LOG.info(errorMessage)
if reraise:
raise error
except errors.APICompatibilityError as error:
self.setConnected(connected=False)
errorMessage = f'API compatibility error ({str(error)}) during update. Will try again after configured interval of {self.interval}s'
self.setError(code=WeConnectErrors.API_COMPATIBILITY, message=errorMessage)
LOG.info(errorMessage)
if reraise:
raise error
except errors.TemporaryAuthentificationError as error:
self.setConnected(connected=False)
errorMessage = f'Temporary authentification error during update. Will try again after configured interval of {self.interval}s'
self.setError(code=WeConnectErrors.AUTHENTIFICATION, message=errorMessage)
LOG.info(errorMessage)
if reraise:
raise error
except socket.error as error:
self.setConnected(connected=False)
errorMessage = f'Socket error during update. Will try again after configured interval of {self.interval}s'
self.setError(code=WeConnectErrors.RETRIEVAL_FAILED, message=errorMessage)
LOG.info(errorMessage)
if reraise:
raise error
if self.withRawJsonTopic and self.hasChanges:
topic = f'{self.prefix}/rawjson'
if topic not in self.topics:
Expand Down Expand Up @@ -623,7 +626,10 @@ def on_connect_callback(self, mqttc, obj, flags, rc): # noqa: C901
self.setConnected()

if self.updateOnConnect:
self.updateWeConnect()
backgroundThread = Thread(target=self.updateWeConnect, daemon=True, name='Update in Background')
backgroundThread.start()
else:
LOG.info('waiting for first update from server')
elif rc == 1:
LOG.error('Could not connect (%d): incorrect protocol version', rc)
elif rc == 2:
Expand Down Expand Up @@ -668,7 +674,10 @@ def on_connect_callback_v5(self, mqttc, obj, flags, reasonCode, properties): #
self.setConnected()

if self.updateOnConnect:
self.updateWeConnect()
backgroundThread = Thread(target=self.updateWeConnect, daemon=True, name='Update in Background')
backgroundThread.start()
else:
LOG.info('waiting for first update from server')
elif reasonCode == 128:
LOG.error('Could not connect (%d): Unspecified error', reasonCode)
elif reasonCode == 129:
Expand Down

0 comments on commit 8f1f362

Please sign in to comment.