-
Notifications
You must be signed in to change notification settings - Fork 0
/
mh_z19b_mcu_reset.py
311 lines (270 loc) · 10.8 KB
/
mh_z19b_mcu_reset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# coding=utf-8
import time
import copy
from mycodo.inputs.base_input import AbstractInput
from mycodo.inputs.sensorutils import is_device
def constraints_pass_measure_range(mod_input, value):
"""
Check if the user input is acceptable
:param mod_input: SQL object with user-saved Input options
:param value: float
:return: tuple: (bool, list of strings)
"""
errors = []
all_passed = True
# Ensure valid range is selected
if value not in ['1000', '2000', '3000', '5000', '10000']:
all_passed = False
errors.append("Invalid range")
return all_passed, errors, mod_input
# Measurements
measurements_dict = {
0: {
'measurement': 'co2',
'unit': 'ppm'
}
}
# Input information
INPUT_INFORMATION = {
'input_name_unique': 'MH_Z19B_MCU_RESET',
'input_manufacturer': 'Winsen',
'input_name': 'MH-Z19B w/Reset',
'input_library': 'serial',
'measurements_name': 'CO2',
'measurements_dict': measurements_dict,
'url_manufacturer': 'https://www.winsen-sensor.com/sensors/co2-sensor/mh-z19b.html',
'url_datasheet': 'https://www.winsen-sensor.com/d/files/MH-Z19B.pdf',
'message': 'This is the B version of the sensor that includes the ability to conduct '
'automatic baseline correction (ABC).',
'options_enabled': [
'uart_location',
'uart_baud_rate',
'period',
'pre_output'
],
'options_disabled': ['interface'],
'interfaces': ['UART'],
'uart_location': '/dev/ttyAMA0',
'uart_baud_rate': 9600,
'custom_options': [
{
'id': 'abc_enable',
'type': 'bool',
'default_value': False,
'name': 'Automatic Baseline Correction',
'phrase': 'Enable automatic baseline correction (ABC)'
},
{
'id': 'measure_range',
'type': 'select',
'default_value': '5000',
'options_select': [
('1000', '0 - 1000 ppmv'),
('2000', '0 - 2000 ppmv'),
('3000', '0 - 3000 ppmv'),
('5000', '0 - 5000 ppmv'),
('10000', '0 - 10000 ppmv'),
],
'required': True,
'constraints_pass': constraints_pass_measure_range,
'name': 'Measurement Range',
'phrase': 'Set the measuring range of the sensor'
}
],
'custom_commands_message': 'Zero point calibration: activate the sensor in a 400 ppmv CO2 environment (outside '
'air), allow to run for 20 minutes, then press the Calibrate Zero Point button.<br>Span '
'point calibration: activate the sensor in an environment with a stable CO2 concentration'
' between 1000 and 2000 ppmv (2000 recommended), allow to run for 20 minutes, enter the '
'ppmv value in the Span Point (ppmv) input field, then press the Calibrate Span Point '
'button. If running a span point calibration, run a zero point calibration first. A span '
'point calibration is not necessary and should only be performed if you know what you are'
' doing and can accurately produce a 2000 ppmv environment.',
'custom_commands': [
{
'id': 'calibrate_zero_point',
'type': 'button',
'name': 'Calibrate Zero Point'
},
{
'id': 'span_point_value_ppmv',
'type': 'integer',
'default_value': 2000,
'name': 'Span Point (ppmv)',
'phrase': 'The ppmv concentration for a span point calibration'
},
{
'id': 'calibrate_span_point',
'type': 'button',
'name': 'Calibrate Span Point'
},
{
'id': 'mcu_reset',
'type': 'button',
'name': 'MCU Reset',
'phrase': 'Click to reset your unit (if you have perhaps accidentally calibrated a span point)'
}
]
}
class InputModule(AbstractInput):
"""A sensor support class that monitors the MH-Z19's CO2 concentration."""
def __init__(self, input_dev, testing=False):
super().__init__(input_dev, testing=testing, name=__name__)
self.ser = None
self.measuring = None
self.calibrating = None
self.measure_range = None
self.abc_enable = False
if not testing:
self.setup_custom_options(
INPUT_INFORMATION['custom_options'], input_dev)
self.try_initialize()
def initialize(self):
import serial
if is_device(self.input_dev.uart_location):
try:
self.ser = serial.Serial(
port=self.input_dev.uart_location,
baudrate=self.input_dev.baud_rate,
timeout=1,
writeTimeout=5)
except serial.SerialException:
self.logger.exception('Opening serial')
else:
self.logger.error('Could not open "{dev}". Check the device location is correct.'.format(
dev=self.input_dev.uart_location))
if self.abc_enable:
self.abcon()
else:
self.abcoff()
if self.measure_range:
self.set_measure_range(self.measure_range)
time.sleep(0.1)
def get_measurement(self):
"""Gets the MH-Z19's CO2 concentration in ppmv."""
if not self.ser:
self.logger.error("Error 101: Device not set up. See https://kizniche.github.io/Mycodo/Error-Codes#error-101 for more info.")
return
self.return_dict = copy.deepcopy(measurements_dict)
while self.calibrating:
time.sleep(0.1)
self.measuring = True
try:
self.ser.flushInput()
self.ser.write(bytearray([0xff, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79]))
time.sleep(.01)
resp = self.ser.read(9)
if not resp:
self.logger.debug("No response")
elif len(resp) < 4:
self.logger.debug("Too few values in response '{}'".format(resp))
elif resp[0] != 0xff or resp[1] != 0x86:
self.logger.error("Bad checksum")
elif len(resp) >= 4:
high = resp[2]
low = resp[3]
co2 = (high * 256) + low
self.value_set(0, co2)
except:
self.logger.exception("get_measurement()")
finally:
self.measuring = False
return self.return_dict
def abcoff(self):
"""
Turns off Automatic Baseline Correction feature of "B" type sensor.
Should be run once at the beginning of every activation.
"""
self.ser.write(bytearray([0xff, 0x01, 0x79, 0x00, 0x00, 0x00, 0x00, 0x00, 0x86]))
def abcon(self):
"""
Turns on Automatic Baseline Correction feature of "B" type sensor.
"""
self.ser.write(bytearray([0xff, 0x01, 0x79, 0xa0, 0x00, 0x00, 0x00, 0x00, 0xe6]))
def set_measure_range(self, measure_range):
"""
Sets the measurement range. Options are: '1000', '2000', '3000', '5000' or '10000' (ppmv)
:param measure_range: string
:return: None
"""
if measure_range == '1000':
self.ser.write(bytearray([0xff, 0x01, 0x99, 0x00, 0x00, 0x00, 0x03, 0xe8, 0x7b]))
elif measure_range == '2000':
self.ser.write(bytearray([0xff, 0x01, 0x99, 0x00, 0x00, 0x00, 0x07, 0xd0, 0x8f]))
elif measure_range == '3000':
self.ser.write(bytearray([0xff, 0x01, 0x99, 0x00, 0x00, 0x00, 0x0b, 0xb8, 0xa3]))
elif measure_range == '5000':
self.ser.write(bytearray([0xff, 0x01, 0x99, 0x00, 0x00, 0x00, 0x13, 0x88, 0xcb]))
elif measure_range == '10000':
self.ser.write(bytearray([0xff, 0x01, 0x99, 0x00, 0x00, 0x00, 0x27, 0x10, 0x2f]))
else:
return "out of range"
def calibrate_span_point(self, args_dict):
"""
Span Point Calibration
from https://github.com/UedaTakeyuki/mh-z19
"""
if 'span_point_value_ppmv' not in args_dict:
self.logger.error("Cannot conduct span point calibration without a ppmv value")
return
if not isinstance(args_dict['span_point_value_ppmv'], int):
self.logger.error("ppmv value does not represent an integer: '{}', type: {}".format(
args_dict['span_point_value_ppmv'], type(args_dict['span_point_value_ppmv'])))
return
while self.measuring:
time.sleep(0.1)
self.calibrating = True
try:
self.logger.info("Conducting span point calibration with a value of {} ppmv".format(
args_dict['span_point_value_ppmv']))
b3 = args_dict['span_point_value_ppmv'] // 256
b4 = args_dict['span_point_value_ppmv'] % 256
c = self.checksum([0x01, 0x88, b3, b4])
self.ser.write(bytearray([0xff, 0x01, 0x88, b3, b4, 0x00, 0x0b, 0xb8, c]))
time.sleep(0.1)
except:
self.logger.exception()
finally:
self.calibrating = False
# byte3 = struct.pack('B', b3)
# byte4 = struct.pack('B', b4)
# request = b"\xff\x01\x88" + byte3 + byte4 + b"\x00\x00\x00" + c
# self.ser.write(request)
def calibrate_zero_point(self, args_dict):
"""
Zero Point Calibration
from https://github.com/UedaTakeyuki/mh-z19
"""
while self.measuring:
time.sleep(0.1)
self.calibrating = True
try:
self.logger.info("Conducting zero point calibration")
self.ser.write(bytearray([0xff, 0x01, 0x87, 0x00, 0x00, 0x00, 0x00, 0x00, 0x78]))
time.sleep(0.1)
except:
self.logger.exception()
finally:
self.calibrating = False
# request = b"\xff\x01\x87\x00\x00\x00\x00\x00\x78"
# self.ser.write(request)
def mcu_reset(self, args_dict):
"""
MCU Reset
"""
while self.measuring:
time.sleep(0.1)
# CMD for MCU Reset: 0x8D
# Command format: 0xFF, 0x01, CMD, (5 bytes of parameters), CKSUM
# There will be no response
try:
self.logger.info("Conducting MCU reset")
self.ser.write(bytearray([0xff, 0x01, 0x8D, 0x00, 0x00, 0x00, 0x00, 0x00, 0x73]))
time.sleep(0.1)
except:
self.logger.exception()
# request = b"\xff\x01\0x8D\x00\x00\x00\x00\x00\x73"
# self.ser.write(request)
@staticmethod
def checksum(array):
return 0xff - (sum(array) % 0x100) + 1
# return struct.pack('B', 0xff - (sum(array) % 0x100) + 1)