From 46d0ac0511a10c009b31402eefc335b832719d59 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Tue, 15 Oct 2024 14:43:19 +0200 Subject: [PATCH 01/16] New version --- PILOTVERSION | 2 +- pilot/util/constants.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index bbafcf0b..425ed0a8 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.0.17 \ No newline at end of file +3.9.1.1 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 98d66af2..f5aae5cb 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '17' # build number should be reset to '1' for every new development cycle +REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '1' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 07a6b47ee4b3868e4bcf0d614cec952db446a2b6 Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Wed, 16 Oct 2024 10:38:49 +0200 Subject: [PATCH 02/16] Updated and modernized grep --- pilot/util/filehandling.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/pilot/util/filehandling.py b/pilot/util/filehandling.py index b4d63b48..316cbed0 100644 --- a/pilot/util/filehandling.py +++ b/pilot/util/filehandling.py @@ -252,6 +252,32 @@ def grep(patterns: list, file_name: str) -> list: """ Search for the patterns in the given list in a file. + Example: + grep(["St9bad_alloc", "FATAL"], "athena_stdout.txt") + -> [list containing the lines below] + CaloTrkMuIdAlg2.sysExecute() ERROR St9bad_alloc + AthAlgSeq.sysExecute() FATAL Standard std::exception is caught + + :param patterns: list of regexp patterns (list) + :param file_name: file name (str) + :return: list of matched lines in file (list). + """ + matched_lines = [] + compiled_patterns = [re.compile(pattern) for pattern in patterns] + + with open(file_name, 'r', encoding='utf-8') as _file: + matched_lines = [ + line for line in _file + if any(compiled_pattern.search(line) for compiled_pattern in compiled_patterns) + ] + + return matched_lines + + +def grep_old(patterns: list, file_name: str) -> list: + """ + Search for the patterns in the given list in a file. + Example: grep(["St9bad_alloc", "FATAL"], "athena_stdout.txt") -> [list containing the lines below] From 999ce12244ff6e59c064b600ac2a550c0b9169fd Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Wed, 16 Oct 2024 10:55:14 +0200 Subject: [PATCH 03/16] Corrected a few type hints, removed unused code --- pilot/util/auxiliary.py | 31 ++++++------------------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/pilot/util/auxiliary.py b/pilot/util/auxiliary.py index 4ab06b2c..69cc7e48 100644 --- a/pilot/util/auxiliary.py +++ b/pilot/util/auxiliary.py @@ -444,7 +444,7 @@ def get_memory_usage(pid: int) -> (int, str, str): return execute(f'ps aux -q {pid}', timeout=60) -def extract_memory_usage_value(output: str) -> int: +def extract_memory_usage_value(output: str) -> str: """ Extract the memory usage value from the ps output (in kB). @@ -482,34 +482,15 @@ def cut_output(txt: str, cutat: int = 1024, separator: str = '\n[...]\n') -> str return txt -def has_instruction_set(instruction_set: str) -> bool: - """ - Determine whether a given CPU instruction set is available. - - The function will use grep to search in /proc/cpuinfo (both in upper and lower case). - - :param instruction_set: instruction set (e.g. AVX2) (str) - :return: True if given instruction set is available, False otherwise (bool). - """ - status = False - cmd = fr"grep -o \'{instruction_set.lower()}[^ ]*\|{instruction_set.upper()}[^ ]*\' /proc/cpuinfo" - exit_code, stdout, stderr = execute(cmd) - if not exit_code and not stderr: - if instruction_set.lower() in stdout.split() or instruction_set.upper() in stdout.split(): - status = True - - return status - - -def has_instruction_sets(instruction_sets: str) -> bool: +def has_instruction_sets(instruction_sets: list) -> str: """ Determine whether a given list of CPU instruction sets is available. The function will use grep to search in /proc/cpuinfo (both in upper and lower case). Example: instruction_sets = ['AVX', 'AVX2', 'SSE4_2', 'XXX'] -> "AVX|AVX2|SSE4_2" - :param instruction_sets: instruction set (e.g. AVX2) (str) - :return: True if given instruction set is available, False otherwise (bool). + :param instruction_sets: instruction set (e.g. AVX2) (list) + :return: string of pipe-separated instruction sets (str). """ ret = "" pattern = "" @@ -709,8 +690,8 @@ def get_host_name(): else: try: host = socket.gethostname() - except socket.herror as exc: - logger.warning(f'failed to get host name: {exc}') + except socket.herror as e: + logger.warning(f'failed to get host name: {e}') host = 'localhost' return host.split('.')[0] From e61c0da51ba4f12923e784aac01b4f3e87a3637b Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Wed, 16 Oct 2024 11:12:12 +0200 Subject: [PATCH 04/16] Corrected a few type hints, removed unused code --- pilot/control/job.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pilot/control/job.py b/pilot/control/job.py index d9b4d1fa..506b0ffc 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -470,7 +470,7 @@ def send_state(job: Any, args: Any, state: str, xml: str = "", metadata: str = " return False -def get_job_status_from_server(job_id: int, url: str, port: str) -> (str, int, int): +def get_job_status_from_server(job_id: int, url: str, port: int) -> (str, int, int): """ Return the current status of job from the dispatcher. @@ -482,7 +482,7 @@ def get_job_status_from_server(job_id: int, url: str, port: str) -> (str, int, i In the case of time-out, the dispatcher will be asked one more time after 10 s. :param job_id: PanDA job id (int) - :param url: PanDA server URL (str + :param url: PanDA server URL (int) :param port: PanDA server port (str) :return: status (string; e.g. holding), attempt_nr (int), status_code (int). """ @@ -1512,10 +1512,6 @@ def get_dispatcher_dictionary(args: Any, taskid: str = "") -> dict: if 'HARVESTER_WORKER_ID' in os.environ: data['worker_id'] = os.environ.get('HARVESTER_WORKER_ID') -# instruction_sets = has_instruction_sets(['AVX', 'AVX2']) -# if instruction_sets: -# data['cpuConsumptionUnit'] = instruction_sets - return data From 8c383c9fbe6da3ec6e49821ad08b12539e755b6f Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Wed, 16 Oct 2024 14:08:34 +0200 Subject: [PATCH 05/16] Created uuidgen_t() to replace call to external command --- pilot/util/auxiliary.py | 10 ++++++++++ pilot/util/tracereport.py | 16 +++++++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pilot/util/auxiliary.py b/pilot/util/auxiliary.py index 69cc7e48..df646abe 100644 --- a/pilot/util/auxiliary.py +++ b/pilot/util/auxiliary.py @@ -33,6 +33,7 @@ from numbers import Number from time import sleep from typing import Any +from uuid import uuid4 from pilot.util.constants import ( SUCCESS, @@ -802,3 +803,12 @@ def is_kubernetes_resource() -> bool: return True else: return False + + +def uuidgen_t() -> str: + """ + Generate a UUID string in the same format as "uuidgen -t". + + :return: A UUID in the format "00000000-0000-0000-0000-000000000000" (str). + """ + return str(uuid4()) diff --git a/pilot/util/tracereport.py b/pilot/util/tracereport.py index 9c73c6be..d6b25ca5 100644 --- a/pilot/util/tracereport.py +++ b/pilot/util/tracereport.py @@ -18,21 +18,25 @@ # Authors: # - Alexey Anisenkov, alexey.anisenkov@cern.ch, 2017 # - Pavlo Svirin, pavlo.svirin@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-24 import hashlib import os import socket import time + from sys import exc_info from json import dumps, loads from os import environ, getuid from pilot.common.exception import FileHandlingFailure -from pilot.util.auxiliary import correct_none_types +from pilot.util.auxiliary import ( + correct_none_types, + uuidgen_t +) from pilot.util.config import config from pilot.util.constants import get_pilot_version, get_rucio_client_version -from pilot.util.container import execute, execute2 +from pilot.util.container import execute2 from pilot.util.filehandling import append_to_file, write_file from pilot.util.https import request2 @@ -130,10 +134,8 @@ def init(self, job): s = 'ppilot_%s' % job.jobdefinitionid self['uuid'] = hashlib.md5(s.encode('utf-8')).hexdigest() # hash_pilotid, Python 2/3 else: - #self['uuid'] = commands.getoutput('uuidgen -t 2> /dev/null').replace('-', '') # all LFNs of one request have the same uuid - cmd = 'uuidgen -t 2> /dev/null' - exit_code, stdout, stderr = execute(cmd, timeout=10) - self['uuid'] = stdout.replace('-', '') + _uuid = uuidgen_t() # 'uuidgen -t 2> /dev/null' + self['uuid'] = _uuid.replace('-', '') def get_value(self, key): """ From 0204f1b4efca1594131aca6962405f1f13a0221d Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Wed, 16 Oct 2024 16:29:47 +0200 Subject: [PATCH 06/16] Updated oom_score handling and reporting --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/monitoring.py | 28 +++++++++++++++++++--------- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 425ed0a8..8cf995ff 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.1.1 \ No newline at end of file +3.9.1.5 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index f5aae5cb..fd659dcc 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '1' # build number should be reset to '1' for every new development cycle +BUILD = '5' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 3cc621bd..03f48c80 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -30,7 +30,7 @@ from signal import SIGKILL from pilot.common.errorcodes import ErrorCodes -from pilot.common.exception import PilotException, MiddlewareImportFailure +from pilot.common.exception import PilotException, MiddlewareImportFailure #, FileHandlingFailure from pilot.util.auxiliary import set_pilot_state #, show_memory_usage from pilot.util.config import config from pilot.util.constants import PILOT_PRE_PAYLOAD @@ -40,7 +40,7 @@ remove_files, get_local_file_size, read_file, - zip_files + zip_files, write_file ) from pilot.util.loopingjob import looping_job from pilot.util.math import ( @@ -135,7 +135,7 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i if exit_code != 0: return exit_code, diagnostics - # display OOM process info + # display OOM process info (once) display_oom_info(job.pid) # should the pilot abort the payload? @@ -204,20 +204,30 @@ def display_oom_info(payload_pid): :param payload_pid: payload pid (int). """ - + fname = f"/proc/{payload_pid}/oom_score_adj" payload_score = get_score(payload_pid) if payload_pid else 'UNKNOWN' pilot_score = get_score(os.getpid()) - logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score}') + if isinstance(pilot_score, str) and pilot_score == 'UNKNOWN': + logger.warning(f'could not get oom_score for pilot process: {pilot_score}') + else: + relative_payload_score = "1" + + # write the payload oom_score to the oom_score_adj file + try: + write_file(path=fname, contents=relative_payload_score) + except Exception as e: # FileHandlingFailure + logger.warning(f'could not write oom_score to file: {e}') + + logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score} (attempted writing relative score 1 to {fname})') -def get_score(pid): +def get_score(pid) -> str: """ Get the OOM process score. - :param pid: process id (int). - :return: score (string). + :param pid: process id (int) + :return: score (str). """ - try: score = '%s' % read_file('/proc/%d/oom_score' % pid) except Exception as error: From 093188d2fc3c8a3ad96b162e265ab9b8afe805b1 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Wed, 16 Oct 2024 16:47:16 +0200 Subject: [PATCH 07/16] Reverted to older arcproxy code --- PILOTVERSION | 2 +- pilot/user/atlas/proxy.py | 113 +++++++++++++++++++++++++++++++++++++- pilot/util/constants.py | 2 +- 3 files changed, 114 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 8cf995ff..e7cdf6bd 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.1.5 \ No newline at end of file +3.9.1.6 \ No newline at end of file diff --git a/pilot/user/atlas/proxy.py b/pilot/user/atlas/proxy.py index 011dd5af..c85cf3d6 100644 --- a/pilot/user/atlas/proxy.py +++ b/pilot/user/atlas/proxy.py @@ -110,13 +110,124 @@ def verify_proxy(limit: int = None, x509: bool = None, proxy_id: str = "pilot", else: envsetup = '' - return verify_arcproxy(envsetup, limit, proxy_id=proxy_id, test=test) # exit_code, diagnostics + exit_code, diagnostics = verify_arcproxy(envsetup, limit, proxy_id=proxy_id, test=test) + if exit_code != 0 and exit_code != -1: + return exit_code, diagnostics + elif exit_code == -1: + pass # go to next test + else: + return 0, diagnostics + + return 0, diagnostics def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bool = False) -> tuple[int, str]: # noqa: C901 """ Verify the proxy using arcproxy. + :param envsetup: general setup string for proxy commands (string). + :param limit: time limit in hours (int). + :param proxy_id: proxy unique id name. The verification result will be cached for this id. If None the result will not be cached (string) + :return: exit code (int), error diagnostics (string). + """ + exit_code = 0 + diagnostics = "" + proxies = ['cert', 'proxy'] + + if test: + return errors.VOMSPROXYABOUTTOEXPIRE, 'dummy test' + #return errors.NOVOMSPROXY, 'dummy test' + + try: + logger.debug(f'proxy_id={proxy_id}') + logger.debug(f'verify_arcproxy.cache={verify_arcproxy.cache}') + logger.debug(f'verify_arcproxy.cache[proxy_id]={verify_arcproxy.cache[proxy_id]}') + except Exception as exc: + logger.debug(f'exc={exc}') + + if proxy_id is not None: + if not hasattr(verify_arcproxy, "cache"): + verify_arcproxy.cache = {} + + if proxy_id in verify_arcproxy.cache: # if exist, then calculate result from current cache + validity_end_cert = verify_arcproxy.cache[proxy_id][0] + validity_end = verify_arcproxy.cache[proxy_id][1] + if validity_end < 0: # previous validity check failed, do not try to re-check + exit_code = -1 + diagnostics = "arcproxy verification failed (cached result)" + else: + # + validities = [validity_end_cert, validity_end] + for proxyname, validity in list(zip(proxies, validities)): + exit_code, diagnostics = check_time_left(proxyname, validity, limit) + if exit_code == errors.VOMSPROXYABOUTTOEXPIRE: + # remove the proxy_id from the dictionary to trigger a new entry after a new proxy has been downloaded + del verify_arcproxy.cache[proxy_id] + + return exit_code, diagnostics + + # options and options' sequence are important for parsing, do not change it + # -i validityEnd -i validityLeft: time left for the certificate + # -i vomsACvalidityEnd -i vomsACvalidityLeft: time left for the proxy + # validityEnd - timestamp when proxy validity ends. + # validityLeft - duration of proxy validity left in seconds. + # vomsACvalidityEnd - timestamp when VOMS attribute validity ends. + # vomsACvalidityLeft - duration of VOMS attribute validity left in seconds. + cmd = f"{envsetup}arcproxy -i subject" + _exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True) + logger.info(f'subject={stdout}') + + cmd = f"{envsetup}arcproxy -i validityEnd -i validityLeft -i vomsACvalidityEnd -i vomsACvalidityLeft" + _exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True) + if stdout is not None: + if 'command not found' in stdout: + logger.warning(f"arcproxy is not available on this queue," + f"this can lead to memory issues with voms-proxy-info on SL6: {stdout}") + exit_code = -1 + else: + exit_code, diagnostics, validity_end_cert, validity_end = interpret_proxy_info(_exit_code, stdout, stderr, limit) + + if proxy_id and validity_end: # setup cache if requested + if exit_code == 0: + logger.info(f"caching the validity ends from arcproxy: cache[\'{proxy_id}\'] = [{validity_end_cert}, {validity_end}]") + verify_arcproxy.cache[proxy_id] = [validity_end_cert, validity_end] + else: + logger.warning('cannot store validity ends from arcproxy in cache') + verify_arcproxy.cache[proxy_id] = [-1, -1] # -1 in cache means any error in prev validation + if exit_code == 0: + + #if proxy_id in verify_arcproxy.cache: + # logger.debug('getting validity ends from arcproxy cache') + #else: + # logger.debug('using validity ends from arcproxy (cache not available)') + endtimes = [validity_end_cert, validity_end] if not proxy_id else verify_arcproxy.cache[proxy_id] + for proxyname, validity in list(zip(proxies, endtimes)): + exit_code, diagnostics = check_time_left(proxyname, validity, limit) + if exit_code == errors.VOMSPROXYABOUTTOEXPIRE: + # remove the proxy_id from the dictionary to trigger a new entry after a new proxy has been downloaded + if proxy_id: + del verify_arcproxy.cache[proxy_id] + if exit_code == errors.CERTIFICATEHASEXPIRED: + logger.debug('certificate has expired') + break + return exit_code, diagnostics + elif exit_code == -1: # skip to next proxy test + return exit_code, diagnostics + elif exit_code == errors.NOVOMSPROXY: + return exit_code, diagnostics + else: + logger.info("will try voms-proxy-info instead") + exit_code = -1 + else: + logger.warning('command execution failed') + + return exit_code, diagnostics + + +def verify_arcproxy_bad(envsetup: str, limit: int, proxy_id: str = "pilot", test: bool = False) -> tuple[int, str]: # noqa: C901 + """ + Verify the proxy using arcproxy. + :param envsetup: general setup string for proxy commands (str) :param limit: time limit in hours (int) :param proxy_id: proxy unique id name. The verification result will be cached for this id. If None the result will not be cached (str or None) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index fd659dcc..209729a4 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '5' # build number should be reset to '1' for every new development cycle +BUILD = '6' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 0262521c62ca81d30a3d5d1f160627eaf7b67d08 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Thu, 17 Oct 2024 09:06:30 +0200 Subject: [PATCH 08/16] Added warning for missing queuedata object --- PILOTVERSION | 2 +- pilot/control/monitor.py | 5 ++++- pilot/util/constants.py | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index e7cdf6bd..871e421c 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.1.6 \ No newline at end of file +3.9.1.7 \ No newline at end of file diff --git a/pilot/control/monitor.py b/pilot/control/monitor.py index f2110c18..9c92c9ec 100644 --- a/pilot/control/monitor.py +++ b/pilot/control/monitor.py @@ -89,6 +89,9 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901 last_minute_check = t_0 queuedata = get_queuedata_from_job(queues) + if not queuedata: + logger.warning('queuedata could not be extracted from queues') + push = args.harvester and args.harvester_submitmode.lower() == 'push' try: # overall loop counter (ignoring the fact that more than one job may be running) @@ -103,7 +106,7 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901 break # check if the OIDC token needs to be refreshed - if tokendownloadchecktime: + if tokendownloadchecktime and queuedata: if int(time.time() - last_token_check) > tokendownloadchecktime: last_token_check = time.time() if 'no_token_renewal' in queuedata.catchall: diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 209729a4..c76832ac 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '6' # build number should be reset to '1' for every new development cycle +BUILD = '7' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 077b29b35c50653e87af5919e5453accb275bce5 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Thu, 17 Oct 2024 09:12:35 +0200 Subject: [PATCH 09/16] Restored red herring problem with proxy verification --- pilot/user/atlas/proxy.py | 49 ++++++++++----------------------------- 1 file changed, 12 insertions(+), 37 deletions(-) diff --git a/pilot/user/atlas/proxy.py b/pilot/user/atlas/proxy.py index c85cf3d6..84ce981c 100644 --- a/pilot/user/atlas/proxy.py +++ b/pilot/user/atlas/proxy.py @@ -110,25 +110,18 @@ def verify_proxy(limit: int = None, x509: bool = None, proxy_id: str = "pilot", else: envsetup = '' - exit_code, diagnostics = verify_arcproxy(envsetup, limit, proxy_id=proxy_id, test=test) - if exit_code != 0 and exit_code != -1: - return exit_code, diagnostics - elif exit_code == -1: - pass # go to next test - else: - return 0, diagnostics - - return 0, diagnostics + return verify_arcproxy(envsetup, limit, proxy_id=proxy_id, test=test) # exit_code, diagnostics def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bool = False) -> tuple[int, str]: # noqa: C901 """ Verify the proxy using arcproxy. - :param envsetup: general setup string for proxy commands (string). - :param limit: time limit in hours (int). - :param proxy_id: proxy unique id name. The verification result will be cached for this id. If None the result will not be cached (string) - :return: exit code (int), error diagnostics (string). + :param envsetup: general setup string for proxy commands (str) + :param limit: time limit in hours (int) + :param proxy_id: proxy unique id name. The verification result will be cached for this id. If None the result will not be cached (str or None) + :param test: free Boolean test parameter (bool) + :return: exit code (int), error diagnostics (str) (tuple). """ exit_code = 0 diagnostics = "" @@ -136,20 +129,12 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo if test: return errors.VOMSPROXYABOUTTOEXPIRE, 'dummy test' - #return errors.NOVOMSPROXY, 'dummy test' - - try: - logger.debug(f'proxy_id={proxy_id}') - logger.debug(f'verify_arcproxy.cache={verify_arcproxy.cache}') - logger.debug(f'verify_arcproxy.cache[proxy_id]={verify_arcproxy.cache[proxy_id]}') - except Exception as exc: - logger.debug(f'exc={exc}') if proxy_id is not None: if not hasattr(verify_arcproxy, "cache"): verify_arcproxy.cache = {} - if proxy_id in verify_arcproxy.cache: # if exist, then calculate result from current cache + if proxy_id in verify_arcproxy.cache: # if exists, then calculate result from current cache validity_end_cert = verify_arcproxy.cache[proxy_id][0] validity_end = verify_arcproxy.cache[proxy_id][1] if validity_end < 0: # previous validity check failed, do not try to re-check @@ -174,8 +159,7 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo # vomsACvalidityEnd - timestamp when VOMS attribute validity ends. # vomsACvalidityLeft - duration of VOMS attribute validity left in seconds. cmd = f"{envsetup}arcproxy -i subject" - _exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True) - logger.info(f'subject={stdout}') + _exit_code, _, _ = execute(cmd, shell=True) # , usecontainer=True, copytool=True) cmd = f"{envsetup}arcproxy -i validityEnd -i validityLeft -i vomsACvalidityEnd -i vomsACvalidityLeft" _exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True) @@ -195,11 +179,6 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo logger.warning('cannot store validity ends from arcproxy in cache') verify_arcproxy.cache[proxy_id] = [-1, -1] # -1 in cache means any error in prev validation if exit_code == 0: - - #if proxy_id in verify_arcproxy.cache: - # logger.debug('getting validity ends from arcproxy cache') - #else: - # logger.debug('using validity ends from arcproxy (cache not available)') endtimes = [validity_end_cert, validity_end] if not proxy_id else verify_arcproxy.cache[proxy_id] for proxyname, validity in list(zip(proxies, endtimes)): exit_code, diagnostics = check_time_left(proxyname, validity, limit) @@ -210,14 +189,10 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo if exit_code == errors.CERTIFICATEHASEXPIRED: logger.debug('certificate has expired') break - return exit_code, diagnostics - elif exit_code == -1: # skip to next proxy test - return exit_code, diagnostics - elif exit_code == errors.NOVOMSPROXY: - return exit_code, diagnostics - else: - logger.info("will try voms-proxy-info instead") - exit_code = -1 + if exit_code == errors.ARCPROXYLIBFAILURE: + logger.warning("currenly ignoring arcproxy library failure") + exit_code = 0 + diagnostics = "" else: logger.warning('command execution failed') From 39f0ed5e64e3fe30ab53c69bac1ad5147c8755de Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Thu, 17 Oct 2024 09:56:13 +0200 Subject: [PATCH 10/16] Removed test code --- pilot/user/atlas/proxy.py | 93 +++------------------------------------ pilot/util/container.py | 5 ++- pilot/util/monitoring.py | 18 ++++---- 3 files changed, 19 insertions(+), 97 deletions(-) diff --git a/pilot/user/atlas/proxy.py b/pilot/user/atlas/proxy.py index 84ce981c..5d0d7cbe 100644 --- a/pilot/user/atlas/proxy.py +++ b/pilot/user/atlas/proxy.py @@ -31,7 +31,10 @@ # from pilot.user.atlas.setup import get_file_system_root_path from pilot.common.errorcodes import ErrorCodes -from pilot.util.container import execute +from pilot.util.container import ( + execute, + execute_nothreads +) from pilot.util.proxy import get_proxy logger = logging.getLogger(__name__) @@ -162,93 +165,7 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo _exit_code, _, _ = execute(cmd, shell=True) # , usecontainer=True, copytool=True) cmd = f"{envsetup}arcproxy -i validityEnd -i validityLeft -i vomsACvalidityEnd -i vomsACvalidityLeft" - _exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True) - if stdout is not None: - if 'command not found' in stdout: - logger.warning(f"arcproxy is not available on this queue," - f"this can lead to memory issues with voms-proxy-info on SL6: {stdout}") - exit_code = -1 - else: - exit_code, diagnostics, validity_end_cert, validity_end = interpret_proxy_info(_exit_code, stdout, stderr, limit) - - if proxy_id and validity_end: # setup cache if requested - if exit_code == 0: - logger.info(f"caching the validity ends from arcproxy: cache[\'{proxy_id}\'] = [{validity_end_cert}, {validity_end}]") - verify_arcproxy.cache[proxy_id] = [validity_end_cert, validity_end] - else: - logger.warning('cannot store validity ends from arcproxy in cache') - verify_arcproxy.cache[proxy_id] = [-1, -1] # -1 in cache means any error in prev validation - if exit_code == 0: - endtimes = [validity_end_cert, validity_end] if not proxy_id else verify_arcproxy.cache[proxy_id] - for proxyname, validity in list(zip(proxies, endtimes)): - exit_code, diagnostics = check_time_left(proxyname, validity, limit) - if exit_code == errors.VOMSPROXYABOUTTOEXPIRE: - # remove the proxy_id from the dictionary to trigger a new entry after a new proxy has been downloaded - if proxy_id: - del verify_arcproxy.cache[proxy_id] - if exit_code == errors.CERTIFICATEHASEXPIRED: - logger.debug('certificate has expired') - break - if exit_code == errors.ARCPROXYLIBFAILURE: - logger.warning("currenly ignoring arcproxy library failure") - exit_code = 0 - diagnostics = "" - else: - logger.warning('command execution failed') - - return exit_code, diagnostics - - -def verify_arcproxy_bad(envsetup: str, limit: int, proxy_id: str = "pilot", test: bool = False) -> tuple[int, str]: # noqa: C901 - """ - Verify the proxy using arcproxy. - - :param envsetup: general setup string for proxy commands (str) - :param limit: time limit in hours (int) - :param proxy_id: proxy unique id name. The verification result will be cached for this id. If None the result will not be cached (str or None) - :param test: free Boolean test parameter (bool) - :return: exit code (int), error diagnostics (str) (tuple). - """ - exit_code = 0 - diagnostics = "" - proxies = ['cert', 'proxy'] - - if test: - return errors.VOMSPROXYABOUTTOEXPIRE, 'dummy test' - - if proxy_id is not None: - if not hasattr(verify_arcproxy, "cache"): - verify_arcproxy.cache = {} - - if proxy_id in verify_arcproxy.cache: # if exist, then calculate result from current cache - validity_end_cert = verify_arcproxy.cache[proxy_id][0] - validity_end = verify_arcproxy.cache[proxy_id][1] - if validity_end < 0: # previous validity check failed, do not try to re-check - exit_code = -1 - diagnostics = "arcproxy verification failed (cached result)" - else: - # - validities = [validity_end_cert, validity_end] - for proxyname, validity in list(zip(proxies, validities)): - exit_code, diagnostics = check_time_left(proxyname, validity, limit) - if exit_code == errors.VOMSPROXYABOUTTOEXPIRE: - # remove the proxy_id from the dictionary to trigger a new entry after a new proxy has been downloaded - del verify_arcproxy.cache[proxy_id] - - return exit_code, diagnostics - - # options and options' sequence are important for parsing, do not change it - # -i validityEnd -i validityLeft: time left for the certificate - # -i vomsACvalidityEnd -i vomsACvalidityLeft: time left for the proxy - # validityEnd - timestamp when proxy validity ends. - # validityLeft - duration of proxy validity left in seconds. - # vomsACvalidityEnd - timestamp when VOMS attribute validity ends. - # vomsACvalidityLeft - duration of VOMS attribute validity left in seconds. - cmd = f"{envsetup}arcproxy -i subject" - _exit_code, _, _ = execute(cmd, shell=True) # , usecontainer=True, copytool=True) - - cmd = f"{envsetup}arcproxy -i validityEnd -i validityLeft -i vomsACvalidityEnd -i vomsACvalidityLeft" - _exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True) + _exit_code, stdout, stderr = execute_nothreads(cmd, shell=True) # , usecontainer=True, copytool=True) if stdout is not None: if 'command not found' in stdout: logger.warning(f"arcproxy is not available on this queue," diff --git a/pilot/util/container.py b/pilot/util/container.py index 74c109b9..0f55091d 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -263,12 +263,15 @@ def read_output(pipe, output_list): return exit_code, stdout, stderr -def execute_old(executable: Any, **kwargs: dict) -> Any: +def execute_nothreads(executable: Any, **kwargs: dict) -> Any: """ Execute the command with its options in the provided executable list using subprocess time-out handler. The function also determines whether the command should be executed within a container. + This variant of execute() is not using threads to read stdout and stderr. This is required for some use-cases like + executing arcproxy where the stdout is time-ordered. + :param executable: command to be executed (str or list) :param kwargs: kwargs (dict) :return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument). diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 03f48c80..9b7e1d14 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -40,7 +40,8 @@ remove_files, get_local_file_size, read_file, - zip_files, write_file + zip_files, + #write_file ) from pilot.util.loopingjob import looping_job from pilot.util.math import ( @@ -204,21 +205,22 @@ def display_oom_info(payload_pid): :param payload_pid: payload pid (int). """ - fname = f"/proc/{payload_pid}/oom_score_adj" + #fname = f"/proc/{payload_pid}/oom_score_adj" payload_score = get_score(payload_pid) if payload_pid else 'UNKNOWN' pilot_score = get_score(os.getpid()) if isinstance(pilot_score, str) and pilot_score == 'UNKNOWN': logger.warning(f'could not get oom_score for pilot process: {pilot_score}') else: - relative_payload_score = "1" + #relative_payload_score = "1" # write the payload oom_score to the oom_score_adj file - try: - write_file(path=fname, contents=relative_payload_score) - except Exception as e: # FileHandlingFailure - logger.warning(f'could not write oom_score to file: {e}') + #try: + # write_file(path=fname, contents=relative_payload_score) + #except Exception as e: # FileHandlingFailure + # logger.warning(f'could not write oom_score to file: {e}') - logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score} (attempted writing relative score 1 to {fname})') + #logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score} (attempted writing relative score 1 to {fname})') + logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score}') def get_score(pid) -> str: From e4e9696324e14edd1ab32849992ff1755f58d39b Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Thu, 17 Oct 2024 10:20:31 +0200 Subject: [PATCH 11/16] Fixed problem with closed steams in main execute function --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/container.py | 9 ++++----- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 871e421c..19a3590e 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.1.7 \ No newline at end of file +3.9.1.8 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index c76832ac..6feb7097 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '7' # build number should be reset to '1' for every new development cycle +BUILD = '8' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index 0f55091d..d4705c36 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -113,11 +113,10 @@ def read_output(stream, queue): while True: try: line = stream.readline() - except AttributeError: - # Handle the case where stream is None - break - - if not line: + if not line: + break + except (AttributeError, ValueError): + # Handle the case where stream is None (AttributeError) or closed (ValueError) break queue.put(line) From 45a513191b682fdaa17702d5696061eb97dd515e Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Thu, 17 Oct 2024 11:09:51 +0200 Subject: [PATCH 12/16] Now using python native grep function instead of external command --- PILOTVERSION | 2 +- pilot/util/auxiliary.py | 30 ++++++++++++++++++++++++++++-- pilot/util/constants.py | 2 +- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 19a3590e..e99a5523 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.1.8 \ No newline at end of file +3.9.1.9 \ No newline at end of file diff --git a/pilot/util/auxiliary.py b/pilot/util/auxiliary.py index df646abe..7c5e3146 100644 --- a/pilot/util/auxiliary.py +++ b/pilot/util/auxiliary.py @@ -17,7 +17,7 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-24 """Auxiliary functions.""" @@ -45,7 +45,10 @@ ) from pilot.common.errorcodes import ErrorCodes from pilot.util.container import execute -from pilot.util.filehandling import dump +from pilot.util.filehandling import ( + dump, + grep +) zero_depth_bases = (str, bytes, Number, range, bytearray) iteritems = 'items' @@ -484,6 +487,29 @@ def cut_output(txt: str, cutat: int = 1024, separator: str = '\n[...]\n') -> str def has_instruction_sets(instruction_sets: list) -> str: + """ + Determine whether a given CPU instruction set is available. + + The function will use grep to search in /proc/cpuinfo (both in upper and lower case). + Example: instruction_sets = ['AVX', 'AVX2', 'SSE4_2', 'XXX'] -> "AVX|AVX2|SSE4_2" + + :param instruction_sets: instruction set (e.g. AVX2) (list) + :return: string of pipe-separated instruction sets (str). + """ + ret = "" + + for instr in instruction_sets: + pattern = re.compile(fr'{instr.lower()}[^ ]*', re.IGNORECASE) + out = grep(patterns=[pattern], file_name="/proc/cpuinfo") + + for stdout in out: + if instr.upper() not in ret and (instr.lower() in stdout.split() or instr.upper() in stdout.split()): + ret += f'|{instr.upper()}' if ret else instr.upper() + + return ret + + +def has_instruction_sets_old(instruction_sets: list) -> str: """ Determine whether a given list of CPU instruction sets is available. diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 6feb7097..da5cc339 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '8' # build number should be reset to '1' for every new development cycle +BUILD = '9' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 65f543d9c0bf1cb7f61a8a0517fbfb1cc129a32b Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Thu, 17 Oct 2024 11:23:39 +0200 Subject: [PATCH 13/16] cpu_arch execution now using execute_nothreads() --- PILOTVERSION | 2 +- pilot/user/atlas/utilities.py | 7 +++++-- pilot/util/constants.py | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index e99a5523..4452dd1e 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.1.9 \ No newline at end of file +3.9.1.10 \ No newline at end of file diff --git a/pilot/user/atlas/utilities.py b/pilot/user/atlas/utilities.py index 27d91384..3b087d01 100644 --- a/pilot/user/atlas/utilities.py +++ b/pilot/user/atlas/utilities.py @@ -32,7 +32,10 @@ NoSuchFile ) from pilot.info.jobdata import JobData -from pilot.util.container import execute +from pilot.util.container import ( + execute, + execute_nothreads +) from pilot.util.filehandling import ( read_json, copy, @@ -830,7 +833,7 @@ def filter_output(stdout): # CPU arch script has now been copied, time to execute it # (reset irrelevant stderr) - ec, stdout, stderr = execute(cmd) + ec, stdout, stderr = execute_nothreads(cmd) if ec == 0 and ('RHEL9 and clone support is relatively new' in stderr or 'RHEL8 and clones are not supported for users' in stderr): stderr = '' diff --git a/pilot/util/constants.py b/pilot/util/constants.py index da5cc339..54038f27 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '9' # build number should be reset to '1' for every new development cycle +BUILD = '10' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 621d59a264a3ff0262630bd0ba86d6eeffc593d6 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Thu, 17 Oct 2024 11:25:41 +0200 Subject: [PATCH 14/16] voms-proxy-info execution now using execute_nothreads() --- PILOTVERSION | 2 +- pilot/user/atlas/proxy.py | 2 +- pilot/util/constants.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 4452dd1e..86aacf1c 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.1.10 \ No newline at end of file +3.9.1.11 \ No newline at end of file diff --git a/pilot/user/atlas/proxy.py b/pilot/user/atlas/proxy.py index 5d0d7cbe..b6defea9 100644 --- a/pilot/user/atlas/proxy.py +++ b/pilot/user/atlas/proxy.py @@ -246,7 +246,7 @@ def verify_vomsproxy(envsetup: str, limit: int) -> tuple[int, str]: if os.environ.get('X509_USER_PROXY', '') != '': cmd = f"{envsetup}voms-proxy-info -actimeleft --timeleft --file $X509_USER_PROXY" logger.info(f'executing command: {cmd}') - _exit_code, stdout, stderr = execute(cmd, shell=True) + _exit_code, stdout, stderr = execute_nothreads(cmd, shell=True) if stdout is not None: if "command not found" in stdout: logger.info("skipping voms proxy check since command is not available") diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 54038f27..ca926f4f 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '10' # build number should be reset to '1' for every new development cycle +BUILD = '11' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 From 43998c7dc16853628e8392c44c65c5efe2f751d0 Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Thu, 17 Oct 2024 11:34:33 +0200 Subject: [PATCH 15/16] voms-proxy-info execution now using execute_nothreads() --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/proxy.py | 7 +++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 86aacf1c..8116d067 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.1.11 \ No newline at end of file +3.9.1.12 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index ca926f4f..8a8d41ae 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '11' # build number should be reset to '1' for every new development cycle +BUILD = '12' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/proxy.py b/pilot/util/proxy.py index 685d8991..b78196e8 100644 --- a/pilot/util/proxy.py +++ b/pilot/util/proxy.py @@ -26,7 +26,10 @@ from pilot.common.exception import FileHandlingFailure from pilot.util import https from pilot.util.config import config -from pilot.util.container import execute +from pilot.util.container import ( + execute, + execute_nothreads +) from pilot.util.filehandling import write_file logger = logging.getLogger(__name__) @@ -78,7 +81,7 @@ def vomsproxyinfo(options: str = '-all', mute: bool = False, path: str = '') -> executable = f'voms-proxy-info {options}' if path: executable += f' --file={path}' - exit_code, stdout, stderr = execute(executable) + exit_code, stdout, stderr = execute_nothreads(executable) if not mute: logger.info(stdout + stderr) From a3c8f521d50e844d9e71c58a1feec47864301ebd Mon Sep 17 00:00:00 2001 From: PalNilsson Date: Thu, 17 Oct 2024 11:49:51 +0200 Subject: [PATCH 16/16] Added additional exception handling to execute(), to handle "Poll: bad file descriptor reading from request pipe" in stderr (harmless) --- PILOTVERSION | 2 +- pilot/util/constants.py | 2 +- pilot/util/container.py | 14 ++++++++++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/PILOTVERSION b/PILOTVERSION index 8116d067..e3b1f212 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.1.12 \ No newline at end of file +3.9.1.13 \ No newline at end of file diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 8a8d41ae..e90371e1 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -28,7 +28,7 @@ RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '12' # build number should be reset to '1' for every new development cycle +BUILD = '13' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index d4705c36..e5837d14 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -21,6 +21,7 @@ """Functions for executing commands.""" +import errno import os import subprocess import logging @@ -118,6 +119,12 @@ def read_output(stream, queue): except (AttributeError, ValueError): # Handle the case where stream is None (AttributeError) or closed (ValueError) break + except OSError as e: + if e.errno == errno.EBADF: + # Handle the case where the file descriptor is bad + break + else: + raise queue.put(line) @@ -145,8 +152,11 @@ def read_output(stream, queue): exit_code = process.poll() # Wait for the threads to finish reading - stdout_thread.join() - stderr_thread.join() + try: + stdout_thread.join() + stderr_thread.join() + except Exception as e: + logger.warning(f'exception caught in execute: {e}') # Read the remaining output from the queues while not stdout_queue.empty():