Skip to content

Commit

Permalink
Merge pull request #151 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.9.1.13
  • Loading branch information
PalNilsson authored Oct 17, 2024
2 parents de0e57c + a2f69f1 commit 41ed500
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 60 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.0.17
3.9.1.13
8 changes: 2 additions & 6 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ def send_state(job: Any, args: Any, state: str, xml: str = "", metadata: str = "
return False


def get_job_status_from_server(job_id: int, url: str, port: str) -> (str, int, int):
def get_job_status_from_server(job_id: int, url: str, port: int) -> (str, int, int):
"""
Return the current status of job <jobId> from the dispatcher.
Expand All @@ -482,7 +482,7 @@ def get_job_status_from_server(job_id: int, url: str, port: str) -> (str, int, i
In the case of time-out, the dispatcher will be asked one more time after 10 s.
:param job_id: PanDA job id (int)
:param url: PanDA server URL (str
:param url: PanDA server URL (int)
:param port: PanDA server port (str)
:return: status (string; e.g. holding), attempt_nr (int), status_code (int).
"""
Expand Down Expand Up @@ -1512,10 +1512,6 @@ def get_dispatcher_dictionary(args: Any, taskid: str = "") -> dict:
if 'HARVESTER_WORKER_ID' in os.environ:
data['worker_id'] = os.environ.get('HARVESTER_WORKER_ID')

# instruction_sets = has_instruction_sets(['AVX', 'AVX2'])
# if instruction_sets:
# data['cpuConsumptionUnit'] = instruction_sets

return data


Expand Down
5 changes: 4 additions & 1 deletion pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
last_minute_check = t_0

queuedata = get_queuedata_from_job(queues)
if not queuedata:
logger.warning('queuedata could not be extracted from queues')

push = args.harvester and args.harvester_submitmode.lower() == 'push'
try:
# overall loop counter (ignoring the fact that more than one job may be running)
Expand All @@ -103,7 +106,7 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
break

# check if the OIDC token needs to be refreshed
if tokendownloadchecktime:
if tokendownloadchecktime and queuedata:
if int(time.time() - last_token_check) > tokendownloadchecktime:
last_token_check = time.time()
if 'no_token_renewal' in queuedata.catchall:
Expand Down
11 changes: 7 additions & 4 deletions pilot/user/atlas/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@

# from pilot.user.atlas.setup import get_file_system_root_path
from pilot.common.errorcodes import ErrorCodes
from pilot.util.container import execute
from pilot.util.container import (
execute,
execute_nothreads
)
from pilot.util.proxy import get_proxy

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -134,7 +137,7 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo
if not hasattr(verify_arcproxy, "cache"):
verify_arcproxy.cache = {}

if proxy_id in verify_arcproxy.cache: # if exist, then calculate result from current cache
if proxy_id in verify_arcproxy.cache: # if exists, then calculate result from current cache
validity_end_cert = verify_arcproxy.cache[proxy_id][0]
validity_end = verify_arcproxy.cache[proxy_id][1]
if validity_end < 0: # previous validity check failed, do not try to re-check
Expand Down Expand Up @@ -162,7 +165,7 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo
_exit_code, _, _ = execute(cmd, shell=True) # , usecontainer=True, copytool=True)

cmd = f"{envsetup}arcproxy -i validityEnd -i validityLeft -i vomsACvalidityEnd -i vomsACvalidityLeft"
_exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True)
_exit_code, stdout, stderr = execute_nothreads(cmd, shell=True) # , usecontainer=True, copytool=True)
if stdout is not None:
if 'command not found' in stdout:
logger.warning(f"arcproxy is not available on this queue,"
Expand Down Expand Up @@ -243,7 +246,7 @@ def verify_vomsproxy(envsetup: str, limit: int) -> tuple[int, str]:
if os.environ.get('X509_USER_PROXY', '') != '':
cmd = f"{envsetup}voms-proxy-info -actimeleft --timeleft --file $X509_USER_PROXY"
logger.info(f'executing command: {cmd}')
_exit_code, stdout, stderr = execute(cmd, shell=True)
_exit_code, stdout, stderr = execute_nothreads(cmd, shell=True)
if stdout is not None:
if "command not found" in stdout:
logger.info("skipping voms proxy check since command is not available")
Expand Down
7 changes: 5 additions & 2 deletions pilot/user/atlas/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
NoSuchFile
)
from pilot.info.jobdata import JobData
from pilot.util.container import execute
from pilot.util.container import (
execute,
execute_nothreads
)
from pilot.util.filehandling import (
read_json,
copy,
Expand Down Expand Up @@ -830,7 +833,7 @@ def filter_output(stdout):

# CPU arch script has now been copied, time to execute it
# (reset irrelevant stderr)
ec, stdout, stderr = execute(cmd)
ec, stdout, stderr = execute_nothreads(cmd)
if ec == 0 and ('RHEL9 and clone support is relatively new' in stderr or
'RHEL8 and clones are not supported for users' in stderr):
stderr = ''
Expand Down
53 changes: 35 additions & 18 deletions pilot/util/auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#
# Authors:
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-23
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-24

"""Auxiliary functions."""

Expand All @@ -33,6 +33,7 @@
from numbers import Number
from time import sleep
from typing import Any
from uuid import uuid4

from pilot.util.constants import (
SUCCESS,
Expand All @@ -44,7 +45,10 @@
)
from pilot.common.errorcodes import ErrorCodes
from pilot.util.container import execute
from pilot.util.filehandling import dump
from pilot.util.filehandling import (
dump,
grep
)

zero_depth_bases = (str, bytes, Number, range, bytearray)
iteritems = 'items'
Expand Down Expand Up @@ -444,7 +448,7 @@ def get_memory_usage(pid: int) -> (int, str, str):
return execute(f'ps aux -q {pid}', timeout=60)


def extract_memory_usage_value(output: str) -> int:
def extract_memory_usage_value(output: str) -> str:
"""
Extract the memory usage value from the ps output (in kB).
Expand Down Expand Up @@ -482,34 +486,38 @@ def cut_output(txt: str, cutat: int = 1024, separator: str = '\n[...]\n') -> str
return txt


def has_instruction_set(instruction_set: str) -> bool:
def has_instruction_sets(instruction_sets: list) -> str:
"""
Determine whether a given CPU instruction set is available.
The function will use grep to search in /proc/cpuinfo (both in upper and lower case).
Example: instruction_sets = ['AVX', 'AVX2', 'SSE4_2', 'XXX'] -> "AVX|AVX2|SSE4_2"
:param instruction_set: instruction set (e.g. AVX2) (str)
:return: True if given instruction set is available, False otherwise (bool).
:param instruction_sets: instruction set (e.g. AVX2) (list)
:return: string of pipe-separated instruction sets (str).
"""
status = False
cmd = fr"grep -o \'{instruction_set.lower()}[^ ]*\|{instruction_set.upper()}[^ ]*\' /proc/cpuinfo"
exit_code, stdout, stderr = execute(cmd)
if not exit_code and not stderr:
if instruction_set.lower() in stdout.split() or instruction_set.upper() in stdout.split():
status = True
ret = ""

return status
for instr in instruction_sets:
pattern = re.compile(fr'{instr.lower()}[^ ]*', re.IGNORECASE)
out = grep(patterns=[pattern], file_name="/proc/cpuinfo")

for stdout in out:
if instr.upper() not in ret and (instr.lower() in stdout.split() or instr.upper() in stdout.split()):
ret += f'|{instr.upper()}' if ret else instr.upper()

def has_instruction_sets(instruction_sets: str) -> bool:
return ret


def has_instruction_sets_old(instruction_sets: list) -> str:
"""
Determine whether a given list of CPU instruction sets is available.
The function will use grep to search in /proc/cpuinfo (both in upper and lower case).
Example: instruction_sets = ['AVX', 'AVX2', 'SSE4_2', 'XXX'] -> "AVX|AVX2|SSE4_2"
:param instruction_sets: instruction set (e.g. AVX2) (str)
:return: True if given instruction set is available, False otherwise (bool).
:param instruction_sets: instruction set (e.g. AVX2) (list)
:return: string of pipe-separated instruction sets (str).
"""
ret = ""
pattern = ""
Expand Down Expand Up @@ -709,8 +717,8 @@ def get_host_name():
else:
try:
host = socket.gethostname()
except socket.herror as exc:
logger.warning(f'failed to get host name: {exc}')
except socket.herror as e:
logger.warning(f'failed to get host name: {e}')
host = 'localhost'
return host.split('.')[0]

Expand Down Expand Up @@ -821,3 +829,12 @@ def is_kubernetes_resource() -> bool:
return True
else:
return False


def uuidgen_t() -> str:
"""
Generate a UUID string in the same format as "uuidgen -t".
:return: A UUID in the format "00000000-0000-0000-0000-000000000000" (str).
"""
return str(uuid4())
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '17' # build number should be reset to '1' for every new development cycle
REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '13' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
28 changes: 20 additions & 8 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

"""Functions for executing commands."""

import errno
import os
import subprocess
import logging
Expand Down Expand Up @@ -113,12 +114,17 @@ def read_output(stream, queue):
while True:
try:
line = stream.readline()
except AttributeError:
# Handle the case where stream is None
break

if not line:
if not line:
break
except (AttributeError, ValueError):
# Handle the case where stream is None (AttributeError) or closed (ValueError)
break
except OSError as e:
if e.errno == errno.EBADF:
# Handle the case where the file descriptor is bad
break
else:
raise

queue.put(line)

Expand Down Expand Up @@ -146,8 +152,11 @@ def read_output(stream, queue):
exit_code = process.poll()

# Wait for the threads to finish reading
stdout_thread.join()
stderr_thread.join()
try:
stdout_thread.join()
stderr_thread.join()
except Exception as e:
logger.warning(f'exception caught in execute: {e}')

# Read the remaining output from the queues
while not stdout_queue.empty():
Expand Down Expand Up @@ -263,12 +272,15 @@ def read_output(pipe, output_list):
return exit_code, stdout, stderr


def execute_old(executable: Any, **kwargs: dict) -> Any:
def execute_nothreads(executable: Any, **kwargs: dict) -> Any:
"""
Execute the command with its options in the provided executable list using subprocess time-out handler.
The function also determines whether the command should be executed within a container.
This variant of execute() is not using threads to read stdout and stderr. This is required for some use-cases like
executing arcproxy where the stdout is time-ordered.
:param executable: command to be executed (str or list)
:param kwargs: kwargs (dict)
:return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument).
Expand Down
26 changes: 26 additions & 0 deletions pilot/util/filehandling.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,32 @@ def grep(patterns: list, file_name: str) -> list:
"""
Search for the patterns in the given list in a file.
Example:
grep(["St9bad_alloc", "FATAL"], "athena_stdout.txt")
-> [list containing the lines below]
CaloTrkMuIdAlg2.sysExecute() ERROR St9bad_alloc
AthAlgSeq.sysExecute() FATAL Standard std::exception is caught
:param patterns: list of regexp patterns (list)
:param file_name: file name (str)
:return: list of matched lines in file (list).
"""
matched_lines = []
compiled_patterns = [re.compile(pattern) for pattern in patterns]

with open(file_name, 'r', encoding='utf-8') as _file:
matched_lines = [
line for line in _file
if any(compiled_pattern.search(line) for compiled_pattern in compiled_patterns)
]

return matched_lines


def grep_old(patterns: list, file_name: str) -> list:
"""
Search for the patterns in the given list in a file.
Example:
grep(["St9bad_alloc", "FATAL"], "athena_stdout.txt")
-> [list containing the lines below]
Expand Down
30 changes: 21 additions & 9 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from signal import SIGKILL

from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import PilotException, MiddlewareImportFailure
from pilot.common.exception import PilotException, MiddlewareImportFailure #, FileHandlingFailure
from pilot.util.auxiliary import set_pilot_state #, show_memory_usage
from pilot.util.config import config
from pilot.util.constants import PILOT_PRE_PAYLOAD
Expand All @@ -40,7 +40,8 @@
remove_files,
get_local_file_size,
read_file,
zip_files
zip_files,
#write_file
)
from pilot.util.loopingjob import looping_job
from pilot.util.math import (
Expand Down Expand Up @@ -135,7 +136,7 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i
if exit_code != 0:
return exit_code, diagnostics

# display OOM process info
# display OOM process info (once)
display_oom_info(job.pid)

# should the pilot abort the payload?
Expand Down Expand Up @@ -204,20 +205,31 @@ def display_oom_info(payload_pid):
:param payload_pid: payload pid (int).
"""

#fname = f"/proc/{payload_pid}/oom_score_adj"
payload_score = get_score(payload_pid) if payload_pid else 'UNKNOWN'
pilot_score = get_score(os.getpid())
logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score}')
if isinstance(pilot_score, str) and pilot_score == 'UNKNOWN':
logger.warning(f'could not get oom_score for pilot process: {pilot_score}')
else:
#relative_payload_score = "1"

# write the payload oom_score to the oom_score_adj file
#try:
# write_file(path=fname, contents=relative_payload_score)
#except Exception as e: # FileHandlingFailure
# logger.warning(f'could not write oom_score to file: {e}')

#logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score} (attempted writing relative score 1 to {fname})')
logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score}')

def get_score(pid):

def get_score(pid) -> str:
"""
Get the OOM process score.
:param pid: process id (int).
:return: score (string).
:param pid: process id (int)
:return: score (str).
"""

try:
score = '%s' % read_file('/proc/%d/oom_score' % pid)
except Exception as error:
Expand Down
Loading

0 comments on commit 41ed500

Please sign in to comment.