-
Notifications
You must be signed in to change notification settings - Fork 1
/
com_monitor.py
70 lines (58 loc) · 1.96 KB
/
com_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from queue import Queue, Empty
import threading
import time
import serial
class ComMonitorThread(threading.Thread):
def __init__(self, data_q, error_q, port_num, port_baud,
port_stopbits=serial.STOPBITS_ONE,
port_parity=serial.PARITY_NONE, port_timeout=0.01):
threading.Thread.__init__(self)
self.serial_port = None
self.serial_arg = dict(port=port_num,
baudrate=port_baud,
stopbits=port_stopbits,
parity=port_parity,
timeout=port_timeout)
self.data_q = data_q
self.error_q = error_q
self.alive = threading.Event()
self.alive.set()
def run(self):
try:
if self.serial_port:
self.serial_port.close()
self.serial_port = serial.Serial(**self.serial_arg)
except serial.SerialException as e:
self.error_q.put(str(e))
return
time.time()
while self.alive.isSet():
data = self.serial_port.read(1)
# print(data)
data += self.serial_port.read(self.serial_port.inWaiting())
if len(data) > 0:
timestamp = time.time()
temp = (data, timestamp)
self.data_q.put((data, timestamp))
# clean up
if self.serial_port:
self.serial_port.close()
def join(self, timeout=None):
self.alive.clear()
threading.Thread.join(self, timeout)
if __name__ == '__main__':
data_q = Queue()
error_q = Queue()
com = ComMonitorThread( data_q,
error_q,
'/dev/pts/5',
38400)
com.start()
while True:
try:
item = data_q.get_nowait()
print(item)
except Empty:
print('no item')
pass
time.sleep(0.5)