Skip to content

Commit

Permalink
Merge pull request #149 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.9.0.17
  • Loading branch information
PalNilsson authored Oct 15, 2024
2 parents 3999f8a + f4ec9ee commit de0e57c
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 17 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.8.2.8
3.9.0.17
9 changes: 6 additions & 3 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ def main() -> int:
https_setup(args, get_pilot_version())
args.amq = None

# update the OIDC token if necessary
update_local_oidc_token_info(args.url, args.port)

# let the server know that the worker has started
if args.update_server and args.workerpilotstatusupdate:
send_worker_status(
Expand Down Expand Up @@ -149,6 +146,12 @@ def main() -> int:
logger.fatal(error)
return error.get_error_code()

# update the OIDC token if necessary (after queuedata has been downloaded, since PQ.catchall can contain instruction to prevent token renewal)
if 'no_token_renewal' in infosys.queuedata.catchall:
logger.info("OIDC token will not be renewed by the pilot")
else:
update_local_oidc_token_info(args.url, args.port)

# handle special CRIC variables via params
# internet protocol versions 'IPv4' or 'IPv6' can be set via CRIC PQ.params.internet_protocol_version
# (must be defined per PQ if wanted). The pilot default is IPv6
Expand Down
4 changes: 3 additions & 1 deletion pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,8 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
entry.is_altstaged = True

logger.info('alt stage-out will be applied for remain=%s files (previously failed)', len(remain_files))
f = [entry.lfn for entry in remain_files]
job.piloterrordiags.append(f'Alternative stage-out for {f}')
client.transfer(xdata, activity, **kwargs)

except PilotException as error:
Expand Down Expand Up @@ -1087,7 +1089,7 @@ def generate_fileinfo(job: JobData) -> dict:
'surl': entry.turl
}
if entry.is_altstaged:
dat['ddmendpoint'] = entry.ddmendpoint
dat['endpoint'] = entry.ddmendpoint

fileinfo[entry.lfn] = dat

Expand Down
3 changes: 2 additions & 1 deletion pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3226,7 +3226,8 @@ def send_heartbeat_if_time(job: Any, args: Any, update_time: float) -> int:
# job.completed will anyway be checked in https::send_update()
if job.serverstate not in {'finished', 'failed'}:
logger.info(f'will send heartbeat for job in \'{job.state}\' state')
send_state(job, args, job.state)
logger.info("note: will only send \'running\' state to server to prevent sending any final state too early")
send_state(job, args, "running")
update_time = time.time()
else:
logger.debug('will not send any job update')
Expand Down
5 changes: 4 additions & 1 deletion pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
if tokendownloadchecktime:
if int(time.time() - last_token_check) > tokendownloadchecktime:
last_token_check = time.time()
update_local_oidc_token_info(args.url, args.port)
if 'no_token_renewal' in queuedata.catchall:
logger.info("OIDC token will not be renewed by the pilot")
else:
update_local_oidc_token_info(args.url, args.port)

# abort if kill signal arrived too long time ago, ie loop is stuck
if args.kill_time and int(time.time()) - args.kill_time > MAX_KILL_WAIT_TIME:
Expand Down
6 changes: 3 additions & 3 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@

# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '8' # build number should be reset to '1' for every new development cycle
VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '17' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
221 changes: 219 additions & 2 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#
# Authors:
# - Paul Nilsson, paul.nilsson@cern.ch, 2018-23
# - Paul Nilsson, paul.nilsson@cern.ch, 2018-24

"""Functions for executing commands."""

Expand All @@ -31,6 +31,7 @@
import threading

from os import environ, getcwd, getpgid, kill #, setpgrp, getpgid #setsid
from queue import Queue
from signal import SIGTERM, SIGKILL
from time import sleep
from typing import Any, TextIO
Expand All @@ -46,7 +47,223 @@
execute_lock = threading.Lock()


def execute(executable: Any, **kwargs: dict) -> Any:
def execute(executable: Any, **kwargs: dict) -> Any: # noqa: C901
"""
Executes the command with its options in the provided executable list using subprocess time-out handler.
The function also determines whether the command should be executed within a container.
:param executable: command to be executed (str or list)
:param kwargs: kwargs (dict)
:return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument).
"""
usecontainer = kwargs.get('usecontainer', False)
job = kwargs.get('job')
#shell = kwargs.get("shell", False)
obscure = kwargs.get('obscure', '') # if this string is set, hide it in the log message

# convert executable to string if it is a list
if isinstance(executable, list):
executable = ' '.join(executable)

# switch off pilot controlled containers for user defined containers
if job and job.imagename != "" and "runcontainer" in executable:
usecontainer = False
job.usecontainer = usecontainer

# Import user specific code if necessary (in case the command should be executed in a container)
# Note: the container.wrapper() function must at least be declared
if usecontainer:
executable, diagnostics = containerise_executable(executable, **kwargs)
if not executable:
return None if kwargs.get('returnproc', False) else -1, "", diagnostics

if not kwargs.get('mute', False):
print_executable(executable, obscure=obscure)

# always use a timeout to prevent stdout buffer problem in nodes with lots of cores
timeout = get_timeout(kwargs.get('timeout', None))

exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

# try: intercept exception such as OSError -> report e.g. error.RESOURCEUNAVAILABLE: "Resource temporarily unavailable"
exit_code = 0
stdout = ''
stderr = ''

# Acquire the lock before creating the subprocess
process = None
with execute_lock:
process = subprocess.Popen(exe,
bufsize=-1,
stdout=kwargs.get('stdout', subprocess.PIPE),
stderr=kwargs.get('stderr', subprocess.PIPE),
cwd=kwargs.get('cwd', getcwd()),
preexec_fn=os.setsid, # setpgrp
encoding='utf-8',
errors='replace')
if kwargs.get('returnproc', False):
return process

# Create threads to read stdout and stderr asynchronously
stdout_queue = Queue()
stderr_queue = Queue()

def read_output(stream, queue):
while True:
try:
line = stream.readline()
except AttributeError:
# Handle the case where stream is None
break

if not line:
break

queue.put(line)

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))

stdout_thread.start()
stderr_thread.start()

try:
logger.debug(f'subprocess.communicate() will use timeout {timeout} s')
stdout, stderr = process.communicate(timeout=timeout)
except subprocess.TimeoutExpired as exc:
# make sure that stdout buffer gets flushed - in case of time-out exceptions
# flush_handler(name="stream_handler")
stderr += f'subprocess communicate sent TimeoutExpired: {exc}'
logger.warning(stderr)
exit_code = errors.COMMANDTIMEDOUT
stderr = kill_all(process, stderr)
except Exception as exc:
logger.warning(f'exception caused when executing command: {executable}: {exc}')
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
else:
exit_code = process.poll()

# Wait for the threads to finish reading
stdout_thread.join()
stderr_thread.join()

# Read the remaining output from the queues
while not stdout_queue.empty():
stdout += stdout_queue.get()
while not stderr_queue.empty():
stderr += stderr_queue.get()

# wait for the process to finish
# (not strictly necessary when process.communicate() is used)
try:
# wait for the process to complete with a timeout of 60 seconds
if process:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
# Handle the case where the process did not complete within the timeout
if process:
logger.warning("process did not complete within the timeout of 60s - terminating")
process.terminate()

# remove any added \n
if stdout and stdout.endswith('\n'):
stdout = stdout[:-1]

return exit_code, stdout, stderr


def execute_old2(executable: Any, **kwargs: dict) -> Any: # noqa: C901
usecontainer = kwargs.get('usecontainer', False)
job = kwargs.get('job')
obscure = kwargs.get('obscure', '')

if isinstance(executable, list):
executable = ' '.join(executable)

if job and job.imagename != "" and "runcontainer" in executable:
usecontainer = False
job.usecontainer = usecontainer

if usecontainer:
executable, diagnostics = containerise_executable(executable, **kwargs)
if not executable:
return None if kwargs.get('returnproc', False) else -1, "", diagnostics

if not kwargs.get('mute', False):
print_executable(executable, obscure=obscure)

timeout = get_timeout(kwargs.get('timeout', None))
exe = ['/usr/bin/python'] + executable.split() if kwargs.get('mode', 'bash') == 'python' else ['/bin/bash', '-c', executable]

exit_code = 0
stdout = ''
stderr = ''

def read_output(pipe, output_list):
for line in iter(pipe.readline, ''):
output_list.append(line)
pipe.close()

process = None
with execute_lock:
process = subprocess.Popen(exe,
bufsize=-1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=kwargs.get('cwd', getcwd()),
preexec_fn=os.setsid,
encoding='utf-8',
errors='replace')
if kwargs.get('returnproc', False):
return process

stdout_lines = []
stderr_lines = []

stdout_thread = threading.Thread(target=read_output, args=(process.stdout, stdout_lines))
stderr_thread = threading.Thread(target=read_output, args=(process.stderr, stderr_lines))

stdout_thread.start()
stderr_thread.start()

try:
logger.debug(f'subprocess.communicate() will use timeout {timeout} s')
process.wait(timeout=timeout)
except subprocess.TimeoutExpired as exc:
stderr += f'subprocess communicate sent TimeoutExpired: {exc}'
logger.warning(stderr)
exit_code = errors.COMMANDTIMEDOUT
stderr = kill_all(process, stderr)
except Exception as exc:
logger.warning(f'exception caused when executing command: {executable}: {exc}')
exit_code = errors.UNKNOWNEXCEPTION
stderr = kill_all(process, str(exc))
else:
exit_code = process.poll()

stdout_thread.join()
stderr_thread.join()

stdout = ''.join(stdout_lines)
stderr = ''.join(stderr_lines)

try:
if process:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
if process:
logger.warning("process did not complete within the timeout of 60s - terminating")
process.terminate()

if stdout and stdout.endswith('\n'):
stdout = stdout[:-1]

return exit_code, stdout, stderr


def execute_old(executable: Any, **kwargs: dict) -> Any:
"""
Execute the command with its options in the provided executable list using subprocess time-out handler.
Expand Down
9 changes: 5 additions & 4 deletions pilot/util/https.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ def request2(url: str = "",
# Send the request securely
try:
logger.debug('sending data to server')
with urllib.request.urlopen(req, context=ssl_context, timeout=30) as response:
with urllib.request.urlopen(req, context=ssl_context, timeout=config.Pilot.http_maxtime) as response:
# Handle the response here
logger.debug(f"response.status={response.status}, response.reason={response.reason}")
ret = response.read().decode('utf-8')
Expand All @@ -870,13 +870,14 @@ def request2(url: str = "",
ret = ""
else:
if secure and isinstance(ret, str):
if ret.startswith('{') and ret.endswith('}'):
if ret == 'Succeeded': # this happens for sending modeOn (debug mode)
ret = {'StatusCode': '0'}
elif ret.startswith('{') and ret.endswith('}'):
try:
ret = json.loads(ret)
except json.JSONDecodeError as e:
logger.warning(f'failed to parse response: {e}')
else:
# For panda server interactions, the response should be in dictionary format
else: # response="StatusCode=_some number_"
# Parse the query string into a dictionary
query_dict = parse_qs(ret)

Expand Down
Loading

0 comments on commit de0e57c

Please sign in to comment.