Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stat_update] #391

Merged
merged 3 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions inputJsonsFiles/ConnectionMap/conn_EEG_1d_2c_1s_4r_4w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"connectionsMap":
{
"r1":["mainServer", "r2"],
"r2":["r3", "s1"],
"r3":["r4", "c1"],
"r4":["r1", "c2"]
}
}
117 changes: 117 additions & 0 deletions inputJsonsFiles/DistributedConfig/dc_EEG_1d_2c_1s_4r_4w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{
"nerlnetSettings": {
"frequency": "5",
"batchSize": "10"
},
"mainServer": {
"port": "8081",
"args": ""
},
"apiServer": {
"port": "8082",
"args": ""
},
"devices": [
{
"name": "pc1",
"ipv4": "10.0.0.30",
"entities": "c1,c2,r2,r1,r3,r4,s1,apiServer,mainServer"
}
],
"routers": [
{
"name": "r1",
"port": "8086",
"policy": "0"
},
{
"name": "r2",
"port": "8087",
"policy": "0"
},
{
"name": "r3",
"port": "8088",
"policy": "0"
},
{
"name": "r4",
"port": "8089",
"policy": "0"
}
],
"sources": [
{
"name": "s1",
"port": "8085",
"frequency": "200",
"policy": "0",
"epochs": "1",
"type": "0"
}
],
"clients": [
{
"name": "c1",
"port": "8083",
"workers": "w1,w2"
},
{
"name": "c2",
"port": "8084",
"workers": "w3,w4"
}
],
"workers": [
{
"name": "w1",
"model_sha": "d8df752e0a2e8f01de8f66e9cec941cdbc65d144ecf90ab7713e69d65e7e82aa"
},
{
"name": "w2",
"model_sha": "d8df752e0a2e8f01de8f66e9cec941cdbc65d144ecf90ab7713e69d65e7e82aa"
},
{
"name": "w3",
"model_sha": "d8df752e0a2e8f01de8f66e9cec941cdbc65d144ecf90ab7713e69d65e7e82aa"
},
{
"name": "w4",
"model_sha": "d8df752e0a2e8f01de8f66e9cec941cdbc65d144ecf90ab7713e69d65e7e82aa"
}
],
"model_sha": {
"d8df752e0a2e8f01de8f66e9cec941cdbc65d144ecf90ab7713e69d65e7e82aa": {
"modelType": "0",
"_doc_modelType": " nn:0 | approximation:1 | classification:2 | forecasting:3 | image-classification:4 | text-classification:5 | text-generation:6 | auto-association:7 | autoencoder:8 | ae-classifier:9 |",
"modelArgs": "",
"layersSizes": "70x1x1k5x1x1x64p0s1t0,66x1x64k2x1p0s1,65x1x64k5x1x64x64p0s1t0,61x1x64k2x1p0s1,60x1x64k5x1x64x32p0s1t0,1,32,16,9",
"_doc_layersSizes": "List of postive integers [L0, L1, ..., LN]",
"layerTypesList": "2,4,2,4,2,9,3,3,3",
"_doc_LayerTypes": " Default:0 | Scaling:1 | CNN:2 | Perceptron:3 | Pooling:4 | Probabilistic:5 | LSTM:6 | Reccurrent:7 | Unscaling:8 | Flatten:9 | Bounding:10 |",
"layers_functions": "11,2,11,2,11,1,6,6,11",
"_doc_layers_functions_activation": " Threshold:1 | Sign:2 | Logistic:3 | Tanh:4 | Linear:5 | ReLU:6 | eLU:7 | SeLU:8 | Soft-plus:9 | Soft-sign:10 | Hard-sigmoid:11 |",
"_doc_layer_functions_pooling": " none:1 | Max:2 | Avg:3 |",
"_doc_layer_functions_probabilistic": " Binary:1 | Logistic:2 | Competitive:3 | Softmax:4 |",
"_doc_layer_functions_scaler": " none:1 | MinMax:2 | MeanStd:3 | STD:4 | Log:5 |",
"lossMethod": "2",
"_doc_lossMethod": " SSE:1 | MSE:2 | NSE:3 | MinkowskiE:4 | WSE:5 | CEE:6 |",
"lr": "0.00001",
"_doc_lr": "Positve float",
"epochs": "1",
"_doc_epochs": "Positve Integer",
"optimizer": "5",
"_doc_optimizer": " GD:0 | CGD:1 | SGD:2 | QuasiNeuton:3 | LVM:4 | ADAM:5 |",
"optimizerArgs": "",
"_doc_optimizerArgs": "String",
"infraType": "0",
"_doc_infraType": " opennn:0 | wolfengine:1 |",
"distributedSystemType": "0",
"_doc_distributedSystemType": " none:0 | fedClientAvg:1 | fedServerAvg:2 |",
"distributedSystemArgs": "",
"_doc_distributedSystemArgs": "String",
"distributedSystemToken": "none",
"_doc_distributedSystemToken": "Token that associates distributed group of workers and parameter-server"
}
}
}
41 changes: 41 additions & 0 deletions inputJsonsFiles/experimentsFlow/exp_EEG_1d_2c_1s_4r_4w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"experimentName": "EEG_Valence_Recognition_DEAP",
"experimentType": "classification",
"batchSize": 10,
"csvFilePath": "/home/nerlnet/workspace/1_3_persons_normalize_bins_valence.csv",
"numOfFeatures": "70",
"numOfLabels": "9",
"headersNames": "1,2,3,4,5,6,7,8,9",
"Phases":
[
{
"phaseName": "training_phase",
"phaseType": "training",
"sourcePieces":
[
{
"sourceName": "s1",
"startingSample": "10",
"numOfBatches": "10",
"workers": "w1,w2,w3,w4",
"nerltensorType": "float"
}
]
},
{
"phaseName": "prediction_phase",
"phaseType": "prediction",
"sourcePieces":
[
{
"sourceName": "s1",
"startingSample": "16510",
"numOfBatches": "10",
"workers": "w1,w2,w3,w4",
"nerltensorType": "float"
}
]
}
]
}

18 changes: 14 additions & 4 deletions src_py/apiServer/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_loss_by_source(self , plot : bool = False , saveToFile : bool = False):
"""
pass

def get_loss_ts(self , plot : bool = False , saveToFile : bool = False):
def get_loss_ts(self , plot : bool = False , saveToFile : bool = False, smoothing : bool = False, log_plot : bool = False):
"""
Returns a dictionary of {worker : loss list} for each worker in the experiment.
use plot=True to plot the loss function.
Expand Down Expand Up @@ -83,6 +83,11 @@ def get_loss_ts(self , plot : bool = False , saveToFile : bool = False):

df = pd.DataFrame(loss_dict)
self.loss_ts_pd = df

if smoothing:
for column in df.columns:
for i in range(1, len(df)):
df.at[i, column] = (df.at[i, column] + df.at[i-1, column]) / 2

if plot:
sns.set(style="whitegrid")
Expand All @@ -103,6 +108,9 @@ def get_loss_ts(self , plot : bool = False , saveToFile : bool = False):
if saveToFile:
plt.savefig('training_loss_function.png', bbox_inches='tight')

if log_plot:
plt.yscale('log')

plt.show()
return df

Expand Down Expand Up @@ -282,12 +290,13 @@ def recieved_batches_key(phase_name, source_name, worker_name):
workers_model_db_list = self.nerl_model_db.get_workers_model_db_list()
for source_piece_inst in sources_pieces_list:
source_name = source_piece_inst.get_source_name()
source_epoch = int(globe.components.sourceEpochs[source_name])
target_workers_string = source_piece_inst.get_target_workers()
target_workers_names = target_workers_string.split(',')
for worker_db in workers_model_db_list:
worker_name = worker_db.get_worker_name()
if worker_name in target_workers_names: # Check if the worker is in the target workers list of this source
for batch_id in range(source_piece_inst.get_num_of_batches()):
for batch_id in range(source_epoch * source_piece_inst.get_num_of_batches()):
batch_db = worker_db.get_batch(source_name, str(batch_id))
if batch_db: # if batch is recieved
recieved_batch_key_str = recieved_batches_key(phase_name, source_name, worker_name)
Expand All @@ -311,13 +320,14 @@ def missed_batches_key(phase_name, source_name, worker_name):
for source_piece_inst in sources_pieces_list:
source_name = source_piece_inst.get_source_name()
source_policy = globe.components.sources_policy_dict[source_name] # 0 -> casting , 1 -> round robin, 2 -> random
source_epoch = int(globe.components.sourceEpochs[source_name])
target_workers_string = source_piece_inst.get_target_workers()
target_workers_names = target_workers_string.split(',')
if source_policy == '0': # casting policy
for worker_db in workers_model_db_list:
worker_name = worker_db.get_worker_name()
if worker_name in target_workers_names: # Check if the worker is in the target workers list of this source
for batch_id in range(source_piece_inst.get_num_of_batches()):
for batch_id in range(source_epoch * source_piece_inst.get_num_of_batches()):
batch_db = worker_db.get_batch(source_name, str(batch_id))
if not batch_db: # if batch is missing
missed_batch_key_str = missed_batches_key(phase_name, source_name, worker_name)
Expand All @@ -326,7 +336,7 @@ def missed_batches_key(phase_name, source_name, worker_name):
missed_batches_dict[missed_batch_key_str].append(batch_id)
elif source_policy == '1': # round robin policy
number_of_workers = len(target_workers_names)
batches_indexes = [i for i in range(source_piece_inst.get_num_of_batches())]
batches_indexes = [i for i in range(source_epoch * source_piece_inst.get_num_of_batches())]
batch_worker_tuple = [(batch_index, target_workers_names[batch_index % number_of_workers]) for batch_index in batches_indexes] # (batch_index, worker_name_that_should_recive_the_batch)
worker_batches_dict = {worker_name: [] for worker_name in target_workers_names} # Create a dictionary to hold batches id for each worker
for batch_index, worker_name in batch_worker_tuple:
Expand Down