-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
68 lines (55 loc) · 2.28 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import xrpl
import wx
import asyncio
from threading import Thread
class XRPLMonitorThread(Thread):
def __init__(self, url, gui):
Thread.__init__(self, daemon = True)
self.gui = gui
self.url = url
self.loop = asyncio.get_event_loop()
self.loop.set_debug(True)
def run(self):
self.loop.run_forever()
async def watch_xrpl(self):
try:
async with xrpl.asyncio.clients.AsyncWebsocketClient(self.url) as self.client:
await self.on_connected()
async for message in self.client:
mtype = message.get("type")
if mtype == "ledgerClosed":
wx.CallAfter(self.gui.update_ledger, message)
except Exception as e:
print(f"XRPL connection error: {e}")
async def on_connected(self):
response = await self.client.request(xrpl.models.requests.Subscribe(
streams=["ledger"]
))
wx.CallAfter(self.gui.update_ledger, response.result)
class TWaXLFrame(wx.Frame):
def __init__(self, url):
wx.Frame.__init__(self, None, title="TWaXDL", size=wx.Size(800, 400))
self.build_ui()
self.worker = XRPLMonitorThread(url, self)
self.worker.start()
self.run_bg_job(self.worker.watch_xrpl())
def build_ui(self):
main_panel = wx.Panel(self)
self.ledger_info = wx.StaticText(main_panel, label="Not connected")
main_sizer = wx.BoxSizer(wx.VERTICAL)
main_sizer.Add(self.ledger_info, 1, flag=wx.EXPAND|wx.ALL, border=5)
main_panel.SetSizer(main_sizer)
def run_bg_job(self, job):
task = asyncio.run_coroutine_threadsafe(job, self.worker.loop)
def update_ledger(self, message):
close_time_iso = xrpl.utils.ripple_time_to_datetime(message["ledger_time"]).isoformat()
self.ledger_info.SetLabel(f"latest validated ledger:\n"
f"ledger index: {message['ledger_index']}\n"
f"ledger hash: {message['ledger_hash']}\n"
f"close time: {close_time_iso}")
if __name__ == "__main__":
WS_URL = "wss://s.altnet.rippletest.net:51233"
app = wx.App()
frame = TWaXLFrame(WS_URL)
frame.Show()
app.MainLoop()