Skip to content
6 changes: 4 additions & 2 deletions lawluigi_configs/KingMaker_law.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ ProduceMultiFriends
ProduceSamples
QuantitiesMap

# [logging]
[logging]
# law: DEBUG
# luigi-interface: DEBUG
# gfal2: DEBUG
# xrootd.stat: DEBUG

[luigi_worker]
keep_alive: True
Expand All @@ -35,4 +37,4 @@ base: root://cmsdcache-kit-disk.gridka.de//store/user/${USER}/CROWN/ntuples/
# base: root://eosuser.cern.ch//eos/user/${USER_FIRST_LETTER}/${USER}/CROWN/ntuples/
use_cache: True
cache_root: /tmp/${USER}/
cache_max_size: 20000
cache_max_size: 20000
5 changes: 5 additions & 0 deletions lawluigi_configs/KingMaker_luigi.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ files_per_task = 10
scopes = mt,et
shifts = None

; Only set this parameter to True if the tarball with the law tasks needs to
; be repacked. This might be needed if task code significantly changed between
; different runs of the workflow, and the tarball is already present in the cache. In this case, setting this parameter to True will force the repacking of the tarball, even if it is already present in the cache.
force_repack_tarball = False

################################################### NOTE #####################################################
# Parameters of tasks that were not explicitly called in the cli will be set through the 'requires' functions. #
# Only parameters that are listed in 'exclude_params_req' are excluded from this. #
Expand Down
12 changes: 9 additions & 3 deletions processor/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,16 @@ class HTCondorWorkflow(Task, law.htcondor.HTCondorWorkflow):
default="source /opt/conda/bin/activate env",
significant=False,
)
force_repack_tarball = luigi.BoolParameter(
default=False,
description="Force repacking and re-uploading of the job tarball, even if it already exists remotely. Use this after updating task code without removing task outputs.",
significant=False,
)

# Use proxy file located in $X509_USER_PROXY or /tmp/x509up_u$(id) if empty
htcondor_user_proxy = law.wlcg.get_vomsproxy_file()

# Do not propagate certain parameters via the ".req()" methode
# Do not propagate certain parameters via the ".req()" method
exclude_set = {
"ENV_NAME",
"htcondor_requirements",
Expand All @@ -344,6 +349,7 @@ class HTCondorWorkflow(Task, law.htcondor.HTCondorWorkflow):
"htcondor_universe",
"htcondor_docker_image",
"additional_files",
"force_repack_tarball",
"workflow",
}
exclude_params_req = (
Expand Down Expand Up @@ -508,7 +514,7 @@ def htcondor_job_config(self, config, job_num, branches):
"processor.tar.gz",
)
)
if not tarball.exists():
if not tarball.exists() or self.force_repack_tarball:
# Make new tarball
# get absolute path to tarball dir
tarball_dir = os.path.abspath(f"tarballs/{self.production_tag}")
Expand All @@ -524,7 +530,7 @@ def htcondor_job_config(self, config, job_num, branches):
)
tarball_local.parent.touch()
# Create tarball containing:
# The processor directory, thhe relevant config files, law
# The processor directory, the relevant config files, law
# and any other files specified in the additional_files parameter
command = [
"tar",
Expand Down
3 changes: 2 additions & 1 deletion processor/tasks/CROWNRun.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def create_branch_map(self):
branchcounter = 0
dataset = ConfigureDatasets.req(self)
# since we use the filelist from the dataset, we need to run it first
dataset.run()
if not dataset.complete():
dataset.run()
datsetinfo = dataset.output()
with datsetinfo.localize("r") as _file:
inputdata = _file.load()
Expand Down
26 changes: 24 additions & 2 deletions processor/tasks/helpers/helpers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,32 @@
from functools import cache
import os
import re
import traceback
import logging
from law.logger import get_logger
from XRootD.client import FileSystem
from XRootD.client.flags import StatInfoFlags


# Get law loggers for this module
logger = get_logger("xrootd.stat")


# Patch FileSystem.stat to trace all XRootD stat calls with their call site.
# Only active when TRACE_XRD_STAT=1 is set at call time.
_original_fs_stat = FileSystem.stat


def _traced_fs_stat(self, path, *args, **kwargs):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"[XRootD STAT] {self.url}{path}")
logger.debug("".join(traceback.format_stack(limit=6)))
return _original_fs_stat(self, path, *args, **kwargs)


FileSystem.stat = _traced_fs_stat


def convert_to_comma_seperated(listobject):
"""
The function converts a list of elements into a comma-separated string.
Expand Down Expand Up @@ -98,10 +120,10 @@ def get_alternate_file_uri(
# Check whether the file fulfills the pattern of a usual XRootD file path.
# If not, just return the file without modifying the path. Otherwise,
# extract the file path without the server address.
m = re.match(r"^(root://[^/]+)/+(.+)$", file)
m = re.match(r"^((root|davs)://[^/]+)/+(.+)$", file)
if m is None:
return file
path = f"/{m.group(2).rstrip('/')}"
path = f"/{m.group(3).rstrip('/')}"

# Cycle through the given XRootD servers and check if the file exists
# there. Return the first one that is found. If no file is found on the
Expand Down
Loading