From 339d48def728004f483ad70f3f03b035bd164758 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Tue, 16 Jan 2024 18:16:55 +0100 Subject: [PATCH 1/5] Check state of the jobs launched by autocloser --- src/osa/job.py | 27 +++++++++++++++++++++++++++ src/osa/scripts/closer.py | 32 +++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/src/osa/job.py b/src/osa/job.py index 410e3e7c..4688cf54 100644 --- a/src/osa/job.py +++ b/src/osa/job.py @@ -702,6 +702,33 @@ def get_sacct_output(sacct_output: StringIO) -> pd.DataFrame: return sacct_output +def get_closer_sacct_output(sacct_output) -> pd.DataFrame: + """ + Fetch the information of jobs in the queue launched by AUTOCLOSER using the sacct + SLURM output and store it in a pandas dataframe. + + Returns + ------- + queue_list: pd.DataFrame + """ + sacct_output = pd.read_csv(sacct_output, names=FORMAT_SLURM) + + # Keep only the jobs corresponding to AUTOCLOSER sequences + sacct_output = sacct_output[ + (sacct_output["JobName"].str.contains("lstchain")) + | (sacct_output["JobName"].str.contains("provproces")) + ] + + try: + sacct_output["JobID"] = sacct_output["JobID"].apply(lambda x: x.split("_")[0]) + sacct_output["JobID"] = sacct_output["JobID"].str.strip(".batch").astype(int) + + except AttributeError: + log.debug("No job info could be obtained from sacct") + + return sacct_output + + def filter_jobs(job_info: pd.DataFrame, sequence_list: Iterable): """Filter the job info list to get the values of the jobs in the current queue.""" sequences_info = pd.DataFrame([vars(seq) for seq in sequence_list]) diff --git a/src/osa/scripts/closer.py b/src/osa/scripts/closer.py index 2f97bcec..c471d130 100644 --- a/src/osa/scripts/closer.py +++ b/src/osa/scripts/closer.py @@ -8,6 +8,7 @@ import shutil import subprocess import sys +import time from datetime import datetime, timedelta from pathlib import Path from typing import Tuple, Iterable, List @@ -15,7 +16,12 @@ from osa import osadb from osa.configs import options from osa.configs.config import cfg -from osa.job import are_all_jobs_correctly_finished, save_job_information +from osa.job import ( + are_all_jobs_correctly_finished, + save_job_information, + run_sacct, + get_closer_sacct_output +) from osa.nightsummary.extract import extract_runs, extract_sequences from osa.nightsummary.nightsummary import run_summary_table from osa.paths import destination_dir @@ -169,6 +175,20 @@ def post_process(seq_tuple): if not options.no_dl2: merge_files(seq_list, data_level="DL2") + time.sleep(300) + + # Check if all jobs launched by autocloser finished correctly + # before creating the NightFinished.txt file + n_max = 10 + n = 0 + while not all_closer_jobs_finished_correctly() & n <= n_max: + log.info( + "All jobs launched by autocloser did not finished correctly yet. " + "Checking again in 5 minutes..." + ) + time.sleep(300) + n += 1 + if options.seqtoclose is None: database = cfg.get("database", "path") if database: @@ -490,5 +510,15 @@ def daily_datacheck(cmd: List[str]): log.debug("Simulate launching scripts") +def all_closer_jobs_finished_correctly(): + """Check if all the jobs launched by autocloser finished correctly.""" + sacct_output = run_sacct() + jobs_closer = get_closer_sacct_output(sacct_output) + if len(jobs_closer[jobs_closer["State"]!="COMPLETED"])==0: + return True + else: + return False + + if __name__ == "__main__": main() From 62d9920932d238e73a55f85bd34e960544b89ab6 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Wed, 17 Jan 2024 12:53:19 +0100 Subject: [PATCH 2/5] For the moment, do not check the status of muon files merging --- src/osa/job.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/osa/job.py b/src/osa/job.py index 4688cf54..c4174db2 100644 --- a/src/osa/job.py +++ b/src/osa/job.py @@ -713,9 +713,12 @@ def get_closer_sacct_output(sacct_output) -> pd.DataFrame: """ sacct_output = pd.read_csv(sacct_output, names=FORMAT_SLURM) - # Keep only the jobs corresponding to AUTOCLOSER sequences + # Keep only the jobs corresponding to AUTOCLOSER sequences + # Until the merging of muon files is fixed, check all jobs except "lstchain_merge_muon_files" sacct_output = sacct_output[ - (sacct_output["JobName"].str.contains("lstchain")) + (sacct_output["JobName"].str.contains("lstchain_merge_hdf5_files")) + | (sacct_output["JobName"].str.contains("lstchain_check_dl1")) + | (sacct_output["JobName"].str.contains("lstchain_longterm_dl1_check")) | (sacct_output["JobName"].str.contains("provproces")) ] From 9b49c5560a4f85b614f6aadd0dff7f1a200e2ff9 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Wed, 17 Jan 2024 13:09:09 +0100 Subject: [PATCH 3/5] Check status of jobs every 10 min up to 1 hour --- src/osa/scripts/closer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/osa/scripts/closer.py b/src/osa/scripts/closer.py index c471d130..2775bf07 100644 --- a/src/osa/scripts/closer.py +++ b/src/osa/scripts/closer.py @@ -175,18 +175,18 @@ def post_process(seq_tuple): if not options.no_dl2: merge_files(seq_list, data_level="DL2") - time.sleep(300) + time.sleep(600) # Check if all jobs launched by autocloser finished correctly # before creating the NightFinished.txt file - n_max = 10 + n_max = 6 n = 0 while not all_closer_jobs_finished_correctly() & n <= n_max: log.info( "All jobs launched by autocloser did not finished correctly yet. " - "Checking again in 5 minutes..." + "Checking again in 10 minutes..." ) - time.sleep(300) + time.sleep(600) n += 1 if options.seqtoclose is None: From 4551950852e3628ad71494ba4a25eb79aac582e1 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Wed, 17 Jan 2024 14:36:19 +0100 Subject: [PATCH 4/5] Send an email if autocloser does not finish correctly --- src/osa/scripts/closer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/osa/scripts/closer.py b/src/osa/scripts/closer.py index 2775bf07..5f609388 100644 --- a/src/osa/scripts/closer.py +++ b/src/osa/scripts/closer.py @@ -30,6 +30,7 @@ from osa.utils.cliopts import closercliparsing from osa.utils.logging import myLogger from osa.utils.register import register_found_pattern +from osa.utils.mail import send_warning_mail from osa.utils.utils import ( night_finished_flag, is_day_closed, @@ -189,6 +190,10 @@ def post_process(seq_tuple): time.sleep(600) n += 1 + if n > n_max: + send_warning_mail(date=options.date) + return False + if options.seqtoclose is None: database = cfg.get("database", "path") if database: From 228d3155bb9b1decc5cb67892213f1abe450727c Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Mon, 12 Feb 2024 18:17:27 +0100 Subject: [PATCH 5/5] Add lstchain_cherenkov_transparency job to be checked --- src/osa/job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/osa/job.py b/src/osa/job.py index c4174db2..b151a69e 100644 --- a/src/osa/job.py +++ b/src/osa/job.py @@ -719,6 +719,7 @@ def get_closer_sacct_output(sacct_output) -> pd.DataFrame: (sacct_output["JobName"].str.contains("lstchain_merge_hdf5_files")) | (sacct_output["JobName"].str.contains("lstchain_check_dl1")) | (sacct_output["JobName"].str.contains("lstchain_longterm_dl1_check")) + | (sacct_output["JobName"].str.contains("lstchain_cherenkov_transparency")) | (sacct_output["JobName"].str.contains("provproces")) ]