Skip to content

Commit

Permalink
Merge pull request #87 from PanDAWMS/next
Browse files Browse the repository at this point in the history
3.6.1.31
  • Loading branch information
PalNilsson authored Jun 27, 2023
2 parents 283157b + 2aff558 commit 23235f3
Show file tree
Hide file tree
Showing 33 changed files with 755 additions and 1,096 deletions.
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.6.0.108
3.6.1.31
117 changes: 65 additions & 52 deletions pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Authors:
# - Mario Lassnig, mario.lassnig@cern.ch, 2016-2017
# - Daniel Drizhuk, d.drizhuk@gmail.com, 2017
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2022
# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2023

from __future__ import print_function # Python 2 (2to3 complains about this)
from __future__ import absolute_import
Expand All @@ -21,17 +21,40 @@
from os import getcwd, chdir, environ
from os.path import exists, join
from shutil import rmtree
from typing import Any

from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import PilotException
from pilot.util.config import config
from pilot.info import infosys
from pilot.util.auxiliary import pilot_version_banner, shell_exit_code
from pilot.util.constants import SUCCESS, FAILURE, ERRNO_NOJOBS, PILOT_START_TIME, PILOT_END_TIME, get_pilot_version, \
SERVER_UPDATE_NOT_DONE, PILOT_MULTIJOB_START_TIME
from pilot.util.filehandling import get_pilot_work_dir, mkdirs, establish_logging
from pilot.util.harvester import is_harvester_mode
from pilot.util.https import get_panda_server, https_setup, send_update
from pilot.util.auxiliary import (
pilot_version_banner,
shell_exit_code,
)
from pilot.util.constants import (
get_pilot_version,
SUCCESS,
FAILURE,
ERRNO_NOJOBS,
PILOT_START_TIME,
PILOT_END_TIME,
SERVER_UPDATE_NOT_DONE,
PILOT_MULTIJOB_START_TIME,
)
from pilot.util.filehandling import (
get_pilot_work_dir,
mkdirs,
)
from pilot.util.harvester import (
is_harvester_mode,
kill_worker,
)
from pilot.util.https import (
get_panda_server,
https_setup,
send_update,
)
from pilot.util.loggingsupport import establish_logging
from pilot.util.timing import add_to_pilot_timing

errors = ErrorCodes()
Expand Down Expand Up @@ -99,7 +122,7 @@ def main():

# set requested workflow
logger.info('pilot arguments: %s', str(args))
workflow = __import__('pilot.workflow.%s' % args.workflow, globals(), locals(), [args.workflow], 0)
workflow = __import__(f'pilot.workflow.{args.workflow}', globals(), locals(), [args.workflow], 0)

# execute workflow
try:
Expand All @@ -115,24 +138,26 @@ def main():
return exitcode


def str2bool(_var):
def str2bool(_var: str):
""" Helper function to convert string to bool """

if isinstance(_var, bool):
return _var
if _var.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif _var.lower() in ('no', 'false', 'f', 'n', '0'):
return False
ret = _var
if isinstance(_var, bool): # does this ever happen?
pass
elif _var.lower() in {'yes', 'true', 't', 'y', '1'}:
ret = True
elif _var.lower() in {'no', 'false', 'f', 'n', '0'}:
ret = False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
return ret


def get_args():
"""
Return the args from the arg parser.
:return: args (arg parser object).
:return: args (arg parser object - type <class 'argparse.Namespace'>).
"""

arg_parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -432,7 +457,7 @@ def create_main_work_dir():
mkdirs(_mainworkdir)
except PilotException as error:
# print to stderr since logging has not been established yet
print('failed to create workdir at %s -- aborting: %s' % (_mainworkdir, error), file=sys.stderr)
print(f'failed to create workdir at {_mainworkdir} -- aborting: {error}', file=sys.stderr)
exitcode = shell_exit_code(error._errorCode)
else:
_mainworkdir = getcwd()
Expand All @@ -448,8 +473,6 @@ def set_environment_variables():
Set environment variables. To be replaced with singleton implementation.
This function sets PILOT_WORK_DIR, PILOT_HOME, PILOT_SITENAME, PILOT_USER and PILOT_VERSION and others.
Note: args and mainworkdir, used in this function, are defined in outer scope.
:return:
"""

# working directory as set with a pilot option (e.g. ..)
Expand All @@ -475,8 +498,8 @@ def set_environment_variables():
environ['PILOT_WRAP_UP'] = 'NORMAL'

# proxy verifications
environ['PILOT_PROXY_VERIFICATION'] = '%s' % args.verify_proxy
environ['PILOT_PAYLOAD_PROXY_VERIFICATION'] = '%s' % args.verify_payload_proxy
environ['PILOT_PROXY_VERIFICATION'] = f'{args.verify_proxy}'
environ['PILOT_PAYLOAD_PROXY_VERIFICATION'] = f'{args.verify_payload_proxy}'

# keep track of the server updates, if any
environ['SERVER_UPDATE'] = SERVER_UPDATE_NOT_DONE
Expand All @@ -485,7 +508,7 @@ def set_environment_variables():
environ['PILOT_RESOURCE_NAME'] = args.hpc_resource

# allow for the possibility of turning off rucio traces
environ['PILOT_USE_RUCIO_TRACES'] = str(args.use_rucio_traces)
environ['PILOT_USE_RUCIO_TRACES'] = f'{args.use_rucio_traces}'

# event service executor type
environ['PILOT_ES_EXECUTOR_TYPE'] = args.executor_type
Expand All @@ -495,9 +518,9 @@ def set_environment_variables():

# keep track of the server urls
environ['PANDA_SERVER_URL'] = get_panda_server(args.url, args.port, update_server=args.update_server)
environ['QUEUEDATA_SERVER_URL'] = '%s' % args.queuedata_url
environ['QUEUEDATA_SERVER_URL'] = f'{args.queuedata_url}'
if args.storagedata_url:
environ['STORAGEDATA_SERVER_URL'] = '%s' % args.storagedata_url
environ['STORAGEDATA_SERVER_URL'] = f'{args.storagedata_url}'


def wrap_up():
Expand All @@ -515,30 +538,29 @@ def wrap_up():
chdir(args.sourcedir)
try:
rmtree(mainworkdir)
except Exception as exc:
logging.warning("failed to remove %s: %s", mainworkdir, exc)
except OSError as exc:
logging.warning(f"failed to remove {mainworkdir}: {exc}")
else:
logging.info("removed %s", mainworkdir)
logging.info(f"removed {mainworkdir}")

# in Harvester mode, create a kill_worker file that will instruct Harvester that the pilot has finished
if args.harvester:
from pilot.util.harvester import kill_worker
kill_worker()

try:
exitcode = trace.pilot['error_code']
except Exception:
except KeyError:
exitcode = trace
else:
logging.info('traces error code: %d', exitcode)
logging.info(f'traces error code: {exitcode}')
if trace.pilot['nr_jobs'] <= 1:
if exitcode != 0:
logging.info('an exit code was already set: %d (will be converted to a standard shell code)', exitcode)
logging.info(f'an exit code was already set: {exitcode} (will be converted to a standard shell code)')
elif trace.pilot['nr_jobs'] > 0:
if trace.pilot['nr_jobs'] == 1:
logging.getLogger(__name__).info('pilot has finished (%d job was processed)', trace.pilot['nr_jobs'])
logging.getLogger(__name__).info("pilot has finished 1 job was processed)")
else:
logging.getLogger(__name__).info('pilot has finished (%d jobs were processed)', trace.pilot['nr_jobs'])
logging.getLogger(__name__).info(f"pilot has finished ({trace.pilot['nr_jobs']} jobs were processed)")
exitcode = SUCCESS
elif trace.pilot['state'] == FAILURE:
logging.critical('pilot workflow failure -- aborting')
Expand All @@ -563,20 +585,16 @@ def get_pilot_source_dir():
"""
Return the pilot source directory.
:return: full path to pilot source directory.
:return: full path to pilot source directory (string).
"""

cwd = getcwd()
if exists(join(join(cwd, 'pilot3'), 'pilot.py')): # in case wrapper has untarred src as pilot3 in init dir
return join(cwd, 'pilot3')
elif exists(join(cwd, 'pilot.py')): # in case pilot gets launched from within the src dir
return cwd
else:
# could throw error here, but logging is not setup yet - fail later
return cwd
cwd = join(cwd, 'pilot3')
return cwd


def send_worker_status(status, queue, url, port, logger, internet_protocol_version):
def send_worker_status(status: str, queue: str, url: str, port: str, logger: Any, internet_protocol_version: str):
"""
Send worker info to the server to let it know that the worker has started
Note: the function can fail, but if it does, it will be ignored.
Expand All @@ -587,7 +605,6 @@ def send_worker_status(status, queue, url, port, logger, internet_protocol_versi
:param port: server port (string).
:param logger: logging object.
:param internet_protocol_version: internet protocol version, IPv4 or IPv6 (string).
:return:
"""

# worker node structure to be sent to the server
Expand All @@ -604,12 +621,9 @@ def send_worker_status(status, queue, url, port, logger, internet_protocol_versi
logger.warning('workerID/harvesterID not known, will not send worker status to server')


def set_lifetime(args):
def set_lifetime():
"""
Update the pilot lifetime if set by an environment variable (PANDAPILOT_LIFETIME) (in seconds).
:param args: pilot args.
:return:
"""

lifetime = os.environ.get('PANDAPILOT_LIFETIME', None)
Expand All @@ -622,9 +636,10 @@ def set_lifetime(args):
args.lifetime = lifetime


def set_redirectall(args):
def set_redirectall():
"""
Set args redirectall field.
Currently not used.
"""

redirectall = os.environ.get('PANDAPILOT_REDIRECTALL', False)
Expand All @@ -638,9 +653,7 @@ def set_redirectall(args):


if __name__ == '__main__':
"""
Main function of pilot module.
"""
# Main function of pilot module.

# get the args from the arg parser
args = get_args()
Expand Down Expand Up @@ -671,7 +684,7 @@ def set_redirectall(args):
if exit_code != 0:
sys.exit(exit_code)

set_lifetime(args)
set_lifetime()

# setup and establish standard logging
establish_logging(debug=args.debug, nopilotlog=args.nopilotlog, redirectstdout=args.redirectstdout)
Expand Down
Loading

0 comments on commit 23235f3

Please sign in to comment.