diff --git a/lawluigi_configs/KingMaker_law.cfg b/lawluigi_configs/KingMaker_law.cfg index ca35ca2..786fe8a 100644 --- a/lawluigi_configs/KingMaker_law.cfg +++ b/lawluigi_configs/KingMaker_law.cfg @@ -14,9 +14,11 @@ ProduceMultiFriends ProduceSamples QuantitiesMap -# [logging] +[logging] # law: DEBUG # luigi-interface: DEBUG +# gfal2: DEBUG +# xrootd.stat: DEBUG [luigi_worker] keep_alive: True @@ -35,4 +37,4 @@ base: root://cmsdcache-kit-disk.gridka.de//store/user/${USER}/CROWN/ntuples/ # base: root://eosuser.cern.ch//eos/user/${USER_FIRST_LETTER}/${USER}/CROWN/ntuples/ use_cache: True cache_root: /tmp/${USER}/ -cache_max_size: 20000 \ No newline at end of file +cache_max_size: 20000 diff --git a/lawluigi_configs/KingMaker_luigi.cfg b/lawluigi_configs/KingMaker_luigi.cfg index 74fc724..38edaac 100644 --- a/lawluigi_configs/KingMaker_luigi.cfg +++ b/lawluigi_configs/KingMaker_luigi.cfg @@ -56,6 +56,11 @@ files_per_task = 10 scopes = mt,et shifts = None +; Only set this parameter to True if the tarball with the law tasks needs to +; be repacked. This might be needed if task code significantly changed between +; different runs of the workflow, and the tarball is already present in the cache. In this case, setting this parameter to True will force the repacking of the tarball, even if it is already present in the cache. +force_repack_tarball = False + ################################################### NOTE ##################################################### # Parameters of tasks that were not explicitly called in the cli will be set through the 'requires' functions. # # Only parameters that are listed in 'exclude_params_req' are excluded from this. # diff --git a/processor/framework.py b/processor/framework.py index 842e437..d320661 100644 --- a/processor/framework.py +++ b/processor/framework.py @@ -327,11 +327,16 @@ class HTCondorWorkflow(Task, law.htcondor.HTCondorWorkflow): default="source /opt/conda/bin/activate env", significant=False, ) + force_repack_tarball = luigi.BoolParameter( + default=False, + description="Force repacking and re-uploading of the job tarball, even if it already exists remotely. Use this after updating task code without removing task outputs.", + significant=False, + ) # Use proxy file located in $X509_USER_PROXY or /tmp/x509up_u$(id) if empty htcondor_user_proxy = law.wlcg.get_vomsproxy_file() - # Do not propagate certain parameters via the ".req()" methode + # Do not propagate certain parameters via the ".req()" method exclude_set = { "ENV_NAME", "htcondor_requirements", @@ -344,6 +349,7 @@ class HTCondorWorkflow(Task, law.htcondor.HTCondorWorkflow): "htcondor_universe", "htcondor_docker_image", "additional_files", + "force_repack_tarball", "workflow", } exclude_params_req = ( @@ -508,7 +514,7 @@ def htcondor_job_config(self, config, job_num, branches): "processor.tar.gz", ) ) - if not tarball.exists(): + if not tarball.exists() or self.force_repack_tarball: # Make new tarball # get absolute path to tarball dir tarball_dir = os.path.abspath(f"tarballs/{self.production_tag}") @@ -524,7 +530,7 @@ def htcondor_job_config(self, config, job_num, branches): ) tarball_local.parent.touch() # Create tarball containing: - # The processor directory, thhe relevant config files, law + # The processor directory, the relevant config files, law # and any other files specified in the additional_files parameter command = [ "tar", diff --git a/processor/tasks/CROWNRun.py b/processor/tasks/CROWNRun.py index 88fda34..89560e5 100644 --- a/processor/tasks/CROWNRun.py +++ b/processor/tasks/CROWNRun.py @@ -37,7 +37,8 @@ def create_branch_map(self): branchcounter = 0 dataset = ConfigureDatasets.req(self) # since we use the filelist from the dataset, we need to run it first - dataset.run() + if not dataset.complete(): + dataset.run() datsetinfo = dataset.output() with datsetinfo.localize("r") as _file: inputdata = _file.load() diff --git a/processor/tasks/helpers/helpers.py b/processor/tasks/helpers/helpers.py index 649f664..0f2ab8e 100644 --- a/processor/tasks/helpers/helpers.py +++ b/processor/tasks/helpers/helpers.py @@ -1,10 +1,32 @@ from functools import cache import os import re +import traceback +import logging +from law.logger import get_logger from XRootD.client import FileSystem from XRootD.client.flags import StatInfoFlags +# Get law loggers for this module +logger = get_logger("xrootd.stat") + + +# Patch FileSystem.stat to trace all XRootD stat calls with their call site. +# Only active when TRACE_XRD_STAT=1 is set at call time. +_original_fs_stat = FileSystem.stat + + +def _traced_fs_stat(self, path, *args, **kwargs): + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"[XRootD STAT] {self.url}{path}") + logger.debug("".join(traceback.format_stack(limit=6))) + return _original_fs_stat(self, path, *args, **kwargs) + + +FileSystem.stat = _traced_fs_stat + + def convert_to_comma_seperated(listobject): """ The function converts a list of elements into a comma-separated string. @@ -98,10 +120,10 @@ def get_alternate_file_uri( # Check whether the file fulfills the pattern of a usual XRootD file path. # If not, just return the file without modifying the path. Otherwise, # extract the file path without the server address. - m = re.match(r"^(root://[^/]+)/+(.+)$", file) + m = re.match(r"^((root|davs)://[^/]+)/+(.+)$", file) if m is None: return file - path = f"/{m.group(2).rstrip('/')}" + path = f"/{m.group(3).rstrip('/')}" # Cycle through the given XRootD servers and check if the file exists # there. Return the first one that is found. If no file is found on the