forked from kendrik-wah/cs3237_weather_nowcasting
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cc2650_manual_read.py
141 lines (108 loc) · 4.64 KB
/
cc2650_manual_read.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
"""
TI CC2650 SensorTag
-------------------
Adapted by Ashwin from the following sources:
- https://github.com/IanHarvey/bluepy/blob/a7f5db1a31dba50f77454e036b5ee05c3b7e2d6e/bluepy/sensortag.py
- https://github.com/hbldh/bleak/blob/develop/examples/sensortag.py
"""
import asyncio
import datetime
import platform
import struct
import time
from bleak import BleakClient
class Service:
"""
Here is a good documentation about the concepts in ble;
https://learn.adafruit.com/introduction-to-bluetooth-low-energy/gatt
In TI SensorTag there is a control characteristic and a data characteristic which define a service or sensor
like the Light Sensor, Humidity Sensor etc
Please take a look at the official TI user guide as well at
https://processors.wiki.ti.com/index.php/CC2650_SensorTag_User's_Guide
"""
def __init__(self):
self.data_uuid = None
self.ctrl_uuid = None
self.period_uuid = None
async def read(self, client):
raise NotImplementedError()
class Sensor(Service):
def callback(self, sender: int, data: bytearray):
raise NotImplementedError()
async def enable(self, client, *args):
# start the sensor on the device
write_value = bytearray([0x01])
await client.write_gatt_char(self.ctrl_uuid, write_value)
write_value = bytearray([0x0A]) # check the sensor period applicable values in the sensor tag guide mentioned above
await client.write_gatt_char(self.period_uuid, write_value)
return self
async def read(self, client):
val = await client.read_gatt_char(self.data_uuid)
return self.callback(1, val)
class OpticalSensor(Sensor):
def __init__(self):
super().__init__()
self.data_uuid = "f000aa71-0451-4000-b000-000000000000"
self.ctrl_uuid = "f000aa72-0451-4000-b000-000000000000"
self.period_uuid = "f000aa73-0451-4000-b000-000000000000"
def callback(self, sender: int, data: bytearray):
tt = datetime.datetime.now()
raw = struct.unpack('<h', data)[0]
m = raw & 0xFFF
e = (raw & 0xF000) >> 12
# print(f"[OpticalSensor @ {tt}] Reading from light sensor: {0.01 * (m << e)}")
return 0.01 * (m << e)
class HumiditySensor(Sensor):
def __init__(self):
super().__init__()
self.data_uuid = "f000aa21-0451-4000-b000-000000000000"
self.ctrl_uuid = "f000aa22-0451-4000-b000-000000000000"
self.period_uuid = "f000aa23-0451-4000-b000-000000000000"
def callback(self, sender: int, data: bytearray):
(rawT, rawH) = struct.unpack('<HH', data)
temp = -40.0 + 165.0 * (rawT / 65536.0)
RH = 100.0 * (rawH / 65536.0)
return temp, RH
class BatteryService(Service):
def __init__(self):
super().__init__()
self.data_uuid = "00002a19-0000-1000-8000-00805f9b34fb"
async def read(self, client):
val = await client.read_gatt_char(self.data_uuid)
return int(val[0])
async def run(address):
async with BleakClient(address) as client:
x = await client.is_connected()
print("Connected: {0}".format(x))
light_sensor = await OpticalSensor().enable(client)
humidity_sensor = await HumiditySensor().enable(client)
battery = BatteryService()
prev_batter_reading_time = time.time()
batter_reading = await battery.read(client)
print("Battery Reading", batter_reading)
while True:
# set according to your period in the sensors; otherwise sensor will return same value for all the readings
# till the sensor refreshes as defined in the period
await asyncio.sleep(0.08) # slightly less than 100ms to accommodate time to print results
data = await asyncio.gather(light_sensor.read(client), humidity_sensor.read(client))
print(data)
if time.time() - prev_batter_reading_time > 15 * 60: # 15 mins
batter_reading = await battery.read(client)
print("Battery Reading", batter_reading)
prev_batter_reading_time = time.time()
if __name__ == "__main__":
"""
To find the address, once your sensor tag is blinking the green led after pressing the button, run the discover.py
file which was provided as an example from bleak to identify the sensor tag device
"""
import os
os.environ["PYTHONASYNCIODEBUG"] = str(1)
address = (
"54:6c:0e:b5:56:00"
if platform.system() != "Darwin"
else "6FFBA6AE-0802-4D92-B1CD-041BE4B4FEB9"
)
loop = asyncio.get_event_loop()
loop.run_until_complete(run(address))
loop.run_forever()