-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
64 lines (53 loc) · 2.39 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Source: https://stackoverflow.com/questions/35237245/how-to-create-a-websocket-client-by-using-qwebsocket-in-pyqt5
import json
from PyQt6 import QtCore, QtNetwork, QtWebSockets
from PyQt6.QtWidgets import QApplication
last = {
"user": "System",
"msg": "Welcome! You first here!"
}
class MyServer(QtCore.QObject):
def __init__(self, parent):
super(QtCore.QObject, self).__init__(parent)
self.clients = []
self.server = QtWebSockets.QWebSocketServer(parent.serverName(), parent.secureMode(), parent)
if self.server.listen(QtNetwork.QHostAddress("localhost"), 1302):
print('Connected: '+self.server.serverName()+' : '
+self.server.serverAddress().toString()+':'+str(self.server.serverPort()))
else:
print('error')
QApplication.quit()
self.server.newConnection.connect(self.onNewConnection)
self.clientConnection = None
print(self.server.isListening())
def onNewConnection(self):
sock = self.server.nextPendingConnection()
sock.textMessageReceived.connect(lambda msg: self.processTextMessage(sock, msg))
sock.binaryMessageReceived.connect(lambda msg: self.processBinaryMessage(sock, msg))
sock.disconnected.connect(lambda: self.socketDisconnected(sock))
print("newClient")
self.clients.append(sock)
def processTextMessage(self, sock: QtWebSockets.QWebSocket, message):
data = json.loads(message)
print(f"[{data['user']}]: {data['msg']}")
last = data
if self.clientConnection:
for client in self.clients:
# if client!= self.clientConnection:
client.sendTextMessage(message)
# self.clientConnection.sendTextMessage(message)
def processBinaryMessage(self, sock: QtWebSockets.QWebSocket, message):
print("b:",message)
if self.clientConnection:
self.clientConnection.sendBinaryMessage(message)
def socketDisconnected(self, sock: QtWebSockets.QWebSocket):
if sock:
self.clients.remove(sock)
sock.deleteLater()
if __name__ == '__main__':
import sys
app = QApplication(sys.argv)
serverObject = QtWebSockets.QWebSocketServer('My Socket', QtWebSockets.QWebSocketServer.SslMode(1))
server = MyServer(serverObject)
serverObject.closed.connect(app.quit)
print(f"Exit code: {app.exec()}")