forked from misdoro/Electronic_load_px100
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
56 lines (43 loc) · 1.65 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from signal import signal, SIGTERM, SIGINT
from sys import exit
from PyQt5.QtCore import QCoreApplication, QThreadPool
from data_store import DataStore
from gui.gui import GUI
from instr_thread import InstrumentWorker
class Main:
def __init__(self):
QCoreApplication.setOrganizationName('github.com/misdoro')
QCoreApplication.setApplicationName('Battery tester')
self.threadpool = QThreadPool()
self.instr_thread()
self.datastore = DataStore()
signal(SIGTERM, self.terminate_process)
signal(SIGINT, self.terminate_process)
self.data_receivers = set()
GUI(self)
def instr_thread(self):
self.instr_worker = InstrumentWorker()
self.instr_worker.signals.data_row.connect(self.data_callback)
self.instr_worker.signals.status_update.connect(self.status_callback)
self.threadpool.start(self.instr_worker)
self.instr_worker.signals.start.emit()
def subscribe(self, receiver):
self.data_receivers.add(receiver)
def data_callback(self, data):
self.datastore.append(data)
for r in self.data_receivers:
r.data_row(self.datastore, data)
def status_callback(self, status):
for r in self.data_receivers:
if hasattr(r, 'status_update'):
r.status_update(status)
def send_command(self, command):
self.instr_worker.signals.command.emit(command)
def at_exit(self):
self.instr_worker.signals.exit.emit()
self.threadpool.waitForDone()
def terminate_process(self, signal, _stack):
self.at_exit()
exit()
if __name__ == "__main__":
Main()