From f60eb46dd0dd73a578815c622dcce85518622a99 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 24 Sep 2024 14:32:56 +0200 Subject: [PATCH 01/16] New version --- PILOTVERSION | 2 +- pilot/util/constants.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index e88f7664..2aa0dce2 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.2.8 \ No newline at end of file +3.8.3.1 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index e497600c..6949fdbc 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '8' # build number should be reset to '1' for every new development cycle +REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '1' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From c2033c81023af5c9449012939be2c9d577826b16 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 24 Sep 2024 14:34:29 +0200 Subject: [PATCH 02/16] Added timeout --- pilot/util/https.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/util/https.py b/pilot/util/https.py index a26c75ce..eb5336cf 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -858,7 +858,7 @@ def request2(url: str = "", # Send the request securely try: logger.debug('sending data to server') - with urllib.request.urlopen(req, context=ssl_context, timeout=30) as response: + with urllib.request.urlopen(req, context=ssl_context, timeout=config.Pilot.http_maxtime) as response: # Handle the response here logger.debug(f"response.status={response.status}, response.reason={response.reason}") ret = response.read().decode('utf-8') From b71b5364719b868a612e3087e7afcb08482ed997 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Wed, 25 Sep 2024 10:03:21 +0200 Subject: [PATCH 03/16] Testing new execute function --- pilot/util/container.py | 91 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/pilot/util/container.py b/pilot/util/container.py index 5243eca6..215cba6e 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -17,7 +17,7 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-24 """Functions for executing commands.""" @@ -47,6 +47,95 @@ def execute(executable: Any, **kwargs: dict) -> Any: + usecontainer = kwargs.get('usecontainer', False) + job = kwargs.get('job') + obscure = kwargs.get('obscure', '') + + if isinstance(executable, list): + executable = ' '.join(executable) + + if job and job.imagename != "" and "runcontainer" in executable: + usecontainer = False + job.usecontainer = usecontainer + + if usecontainer: + executable, diagnostics = containerise_executable(executable, **kwargs) + if not executable: + return None if kwargs.get('returnproc', False) else -1, "", diagnostics + + if not kwargs.get('mute', False): + print_executable(executable, obscure=obscure) + + timeout = get_timeout(kwargs.get('timeout', None)) + exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] + + exit_code = 0 + stdout = '' + stderr = '' + + def read_output(pipe, output_list): + for line in iter(pipe.readline, ''): + output_list.append(line) + pipe.close() + + process = None + with execute_lock: + process = subprocess.Popen(exe, + bufsize=-1, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=kwargs.get('cwd', getcwd()), + preexec_fn=os.setsid, + encoding='utf-8', + errors='replace') + if kwargs.get('returnproc', False): + return process + + stdout_lines = [] + stderr_lines = [] + + stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_lines)) + stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_lines)) + + stdout_thread.start() + stderr_thread.start() + + try: + logger.debug(f'subprocess.communicate() will use timeout {timeout} s') + process.wait(timeout=timeout) + except subprocess.TimeoutExpired as exc: + stderr += f'subprocess communicate sent TimeoutExpired: {exc}' + logger.warning(stderr) + exit_code = errors.COMMANDTIMEDOUT + stderr = kill_all(process, stderr) + except Exception as exc: + logger.warning(f'exception caused when executing command: {executable}: {exc}') + exit_code = errors.UNKNOWNEXCEPTION + stderr = kill_all(process, str(exc)) + else: + exit_code = process.poll() + + stdout_thread.join() + stderr_thread.join() + + stdout = ''.join(stdout_lines) + stderr = ''.join(stderr_lines) + + try: + if process: + process.wait(timeout=60) + except subprocess.TimeoutExpired: + if process: + logger.warning("process did not complete within the timeout of 60s - terminating") + process.terminate() + + if stdout and stdout.endswith('\n'): + stdout = stdout[:-1] + + return exit_code, stdout, stderr + + +def execute_old(executable: Any, **kwargs: dict) -> Any: """ Execute the command with its options in the provided executable list using subprocess time-out handler. From da69c8e0d0640ddace1d3ca2c4932ccd4beefb8d Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Wed, 25 Sep 2024 10:51:38 +0200 Subject: [PATCH 04/16] Testing new execute function --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/container.py | 96 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 2aa0dce2..42da0da0 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.3.1 \ No newline at end of file +3.8.3.2 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 6949fdbc..cce88a97 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '1' # build number should be reset to '1' for every new development cycle +BUILD = '2' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index 215cba6e..fc1511b3 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -73,6 +73,102 @@ def execute(executable: Any, **kwargs: dict) -> Any: stdout = '' stderr = '' + def read_output(pipe, file): + for line in iter(pipe.readline, ''): + file.write(line) + pipe.close() + + process = None + with execute_lock: + process = subprocess.Popen(exe, + bufsize=-1, + stdout=kwargs.get('stdout', subprocess.PIPE), + stderr=kwargs.get('stderr', subprocess.PIPE), + cwd=kwargs.get('cwd', getcwd()), + preexec_fn=os.setsid, + encoding='utf-8', + errors='replace') + if kwargs.get('returnproc', False): + return process + + stdout_file = kwargs.get('stdout', subprocess.PIPE) + stderr_file = kwargs.get('stderr', subprocess.PIPE) + + stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_file)) + stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_file)) + + stdout_thread.start() + stderr_thread.start() + + try: + logger.debug(f'subprocess.communicate() will use timeout {timeout} s') + process.wait(timeout=timeout) + except subprocess.TimeoutExpired as exc: + stderr += f'subprocess communicate sent TimeoutExpired: {exc}' + logger.warning(stderr) + exit_code = errors.COMMANDTIMEDOUT + stderr = kill_all(process, stderr) + except Exception as exc: + logger.warning(f'exception caused when executing command: {executable}: {exc}') + exit_code = errors.UNKNOWNEXCEPTION + stderr = kill_all(process, str(exc)) + else: + exit_code = process.poll() + + stdout_thread.join() + stderr_thread.join() + + if stdout_file != subprocess.PIPE: + stdout_file.flush() + stdout_file.seek(0) + stdout = stdout_file.read() + + if stderr_file != subprocess.PIPE: + stderr_file.flush() + stderr_file.seek(0) + stderr = stderr_file.read() + + try: + if process: + process.wait(timeout=60) + except subprocess.TimeoutExpired: + if process: + logger.warning("process did not complete within the timeout of 60s - terminating") + process.terminate() + + if stdout and stdout.endswith('\n'): + stdout = stdout[:-1] + + return exit_code, stdout, stderr + + +def execute_old2(executable: Any, **kwargs: dict) -> Any: + usecontainer = kwargs.get('usecontainer', False) + job = kwargs.get('job') + obscure = kwargs.get('obscure', '') + + if isinstance(executable, list): + executable = ' '.join(executable) + + if job and job.imagename != "" and "runcontainer" in executable: + usecontainer = False + job.usecontainer = usecontainer + + if usecontainer: + executable, diagnostics = containerise_executable(executable, **kwargs) + if not executable: + return None if kwargs.get('returnproc', False) else -1, "", diagnostics + + if not kwargs.get('mute', False): + print_executable(executable, obscure=obscure) + + timeout = get_timeout(kwargs.get('timeout', None)) + exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] + + exit_code = 0 + stdout = '' + stderr = '' + def read_output(pipe, output_list): for line in iter(pipe.readline, ''): output_list.append(line) From e64f03d0b7543e78cc7ee28533d5b002eb637f72 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 27 Sep 2024 16:23:03 +0200 Subject: [PATCH 05/16] Added thread based reading of stdout/err --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/container.py | 110 ++++++++++++++++++++++++++-------------- 3 files changed, 73 insertions(+), 41 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 42da0da0..6a586bf6 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.3.2 \ No newline at end of file +3.8.3.5 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index cce88a97..e14e395b 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '2' # build number should be reset to '1' for every new development cycle +BUILD = '5' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index fc1511b3..74c109b9 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -31,6 +31,7 @@ import threading from os import environ, getcwd, getpgid, kill #, setpgrp, getpgid #setsid +from queue import Queue from signal import SIGTERM, SIGKILL from time import sleep from typing import Any, TextIO @@ -46,18 +47,32 @@ execute_lock = threading.Lock() -def execute(executable: Any, **kwargs: dict) -> Any: +def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901 + """ + Executes the command with its options in the provided executable list using subprocess time-out handler. + + The function also determines whether the command should be executed within a container. + + :param executable: command to be executed (str or list) + :param kwargs: kwargs (dict) + :return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument). + """ usecontainer = kwargs.get('usecontainer', False) job = kwargs.get('job') - obscure = kwargs.get('obscure', '') + #shell = kwargs.get("shell", False) + obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message + # convert executable to string if it is a list if isinstance(executable, list): executable = ' '.join(executable) + # switch off pilot controlled containers for user defined containers if job and job.imagename != "" and "runcontainer" in executable: usecontainer = False job.usecontainer = usecontainer + # Import user specific code if necessary (in case the command should be executed in a container) + # Note: the container.wrapper() function must at least be declared if usecontainer: executable, diagnostics = containerise_executable(executable, **kwargs) if not executable: @@ -66,18 +81,17 @@ def execute(executable: Any, **kwargs: dict) -> Any: if not kwargs.get('mute', False): print_executable(executable, obscure=obscure) + # always use a timeout to prevent stdout buffer problem in nodes with lots of cores timeout = get_timeout(kwargs.get('timeout', None)) + exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable] + # try: intercept exception such as OSError -> report e.g. error.RESOURCEUNAVAILABLE: "Resource temporarily unavailable" exit_code = 0 stdout = '' stderr = '' - def read_output(pipe, file): - for line in iter(pipe.readline, ''): - file.write(line) - pipe.close() - + # Acquire the lock before creating the subprocess process = None with execute_lock: process = subprocess.Popen(exe, @@ -85,64 +99,82 @@ def read_output(pipe, file): stdout=kwargs.get('stdout', subprocess.PIPE), stderr=kwargs.get('stderr', subprocess.PIPE), cwd=kwargs.get('cwd', getcwd()), - preexec_fn=os.setsid, + preexec_fn=os.setsid, # setpgrp encoding='utf-8', errors='replace') if kwargs.get('returnproc', False): return process - stdout_file = kwargs.get('stdout', subprocess.PIPE) - stderr_file = kwargs.get('stderr', subprocess.PIPE) + # Create threads to read stdout and stderr asynchronously + stdout_queue = Queue() + stderr_queue = Queue() - stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_file)) - stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_file)) + def read_output(stream, queue): + while True: + try: + line = stream.readline() + except AttributeError: + # Handle the case where stream is None + break - stdout_thread.start() - stderr_thread.start() + if not line: + break - try: - logger.debug(f'subprocess.communicate() will use timeout {timeout} s') - process.wait(timeout=timeout) - except subprocess.TimeoutExpired as exc: - stderr += f'subprocess communicate sent TimeoutExpired: {exc}' - logger.warning(stderr) - exit_code = errors.COMMANDTIMEDOUT - stderr = kill_all(process, stderr) - except Exception as exc: - logger.warning(f'exception caused when executing command: {executable}: {exc}') - exit_code = errors.UNKNOWNEXCEPTION - stderr = kill_all(process, str(exc)) - else: - exit_code = process.poll() + queue.put(line) - stdout_thread.join() - stderr_thread.join() + stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue)) + stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue)) - if stdout_file != subprocess.PIPE: - stdout_file.flush() - stdout_file.seek(0) - stdout = stdout_file.read() + stdout_thread.start() + stderr_thread.start() + + try: + logger.debug(f'subprocess.communicate() will use timeout {timeout} s') + stdout, stderr = process.communicate(timeout=timeout) + except subprocess.TimeoutExpired as exc: + # make sure that stdout buffer gets flushed - in case of time-out exceptions + # flush_handler(name="stream_handler") + stderr += f'subprocess communicate sent TimeoutExpired: {exc}' + logger.warning(stderr) + exit_code = errors.COMMANDTIMEDOUT + stderr = kill_all(process, stderr) + except Exception as exc: + logger.warning(f'exception caused when executing command: {executable}: {exc}') + exit_code = errors.UNKNOWNEXCEPTION + stderr = kill_all(process, str(exc)) + else: + exit_code = process.poll() - if stderr_file != subprocess.PIPE: - stderr_file.flush() - stderr_file.seek(0) - stderr = stderr_file.read() + # Wait for the threads to finish reading + stdout_thread.join() + stderr_thread.join() + # Read the remaining output from the queues + while not stdout_queue.empty(): + stdout += stdout_queue.get() + while not stderr_queue.empty(): + stderr += stderr_queue.get() + + # wait for the process to finish + # (not strictly necessary when process.communicate() is used) try: + # wait for the process to complete with a timeout of 60 seconds if process: process.wait(timeout=60) except subprocess.TimeoutExpired: + # Handle the case where the process did not complete within the timeout if process: logger.warning("process did not complete within the timeout of 60s - terminating") process.terminate() + # remove any added \n if stdout and stdout.endswith('\n'): stdout = stdout[:-1] return exit_code, stdout, stderr -def execute_old2(executable: Any, **kwargs: dict) -> Any: +def execute_old2(executable: Any, **kwargs: dict) -> Any: # noqa: C901 usecontainer = kwargs.get('usecontainer', False) job = kwargs.get('job') obscure = kwargs.get('obscure', '') From fecb02231a6a60186507f23d0e3850f606ca75da Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Sat, 28 Sep 2024 11:00:03 +0200 Subject: [PATCH 06/16] Reporting alternative stage-out with piloterrordiag --- PILOTVERSION | 2 +- pilot/control/data.py | 1 + pilot/util/constants.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 6a586bf6..20610fcc 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.3.5 \ No newline at end of file +3.8.3.7 \ No newline at end of file diff --git a/pilot/control/data.py b/pilot/control/data.py index 28db36a3..86c9ed1a 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -938,6 +938,7 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: entry.is_altstaged = True logger.info('alt stage-out will be applied for remain=%s files (previously failed)', len(remain_files)) + job.piloterrordiags.append(f'Alternative stage-out for {remain_files}') client.transfer(xdata, activity, **kwargs) except PilotException as error: diff --git a/pilot/util/constants.py b/pilot/util/constants.py index e14e395b..c3b7d3cb 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '5' # build number should be reset to '1' for every new development cycle +BUILD = '7' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From fac2cae4e6d53250bdd5861fdea787595d552d3a Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Mon, 30 Sep 2024 09:29:16 +0200 Subject: [PATCH 07/16] Only sending running state from send_heartbeat_if_time() --- pilot/control/job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pilot/control/job.py b/pilot/control/job.py index f9b80c25..d9b4d1fa 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -3226,7 +3226,8 @@ def send_heartbeat_if_time(job: Any, args: Any, update_time: float) -> int: # job.completed will anyway be checked in https::send_update() if job.serverstate not in {'finished', 'failed'}: logger.info(f'will send heartbeat for job in \'{job.state}\' state') - send_state(job, args, job.state) + logger.info("note: will only send \'running\' state to server to prevent sending any final state too early") + send_state(job, args, "running") update_time = time.time() else: logger.debug('will not send any job update') From 21af98e30bb71781fc04aad6ba1a846389402675 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Mon, 30 Sep 2024 09:30:04 +0200 Subject: [PATCH 08/16] Only sending running state from send_heartbeat_if_time() --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 20610fcc..8d4fd203 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.3.7 \ No newline at end of file +3.8.3.8 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index c3b7d3cb..0820e8f4 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '7' # build number should be reset to '1' for every new development cycle +BUILD = '8' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From ce8b3b8075cfa7093ea9538f282f21cfd5049f59 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 1 Oct 2024 15:54:25 +0200 Subject: [PATCH 09/16] Endpoint fix. Writing explicit file names in alt stage-out --- PILOTVERSION | 2 +- pilot/control/data.py | 5 +++-- pilot/util/constants.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 8d4fd203..11fe8cf7 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.3.8 \ No newline at end of file +3.8.3.10 \ No newline at end of file diff --git a/pilot/control/data.py b/pilot/control/data.py index 86c9ed1a..c3d1af73 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -938,7 +938,8 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title: entry.is_altstaged = True logger.info('alt stage-out will be applied for remain=%s files (previously failed)', len(remain_files)) - job.piloterrordiags.append(f'Alternative stage-out for {remain_files}') + f = [entry.lfn for entry in remain_files] + job.piloterrordiags.append(f'Alternative stage-out for {f}') client.transfer(xdata, activity, **kwargs) except PilotException as error: @@ -1088,7 +1089,7 @@ def generate_fileinfo(job: JobData) -> dict: 'surl': entry.turl } if entry.is_altstaged: - dat['ddmendpoint'] = entry.ddmendpoint + dat['endpoint'] = entry.ddmendpoint fileinfo[entry.lfn] = dat diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 0820e8f4..12195687 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '8' # build number should be reset to '1' for every new development cycle +BUILD = '10' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 92034a2a85c0a1a5de48aecf339ab5dfae9ffd41 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 4 Oct 2024 10:49:22 +0200 Subject: [PATCH 10/16] Using a stack instead of recursion --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/processes.py | 38 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 11fe8cf7..402ac81b 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.3.10 \ No newline at end of file +3.8.3.11 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 12195687..62e8c580 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '10' # build number should be reset to '1' for every new development cycle +BUILD = '11' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 4a16dcad..0741cc47 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -54,6 +54,44 @@ def find_processes_in_group(cpids: list, pid: int, ps_cache: str = ""): The cpids input parameter list gets updated in the function. + :param cpids: list of pid's for all child processes to the parent pid, as well as the parent pid itself (int) + :param pid: parent process id (int) + :param ps_cache: ps command output (str). + """ + visited = set() + stack = [pid] + + while stack: + current_pid = stack.pop() + if current_pid in visited: + continue + visited.add(current_pid) + cpids.append(current_pid) + lines = grep_str([str(current_pid)], ps_cache) + + if lines and lines != ['']: + for line in lines: + try: + thispid, thisppid = [int(x) for x in line.split()[:2]] + except Exception as error: + logger.warning(f'exception caught: {error}') + else: + if thisppid == current_pid: + stack.append(thispid) + + +def find_processes_in_group_old(cpids: list, pid: int, ps_cache: str = ""): + """ + Find all processes that belong to the same group using the given ps command output. + + Recursively search for the children processes belonging to pid and return their pid's. + pid is the parent pid and cpids is a list that has to be initialized before calling this function and it contains + the pids of the children AND the parent. + + ps_cache is expected to be the output from the command "ps -eo pid,ppid -m". + + The cpids input parameter list gets updated in the function. + :param cpids: list of pid's for all child processes to the parent pid, as well as the parent pid itself (int) :param pid: parent process id (int) :param ps_cache: ps command output (str). From e766cafdb87db56b4049428adca5a27e810602db Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 4 Oct 2024 11:18:23 +0200 Subject: [PATCH 11/16] Moved token renewal to after queuedata download, only renew token if NO_TOKEN_RENEWAL not present in catchall --- pilot.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pilot.py b/pilot.py index bb1bd797..c9ce5fd6 100755 --- a/pilot.py +++ b/pilot.py @@ -118,9 +118,6 @@ def main() -> int: https_setup(args, get_pilot_version()) args.amq = None - # update the OIDC token if necessary - update_local_oidc_token_info(args.url, args.port) - # let the server know that the worker has started if args.update_server and args.workerpilotstatusupdate: send_worker_status( @@ -149,6 +146,10 @@ def main() -> int: logger.fatal(error) return error.get_error_code() + # update the OIDC token if necessary (after queuedata has been downloaded, since PQ.catchall can contain instruction to prevent token renewal) + if 'NO_TOKEN_RENEWAL' in infosys.queuedata.catchall: + update_local_oidc_token_info(args.url, args.port) + # handle special CRIC variables via params # internet protocol versions 'IPv4' or 'IPv6' can be set via CRIC PQ.params.internet_protocol_version # (must be defined per PQ if wanted). The pilot default is IPv6 From 381c47d17b4979ce4938af1aabe75fad5632e409 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 4 Oct 2024 11:18:39 +0200 Subject: [PATCH 12/16] Only renew token if NO_TOKEN_RENEWAL not present in catchall --- pilot/control/monitor.py | 2 +- pilot/util/constants.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pilot/control/monitor.py b/pilot/control/monitor.py index 24e056de..54a2e20c 100644 --- a/pilot/control/monitor.py +++ b/pilot/control/monitor.py @@ -103,7 +103,7 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901 break # check if the OIDC token needs to be refreshed - if tokendownloadchecktime: + if tokendownloadchecktime and 'NO_TOKEN_RENEWAL' not in queuedata.catchall: if int(time.time() - last_token_check) > tokendownloadchecktime: last_token_check = time.time() update_local_oidc_token_info(args.url, args.port) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 62e8c580..ec79f3aa 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '11' # build number should be reset to '1' for every new development cycle +BUILD = '12' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 1d15c5a25ebddcfa123b6eca8024405b067533be Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 4 Oct 2024 16:29:31 +0200 Subject: [PATCH 13/16] Implemented skipping OIDC token renewal if requested. Corrected request2() for debug mode --- PILOTVERSION | 2 +- pilot.py | 4 +++- pilot/control/monitor.py | 6 +++++- pilot/util/constants.py | 6 +++--- pilot/util/https.py | 7 ++++--- 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 402ac81b..0ce40c12 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.3.11 \ No newline at end of file +3.9.0.14 \ No newline at end of file diff --git a/pilot.py b/pilot.py index c9ce5fd6..ba9b5654 100755 --- a/pilot.py +++ b/pilot.py @@ -147,7 +147,9 @@ def main() -> int: return error.get_error_code() # update the OIDC token if necessary (after queuedata has been downloaded, since PQ.catchall can contain instruction to prevent token renewal) - if 'NO_TOKEN_RENEWAL' in infosys.queuedata.catchall: + if 'no_token_renewal' in infosys.queuedata.catchall and args.version_tag == "RC": + logger.info("OIDC token will not be renewed by the pilot") + else: update_local_oidc_token_info(args.url, args.port) # handle special CRIC variables via params diff --git a/pilot/control/monitor.py b/pilot/control/monitor.py index 54a2e20c..d1f7522a 100644 --- a/pilot/control/monitor.py +++ b/pilot/control/monitor.py @@ -103,10 +103,14 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901 break # check if the OIDC token needs to be refreshed - if tokendownloadchecktime and 'NO_TOKEN_RENEWAL' not in queuedata.catchall: + if tokendownloadchecktime: if int(time.time() - last_token_check) > tokendownloadchecktime: last_token_check = time.time() update_local_oidc_token_info(args.url, args.port) + #if 'no_token_renewal' in queuedata.catchall: + # logger.info("OIDC token will not be renewed by the pilot") + #else: + # update_local_oidc_token_info(args.url, args.port) # abort if kill signal arrived too long time ago, ie loop is stuck if args.kill_time and int(time.time()) - args.kill_time > MAX_KILL_WAIT_TIME: diff --git a/pilot/util/constants.py b/pilot/util/constants.py index ec79f3aa..b8145545 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -26,9 +26,9 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 -VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '3' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '12' # build number should be reset to '1' for every new development cycle +VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates +REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '14' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/https.py b/pilot/util/https.py index eb5336cf..70bcacc8 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -870,13 +870,14 @@ def request2(url: str = "", ret = "" else: if secure and isinstance(ret, str): - if ret.startswith('{') and ret.endswith('}'): + if ret == 'Succeeded': # this happens for sending modeOn (debug mode) + ret = {'StatusCode': '0'} + elif ret.startswith('{') and ret.endswith('}'): try: ret = json.loads(ret) except json.JSONDecodeError as e: logger.warning(f'failed to parse response: {e}') - else: - # For panda server interactions, the response should be in dictionary format + else: # response="StatusCode=_some number_" # Parse the query string into a dictionary query_dict = parse_qs(ret) From 0e340335f0c0f0a295ab67ee101e4b55387c81ea Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Mon, 7 Oct 2024 15:22:06 +0200 Subject: [PATCH 14/16] Removed test code --- PILOTVERSION | 2 +- pilot.py | 2 +- pilot/control/monitor.py | 9 ++++----- pilot/util/constants.py | 2 +- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 0ce40c12..28db2469 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.0.14 \ No newline at end of file +3.9.0.15 \ No newline at end of file diff --git a/pilot.py b/pilot.py index ba9b5654..5405cdb9 100755 --- a/pilot.py +++ b/pilot.py @@ -147,7 +147,7 @@ def main() -> int: return error.get_error_code() # update the OIDC token if necessary (after queuedata has been downloaded, since PQ.catchall can contain instruction to prevent token renewal) - if 'no_token_renewal' in infosys.queuedata.catchall and args.version_tag == "RC": + if 'no_token_renewal' in infosys.queuedata.catchall: logger.info("OIDC token will not be renewed by the pilot") else: update_local_oidc_token_info(args.url, args.port) diff --git a/pilot/control/monitor.py b/pilot/control/monitor.py index d1f7522a..f2110c18 100644 --- a/pilot/control/monitor.py +++ b/pilot/control/monitor.py @@ -106,11 +106,10 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901 if tokendownloadchecktime: if int(time.time() - last_token_check) > tokendownloadchecktime: last_token_check = time.time() - update_local_oidc_token_info(args.url, args.port) - #if 'no_token_renewal' in queuedata.catchall: - # logger.info("OIDC token will not be renewed by the pilot") - #else: - # update_local_oidc_token_info(args.url, args.port) + if 'no_token_renewal' in queuedata.catchall: + logger.info("OIDC token will not be renewed by the pilot") + else: + update_local_oidc_token_info(args.url, args.port) # abort if kill signal arrived too long time ago, ie loop is stuck if args.kill_time and int(time.time()) - args.kill_time > MAX_KILL_WAIT_TIME: diff --git a/pilot/util/constants.py b/pilot/util/constants.py index b8145545..3f11d37f 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '14' # build number should be reset to '1' for every new development cycle +BUILD = '15' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From a3a39f3d8c80214d24842565fa43762b21c6b813 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 11 Oct 2024 10:56:56 +0200 Subject: [PATCH 15/16] Updated version number --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 28db2469..bbafcf0b 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.0.15 \ No newline at end of file +3.9.0.17 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 3f11d37f..98d66af2 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '15' # build number should be reset to '1' for every new development cycle +BUILD = '17' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 304daf9ee35630c0e40b527dcf0a0e16c5a8b1ec Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Tue, 15 Oct 2024 10:17:17 +0200 Subject: [PATCH 16/16] Removed space --- pilot/util/processes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pilot/util/processes.py b/pilot/util/processes.py index 0741cc47..b1c16536 100644 --- a/pilot/util/processes.py +++ b/pilot/util/processes.py @@ -654,7 +654,7 @@ def cleanup(job: JobData, args: object): logger.info("collected zombie processes") if job.pid: - logger.info(f"will attempt to kill all subprocesses of pid={job.pid}") + logger.info(f"will attempt to kill all subprocesses of pid={job.pid}") kill_processes(job.pid) else: logger.warning('cannot kill any subprocesses since job.pid is not set')