From 5914ec50be0681875a8fb5be140a9dfee632daf8 Mon Sep 17 00:00:00 2001 From: schaffung Date: Fri, 30 Apr 2021 17:05:16 +0530 Subject: [PATCH 01/20] Converting threading to multiprocessing (#230) * Converting threading to multiprocessing The current implementation will have a bottleneck because of the nature of how threads are executed in python, wherein threads and the main process share the GIL even though the system might boast of multiple processor cores. To work around this bottleneck in python, multiprocessing module can be used. Now one can argue that multiprocess will create a separate process in the background and that this will be heavier than a thread and also would consume more time than spinning up a new thread. To solve this, we use the idea of worker processes taking up tasks from a queue, hence processes are spun up once and are killed only when there is no job in the queue. One more point to note is that the multiprocessing requires some mechanism of locking to prevent the main process form going ahead with its flow ( which in our case would be the execution of the disruptive test cases ). Now for the time being we have an incremental backoff system with sleep till all jobs are dead. This will have to be replaced with a sophesticated locking and signalling mechanism. Fixes: #229 Signed-off-by: srijan-sivakumar * Test result handler working now The issue was the Queue was not updating in case of non_concurrent tests which gets handled directly in the run_test now and for the concurrent tests the volume types are aggregated as well providing a single table for all the volumes for a test. Signed-off-by: Ayush Ujjwal * removed the extra print Signed-off-by: Ayush Ujjwal * Modified the code and refactored the result aggregation code Signed-off-by: Ayush Ujjwal * Using the queue only to get the work done Signed-off-by: Ayush Ujjwal * Update test_runner.py * Update test_runner.py Co-authored-by: Ayush Ujjwal --- core/redant_main.py | 10 +++--- core/result_handler.py | 18 ++++++++-- core/test_runner.py | 79 ++++++++++++++++++++++++++---------------- 3 files changed, 72 insertions(+), 35 deletions(-) diff --git a/core/redant_main.py b/core/redant_main.py index 7efcbd5a8..253460e76 100644 --- a/core/redant_main.py +++ b/core/redant_main.py @@ -40,7 +40,7 @@ def pars_args(): dest="log_level", default="I", type=str) parser.add_argument("-cc", "--concurrency-count", help="Number of concurrent test runs", - dest="semaphore_count", default=4, type=int) + dest="concur_count", default=4, type=int) parser.add_argument("-rf", "--result-file", help="Result file. By default it will be None", dest="result_path", default=None, type=str) @@ -83,14 +83,16 @@ def main(): # invoke the test_runner. TestRunner.init(test_cases_dict, param_obj, args.log_dir, - args.log_level, args.semaphore_count) - all_test_results = TestRunner.run_tests() + args.log_level, args.concur_count) + result_queue = TestRunner.run_tests() # Environment cleanup. TBD. print(f"\nTotal time taken by the framework: {time.time()-start} sec") + + ResultHandler.handle_results(result_queue, args.result_path) + - ResultHandler.handle_results(all_test_results, args.result_path) if __name__ == '__main__': diff --git a/core/result_handler.py b/core/result_handler.py index 8d508186d..6e3436339 100644 --- a/core/result_handler.py +++ b/core/result_handler.py @@ -72,7 +72,7 @@ def _store_results(cls, test_results: dict, result_path: str): file.close() @classmethod - def handle_results(cls, test_results: dict, result_path: str): + def handle_results(cls, result_queue, result_path: str): """ This function handles the results for the framework. It checks @@ -81,9 +81,23 @@ def handle_results(cls, test_results: dict, result_path: str): specified by the user Args: - test_results: all the tests results + result_queue: a queue of results result_path: path of the result file """ + test_results = {} + + while not result_queue.empty(): + + curr_item = result_queue.get() + key = list(curr_item.keys())[0] + value = curr_item[key] + + if key not in test_results.keys(): + test_results[key] = [] + + test_results[key].append(value) + + if result_path is None: cls._display_test_results(test_results) else: diff --git a/core/test_runner.py b/core/test_runner.py index 30f544c03..07e628e72 100644 --- a/core/test_runner.py +++ b/core/test_runner.py @@ -5,7 +5,7 @@ import uuid import time from datetime import datetime -from threading import Thread, Semaphore +from multiprocessing import Process, Queue from colorama import Fore, Style from runner_thread import RunnerThread @@ -19,15 +19,16 @@ class TestRunner: @classmethod def init(cls, test_run_dict: dict, param_obj: dict, - base_log_path: str, log_level: str, semaphore_count: int): + base_log_path: str, log_level: str, multiprocess_count: int): cls.param_obj = param_obj - cls.semaphore = Semaphore(semaphore_count) + cls.concur_count = multiprocess_count cls.base_log_path = base_log_path cls.log_level = log_level cls.concur_test = test_run_dict["nonDisruptive"] cls.non_concur_test = test_run_dict["disruptive"] cls.threadList = [] - cls.test_results = {} + cls.job_result_queue = Queue() + cls.nd_job_queue = Queue() cls._prepare_thread_tests() @classmethod @@ -36,44 +37,54 @@ def run_tests(cls): The non-disruptive tests are invoked followed by the disruptive tests. """ - for test_thread in cls.threadList: - test_thread.start() - - for test_thread in cls.threadList: - test_thread.join() - - thread_flag = False + jobs = [] + if bool(cls.concur_count): + for iter in range(cls.concur_count): + proc = Process(target=cls._worker_process, + args=(cls.nd_job_queue,)) + jobs.append(proc) + proc.start() + + # TODO replace incremental backup with a signalling and lock. + backoff_time = 0 + while len(jobs) > 0: + jobs = [job for job in jobs if job.is_alive()] + if backoff_time == 20: + time.sleep(backoff_time) + else: + backoff_time += 1 + + for iter in range(cls.concur_count): + proc.join() for test in cls.non_concur_test: - cls.test_results[test['moduleName'][:-3]] = [] + cls._run_test(test) - for test in cls.non_concur_test: - cls._run_test(test, thread_flag) + """ + Because of the infinitesimal delay in value being reflected in Queue + it was found that sometimes the Queue which was empty had been given + some value, it still showed itself as empty. + TODO: Handle it without sleep. + """ + while cls.job_result_queue.empty(): + time.sleep(1) - return cls.test_results + return cls.job_result_queue @classmethod def _prepare_thread_tests(cls): """ This method creates the threadlist for non disruptive tests """ - thread_flag = True - for test in cls.concur_test: - cls.test_results[test['moduleName'][:-3]] = [] - - for test in cls.concur_test: - cls.threadList.append(Thread(target=cls._run_test, - args=(test, thread_flag,))) - + cls.nd_job_queue.put(test) + @classmethod - def _run_test(cls, test_dict: dict, thread_flag: bool): + def _run_test(cls, test_dict: dict, thread_flag: bool=False): """ A generic method handling the run of both disruptive and non disruptive tests. """ - if thread_flag: - cls.semaphore.acquire() tc_class = test_dict["testClass"] tc_log_path = cls.base_log_path+test_dict["modulePath"][5:-3]+"/" +\ test_dict["volType"]+"/"+test_dict["moduleName"][:-3]+".log" @@ -98,7 +109,17 @@ def _run_test(cls, test_dict: dict, thread_flag: bool): test_stats['testResult'] = "FAIL" print(Fore.RED + result_text) print(Style.RESET_ALL) - if thread_flag: - cls.semaphore.release() - cls.test_results[test_dict['moduleName'][:-3]].append(test_stats) + result_value = { test_dict["moduleName"][:-3] : test_stats } + cls.job_result_queue.put(result_value) + + @classmethod + def _worker_process(cls, nd_queue): + """ + Worker process would be taking up new jobs from the queue + till the queue is empty. This queue will consist only non disruptive + test cases. + """ + while not nd_queue.empty(): + job_data = nd_queue.get() + cls._run_test(job_data, True) From 863b200ea15921692c49fd5f33df31c01b52e334 Mon Sep 17 00:00:00 2001 From: Ayush Ujjwal <77244483+aujjwal-redhat@users.noreply.github.com> Date: Fri, 30 Apr 2021 17:16:28 +0530 Subject: [PATCH 02/20] Added the passwordless ssh code in rexe (#233) * Added the passwordless ssh code in rexe Added the passwordless ssh code in the remote executioner. Now no user name or password is required. To test the same you need to setup the passwordless ssh on your servers. Note: Setup passwordless ssh from one server to another server and from one server to the same server as well. Fixes: #154 Signed-off-by: Ayush Ujjwal * Modified rexe: Removed autoAddPollicy; Removed debug; Signed-off-by: Ayush Ujjwal --- common/rexe.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/common/rexe.py b/common/rexe.py index 49be6f898..a2ea7ac31 100644 --- a/common/rexe.py +++ b/common/rexe.py @@ -1,3 +1,4 @@ +import os import random import concurrent.futures import paramiko @@ -5,7 +6,6 @@ import json from multipledispatch import dispatch - class Rexe: def __init__(self, host_dict): self.host_generic = ['alls', 'allp'] @@ -28,18 +28,21 @@ def establish_connection(self): self.connect_flag = True for node in self.host_dict: + node_ssh_client = paramiko.SSHClient() - node_ssh_client.set_missing_host_key_policy( - paramiko.AutoAddPolicy()) + node_ssh_client.load_host_keys(os.path.expanduser('/root/.ssh/known_hosts')) + mykey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') try: node_ssh_client.connect( hostname=node, - username=self.host_dict[node]['user'], - password=self.host_dict[node]['passwd']) + pkey=mykey, + ) + self.logger.debug(f"SSH connection to {node} is successful.") except Exception as e: self.logger.error(f"Connection failure. Exception : {e}") self.connect_flag = False + raise e self.node_dict[node] = node_ssh_client def deconstruct_connection(self): @@ -93,13 +96,12 @@ def execute_command(self, cmd, node): except Exception as e: # Reconnection to be done. node_ssh_client = paramiko.SSHClient() - node_ssh_client.set_missing_host_key_policy( - paramiko.AutoAddPolicy()) + node_ssh_client.load_host_keys(os.path.expanduser('/root/.ssh/known_hosts')) + mykey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') try: node_ssh_client.connect( hostname=node, - username=self.host_dict[node]['user'], - password=self.host_dict[node]['passwd'], + pkey=mykey, ) self.logger.debug(f"SSH connection to {node} is successful.") self.node_dict[node] = node_ssh_client From c4fd7472b47b58b5eea324f68594327457639dbc Mon Sep 17 00:00:00 2001 From: schaffung Date: Sat, 1 May 2021 19:43:53 +0530 Subject: [PATCH 03/20] Delete test_file_creation.py The said TC does no valid operation. We have an equivalent volume_creation test file which handles the operations properly in the mountpoints. --- .../sample_component/test_file_creation.py | 55 ------------------- 1 file changed, 55 deletions(-) delete mode 100644 tests/example/sample_component/test_file_creation.py diff --git a/tests/example/sample_component/test_file_creation.py b/tests/example/sample_component/test_file_creation.py deleted file mode 100644 index 6128dcbf0..000000000 --- a/tests/example/sample_component/test_file_creation.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -This file contains a test-case which tests -the creation of different types of files and -some operations on it. -""" -# nonDisruptive;rep,dist,dist-rep - -from tests.parent_test import ParentTest - - -class TestCase(ParentTest): - """ - The test case contains one function to test - for the creation of different types of files and - some operations on it. - """ - - def run_test(self, redant): - """ - In the testcase: - 1) A mount-point is created - 2) A regular file is created. - 3) Block , char and pipefile is created. - 4) Append some data to file. - 5) Check if data has been appended. - 6) Look-up on mount-point. - 7) Mount-point is deleted. - """ - host = self.server_list[0] - regfile_name = "file1" - mountpoint = "/mnt/test_dir" - redant.execute_io_cmd(f"mkdir -p {mountpoint}", host) - redant.execute_io_cmd( - f"cd {mountpoint} && touch {mountpoint}/{regfile_name}", host) - - for (file_name, parameter) in [ - ("blockfile", "b"), ("charfile", "c")]: - redant.execute_io_cmd( - f"mknod {mountpoint}/{file_name} {parameter} 1 5", host) - - redant.execute_io_cmd(f"mkfifo {mountpoint}/pipefile", host) - - for (file_name, data_str) in [ - ("regfile", "regular"), - ("charfile", "character special"), - ("blockfile", "block special")]: - str_to_add = f"This is a {data_str} file." - path = f"{mountpoint}/{regfile_name}" - redant.execute_io_cmd( - f"echo '{str_to_add}' >> {path}", host) - - redant.execute_io_cmd(f"cat {path}", host) - redant.execute_io_cmd(f"ls -lR {mountpoint}", host) - redant.execute_io_cmd("ls -l /", host) - redant.execute_io_cmd(f"rm -rf {mountpoint}", host) From f571589893757e8c3f99982f4b2dadc423311d1a Mon Sep 17 00:00:00 2001 From: schaffung Date: Sat, 1 May 2021 19:45:08 +0530 Subject: [PATCH 04/20] Update test_peer_probe_detach.py Modifying the condition check. --- tests/functional/glusterd/test_peer_probe_detach.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/functional/glusterd/test_peer_probe_detach.py b/tests/functional/glusterd/test_peer_probe_detach.py index 571c1a5fc..ba407f4f5 100644 --- a/tests/functional/glusterd/test_peer_probe_detach.py +++ b/tests/functional/glusterd/test_peer_probe_detach.py @@ -20,9 +20,7 @@ def run_test(self, redant): 2) The storage pool is listed. """ server1 = self.server_list[0] - server2 = self.server_list[1] - server3 = self.server_list[2] - for _ in range(3): + for _ in range(2): redant.create_cluster(self.server_list) node_list = redant.nodes_from_pool_list(server1) node_ip_list = redant.convert_hosts_to_ip(node_list, server1) From 623bb7d3f5e93966040138ca9ea61c700d99d202 Mon Sep 17 00:00:00 2001 From: schaffung Date: Sat, 1 May 2021 19:46:08 +0530 Subject: [PATCH 05/20] Update test_glusterd_start_stop.py Modifying condition check. and the order too. --- tests/functional/glusterd/test_glusterd_start_stop.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/functional/glusterd/test_glusterd_start_stop.py b/tests/functional/glusterd/test_glusterd_start_stop.py index a53803977..19464ca2e 100644 --- a/tests/functional/glusterd/test_glusterd_start_stop.py +++ b/tests/functional/glusterd/test_glusterd_start_stop.py @@ -19,6 +19,6 @@ def run_test(self, redant): 1) glusterd service is started on the server. 4) glusterd service is stopped. """ - for _ in range(10): - redant.start_glusterd(self.server_list) + for _ in range(5): redant.stop_glusterd(self.server_list) + redant.start_glusterd(self.server_list) From 22e9a9a1594dc78df762faa18052ea096e80c76f Mon Sep 17 00:00:00 2001 From: schaffung Date: Sat, 1 May 2021 22:10:39 +0530 Subject: [PATCH 06/20] Delete test_peer_probe_detach.py Removing the test file. Will be added in modified form after addition from the ops. --- .../glusterd/test_peer_probe_detach.py | 30 ------------------- 1 file changed, 30 deletions(-) delete mode 100644 tests/functional/glusterd/test_peer_probe_detach.py diff --git a/tests/functional/glusterd/test_peer_probe_detach.py b/tests/functional/glusterd/test_peer_probe_detach.py deleted file mode 100644 index ba407f4f5..000000000 --- a/tests/functional/glusterd/test_peer_probe_detach.py +++ /dev/null @@ -1,30 +0,0 @@ -""" -This component has a test-case for testing -peer related operations -""" -# disruptive; - -from tests.parent_test import ParentTest - - -class TestCase(ParentTest): - """ - This TestCase class contains a function to test - for peer probe , pool list and peer detach. - """ - - def run_test(self, redant): - """ - In this testcase: - 1) Cluster is created. - 2) The storage pool is listed. - """ - server1 = self.server_list[0] - for _ in range(2): - redant.create_cluster(self.server_list) - node_list = redant.nodes_from_pool_list(server1) - node_ip_list = redant.convert_hosts_to_ip(node_list, server1) - redant.logger.info(node_ip_list) - redant.delete_cluster(self.server_list) - from time import sleep - sleep(1) From 5c6f1d1a06f22017bcce1f1b93358f0d83774e60 Mon Sep 17 00:00:00 2001 From: Ayush Ujjwal <77244483+aujjwal-redhat@users.noreply.github.com> Date: Mon, 3 May 2021 12:01:18 +0530 Subject: [PATCH 07/20] Added the total time information in the result file (#238) * Added the total time information in the result file As mentioned in the issue, the total time was not getting stored in the result file. Hence added the same. Fixes: #237 Signed-off-by: Ayush Ujjwal * Modified docstrings and added the data type for total_time Added docstrings for total_time. Modified the docstring describing the result_handler's operation. Added type for the total_time variable. Signed-off-by: Ayush Ujjwal * Formatted the strings instead of appending the objects The strings were getting appended and since everything in python is an object it creates another object. Hence, used formatted strings in place. Signed-off-by: Ayush Ujjwal --- core/redant_main.py | 6 ++---- core/result_handler.py | 43 +++++++++++++++++++++++++----------------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/core/redant_main.py b/core/redant_main.py index 253460e76..49e7ab378 100644 --- a/core/redant_main.py +++ b/core/redant_main.py @@ -87,10 +87,8 @@ def main(): result_queue = TestRunner.run_tests() # Environment cleanup. TBD. - - print(f"\nTotal time taken by the framework: {time.time()-start} sec") - - ResultHandler.handle_results(result_queue, args.result_path) + total_time = time.time()-start + ResultHandler.handle_results(result_queue, args.result_path, total_time) diff --git a/core/result_handler.py b/core/result_handler.py index 6e3436339..ad2069b53 100644 --- a/core/result_handler.py +++ b/core/result_handler.py @@ -1,9 +1,9 @@ """ -This component is dedicated to -displaying the result of the tests -in the form of tables for better -understanding of the performance of -framework +This component handles the +results in two ways: + +1. display on the CLI +2. store in a file """ from prettytable import PrettyTable from colorama import Fore, Style @@ -12,7 +12,7 @@ class ResultHandler: @classmethod - def _get_output(cls, test_results: dict, colorify: bool): + def _get_output(cls, test_results: dict, colorify: bool, total_time: float): """ It generates the output in the form of tables with columns @@ -23,15 +23,17 @@ def _get_output(cls, test_results: dict, colorify: bool): test_results: All the tests result. colorify: Stores whether to show colored output or not + total_time: stores the total time taken by the framework + """ cls.result = "Table:\n" for item in test_results: if colorify: - cls.result += (Fore.BLUE + item+"\n") - cls.result += (Style.RESET_ALL+"\n") + cls.result = f"{cls.result} {Fore.BLUE}{item}\n" + cls.result = f"{cls.result} {Style.RESET_ALL}\n" else: - cls.result += (item+'\n') + cls.result = f"{cls.result} {item}\n" table = PrettyTable( ['Volume Type', 'Test Result', 'Time taken (sec)']) @@ -41,23 +43,27 @@ def _get_output(cls, test_results: dict, colorify: bool): [each_vol_test['volType'], each_vol_test['testResult'], each_vol_test['timeTaken']]) - cls.result += (str(table)+"\n") + cls.result = f"{cls.result}{str(table)}\n" + + cls.result = (f"{cls.result}\nTotal time taken by the framework is {total_time}\n") @classmethod - def _display_test_results(cls, test_results: dict): + def _display_test_results(cls, test_results: dict, total_time: float): """ This function displays the test results in the form of tables. Args: test_results: All the tests results. + total_time: stores the total time taken by the framework """ - cls._get_output(test_results, True) + cls._get_output(test_results, True, total_time) print(cls.result) + @classmethod - def _store_results(cls, test_results: dict, result_path: str): + def _store_results(cls, test_results: dict, result_path: str, total_time: float): """ This function stores the test results in the form of tables in a file. @@ -65,14 +71,16 @@ def _store_results(cls, test_results: dict, result_path: str): Args: test_results: All the tests results. result_path: Path of the result file + total_time: stores the total time taken by the framework """ - cls._get_output(test_results, False) + cls._get_output(test_results, False, total_time) + print(f"The results are stored in {result_path}") file = open(result_path, 'w') file.write(cls.result) file.close() @classmethod - def handle_results(cls, result_queue, result_path: str): + def handle_results(cls, result_queue, result_path: str, total_time: float): """ This function handles the results for the framework. It checks @@ -83,6 +91,7 @@ def handle_results(cls, result_queue, result_path: str): Args: result_queue: a queue of results result_path: path of the result file + total_time: stores the total time taken by the framework """ test_results = {} @@ -99,6 +108,6 @@ def handle_results(cls, result_queue, result_path: str): if result_path is None: - cls._display_test_results(test_results) + cls._display_test_results(test_results, total_time) else: - cls._store_results(test_results, result_path) + cls._store_results(test_results, result_path, total_time) From c03ff65b1b09198472bd8d88d959c8763a89b27b Mon Sep 17 00:00:00 2001 From: nishith-vihar <77044911+nishith-vihar@users.noreply.github.com> Date: Mon, 3 May 2021 18:27:49 +0530 Subject: [PATCH 08/20] User credentials are removed from config file (#242) * User credentials are removed from config file username and password for the nodes are removed from the config file as they are redundant after incorporating passwordless ssh into the test framework. Fixes: #240 Signed-off-by: srijan-sivakumar * CONFIG_README documentation change 'attributes' to attribute Fixes: #240 Signed-off-by: srijan-sivakumar Co-authored-by: srijan-sivakumar --- config/CONFIG_README.md | 27 ++++++++------------------- config/config.yml | 12 ------------ 2 files changed, 8 insertions(+), 31 deletions(-) diff --git a/config/CONFIG_README.md b/config/CONFIG_README.md index 9426b0dea..82e126947 100644 --- a/config/CONFIG_README.md +++ b/config/CONFIG_README.md @@ -6,39 +6,28 @@ The components of the config.yml file are as follows:

1. servers_info

'servers_info' is info about each server in the cluster.
-Each server is defined by its ip address. -Each server should contain 4 attributes:
-1) ip(key representing server): ip address of the server.
-2) brick_root: the list of directories where bricks have to be created.
-3) user: the username of the server for ssh connection.
-4) passwd: the password of the server for ssh connection.
-All the above attributes have to defined by the user.
+Each server is defined by its ip address which acts as a key representing the server.
+Each server should contain 1 attribute:
+1) brick_root: the list of directories where bricks have to be created.
+The above attribute has to be defined by the user.
If a new server has to added, then it has to follow the convention of the -previous servers +previous servers. Example format of one server:
ip:
     brick_root: ["/bricks","/gluster"]
-      user: "root"
-      passwd: "redhat"

2. clients_info

'clients_info' is info about each client in the cluster.
-Each client is defined by its ip address. -Each client should contain 3 attributes:
-1) ip: ip address of the client.
-2) user: the username of the client for ssh connection.
-3) passwd: the password of the client for ssh connection.
-All the above attributes have to defined by the user.
+Each client is defined by its ip address which acts as a key representing the client.
+The client does not take any attribute values.
If a new client has to added, then it has to follow the convention of the -previous clients +previous clients. Example format of one client:
ip:
-      user: "root"
-      passwd: "redhat"

3. volume_types

'volume_types' defines different volume types that we can create in diff --git a/config/config.yml b/config/config.yml index dd6d75e31..57613dbef 100644 --- a/config/config.yml +++ b/config/config.yml @@ -6,29 +6,17 @@ servers_info: "1.1.1.1": brick_root: ["/bricks"] - user: "root" - passwd: "redhat" "2.2.2.2": brick_root: ["/bricks"] - user: "root" - passwd: "redhat" "3.3.3.3": brick_root: ["/bricks"] - user: "root" - passwd: "redhat" "4.4.4.4": brick_root: ["/bricks"] - user: "root" - passwd: "redhat" #clients_info - All the relevant information about the clients clients_info: "5.5.5.5": - user: "root" - passwd: "redhat" "6.6.6.6": - user: 'root' - passwd: "redhat" #volume_types - Indivudual volume type information and minimum servers for # each volume type From 76344ca3902d348562a48f49bb9ee042fa81a6f7 Mon Sep 17 00:00:00 2001 From: Ayush Ujjwal <77244483+aujjwal-redhat@users.noreply.github.com> Date: Mon, 3 May 2021 18:30:08 +0530 Subject: [PATCH 09/20] Added option for storing results in excel sheet (#241) * Added option for storing results in excel sheet Added an option to store the results in excel sheet. Right now, it displays the basic test stats but later we can also add memory and other detailed stats as well. Fixes: #234 Signed-off-by: Ayush Ujjwal * Added xlwt to requirements.txt Added the package xlwt in requirements file Signed-off-by: Ayush Ujjwal --- core/redant_main.py | 5 ++++- core/result_handler.py | 47 +++++++++++++++++++++++++++++++++++++++++- requirements.txt | 1 + 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/core/redant_main.py b/core/redant_main.py index 49e7ab378..871b75ee8 100644 --- a/core/redant_main.py +++ b/core/redant_main.py @@ -44,6 +44,9 @@ def pars_args(): parser.add_argument("-rf", "--result-file", help="Result file. By default it will be None", dest="result_path", default=None, type=str) + parser.add_argument("-xls", "--excel-sheet", + help="Excel sheet to store the result. By default it will be None", + dest="excel_sheet", default=None, type=str) return parser.parse_args() @@ -88,7 +91,7 @@ def main(): # Environment cleanup. TBD. total_time = time.time()-start - ResultHandler.handle_results(result_queue, args.result_path, total_time) + ResultHandler.handle_results(result_queue, args.result_path, total_time, args.excel_sheet) diff --git a/core/result_handler.py b/core/result_handler.py index ad2069b53..b04f022d9 100644 --- a/core/result_handler.py +++ b/core/result_handler.py @@ -5,6 +5,8 @@ 1. display on the CLI 2. store in a file """ +import xlwt +from xlwt import Workbook from prettytable import PrettyTable from colorama import Fore, Style @@ -78,9 +80,48 @@ def _store_results(cls, test_results: dict, result_path: str, total_time: float) file = open(result_path, 'w') file.write(cls.result) file.close() + + @classmethod + def store_results_in_excelsheet(cls, excel_sheet: str, test_results: dict, total_time: float): + """ + This method stores the + results of the test(s) run + in an excel sheet for better understanding. + + Args: + excel_sheet: stores the path of excel sheet + test_results: stores the test results + total_time: total time taken by the framework + """ + wb = Workbook() + + result_sheet = wb.add_sheet('Result Sheet') + + row = 0 + style = xlwt.easyxf('font: bold 1') + + for item in test_results: + result_sheet.write(row, 0, item, style) + row = row + 1 + result_sheet.write(row, 0, 'Volume Type', style) + result_sheet.write(row, 1, 'Test Result', style) + result_sheet.write(row, 2, 'Time Taken', style) + row = row + 1 + + for each_vol_test in test_results[item]: + result_sheet.write(row, 0, each_vol_test['volType']) + result_sheet.write(row, 1, each_vol_test['testResult']) + result_sheet.write(row, 2, each_vol_test['timeTaken']) + row = row + 1 + + row = row + 2 + result_sheet.write(row, 0, 'Total time taken ', style) + result_sheet.write(row, 1, total_time) + + wb.save(excel_sheet) @classmethod - def handle_results(cls, result_queue, result_path: str, total_time: float): + def handle_results(cls, result_queue, result_path: str, total_time: float, excel_sheet: str): """ This function handles the results for the framework. It checks @@ -92,6 +133,7 @@ def handle_results(cls, result_queue, result_path: str, total_time: float): result_queue: a queue of results result_path: path of the result file total_time: stores the total time taken by the framework + excel_sheet: stores the path of the excel sheet """ test_results = {} @@ -111,3 +153,6 @@ def handle_results(cls, result_queue, result_path: str, total_time: float): cls._display_test_results(test_results, total_time) else: cls._store_results(test_results, result_path, total_time) + + if excel_sheet is not None: + cls.store_results_in_excelsheet(excel_sheet, test_results, total_time) diff --git a/requirements.txt b/requirements.txt index 626ead2cf..88c4ba3e9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,4 @@ comment-parser==1.2.3 colorama==0.4.4 prettytable==2.1.0 multipledispatch==0.6.0 +xlwt==1.3.0 From b7d5be67a4679814bde34f2bce4a44e76316f16f Mon Sep 17 00:00:00 2001 From: nishith-vihar <77044911+nishith-vihar@users.noreply.github.com> Date: Tue, 4 May 2021 12:22:01 +0530 Subject: [PATCH 10/20] Gluster Operations shifted from parent test constructor to parent_test_run (#244) * Gluster Operations shifted from parent test constructor to parent_test_run Gluster Operations are shifted from constructor of parent test to parent_test_run under the try-except clause Fixes: #239 Signed-off-by: Nishith Vihar Sakinala * Gluster operations from terminate are shifted to parent_run_test The gluster operates in terminate function are shifted to parent_run_test under the try-except clause which handles the exception. Fixes: #239 Signed-off-by: Nishith Vihar Sakinala --- core/runner_thread.py | 12 +++++------ core/test_runner.py | 16 ++++++++------ tests/parent_test.py | 50 +++++++++++++++++++++---------------------- 3 files changed, 39 insertions(+), 39 deletions(-) diff --git a/core/runner_thread.py b/core/runner_thread.py index d4d5c786e..8c1c30cd9 100644 --- a/core/runner_thread.py +++ b/core/runner_thread.py @@ -10,12 +10,10 @@ class RunnerThread: functions for running it. """ - def __init__(self, tc_class, param_obj, mname: str, - volume_type: str, log_path: str, log_level: str, - thread_flag : bool): + def __init__(self, tc_class, param_obj, volume_type: str, + mname: str, log_path: str, log_level: str): # Creating the test case object from the test case. - self.tc_obj = tc_class(mname, param_obj, volume_type, - thread_flag, log_path, log_level) + self.tc_obj = tc_class(mname, param_obj, volume_type, log_path, log_level) self.run_test_func = getattr(self.tc_obj, "parent_run_test") self.terminate_test_func = getattr(self.tc_obj, "terminate") self.test_stats = { @@ -23,11 +21,11 @@ def __init__(self, tc_class, param_obj, mname: str, 'volType': volume_type } - def run_thread(self): + def run_thread(self, mname: str, volume_type: str, thread_flag: bool): """ Method to trigger the run test and the terminate test functions. """ - self.run_test_func() + self.run_test_func(mname, volume_type, thread_flag) self.terminate_test_func() self.test_stats['testResult'] = self.tc_obj.TEST_RES return self.test_stats diff --git a/core/test_runner.py b/core/test_runner.py index 07e628e72..284eeac70 100644 --- a/core/test_runner.py +++ b/core/test_runner.py @@ -78,7 +78,7 @@ def _prepare_thread_tests(cls): """ for test in cls.concur_test: cls.nd_job_queue.put(test) - + @classmethod def _run_test(cls, test_dict: dict, thread_flag: bool=False): """ @@ -86,16 +86,18 @@ def _run_test(cls, test_dict: dict, thread_flag: bool=False): disruptive tests. """ tc_class = test_dict["testClass"] + volume_type = test_dict["volType"] + mname = test_dict["moduleName"][:-3] + tc_log_path = cls.base_log_path+test_dict["modulePath"][5:-3]+"/" +\ - test_dict["volType"]+"/"+test_dict["moduleName"][:-3]+".log" + volume_type+"/"+mname+".log" # to calculate time spent to execute the test start = time.time() - runner_thread_obj = RunnerThread(tc_class, cls.param_obj, - test_dict["moduleName"][:-3], - test_dict["volType"], tc_log_path, - cls.log_level, thread_flag) - test_stats = runner_thread_obj.run_thread() + + runner_thread_obj = RunnerThread(tc_class, cls.param_obj, volume_type, + mname, tc_log_path, cls.log_level) + test_stats = runner_thread_obj.run_thread(mname, volume_type, thread_flag) test_stats['timeTaken'] = time.time() - start result_text = test_dict["moduleName"][:-3]+"-"+test_dict["volType"] diff --git a/tests/parent_test.py b/tests/parent_test.py index db8b92bf7..934f158b8 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -1,7 +1,7 @@ import traceback import abc -from common.mixin import RedantMixin from datetime import datetime +from common.mixin import RedantMixin class ParentTest(metaclass=abc.ABCMeta): @@ -24,7 +24,7 @@ class ParentTest(metaclass=abc.ABCMeta): "dist-arb" : "distributed-arbiter" } def __init__(self, mname: str, param_obj, volume_type: str, - thread_flag: bool, log_path: str, log_level: str = 'I'): + log_path: str, log_level: str = 'I'): """ Creates volume And runs the specific component in the @@ -43,22 +43,6 @@ def __init__(self, mname: str, param_obj, volume_type: str, self.client_list = param_obj.get_client_ip_list() self.brick_roots = param_obj.get_brick_roots() - if not thread_flag: - self.redant.start_glusterd() - self.redant.create_cluster(self.server_list) - - if self.volume_type != "Generic": - self.vol_name = (f"{mname}-{volume_type}") - self.redant.volume_create(self.vol_name, self.server_list[0], - self.volume_types_info[self.conv_dict[volume_type]], - self.server_list, self.brick_roots, True) - self.redant.volume_start(self.vol_name, self.server_list[0]) - self.mountpoint = (f"/mnt/{self.vol_name}") - self.redant.execute_io_cmd(f"mkdir -p {self.mountpoint}", - self.client_list[0]) - self.redant.volume_mount(self.server_list[0], self.vol_name, - self.mountpoint, self.client_list[0]) - def _configure(self, mname: str, server_details: dict, client_details: dict, log_path: str, log_level: str): machine_detail = {**client_details, **server_details} @@ -71,7 +55,7 @@ def _configure(self, mname: str, server_details: dict, def run_test(self): pass - def parent_run_test(self): + def parent_run_test(self, mname: str, volume_type: str, thread_flag: bool): """ Function to handle the exception logic and invokes the run_test which is overridden by every TC. @@ -82,7 +66,29 @@ def parent_run_test(self): ============================================================ ''') try: + if not thread_flag: + self.redant.start_glusterd() + self.redant.create_cluster(self.server_list) + + if self.volume_type != "Generic": + self.vol_name = (f"{mname}-{volume_type}") + self.redant.volume_create(self.vol_name, self.server_list[0], + self.volume_types_info[self.conv_dict[volume_type]], + self.server_list, self.brick_roots, True) + self.redant.volume_start(self.vol_name, self.server_list[0]) + self.mountpoint = (f"/mnt/{self.vol_name}") + self.redant.execute_io_cmd(f"mkdir -p {self.mountpoint}", + self.client_list[0]) + self.redant.volume_mount(self.server_list[0], self.vol_name, + self.mountpoint, self.client_list[0]) self.run_test(self.redant) + + if self.volume_type != 'Generic': + self.redant.volume_unmount(self.mountpoint, self.client_list[0]) + self.redant.execute_io_cmd(f"rm -rf {self.mountpoint}", + self.client_list[0]) + self.redant.volume_stop(self.vol_name, self.server_list[0], True) + self.redant.volume_delete(self.vol_name, self.server_list[0]) except Exception as error: tb = traceback.format_exc() self.redant.logger.error(error) @@ -98,10 +104,4 @@ def terminate(self): """ Closes connection for now. """ - if self.volume_type != 'Generic': - self.redant.volume_unmount(self.mountpoint, self.client_list[0]) - self.redant.execute_io_cmd(f"rm -rf {self.mountpoint}", - self.client_list[0]) - self.redant.volume_stop(self.vol_name, self.server_list[0], True) - self.redant.volume_delete(self.vol_name, self.server_list[0]) self.redant.deconstruct_connection() From 4d7a95b30bbe12df45f1ef1469251b285edd81d3 Mon Sep 17 00:00:00 2001 From: nishith-vihar <77044911+nishith-vihar@users.noreply.github.com> Date: Tue, 4 May 2021 16:24:03 +0530 Subject: [PATCH 11/20] Check is added for existence and accessibility of config_file (#232) * Check is added for existence and accessibility of config_file Fixes: #231 Signed-off-by: Nishith Vihar Sakinala * Cheking the existence of file is shifted to test_parser IOerror is thrown in test_parser and caught in redant_main Fixes: #231 Signed-off-by: Nishith Vihar Sakinala * Removing os module from params handler Fixes: #231 Signed-off-by: Nishith Vihar Sakinala * Adding a function for checking file accessibility A function is addded to check the accessibility of the file. Fixes: #231 Signed-off-by: Nishith Vihar Sakinala --- core/parsing/test_parser.py | 32 +++++++++++++++++++++++--------- core/redant_main.py | 8 ++++++-- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/core/parsing/test_parser.py b/core/parsing/test_parser.py index 7e131f650..683b9a914 100644 --- a/core/parsing/test_parser.py +++ b/core/parsing/test_parser.py @@ -5,13 +5,29 @@ import yaml -class Parser(): +class Parser: """ This class consists an API which parses the configuration file from the filepath. The API is called from the ParamsHandler module. """ + @staticmethod + def file_accessible(path, mode='r'): + """ + Check if the file or directory at `path` can + be accessed by the program using `mode` open flags. + Args: + + """ + try: + f = open(path, mode) + f.close() + except IOError: + return False + return True + + @staticmethod def generate_config_hashmap(filepath: str) -> dict: """ @@ -22,11 +38,9 @@ def generate_config_hashmap(filepath: str) -> dict: dict: Hashmap for config file as a dictionary. None: None on failure. """ - try: - configfd = open(filepath, 'r') - config_hashmap = yaml.load(configfd, Loader=yaml.FullLoader) - configfd.close() - return config_hashmap - except IOError: - print("Error: can\'t find config file or read data.") - return None + if not Parser.file_accessible(filepath): + raise IOError + configfd = open(filepath, 'r') + config_hashmap = yaml.load(configfd, Loader=yaml.FullLoader) + configfd.close() + return config_hashmap diff --git a/core/redant_main.py b/core/redant_main.py index 871b75ee8..fdfe198fd 100644 --- a/core/redant_main.py +++ b/core/redant_main.py @@ -4,7 +4,6 @@ 2) Tests-to-run list preparation (by test_list_builder). 3) Invocation of the test_runner. """ - import sys import time import datetime @@ -15,6 +14,7 @@ from result_handler import ResultHandler from environ import environ + def pars_args(): """ Function to handle command line parsing for the redant. @@ -62,7 +62,11 @@ def main(): start = time.time() args = pars_args() - param_obj = ParamsHandler(args.config_file) + try: + param_obj = ParamsHandler(args.config_file) + except IOError: + print("Error: can't find config file or read data.") + return # Building the test list and obtaining the TC details. test_cases_tuple = TestListBuilder.create_test_dict(args.test_dir, From 93c33599c33658e87b4e264a8d4648f80f0c9204 Mon Sep 17 00:00:00 2001 From: Ayush Ujjwal <77244483+aujjwal-redhat@users.noreply.github.com> Date: Tue, 4 May 2021 16:33:08 +0530 Subject: [PATCH 12/20] Added information about the flags (#247) Added information about the flags. Signed-off-by: Ayush Ujjwal --- README.md | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a905084f9..b96a8c690 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,19 @@ Design Doc Link : [Gluster-test Design-doc](https://docs.google.com/document/d/1D8zUSmg-00ey711gsqvS6G9i_fGN2cE0EbG4u1TOsaQ/edit?usp=sharing) -### Structure: +# Contents +* [Structure](#structure) +* [Set up](#set-up) +* [About](#flags) + +## Structure: core: contains the core redant framework which includes parsing,test_list_builder,test_runner,runner_thread and redant_main.
common: consists of the libs and ops that will help in running the test cases and the mixin class.
tests: holds the test cases as performace and functional tests and includes parent test. Add any new test cases here.
+## Set up + ### To start Working: 1. Clone redant repo. @@ -55,3 +62,15 @@ For example, One can also run the scripts given under the tools dir which will reduce the lengthy commands to be typed out everytime. Check out the README.md at the link [Tools-README](https://github.com/srijan-sivakumar/redant/blob/main/tools/README.md) + +## About + +### Flags + +* -c, --config : Stores the path of the config file(s) to read. You need to provide the path else by default it is `None`. Moreover, this is a required argument so you need to provide it for sure. +* -t, --test-dir : The path of the test directory where test cases exist. You can also provide the path to the specific test file. But in that case remember the `-sp` flag :upside_down_face:. This is also a required argument so don't forget it. +* -l, --log-dir : It stores the path of the log directory where you want the log files to be kept. By default it stores `/tmp/redant` and it is not a required argument. +* -ll, --log-level : The log level you want for the execution.By default the log level is `I` (INFO). There are other log levels also like `D`(DEBUG), `W`(WARN) etc. +* -cc, --concurrency-count : It stores the number of concurrent tests run. By default it is 4. +* -rf, --result-file : It stores the path of the result file. By default it is `None` +* -xls, --excel-sheet : It stores the path of the excel sheet. By default it is `None`. \ No newline at end of file From 0ebd94f6583e1c5151c72ebe2520a0372aa49f8f Mon Sep 17 00:00:00 2001 From: srijan-sivakumar Date: Tue, 4 May 2021 22:48:27 +0530 Subject: [PATCH 13/20] Requirement of `cmd run` successfully in all ops. This seems like an overkill. Even though the improvement received might be in small units, it is but a logical step to not log for successful run of a command. The reasoning being just stating running command xyz on node or nodes xyz without showing any errors for the same command, will imply that it succeeded, hence we can eliminate these success logs. Fixes: #250 Signed-off-by: srijan-sivakumar --- common/ops/abstract_ops.py | 4 ---- common/rexe.py | 2 -- 2 files changed, 6 deletions(-) diff --git a/common/ops/abstract_ops.py b/common/ops/abstract_ops.py index 2a6311d23..2fece32c5 100644 --- a/common/ops/abstract_ops.py +++ b/common/ops/abstract_ops.py @@ -36,8 +36,6 @@ def execute_abstract_op_node(self, cmd : str, node : str=None): self.logger.error(ret['msg']['opErrstr']) raise Exception(ret['msg']['opErrstr']) - self.logger.info(f"Successfully ran {cmd} on {node}") - return ret def execute_abstract_op_multinode(self, cmd : str, node : str=None): @@ -59,8 +57,6 @@ def execute_abstract_op_multinode(self, cmd : str, node : str=None): self.logger.error(ret['msg']['opErrstr']) raise Exception(ret['msg']['opErrstr']) - self.logger.info(f"Successfully ran {cmd} on {node}") - return ret diff --git a/common/rexe.py b/common/rexe.py index a2ea7ac31..a0384bcfa 100644 --- a/common/rexe.py +++ b/common/rexe.py @@ -38,7 +38,6 @@ def establish_connection(self): pkey=mykey, ) - self.logger.debug(f"SSH connection to {node} is successful.") except Exception as e: self.logger.error(f"Connection failure. Exception : {e}") self.connect_flag = False @@ -103,7 +102,6 @@ def execute_command(self, cmd, node): hostname=node, pkey=mykey, ) - self.logger.debug(f"SSH connection to {node} is successful.") self.node_dict[node] = node_ssh_client except Exception as e: self.logger.error(f"Connection failure. Exceptions {e}.") From 53d73224ae0dc0a1249b572de4971b559e1b5fde Mon Sep 17 00:00:00 2001 From: srijan-sivakumar Date: Tue, 4 May 2021 23:07:29 +0530 Subject: [PATCH 14/20] Is a limit of 20 seconds good in incremental backoff ? Replaced the incremental time-of with sleep(1). Seen an improvement of 25 seconds in the total test framework runtime. Fixes: #251 Signed-off-by: srijan-sivakumar --- core/test_runner.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/test_runner.py b/core/test_runner.py index 284eeac70..4b01768ea 100644 --- a/core/test_runner.py +++ b/core/test_runner.py @@ -46,13 +46,9 @@ def run_tests(cls): proc.start() # TODO replace incremental backup with a signalling and lock. - backoff_time = 0 while len(jobs) > 0: jobs = [job for job in jobs if job.is_alive()] - if backoff_time == 20: - time.sleep(backoff_time) - else: - backoff_time += 1 + time.sleep(1) for iter in range(cls.concur_count): proc.join() From 0cb1257004fbd18adc8c8ae2d0200840b3e50165 Mon Sep 17 00:00:00 2001 From: srijan-sivakumar Date: Wed, 5 May 2021 06:41:05 +0530 Subject: [PATCH 15/20] Fixing synchronous execute_command The execute_command should be waiting for the response from the remote shell session before going to receive the response as it might happen that the user gave a certain command which is long running and it wasn't checked for its end. Updates: #248 Signed-off-by: srijan-sivakumar --- common/rexe.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/common/rexe.py b/common/rexe.py index a0384bcfa..f6971e7a1 100644 --- a/common/rexe.py +++ b/common/rexe.py @@ -108,6 +108,12 @@ def execute_command(self, cmd, node): # On rebooting the node _, stdout, stderr = self.node_dict[node].exec_command(cmd) + # Wait till command completes. + while not stdout.channel.exit_status_ready(): + time.sleep(1) + if stdout.channel.recv_ready(): + break + if stdout.channel.recv_exit_status() != 0: ret_dict['Flag'] = False ret_dict['msg'] = stdout.readlines() @@ -115,6 +121,7 @@ def execute_command(self, cmd, node): if isinstance(ret_dict['error_msg'], list): ret_dict['error_msg'] = "".join(ret_dict['error_msg']) else: + # Gluster related commands use --xml flag for more info on response if cmd.find("--xml") != -1: stdout_xml_string = "".join(stdout.readlines()) ret_dict['msg'] = json.loads(json.dumps(xmltodict.parse( From 4d218dc7a1dee45e41f15eea24605ac05d9e6819 Mon Sep 17 00:00:00 2001 From: schaffung Date: Wed, 5 May 2021 06:46:07 +0530 Subject: [PATCH 16/20] Update rexe.py Adding import for time. --- common/rexe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/common/rexe.py b/common/rexe.py index f6971e7a1..b8a5d63eb 100644 --- a/common/rexe.py +++ b/common/rexe.py @@ -1,5 +1,6 @@ import os import random +import time import concurrent.futures import paramiko import xmltodict From 0132dfe9482199ec7706bd3538b9dc5f5ad48822 Mon Sep 17 00:00:00 2001 From: srijan-sivakumar Date: Wed, 5 May 2021 07:00:25 +0530 Subject: [PATCH 17/20] Revert "Update rexe.py" This reverts commit 4d218dc7a1dee45e41f15eea24605ac05d9e6819. --- common/rexe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/common/rexe.py b/common/rexe.py index b8a5d63eb..f6971e7a1 100644 --- a/common/rexe.py +++ b/common/rexe.py @@ -1,6 +1,5 @@ import os import random -import time import concurrent.futures import paramiko import xmltodict From c74e086f4624e845a669d24fc339b04ec2600d13 Mon Sep 17 00:00:00 2001 From: srijan-sivakumar Date: Wed, 5 May 2021 07:00:35 +0530 Subject: [PATCH 18/20] Revert "Fixing synchronous execute_command" This reverts commit 0cb1257004fbd18adc8c8ae2d0200840b3e50165. --- common/rexe.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/common/rexe.py b/common/rexe.py index f6971e7a1..a0384bcfa 100644 --- a/common/rexe.py +++ b/common/rexe.py @@ -108,12 +108,6 @@ def execute_command(self, cmd, node): # On rebooting the node _, stdout, stderr = self.node_dict[node].exec_command(cmd) - # Wait till command completes. - while not stdout.channel.exit_status_ready(): - time.sleep(1) - if stdout.channel.recv_ready(): - break - if stdout.channel.recv_exit_status() != 0: ret_dict['Flag'] = False ret_dict['msg'] = stdout.readlines() @@ -121,7 +115,6 @@ def execute_command(self, cmd, node): if isinstance(ret_dict['error_msg'], list): ret_dict['error_msg'] = "".join(ret_dict['error_msg']) else: - # Gluster related commands use --xml flag for more info on response if cmd.find("--xml") != -1: stdout_xml_string = "".join(stdout.readlines()) ret_dict['msg'] = json.loads(json.dumps(xmltodict.parse( From bc29b124346ea1bdfa5105c35c3d72e99fdd9456 Mon Sep 17 00:00:00 2001 From: schaffung Date: Wed, 5 May 2021 11:48:42 +0530 Subject: [PATCH 19/20] Adding async capabilities in rexe. (#252) * Adding async capabilities in rexe. The current set of features don't support async behaviour and for th eops and tests to grow, this is a natural extension. Async uses the same set of base libraries as that of sync command execution. Fixes: #248 Signed-off-by: srijan-sivakumar * Update rexe.py Added docstrings. --- common/rexe.py | 127 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 124 insertions(+), 3 deletions(-) diff --git a/common/rexe.py b/common/rexe.py index a0384bcfa..e08dc7043 100644 --- a/common/rexe.py +++ b/common/rexe.py @@ -1,4 +1,5 @@ import os +import time import random import concurrent.futures import paramiko @@ -6,6 +7,7 @@ import json from multipledispatch import dispatch + class Rexe: def __init__(self, host_dict): self.host_generic = ['alls', 'allp'] @@ -30,13 +32,14 @@ def establish_connection(self): for node in self.host_dict: node_ssh_client = paramiko.SSHClient() - node_ssh_client.load_host_keys(os.path.expanduser('/root/.ssh/known_hosts')) + node_ssh_client.load_host_keys( + os.path.expanduser('/root/.ssh/known_hosts')) mykey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') try: node_ssh_client.connect( hostname=node, pkey=mykey, - ) + ) except Exception as e: self.logger.error(f"Connection failure. Exception : {e}") @@ -95,7 +98,8 @@ def execute_command(self, cmd, node): except Exception as e: # Reconnection to be done. node_ssh_client = paramiko.SSHClient() - node_ssh_client.load_host_keys(os.path.expanduser('/root/.ssh/known_hosts')) + node_ssh_client.load_host_keys( + os.path.expanduser('/root/.ssh/known_hosts')) mykey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') try: node_ssh_client.connect( @@ -129,6 +133,123 @@ def execute_command(self, cmd, node): self.logger.debug(ret_dict) return ret_dict + @dispatch(str) + def execute_command_async(self, cmd: str) -> dict: + """ + Module to handle random node async execution. + Returns: + ret: A dictionary consisting + - cmd : Command requested + - node : Node wherein it was run + - stdout : The stdout handle + - stderr : The stderr handle + """ + return self.execute_command_async(cmd, self._random_node()) + + @dispatch(str, str) + def execute_command_async(self, cmd: str, node: str) -> dict: + """ + Function to execute command asynchronously in the given node. + Args: + cmd (string): Command to be executed. + node (string) : The node ip wherein the command is to be run. + Returns: + ret: A dictionary consisting + - cmd : Command requested + - node : Node wherein the command was run + - stdout : The stdout handle + - stderr : The stderr handle + """ + async_obj = {} + + if not self.connect_flag: + return async_obj + try: + _, stdout, stderr = self.node_dict[node].exec_command(cmd) + async_obj = {"cmd": cmd, "node": node, "stdout": stdout, + "stderr": stderr} + except Exception as e: + # Reconnection to be done. + node_ssh_client = paramiko.SSHClient() + node_ssh_client.load_host_keys( + os.path.expanduser('/root/.ssh/known_hosts')) + mykey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') + try: + node_ssh_client.connect( + hostname=node, + pkey=mykey, + ) + self.node_dict[node] = node_ssh_client + except Exception as e: + self.logger.error(f"Connection failure. Exceptions {e}.") + # On rebooting the node + _, stdout, stderr = self.node_dict[node].exec_command(cmd) + + async_obj = {"cmd": cmd, "node": node, "stdout": stdout, + "stderr": stderr} + return async_obj + + def check_async_command_status(self, async_obj: dict) -> bool: + """ + A check to see if the async execution of a command which + was dispatched has been finished. + Args: + async_obj (dict) : Contains the details about the async command, + with keys -> 'stdout', 'stderr', 'cmd', 'node' + Returns: + Bool : True if the operations is completed or else False. + """ + return async_obj["stdout"].channel.exit_status_ready() + + def collect_async_result(self, async_obj: dict) -> dict: + """ + Collect the async command's execution result after it ends. + Args: + async_obj (dict) : Contains the details about the async command, + with keys -> 'stdout', 'stderr', 'cmd', 'node' + Returns: + dict: Returns the resultant dictionary + """ + ret_dict = {} + if async_obj['stdout'].channel.recv_exit_status() != 0: + ret_dict['Flag'] = False + ret_dict['msg'] = async_obj['stdout'].readlines() + ret_dict['error_msg'] = async_obj['stderr'].readlines() + if isinstance(ret_dict['error_msg'], list): + ret_dict['error_msg'] = "".join(ret_dict['error_msg']) + else: + if async_obj['cmd'].find("--xml") != -1: + stdout_xml_string = "".join(async_obj['stdout'].readlines()) + ret_dict['msg'] = json.loads(json.dumps(xmltodict.parse( + stdout_xml_string)))['cliOutput'] + else: + ret_dict['msg'] = async_obj['stdout'].readlines() + ret_dict['Flag'] = True + ret_dict['node'] = async_obj['node'] + ret_dict['cmd'] = async_obj['cmd'] + ret_dict['error_code'] = async_obj['stdout'].channel.recv_exit_status() + + self.logger.debug(ret_dict) + return ret_dict + + def wait_till_async_command_ends(self, async_obj: dict) -> dict: + """ + Stay put till the async command finished it's execution and + provide the required return value. + Args: + async_obj (dict) : Contains the details about the async command, + with keys -> 'stdout', 'stderr', 'cmd', 'node' + Returns: + dict: Returns the resultant dictionary after the command ends. + """ + while not async_obj['stdout'].channel.exit_status_ready(): + time.sleep(1) + if async_obj['stdout'].channel.recv_ready(): + ret_dict = self.collect_async_result(async_obj) + break + + return ret_dict + @dispatch(str) def execute_command_multinode(self, cmd): """ From 68eb7313f947253a10b76d4afa13bba289b45292 Mon Sep 17 00:00:00 2001 From: nishith-vihar <77044911+nishith-vihar@users.noreply.github.com> Date: Thu, 6 May 2021 08:15:13 +0530 Subject: [PATCH 20/20] Basic io functions are added (#254) Basic io functions like creating a file or a directory and checking is path exists is added. Fixes: #246 Signed-off-by: Nishith Vihar Sakinala --- common/ops/support_ops/io_ops.py | 72 ++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/common/ops/support_ops/io_ops.py b/common/ops/support_ops/io_ops.py index 7839e03f3..606d43c90 100644 --- a/common/ops/support_ops/io_ops.py +++ b/common/ops/support_ops/io_ops.py @@ -33,3 +33,75 @@ def execute_io_cmd(self, cmd: str, host: str = None): ret = self.execute_abstract_op_node(cmd, host) return ret + + def create_file(self, path: str, filename: str, node: str): + """ + Creates a file in the specified specified path + """ + cmd = f"touch {path}/{filename}" + self.execute_abstract_op_node(cmd, node) + + def create_dir(self, path: str, dirname: str, node: str): + """ + Creates a directory in the specified path + """ + cmd = f"mkdir -p {path}/{dirname}" + self.execute_abstract_op_node(cmd, node) + + def create_dirs(self, list_of_nodes: list, list_of_dir_paths: list): + """ + Create directories on nodes. + Args: + list_of_nodes (list): Nodes on which dirs has to be created. + list_of_dir_paths (list): List of dirs abs path. + Returns: + bool: True of creation of all dirs on all nodes is successful. + False otherwise. + """ + if not isinstance(list_of_nodes, list): + list_of_nodes = [list_of_nodes] + + if isinstance(list_of_dir_paths, list): + list_of_dir_paths = ' '.join(list_of_dir_paths) + + # Create upload dir on each node + cmd = f"mkdir -p {list_of_dir_paths}" + _rc = True + + ret = self.execute_command_multinode(cmd,list_of_nodes) + for each_ret in ret: + if each_ret['error_code'] != 0: + self.logger.error(f"Failed to create the dirs: {list_of_dir_paths.split(' ')} " + f"on node: {each_ret['node']} - {each_ret['error_msg']}") + _rc = False + + return _rc + + + def path_exists(self, list_of_nodes, list_of_paths): + """Check if paths exist on nodes. + Args: + list_of_nodes (list): List of nodes. + list_of_paths (list): List of abs paths to verify if path exist. + Returns: + bool: True if all paths exists on all nodes. False otherwise. + """ + if not isinstance(list_of_nodes, list): + list_of_nodes = [list_of_nodes] + + if not isinstance(list_of_paths, list): + list_of_paths = (list_of_paths.split(" ")) + + + _rc = True + + for path in list_of_paths: + cmd = f"ls -l {path}" + ret = self.execute_command_multinode(cmd, list_of_nodes) + for each_ret in ret: + if each_ret['error_code'] != 0: + error_string = each_ret['error_msg'].rstrip('\n') + self.logger.error(f"{error_string} on node {each_ret['node']}") + _rc = False + + return _rc \ No newline at end of file