diff --git a/project/server/server.py b/project/server/server.py index 762e438..61b9d90 100755 --- a/project/server/server.py +++ b/project/server/server.py @@ -58,23 +58,28 @@ def watch_children(children): # master died print("NOTIFICATION:", deadSet, " [master] died. ") if portno == deadInstanceBackup: + printout("[MASTER-BACKUP]", MAGENTA) print("MASTER DIED, MASTER'S BACKUP DOING HANDOVER. ") # master's backup slave zk.set('/meta/master', (str(deadInstanceBackup)+'/backup').encode('utf-8')) # setting new master - print("NOTIFICATION:", deadSet, " [master] died. ") + printout("[MASTER-BACKUP]", MAGENTA) print("SLAVE PERSISTING DEAD MASTER CONFIG") config["lastDead"]["portno"] = deadInstancePort config["lastDead"]["backup"] = deadInstanceBackup + printout("[MASTER-BACKUP]", MAGENTA) print("HANDOVER COMPLETE. ") + printout("[MASTER-BACKUP]", MAGENTA) print("SETTING CONFIG: ", config) zk.set('/meta/config', json.dumps(config).encode('utf-8')) else: # slave died + printout("[SLAVE-BACKUP]", MAGENTA) print("NOTIFICATION:", deadSet, " [slave] died. ") masterPort, _ = zk.get('meta/master') masterPort = masterPort.decode("utf-8") if portno == int(masterPort): #inside master + printout("[SLAVE-BACKUP]", MAGENTA) print("MASTER PERSISTING DEAD SLAVE CONFIG") config["lastDead"]["portno"] = deadInstancePort config["lastDead"]["backup"] = deadInstanceBackup @@ -83,6 +88,7 @@ def watch_children(children): if value == deadInstancePort: MasterService.keyRanges[key] = str(deadInstanceBackup) + "/backup" + printout("[SLAVE-BACKUP]", MAGENTA) print("SETTING CONFIG: ", config) zk.set('/meta/config', json.dumps(config).encode('utf-8')) @@ -91,7 +97,9 @@ def watch_children(children): def scheduleSignals(a='default'): + printout("[MASTER]", RED) print("Start Signals Scheduled") + printout("[MASTER]", RED) print("Child Watcher Scheduled") scheduleChildWatcher() if portno == 8080: @@ -114,14 +122,17 @@ def scheduleSignals(a='default'): def scheduleMasterKeySet(): # master keyset and propogate to backup - print("MASTER KEYSET RUNNING") + printout("[MASTER]", RED) + print("MASTER BACKUP KEYSET SCHEDULED") MasterService.keyRange["range"] = ranges[0] MasterService.keyRange["status"] = "True" lastport, _ = zk.get('/meta/lastport') lastport = int(lastport.decode('utf-8')) numOfServers = lastport - 8080 + 1 MasterService.keyRange["backupPort"] = 8080 + ((8080 - portno) + 1) % numOfServers + printout("[MASTER]", RED) print("KEYRANGE: ", MasterService.keyRange["range"]) + printout("[MASTER]", RED) print("ws://127.0.0.1:" + str(MasterService.keyRange["backupPort"]) + "/backup") ss = create_connection("ws://localhost:" + str(MasterService.keyRange["backupPort"]) + "/backup") @@ -130,10 +141,9 @@ def scheduleMasterKeySet(): "params": {"data": MasterService.keyRange["range"], "master": "true", "keyRanges": MasterService.keyRanges, "backupPort": MasterService.keyRange["backupPort"]} } - print("SIGNAL", signal) - ss.send(json.dumps(signal)) ss.close() + printout("[MASTER]", RED) print ("GOT ACK. ") time.sleep(6) @@ -141,7 +151,8 @@ def scheduleMasterKeySet(): scheduleSignals() def clusterStatusUp(): - print(" ------------- CLUSTER STATUS UP ----------------- ") + printout("[MASTER]", RED) + print(" ------- CLUSTER STATUS UP ------- ") zk.set('/meta/status', 'STABLE'.encode('utf-8')) def signalScheduler(): @@ -213,7 +224,6 @@ def put(data, key, value): class BaseService: def __init__(self, proto): - print("init") self.proto = proto self.is_closed = False @@ -230,7 +240,6 @@ class KeyStoreService(BaseService): data = {} keyRange = {"status": "false", "range": "", "backupPort": ""} - test = 0 def onMessage(self, payload, isBinary): if not isBinary: @@ -257,6 +266,7 @@ def onMessage(self, payload, isBinary): lastport, _ = zk.get('/meta/lastport') lastport = int(lastport.decode('utf-8')) numOfServers = lastport - 8080 + 1 + printout("[SLAVE]", YELLOW) print("PORT NUMBER", portno) self.keyRange["backupPort"] = 8080 + (abs(8080 - portno) + 1) % numOfServers printout("[SLAVE]", YELLOW) @@ -265,7 +275,9 @@ def onMessage(self, payload, isBinary): #print("Sleeping for all clients to awake. ") #time.sleep(10) payload["params"]["backupPort"] = self.keyRange["backupPort"] + printout("[SLAVE]", YELLOW) print("Finished Sleeping") + printout("[SLAVE]", YELLOW) print("ws://127.0.0.1:" + str(self.keyRange["backupPort"]) + "/backup") success = 0 ss = None @@ -274,11 +286,13 @@ def onMessage(self, payload, isBinary): ss = create_connection("ws://localhost:" + str(self.keyRange["backupPort"]) + "/backup") success = 1 except Exception: + printout("[SLAVE]", YELLOW) print("Server busy, trying again. ") time.sleep(5) ss.send(json.dumps(payload)) ss.close() + printout("[SLAVE]", YELLOW) print ("GOT ACK. ") global persistList @@ -330,12 +344,15 @@ def onMessage(self, payload, isBinary): try: ws = create_connection("ws://localhost:" + str(self.keyRange["backupPort"]) + "/backup") ws.send(json.dumps(payload)) - print ("Reeiving...") + printout("[SLAVE]", YELLOW) + print ("Receiving...") result = ws.recv() - print ("got ack. ") + printout("[SLAVE]", YELLOW) + print ("Got ACK") ws.close() except Exception as e: - print("UNABLE TO CONTACT BACKUP SERVER. MAYBE SERVER DOWN? ") + printout("[SLAVE]", YELLOW) + print("UNABLE TO CONTACT BACKUP SERVER. SERVER MIGHT POSSIBLY BE DOWN ") else: res['status'] = codes.ERR_KEY_NOT_RESPONSIBLE @@ -347,8 +364,8 @@ def onMessage(self, payload, isBinary): res['status'] = str(codes.FAIL.name) res['data'] = "Operation not permitted" - print(res) msg = json.dumps(res) + printout("[SLAVE]", YELLOW) print("SERVER SENT: " + msg) self.proto.sendMessage(msg.encode('utf8')) @@ -376,6 +393,7 @@ def checkKeyRange(self, key): def onMessage(self, payload, isBinary): if not isBinary: payload = json.loads(payload.decode('utf-8')) + printout("[MASTER]", RED) print("MASTER SERVER RECEIVED: " + str(payload)) payloadType = payload['type'].upper() payloadParams = payload['params'] @@ -387,6 +405,7 @@ def onMessage(self, payload, isBinary): res['status'] = str(codes.ERR_SERVER_NOT_INIT.name) if payloadType == 'REINCARNATE': + printout("[MASTER]", RED) print("REINCARNATION REQUEST RECIEVED") deadInstancePort = payloadParams["deadPort"] deadInstanceBackup = payloadParams["backupPort"] @@ -395,6 +414,7 @@ def onMessage(self, payload, isBinary): if value == str(deadInstanceBackup)+"/backup": self.keyRanges[key] = deadInstancePort + printout("[MASTER]", RED) print("REINCARNATION SUCCESSFUL") elif payloadType == 'REPLICA': @@ -407,7 +427,6 @@ def onMessage(self, payload, isBinary): res['status'] = str(codes.SUCCESS.name) elif payloadType == 'GET': - print(self.keyRange["range"]) keyRangeCheck = self.checkKeyRange(payloadParams['key']) if keyRangeCheck == codes.SUCCESS: # key in master @@ -421,7 +440,6 @@ def onMessage(self, payload, isBinary): # key not responsible res["status"] = str(codes.ERR_KEY_NOT_RESPONSIBLE.name) res["data"] = str(keyRangeCheck) - print(res) #elif payloadType == 'GETMULTIPLE': # result = {} @@ -441,12 +459,15 @@ def onMessage(self, payload, isBinary): try: ws = create_connection("ws://localhost:" + str(self.keyRange["backupPort"]) + "/backup") ws.send(json.dumps(payload)) - print ("Reeiving...") + printout("[MASTER]", RED) + print ("Receiving...") result = ws.recv() - print ("got ack. ") + printout("[MASTER]", RED) + print ("Got ACK") ws.close() except Exception as e: - print("UNABLE TO CONTACT BACKUP SERVER. MAYBE SERVER DOWN?") + printout("[MASTER]", RED) + print("UNABLE TO CONTACT BACKUP SERVER. SERVER MIGHT POSSIBLY BE DOWN") if result == codes.ERR_KEY_ALREADY_EXISTS: res['status'] = str(result.name) else: @@ -462,6 +483,7 @@ def onMessage(self, payload, isBinary): res['data'] = "Operation not permitted" msg = json.dumps(res) + printout("[MASTER]", RED) print("SERVER SENT: " + msg) self.proto.sendMessage(msg.encode('utf8')) @@ -489,14 +511,17 @@ def onMessage(self, payload, isBinary): if not isBinary: payload = json.loads(payload.decode('utf-8')) - print("BACKUP RECEIVED: ", str(payload)) payloadType = payload['type'].upper() payloadParams = payload['params'] + printStr = "[MASTER-BACKUP]" if "master" in payloadParams else "[SLAVE-BACKUP]" + printout(printStr, MAGENTA) + print("BACKUP RECEIVED: ", str(payload)) res = { 'status': str(codes.SUCCESS.name) } if payloadType == 'SETKEY': + printout(printStr, MAGENTA) print("KEYSET RECIEVED") self.keyRange["range"] = payloadParams['data'] self.keyRange["status"] = "true" @@ -606,6 +631,7 @@ def onMessage(self, payload, isBinary): res['data'] = "Operation not permitted" msg = json.dumps(res) + printout(printStr, MAGENTA) print("SERVER SENT: " + msg) self.proto.sendMessage(msg.encode('utf8')) @@ -722,9 +748,6 @@ def onClose(self, wasClean, code, reason): for port in range(8081, portnum + 1): MasterService.keyRanges[ranges[port - 8080]] = port - - print(MasterService.keyRanges) - printout("[MASTER]", RED) print("RANGES: ", ranges) @@ -736,9 +759,7 @@ def onClose(self, wasClean, code, reason): else: try: - print("getting /meta/config") config, _ = zk.get('/meta/config') - print("got /meta/config") # dead server detected # REINCARNATION MODE config = json.loads(config.decode("utf-8")) @@ -748,8 +769,9 @@ def onClose(self, wasClean, code, reason): portno = config["lastDead"]["portno"] masterDied = True if portno == 8080 else False portbackup = config["lastDead"]["backup"] - - + printStr = "[MASTER]" if masterDied else "[SLAVE]" + colorr = RED if masterDied else YELLOW + printout(printStr, colorr) print("REINCARNATION SERVER BOOTING FOR ", "[master]: " if masterDied else "[slave]: ", portno) # get it servers own data from the next server/backup @@ -757,6 +779,7 @@ def onClose(self, wasClean, code, reason): "type":"REPLICA", "params": "" } + printout(printStr, colorr) print("ws://127.0.0.1:" + str(portbackup) + "/backup") ss = create_connection("ws://localhost:" + str(portbackup) + "/backup") ss.send(json.dumps(payload)) @@ -772,7 +795,7 @@ def onClose(self, wasClean, code, reason): # get its /backup contents from previous servers/keystore or /master backupContentPort = (8080 + config["numOfServers"] - 1) if (abs(portno-8080)-1) < 0 else portno-1 backupContentPortAppend = "/master" if backupContentPort == 8080 else "/keystore" - + printout(printStr, colorr) print("ws://localhost:" + str(backupContentPort) + str(backupContentPortAppend)) ss = create_connection("ws://localhost:" + str(backupContentPort) + str(backupContentPortAppend)) ss.send(json.dumps(payload)) @@ -780,8 +803,6 @@ def onClose(self, wasClean, code, reason): ss.close() backupResponse = json.loads(backupResponse) - print("/backup content response", backupResponse) - print("/content response", response) new_config = copy.deepcopy(config) @@ -791,7 +812,7 @@ def onClose(self, wasClean, code, reason): "portno": -1 } zk.set('/meta/config', json.dumps(new_config).encode('utf-8')) - + zk.set("/meta/status", "STABLE".encode("utf-8")) if masterDied: MasterService.keyRange = response["data"]["keyRange"] @@ -801,9 +822,10 @@ def onClose(self, wasClean, code, reason): BackupKeyStoreService.keyRange = backupResponse["data"]["keyRange"] BackupKeyStoreService.data = backupResponse["data"] + printout("[MASTER]", RED) print ("MASTER REINCARNATING ITSELF") zk.set('/meta/master', str(portno).encode('utf-8')) - zk.set("/meta/status", "STABLE".encode("utf-8")) + printout("[MASTER]", RED) print ("MASTER KEY HANDOVER DONE") scheduleChildWatcher() @@ -839,14 +861,12 @@ def onClose(self, wasClean, code, reason): #response = ss.recv() ss.close() scheduleChildWatcher() - + printout("[SLAVE]", YELLOW) print("SERVER BOOTED") except Exception as e: - print(e) - print(traceback.format_exc()) - print("Failed to get /meta/config") + pass if not serverDied: printout("[SLAVE]", YELLOW) @@ -856,6 +876,7 @@ def onClose(self, wasClean, code, reason): zk.set('/meta/lastport', str(portno).encode()) printout("[SLAVE]", YELLOW) print("LISTENING FOR KEYSET") + printout("[SLAVE]", YELLOW) print("Child watcher scheduled") scheduleChildWatcher() @@ -866,6 +887,4 @@ def onClose(self, wasClean, code, reason): factory.protocol = ServiceServerProtocol listenWS(factory) - reactor.run() -print("after")