Skip to content

Commit

Permalink
displaying MASTER and SLAVE printout statements
Browse files Browse the repository at this point in the history
  • Loading branch information
anaskhan96 committed Dec 6, 2017
1 parent c0bcedb commit 23b838c
Showing 1 changed file with 54 additions and 35 deletions.
89 changes: 54 additions & 35 deletions project/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,28 @@ def watch_children(children):
# master died
print("NOTIFICATION:", deadSet, " [master] died. ")
if portno == deadInstanceBackup:
printout("[MASTER-BACKUP]", MAGENTA)
print("MASTER DIED, MASTER'S BACKUP DOING HANDOVER. ")
# master's backup slave
zk.set('/meta/master', (str(deadInstanceBackup)+'/backup').encode('utf-8')) # setting new master
print("NOTIFICATION:", deadSet, " [master] died. ")
printout("[MASTER-BACKUP]", MAGENTA)
print("SLAVE PERSISTING DEAD MASTER CONFIG")
config["lastDead"]["portno"] = deadInstancePort
config["lastDead"]["backup"] = deadInstanceBackup
printout("[MASTER-BACKUP]", MAGENTA)
print("HANDOVER COMPLETE. ")
printout("[MASTER-BACKUP]", MAGENTA)
print("SETTING CONFIG: ", config)
zk.set('/meta/config', json.dumps(config).encode('utf-8'))
else:
# slave died
printout("[SLAVE-BACKUP]", MAGENTA)
print("NOTIFICATION:", deadSet, " [slave] died. ")
masterPort, _ = zk.get('meta/master')
masterPort = masterPort.decode("utf-8")
if portno == int(masterPort):
#inside master
printout("[SLAVE-BACKUP]", MAGENTA)
print("MASTER PERSISTING DEAD SLAVE CONFIG")
config["lastDead"]["portno"] = deadInstancePort
config["lastDead"]["backup"] = deadInstanceBackup
Expand All @@ -83,6 +88,7 @@ def watch_children(children):
if value == deadInstancePort:
MasterService.keyRanges[key] = str(deadInstanceBackup) + "/backup"

printout("[SLAVE-BACKUP]", MAGENTA)
print("SETTING CONFIG: ", config)
zk.set('/meta/config', json.dumps(config).encode('utf-8'))

Expand All @@ -91,7 +97,9 @@ def watch_children(children):


def scheduleSignals(a='default'):
printout("[MASTER]", RED)
print("Start Signals Scheduled")
printout("[MASTER]", RED)
print("Child Watcher Scheduled")
scheduleChildWatcher()
if portno == 8080:
Expand All @@ -114,14 +122,17 @@ def scheduleSignals(a='default'):

def scheduleMasterKeySet():
# master keyset and propogate to backup
print("MASTER KEYSET RUNNING")
printout("[MASTER]", RED)
print("MASTER BACKUP KEYSET SCHEDULED")
MasterService.keyRange["range"] = ranges[0]
MasterService.keyRange["status"] = "True"
lastport, _ = zk.get('/meta/lastport')
lastport = int(lastport.decode('utf-8'))
numOfServers = lastport - 8080 + 1
MasterService.keyRange["backupPort"] = 8080 + ((8080 - portno) + 1) % numOfServers
printout("[MASTER]", RED)
print("KEYRANGE: ", MasterService.keyRange["range"])
printout("[MASTER]", RED)
print("ws://127.0.0.1:" + str(MasterService.keyRange["backupPort"]) + "/backup")
ss = create_connection("ws://localhost:" + str(MasterService.keyRange["backupPort"]) + "/backup")

Expand All @@ -130,18 +141,18 @@ def scheduleMasterKeySet():
"params": {"data": MasterService.keyRange["range"], "master": "true", "keyRanges": MasterService.keyRanges, "backupPort": MasterService.keyRange["backupPort"]}
}

print("SIGNAL", signal)

ss.send(json.dumps(signal))
ss.close()
printout("[MASTER]", RED)
print ("GOT ACK. ")

time.sleep(6)

scheduleSignals()

def clusterStatusUp():
print(" ------------- CLUSTER STATUS UP ----------------- ")
printout("[MASTER]", RED)
print(" ------- CLUSTER STATUS UP ------- ")
zk.set('/meta/status', 'STABLE'.encode('utf-8'))

def signalScheduler():
Expand Down Expand Up @@ -213,7 +224,6 @@ def put(data, key, value):
class BaseService:

def __init__(self, proto):
print("init")
self.proto = proto
self.is_closed = False

Expand All @@ -230,7 +240,6 @@ class KeyStoreService(BaseService):

data = {}
keyRange = {"status": "false", "range": "", "backupPort": ""}
test = 0

def onMessage(self, payload, isBinary):
if not isBinary:
Expand All @@ -257,6 +266,7 @@ def onMessage(self, payload, isBinary):
lastport, _ = zk.get('/meta/lastport')
lastport = int(lastport.decode('utf-8'))
numOfServers = lastport - 8080 + 1
printout("[SLAVE]", YELLOW)
print("PORT NUMBER", portno)
self.keyRange["backupPort"] = 8080 + (abs(8080 - portno) + 1) % numOfServers
printout("[SLAVE]", YELLOW)
Expand All @@ -265,7 +275,9 @@ def onMessage(self, payload, isBinary):
#print("Sleeping for all clients to awake. ")
#time.sleep(10)
payload["params"]["backupPort"] = self.keyRange["backupPort"]
printout("[SLAVE]", YELLOW)
print("Finished Sleeping")
printout("[SLAVE]", YELLOW)
print("ws://127.0.0.1:" + str(self.keyRange["backupPort"]) + "/backup")
success = 0
ss = None
Expand All @@ -274,11 +286,13 @@ def onMessage(self, payload, isBinary):
ss = create_connection("ws://localhost:" + str(self.keyRange["backupPort"]) + "/backup")
success = 1
except Exception:
printout("[SLAVE]", YELLOW)
print("Server busy, trying again. ")
time.sleep(5)

ss.send(json.dumps(payload))
ss.close()
printout("[SLAVE]", YELLOW)
print ("GOT ACK. ")

global persistList
Expand Down Expand Up @@ -330,12 +344,15 @@ def onMessage(self, payload, isBinary):
try:
ws = create_connection("ws://localhost:" + str(self.keyRange["backupPort"]) + "/backup")
ws.send(json.dumps(payload))
print ("Reeiving...")
printout("[SLAVE]", YELLOW)
print ("Receiving...")
result = ws.recv()
print ("got ack. ")
printout("[SLAVE]", YELLOW)
print ("Got ACK")
ws.close()
except Exception as e:
print("UNABLE TO CONTACT BACKUP SERVER. MAYBE SERVER DOWN? ")
printout("[SLAVE]", YELLOW)
print("UNABLE TO CONTACT BACKUP SERVER. SERVER MIGHT POSSIBLY BE DOWN ")
else:
res['status'] = codes.ERR_KEY_NOT_RESPONSIBLE

Expand All @@ -347,8 +364,8 @@ def onMessage(self, payload, isBinary):
res['status'] = str(codes.FAIL.name)
res['data'] = "Operation not permitted"

print(res)
msg = json.dumps(res)
printout("[SLAVE]", YELLOW)
print("SERVER SENT: " + msg)
self.proto.sendMessage(msg.encode('utf8'))

Expand Down Expand Up @@ -376,6 +393,7 @@ def checkKeyRange(self, key):
def onMessage(self, payload, isBinary):
if not isBinary:
payload = json.loads(payload.decode('utf-8'))
printout("[MASTER]", RED)
print("MASTER SERVER RECEIVED: " + str(payload))
payloadType = payload['type'].upper()
payloadParams = payload['params']
Expand All @@ -387,6 +405,7 @@ def onMessage(self, payload, isBinary):
res['status'] = str(codes.ERR_SERVER_NOT_INIT.name)

if payloadType == 'REINCARNATE':
printout("[MASTER]", RED)
print("REINCARNATION REQUEST RECIEVED")
deadInstancePort = payloadParams["deadPort"]
deadInstanceBackup = payloadParams["backupPort"]
Expand All @@ -395,6 +414,7 @@ def onMessage(self, payload, isBinary):
if value == str(deadInstanceBackup)+"/backup":
self.keyRanges[key] = deadInstancePort

printout("[MASTER]", RED)
print("REINCARNATION SUCCESSFUL")

elif payloadType == 'REPLICA':
Expand All @@ -407,7 +427,6 @@ def onMessage(self, payload, isBinary):
res['status'] = str(codes.SUCCESS.name)

elif payloadType == 'GET':
print(self.keyRange["range"])
keyRangeCheck = self.checkKeyRange(payloadParams['key'])
if keyRangeCheck == codes.SUCCESS:
# key in master
Expand All @@ -421,7 +440,6 @@ def onMessage(self, payload, isBinary):
# key not responsible
res["status"] = str(codes.ERR_KEY_NOT_RESPONSIBLE.name)
res["data"] = str(keyRangeCheck)
print(res)

#elif payloadType == 'GETMULTIPLE':
# result = {}
Expand All @@ -441,12 +459,15 @@ def onMessage(self, payload, isBinary):
try:
ws = create_connection("ws://localhost:" + str(self.keyRange["backupPort"]) + "/backup")
ws.send(json.dumps(payload))
print ("Reeiving...")
printout("[MASTER]", RED)
print ("Receiving...")
result = ws.recv()
print ("got ack. ")
printout("[MASTER]", RED)
print ("Got ACK")
ws.close()
except Exception as e:
print("UNABLE TO CONTACT BACKUP SERVER. MAYBE SERVER DOWN?")
printout("[MASTER]", RED)
print("UNABLE TO CONTACT BACKUP SERVER. SERVER MIGHT POSSIBLY BE DOWN")
if result == codes.ERR_KEY_ALREADY_EXISTS:
res['status'] = str(result.name)
else:
Expand All @@ -462,6 +483,7 @@ def onMessage(self, payload, isBinary):
res['data'] = "Operation not permitted"

msg = json.dumps(res)
printout("[MASTER]", RED)
print("SERVER SENT: " + msg)
self.proto.sendMessage(msg.encode('utf8'))

Expand Down Expand Up @@ -489,14 +511,17 @@ def onMessage(self, payload, isBinary):
if not isBinary:

payload = json.loads(payload.decode('utf-8'))
print("BACKUP RECEIVED: ", str(payload))
payloadType = payload['type'].upper()
payloadParams = payload['params']
printStr = "[MASTER-BACKUP]" if "master" in payloadParams else "[SLAVE-BACKUP]"
printout(printStr, MAGENTA)
print("BACKUP RECEIVED: ", str(payload))
res = {
'status': str(codes.SUCCESS.name)
}

if payloadType == 'SETKEY':
printout(printStr, MAGENTA)
print("KEYSET RECIEVED")
self.keyRange["range"] = payloadParams['data']
self.keyRange["status"] = "true"
Expand Down Expand Up @@ -606,6 +631,7 @@ def onMessage(self, payload, isBinary):
res['data'] = "Operation not permitted"

msg = json.dumps(res)
printout(printStr, MAGENTA)
print("SERVER SENT: " + msg)
self.proto.sendMessage(msg.encode('utf8'))

Expand Down Expand Up @@ -722,9 +748,6 @@ def onClose(self, wasClean, code, reason):
for port in range(8081, portnum + 1):
MasterService.keyRanges[ranges[port - 8080]] = port


print(MasterService.keyRanges)

printout("[MASTER]", RED)
print("RANGES: ", ranges)

Expand All @@ -736,9 +759,7 @@ def onClose(self, wasClean, code, reason):

else:
try:
print("getting /meta/config")
config, _ = zk.get('/meta/config')
print("got /meta/config")
# dead server detected
# REINCARNATION MODE
config = json.loads(config.decode("utf-8"))
Expand All @@ -748,15 +769,17 @@ def onClose(self, wasClean, code, reason):
portno = config["lastDead"]["portno"]
masterDied = True if portno == 8080 else False
portbackup = config["lastDead"]["backup"]


printStr = "[MASTER]" if masterDied else "[SLAVE]"
colorr = RED if masterDied else YELLOW
printout(printStr, colorr)
print("REINCARNATION SERVER BOOTING FOR ", "[master]: " if masterDied else "[slave]: ", portno)

# get it servers own data from the next server/backup
payload = {
"type":"REPLICA",
"params": ""
}
printout(printStr, colorr)
print("ws://127.0.0.1:" + str(portbackup) + "/backup")
ss = create_connection("ws://localhost:" + str(portbackup) + "/backup")
ss.send(json.dumps(payload))
Expand All @@ -772,16 +795,14 @@ def onClose(self, wasClean, code, reason):
# get its /backup contents from previous servers/keystore or /master
backupContentPort = (8080 + config["numOfServers"] - 1) if (abs(portno-8080)-1) < 0 else portno-1
backupContentPortAppend = "/master" if backupContentPort == 8080 else "/keystore"

printout(printStr, colorr)
print("ws://localhost:" + str(backupContentPort) + str(backupContentPortAppend))
ss = create_connection("ws://localhost:" + str(backupContentPort) + str(backupContentPortAppend))
ss.send(json.dumps(payload))
backupResponse= ss.recv()
ss.close()
backupResponse = json.loads(backupResponse)

print("/backup content response", backupResponse)
print("/content response", response)

new_config = copy.deepcopy(config)

Expand All @@ -791,7 +812,7 @@ def onClose(self, wasClean, code, reason):
"portno": -1
}
zk.set('/meta/config', json.dumps(new_config).encode('utf-8'))

zk.set("/meta/status", "STABLE".encode("utf-8"))
if masterDied:

MasterService.keyRange = response["data"]["keyRange"]
Expand All @@ -801,9 +822,10 @@ def onClose(self, wasClean, code, reason):
BackupKeyStoreService.keyRange = backupResponse["data"]["keyRange"]
BackupKeyStoreService.data = backupResponse["data"]

printout("[MASTER]", RED)
print ("MASTER REINCARNATING ITSELF")
zk.set('/meta/master', str(portno).encode('utf-8'))
zk.set("/meta/status", "STABLE".encode("utf-8"))
printout("[MASTER]", RED)
print ("MASTER KEY HANDOVER DONE")
scheduleChildWatcher()

Expand Down Expand Up @@ -839,14 +861,12 @@ def onClose(self, wasClean, code, reason):
#response = ss.recv()
ss.close()
scheduleChildWatcher()

printout("[SLAVE]", YELLOW)
print("SERVER BOOTED")


except Exception as e:
print(e)
print(traceback.format_exc())
print("Failed to get /meta/config")
pass

if not serverDied:
printout("[SLAVE]", YELLOW)
Expand All @@ -856,6 +876,7 @@ def onClose(self, wasClean, code, reason):
zk.set('/meta/lastport', str(portno).encode())
printout("[SLAVE]", YELLOW)
print("LISTENING FOR KEYSET")
printout("[SLAVE]", YELLOW)
print("Child watcher scheduled")
scheduleChildWatcher()

Expand All @@ -866,6 +887,4 @@ def onClose(self, wasClean, code, reason):
factory.protocol = ServiceServerProtocol
listenWS(factory)


reactor.run()
print("after")

0 comments on commit 23b838c

Please sign in to comment.