forked from oliver-sanders/actions-dns-test
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dns_test.py
69 lines (59 loc) · 1.6 KB
/
dns_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import socket
import time
import zmq
from zmq.asyncio import Context, Poller
import asyncio
from async_timeout import timeout
ctx = Context.instance()
async def receiver(host, port):
"""receive messages with polling"""
pull = ctx.socket(zmq.PULL)
url = f'tcp://{host}:{port}'
pull.connect(url)
poller = Poller()
poller.register(pull, zmq.POLLIN)
async with timeout(10):
while True:
try:
events = await poller.poll()
except:
# swallow "task exception was never retrieved" errors
break
if pull in dict(events):
print("recving", events)
msg = await pull.recv_multipart()
print('recvd', msg)
if msg == [b'END']:
break
print('done')
async def sender(port):
"""send a message every second"""
tic = time.time()
push = ctx.socket(zmq.PUSH)
try:
url = f'tcp://*:{port}'
push.bind(url)
for _ in range(3):
print("sending")
await push.send_multipart([str(time.time() - tic).encode('ascii')])
await asyncio.sleep(1)
await push.send_multipart(['END'.encode('ascii')])
finally:
push.close()
port = 43087
for host in [
'127.0.0.1',
socket.gethostname(),
socket.getfqdn(),
]:
print('\n\n\n')
print(f'# {host}:{port}')
asyncio.get_event_loop().run_until_complete(
asyncio.wait(
[
receiver(host, port),
sender(port),
]
)
)
print('\n\n\n')