-
Notifications
You must be signed in to change notification settings - Fork 6
/
example.py
57 lines (42 loc) · 1.3 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from pyeventbus import *
import time
class Events:
class EventFromA:
def __init__(self, msg):
self.msg = msg
def getMessage(self):
return self.msg
class EventFromB:
def __init__(self, msg):
self.msg = msg
def getMessage(self):
return self.msg
class A:
def __init__(self):
self.bus = PyBus.Instance()
pass
def register(self, aInstance):
PyBus.Instance().register(aInstance, self.__class__.__name__)
def post(self):
PyBus.Instance().post(Events.EventFromA("EventFromA"))
@subscribe(threadMode = Mode.PARALLEL, onEvent=Events.EventFromB)
def readEventFromB(self, event):
print 'Class A', event.getMessage()
class B:
def __init__(self):
self.bus = PyBus.Instance()
pass
def register(self, bInstance):
PyBus.Instance().register(bInstance, self.__class__.__name__)
@subscribe( threadMode = Mode.BACKGROUND, onEvent=Events.EventFromA)
def readEventFromA(self, event):
print 'Class B:', event.getMessage()
def post(self):
PyBus.Instance().post(Events.EventFromB("EventFromB"))
if __name__ == "__main__":
a = A()
a.register(a)
b = B()
b.register(b)
a.post()
b.post()