-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.py
74 lines (60 loc) · 2.56 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import asyncio
import configparser
import json
import logging
import os
import sys
import paho.mqtt.publish as publish
from bleak import BleakClient, BleakScanner
characteristic = "0000ffe4-0000-1000-8000-00805f9b34fb"
wait_time = 120 # I haven't yet figured out how to make ad-hoc read
class InkBird:
def __init__(self, config):
self.config: configparser.ConfigParser = config
self.client = None
async def connect(self):
logging.info('Searching for device..')
inkbird = await BleakScanner.find_device_by_name(self.config['device']['alias'])
if not inkbird:
return logging.error('Device not found!')
else:
logging.info(f'Found device {inkbird}')
async with BleakClient(inkbird) as client:
self.client = client
logging.info(f"Client connection: {client.is_connected}")
await client.start_notify(characteristic, self.notification_callback)
await asyncio.sleep(wait_time)
await self.disconnect()
async def notification_callback(self, sender, data):
sign = data[4]
temperature = data[5] << 8 | data[6]
json_data = {
'temperature': (temperature if sign == 0 else -temperature) / 10,
'co2': data[9] << 8 | data[10],
'humidity': (data[7] << 8 | data[8]) / 10,
'atmospheric_pressure': data[11] << 8 | data[12]
}
logging.info(json_data)
self.publish_mqtt(json_data)
await self.disconnect()
def publish_mqtt(self, json_data):
user = self.config['mqtt']['user']
password = self.config['mqtt']['password']
auth = None if not user or not password else {"username": user, "password": password}
publish.single(
self.config['mqtt']['topic'], payload=json.dumps(json_data),
hostname=self.config['mqtt']['server'], port=self.config['mqtt'].getint('port'),
auth=auth, client_id="inkbird-reader"
)
async def disconnect(self):
if self.client and self.client.is_connected:
await self.client.stop_notify(characteristic)
await self.client.disconnect()
os._exit(os.EX_OK)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
config_file = sys.argv[1] if len(sys.argv) > 1 else 'config.ini'
config_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), config_file)
config = configparser.ConfigParser(inline_comment_prefixes=('#'))
config.read(config_path)
asyncio.run(InkBird(config).connect())