From 4a5988303f8bec00624091027fbea8537ed80f58 Mon Sep 17 00:00:00 2001 From: noeliacastrejon <146071261+noeliacastrejon@users.noreply.github.com> Date: Mon, 5 Feb 2024 11:13:29 +0100 Subject: [PATCH 01/12] Add files via upload --- src/osa/scripts/data_reduction.py | 249 ++++++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 src/osa/scripts/data_reduction.py diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py new file mode 100644 index 00000000..3acc1ef6 --- /dev/null +++ b/src/osa/scripts/data_reduction.py @@ -0,0 +1,249 @@ +"""Script to run the gain selection over a list of dates.""" +import logging +import re +import shutil +import glob +import pandas as pd +import os +import csv +import subprocess as sp +from pathlib import Path +from textwrap import dedent +from io import StringIO + +import click +from astropy.table import Table +from lstchain.paths import run_info_from_filename, parse_r0_filename + +from osa.scripts.reprocessing import get_list_of_dates, check_job_status_and_wait +from osa.utils.utils import wait_for_daytime +from osa.utils.logging import myLogger +from osa.job import get_sacct_output, FORMAT_SLURM + +log = myLogger(logging.getLogger(__name__)) + +PATH = "PATH=/fefs/aswg/lstosa/DVR/offline_data_volume_reduction_v0.2.1/build/:$PATH" +LD_LIBRARY_PATH = "LD_LIBRARY_PATH=/usr/lib64/" + +#PART 1: we generate the .sh pointing out the main indications for SLURM, introducing the instructions of the function to be executed and that calculates the time it takes to execute this job + +def get_sbatch_script( + run_id, log_dir, name_job,i +): + """Build the sbatch job pilot script for running the pixel selection.""" + sbatch_part = dedent( + f"""\ + #!/bin/bash + + #SBATCH -D {log_dir} + #SBATCH --export {PATH},{LD_LIBRARY_PATH} + #SBATCH -p long + """) + if name_job: + sbatch_part+=dedent( + f"""\ + #SBATCH -o "pixel_selection_{run_id:05d}_%j.log" + #SBATCH --job-name "pixel_selection_{run_id:05d}" + """ + ) + else: + sbatch_part += dedent( + f"""\ + #SBATCH -o "pixel_selection_{run_id:05d}_{i}_%j.log" + #SBATCH --job-name "pixel_selection_{run_id:05d}_{i}" + """ + ) + sbatch_part+= dedent( + f"""\ + echo $PATH + echo $LD_LIBRARY_PATH + echo " Hostname : " + /usr/bin/hostname + echo " " + n_subruns=0 + total_time=0 + """) + + return sbatch_part + +def get_sbatch_instruction( + run_id, log_dir, input_file, output_dir, pixelmap_file +): + return dedent( + f"""\ + start=$(/usr/bin/date '+%H:%M:%S') + echo $start + lst_dvr {input_file} {output_dir} {pixelmap_file} + end=$(/usr/bin/date '+%H:%M:%S') + echo $end + subruntime=$(($(/usr/bin/date -d "$end" +%s) - $(/usr/bin/date -d "$start" +%s))) + echo $subruntime + n_subruns=$((n_subruns + 1)) + total_time=$((total_time + subruntime)) + """ + ) +def get_sbatch_time(): + return dedent( + f"""\ + time_aprox=$((total_time / n_subruns)) + echo $time_aprox + """ + ) + +#PART 2:In this function check that pixel_mask exist to write the job (file.sh this is the job_file to launch) + +def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,write_job_file,job_file,i): + #checks if the pixel file exists + new_file = Path(f"{original_dir}/LST-1.1.Run{run_id:05d}.{subrun:04d}.fits.fz") + pixel_file = Path(f"/fefs/aswg/data/real/auxiliary/DataVolumeReduction/PixelMasks/Pixel_selection_LST-1.Run{run_id:05d}.{subrun:04d}.h5") + + if not os.path.exists(pixel_file): + pixel_file = Path(f"/fefs/aswg/data/real/auxiliary/DataVolumeReduction/PixelMasks/recreated/Pixel_selection_LST-1.Run{run_id:05d}.{subrun:04d}.h5") + if not os.path.exists(pixel_file): + all_streams=original_dir.glob(f"LST-1.?.Run{run_id:05d}.{subrun:04d}.fits.fz") + for all_stream in all_streams: + #print(f"Copia este new_file: {all_stream}") + sp.run(["cp", all_stream, output_dir]) + continue # Skip creating instructions for this subrun + else: + if not write_job_file: + write_job_file = True + with open(job_file, "a") as f: + if subrun == first_subrun : # Only write instructions for the first subrun of the run + f.write(get_sbatch_script(run_id, log_dir,name_job,i)) + f.write( + get_sbatch_instruction( + run_id, + log_dir, + new_file, + output_dir, + pixel_file + ) + ) + + + else: + if not write_job_file: + write_job_file = True + with open(job_file, "a") as f: + if subrun == first_subrun: # Only write instructions for the first subrun of the run + f.write(get_sbatch_script(run_id, log_dir,name_job,i)) + f.write( + get_sbatch_instruction( + run_id, + log_dir, + new_file, + output_dir, + pixel_file + ) + ) + + if write_job_file: + with open(job_file, "a") as f: + f.write(get_sbatch_time()) + +#PART 3: In this function apply pixel_selection for files which have got pixel_mask(only data_runs have pixel mask!!!) and copy for files that haven't got it. So for those that are reduced we call the function that writes the job to check if the files exist(drafts_job_file), and write the sh with the 3 functions of part 1. +#def apply_pixel_selection(date: str): +def apply_pixel_selection(date): + """ + Submit the jobs to apply the pixel selection to the data for a given date + on a run-by-run basis. + """ + + run_summary_dir = Path("/fefs/aswg/data/real/monitoring/RunSummary") + run_summary_file = run_summary_dir / f"RunSummary_{date}.ecsv" + summary_table = Table.read(run_summary_file) + # Apply pixel selection only to DATA runs + data_runs = summary_table[summary_table["run_type"] == "DATA"] + output_basedir = Path("/fefs/aswg/data/real/R0V") + output_dir = output_basedir / date + log_dir = output_basedir / "log" / date + output_dir.mkdir(parents=True, exist_ok=True) + log_dir.mkdir(parents=True, exist_ok=True) + original_dir = Path(f"/fefs/aswg/data/real/R0G/{date}") + if not os.path.exists(original_dir): + original_dir= Path (f"/fefs/aswg/data/real/R0/{date}") +# d_run = data_runs[data_runs["run_id"] == run] +# print(d_run) +# for run in d_run: + for run in data_runs: + # Check slurm queue status and sleep for a while to avoid overwhelming the queue + #check_job_status_and_wait(max_jobs=1500) + # Avoid running jobs while it is still night time + #wait_for_daytime(start=12, end=18) + + run_id = run["run_id"] + files = glob.glob(f"{original_dir}/LST-1.?.Run{run_id:05d}.????.fits.fz") + subrun_numbers = [int(file[-12:-8]) for file in files] + run=int(run_id) + n_subruns = max(subrun_numbers) + write_job_file = False + #check the number of subruns, because if it is more than 200 we split the run into many jobs + if n_subruns>=190: + group_size = 100 + i=0 + for start_subrun in range(0, n_subruns+1, group_size): + end_subrun = min(start_subrun + group_size, n_subruns+1) + i=i+1 + + job_file = log_dir / f"dvr_reduction_{run:05d}_{start_subrun}-{end_subrun}.sh" + first_subrun=start_subrun + for subrun in range(start_subrun, end_subrun): + name_job=False + job = drafts_job_file(original_dir, output_dir, log_dir, name_job,first_subrun,run_id, subrun,write_job_file, job_file,i) + + if os.path.exists(job_file): + #print(f"se va a lanzar el siguiente job{job_file}") + sp.run(["sbatch", job_file], check=True) + + else: + job_file_2 = log_dir / f"dvr_reduction_{run:05d}.sh" + first_subrun=0 + i=0 + for subrun in range (n_subruns +1): + name_job=True + job3=drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,write_job_file,job_file_2,i) + + if os.path.exists(job_file_2): + #print(f"se va a lanzar el siguiente job{job_file_2}") + sp.run(["sbatch", job_file_2], check=True) + + #the calibration files won't reduced + calib_runs = summary_table[summary_table["run_type"] != "DATA"] + + for run in calib_runs: + # Avoid copying files while it is still night time + #wait_for_daytime(start=12, end=18) + + run_id = run["run_id"] + r0_files = original_dir.glob(f"LST-1.?.Run{run_id:05d}.????.fits.fz") + + for file in r0_files: + #print(f"copia este archivo: {file}") + sp.run(["cp", file, output_dir]) + +@click.command() +@click.argument("dates-file", type=click.Path(exists=True, path_type=Path)) +def main(dates_file: Path = None): + """ + Loop over the dates listed in the input file and launch the gain selection + script for each of them. The input file should list the dates in the format + YYYYMMDD one date per line. + """ + log.setLevel(logging.INFO) + + list_of_dates = get_list_of_dates(dates_file) +# with open('DVR/PixelSel/time_limit.csv', mode='r', newline='') as file: +# reader = csv.reader(file) +# header = next(reader) +# for row in reader: +# date = row[0] +# run = int(row[1]) + for date in list_of_dates: + print (date) + apply_pixel_selection(date) + log.info("Done! No more dates to process.") + + +if __name__ == "__main__": + main() From e850c397a0d62e4d2babd9104d4efa766469c958 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Mon, 5 Feb 2024 12:15:41 +0100 Subject: [PATCH 02/12] Remove unused packages --- src/osa/scripts/data_reduction.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index 3acc1ef6..5da1f9a1 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -1,24 +1,17 @@ """Script to run the gain selection over a list of dates.""" import logging -import re -import shutil import glob -import pandas as pd import os -import csv import subprocess as sp from pathlib import Path from textwrap import dedent -from io import StringIO import click from astropy.table import Table -from lstchain.paths import run_info_from_filename, parse_r0_filename from osa.scripts.reprocessing import get_list_of_dates, check_job_status_and_wait from osa.utils.utils import wait_for_daytime from osa.utils.logging import myLogger -from osa.job import get_sacct_output, FORMAT_SLURM log = myLogger(logging.getLogger(__name__)) From 100aa9c20095a103479972bb9e7b1e821974d886 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Mon, 5 Feb 2024 12:17:05 +0100 Subject: [PATCH 03/12] Remove unnecessary f-strings --- src/osa/scripts/data_reduction.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index 5da1f9a1..03611f71 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -47,7 +47,7 @@ def get_sbatch_script( """ ) sbatch_part+= dedent( - f"""\ + """\ echo $PATH echo $LD_LIBRARY_PATH echo " Hostname : " @@ -77,7 +77,7 @@ def get_sbatch_instruction( ) def get_sbatch_time(): return dedent( - f"""\ + """\ time_aprox=$((total_time / n_subruns)) echo $time_aprox """ From 64deab22ad762ab147bc0fa7852dc35d9bfcdb39 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Mon, 5 Feb 2024 12:20:00 +0100 Subject: [PATCH 04/12] Use check_job_status_and_wait and wait_for_daytime functions --- src/osa/scripts/data_reduction.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index 03611f71..0a1fbf01 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -161,9 +161,9 @@ def apply_pixel_selection(date): # for run in d_run: for run in data_runs: # Check slurm queue status and sleep for a while to avoid overwhelming the queue - #check_job_status_and_wait(max_jobs=1500) + check_job_status_and_wait(max_jobs=1500) # Avoid running jobs while it is still night time - #wait_for_daytime(start=12, end=18) + wait_for_daytime(start=10, end=18) run_id = run["run_id"] files = glob.glob(f"{original_dir}/LST-1.?.Run{run_id:05d}.????.fits.fz") @@ -206,7 +206,7 @@ def apply_pixel_selection(date): for run in calib_runs: # Avoid copying files while it is still night time - #wait_for_daytime(start=12, end=18) + wait_for_daytime(start=10, end=18) run_id = run["run_id"] r0_files = original_dir.glob(f"LST-1.?.Run{run_id:05d}.????.fits.fz") From 8c4e16aa07cab86e35a0810ddc62fbe725eb638b Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Mon, 5 Feb 2024 12:21:21 +0100 Subject: [PATCH 05/12] Comment unused lines, but check why --- src/osa/scripts/data_reduction.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index 0a1fbf01..753aa970 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -183,7 +183,7 @@ def apply_pixel_selection(date): first_subrun=start_subrun for subrun in range(start_subrun, end_subrun): name_job=False - job = drafts_job_file(original_dir, output_dir, log_dir, name_job,first_subrun,run_id, subrun,write_job_file, job_file,i) + #job = drafts_job_file(original_dir, output_dir, log_dir, name_job,first_subrun,run_id, subrun,write_job_file, job_file,i) if os.path.exists(job_file): #print(f"se va a lanzar el siguiente job{job_file}") @@ -195,7 +195,7 @@ def apply_pixel_selection(date): i=0 for subrun in range (n_subruns +1): name_job=True - job3=drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,write_job_file,job_file_2,i) + #job3=drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,write_job_file,job_file_2,i) if os.path.exists(job_file_2): #print(f"se va a lanzar el siguiente job{job_file_2}") From 9ba952f537f9be91482b7f9e6dac545e33bf7089 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Thu, 15 Feb 2024 17:31:46 +0100 Subject: [PATCH 06/12] Make some simplifications --- src/osa/scripts/data_reduction.py | 49 +++++++++++++++---------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index 753aa970..f24459c6 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -1,4 +1,4 @@ -"""Script to run the gain selection over a list of dates.""" +"""Script to run the data volume reduction over a list of dates.""" import logging import glob import os @@ -18,10 +18,9 @@ PATH = "PATH=/fefs/aswg/lstosa/DVR/offline_data_volume_reduction_v0.2.1/build/:$PATH" LD_LIBRARY_PATH = "LD_LIBRARY_PATH=/usr/lib64/" -#PART 1: we generate the .sh pointing out the main indications for SLURM, introducing the instructions of the function to be executed and that calculates the time it takes to execute this job def get_sbatch_script( - run_id, log_dir, name_job,i + run_id, log_dir, name_job, i ): """Build the sbatch job pilot script for running the pixel selection.""" sbatch_part = dedent( @@ -75,7 +74,9 @@ def get_sbatch_instruction( total_time=$((total_time + subruntime)) """ ) + def get_sbatch_time(): + """Calculate the time it takes to execute the job.""" return dedent( """\ time_aprox=$((total_time / n_subruns)) @@ -83,21 +84,18 @@ def get_sbatch_time(): """ ) -#PART 2:In this function check that pixel_mask exist to write the job (file.sh this is the job_file to launch) - def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,write_job_file,job_file,i): - #checks if the pixel file exists + """Check if the pixel_mask file exists and write the job file to be launched.""" new_file = Path(f"{original_dir}/LST-1.1.Run{run_id:05d}.{subrun:04d}.fits.fz") pixel_file = Path(f"/fefs/aswg/data/real/auxiliary/DataVolumeReduction/PixelMasks/Pixel_selection_LST-1.Run{run_id:05d}.{subrun:04d}.h5") - if not os.path.exists(pixel_file): + if not pixel_file.exists(): pixel_file = Path(f"/fefs/aswg/data/real/auxiliary/DataVolumeReduction/PixelMasks/recreated/Pixel_selection_LST-1.Run{run_id:05d}.{subrun:04d}.h5") - if not os.path.exists(pixel_file): - all_streams=original_dir.glob(f"LST-1.?.Run{run_id:05d}.{subrun:04d}.fits.fz") - for all_stream in all_streams: - #print(f"Copia este new_file: {all_stream}") + if not pixel_file.exists(): + all_streams = original_dir.glob(f"LST-1.?.Run{run_id:05d}.{subrun:04d}.fits.fz") + for stream in all_streams: + log.info(f"Copying file {stream} to {output_dir}") sp.run(["cp", all_stream, output_dir]) - continue # Skip creating instructions for this subrun else: if not write_job_file: write_job_file = True @@ -135,14 +133,12 @@ def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id with open(job_file, "a") as f: f.write(get_sbatch_time()) -#PART 3: In this function apply pixel_selection for files which have got pixel_mask(only data_runs have pixel mask!!!) and copy for files that haven't got it. So for those that are reduced we call the function that writes the job to check if the files exist(drafts_job_file), and write the sh with the 3 functions of part 1. -#def apply_pixel_selection(date: str): def apply_pixel_selection(date): """ Submit the jobs to apply the pixel selection to the data for a given date - on a run-by-run basis. + on a run-by-run basis. Only data runs have pixel mask files, the rest of + the files are directly copied without being reduced. """ - run_summary_dir = Path("/fefs/aswg/data/real/monitoring/RunSummary") run_summary_file = run_summary_dir / f"RunSummary_{date}.ecsv" summary_table = Table.read(run_summary_file) @@ -154,8 +150,8 @@ def apply_pixel_selection(date): output_dir.mkdir(parents=True, exist_ok=True) log_dir.mkdir(parents=True, exist_ok=True) original_dir = Path(f"/fefs/aswg/data/real/R0G/{date}") - if not os.path.exists(original_dir): - original_dir= Path (f"/fefs/aswg/data/real/R0/{date}") + if not original_dir.exists(): + original_dir = Path (f"/fefs/aswg/data/real/R0/{date}") # d_run = data_runs[data_runs["run_id"] == run] # print(d_run) # for run in d_run: @@ -171,7 +167,8 @@ def apply_pixel_selection(date): run=int(run_id) n_subruns = max(subrun_numbers) write_job_file = False - #check the number of subruns, because if it is more than 200 we split the run into many jobs + + # If the number of subruns is above 200, the run is split into multiple jobs if n_subruns>=190: group_size = 100 i=0 @@ -185,8 +182,8 @@ def apply_pixel_selection(date): name_job=False #job = drafts_job_file(original_dir, output_dir, log_dir, name_job,first_subrun,run_id, subrun,write_job_file, job_file,i) - if os.path.exists(job_file): - #print(f"se va a lanzar el siguiente job{job_file}") + if job_file.exists(): + log.info(f"Launching job {job_file}") sp.run(["sbatch", job_file], check=True) else: @@ -197,11 +194,11 @@ def apply_pixel_selection(date): name_job=True #job3=drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,write_job_file,job_file_2,i) - if os.path.exists(job_file_2): - #print(f"se va a lanzar el siguiente job{job_file_2}") + if job_file_2.exists(): + log.info(f"Launching job{job_file_2}") sp.run(["sbatch", job_file_2], check=True) - #the calibration files won't reduced + # Non-data files won't be reduced calib_runs = summary_table[summary_table["run_type"] != "DATA"] for run in calib_runs: @@ -212,14 +209,14 @@ def apply_pixel_selection(date): r0_files = original_dir.glob(f"LST-1.?.Run{run_id:05d}.????.fits.fz") for file in r0_files: - #print(f"copia este archivo: {file}") + log.info(f"Copying {file} to {output_dir}") sp.run(["cp", file, output_dir]) @click.command() @click.argument("dates-file", type=click.Path(exists=True, path_type=Path)) def main(dates_file: Path = None): """ - Loop over the dates listed in the input file and launch the gain selection + Loop over the dates listed in the input file and launch the data reduction script for each of them. The input file should list the dates in the format YYYYMMDD one date per line. """ From cb1055207c11a34e843b048bc655af8eff8e1099 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Thu, 15 Feb 2024 17:39:16 +0100 Subject: [PATCH 07/12] Do not check for files in the recreated directory, since it's not used anymore --- src/osa/scripts/data_reduction.py | 59 +++++++++++-------------------- 1 file changed, 20 insertions(+), 39 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index f24459c6..b7d65da4 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -90,48 +90,29 @@ def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id pixel_file = Path(f"/fefs/aswg/data/real/auxiliary/DataVolumeReduction/PixelMasks/Pixel_selection_LST-1.Run{run_id:05d}.{subrun:04d}.h5") if not pixel_file.exists(): - pixel_file = Path(f"/fefs/aswg/data/real/auxiliary/DataVolumeReduction/PixelMasks/recreated/Pixel_selection_LST-1.Run{run_id:05d}.{subrun:04d}.h5") - if not pixel_file.exists(): - all_streams = original_dir.glob(f"LST-1.?.Run{run_id:05d}.{subrun:04d}.fits.fz") - for stream in all_streams: - log.info(f"Copying file {stream} to {output_dir}") - sp.run(["cp", all_stream, output_dir]) - else: - if not write_job_file: - write_job_file = True - with open(job_file, "a") as f: - if subrun == first_subrun : # Only write instructions for the first subrun of the run - f.write(get_sbatch_script(run_id, log_dir,name_job,i)) - f.write( - get_sbatch_instruction( - run_id, - log_dir, - new_file, - output_dir, - pixel_file - ) - ) - - + all_streams = original_dir.glob(f"LST-1.?.Run{run_id:05d}.{subrun:04d}.fits.fz") + for stream in all_streams: + log.info(f"Copying file {stream} to {output_dir}") + sp.run(["cp", all_stream, output_dir]) else: - if not write_job_file: - write_job_file = True - with open(job_file, "a") as f: - if subrun == first_subrun: # Only write instructions for the first subrun of the run - f.write(get_sbatch_script(run_id, log_dir,name_job,i)) - f.write( - get_sbatch_instruction( - run_id, - log_dir, - new_file, - output_dir, - pixel_file - ) - ) + if not write_job_file: + write_job_file = True + with open(job_file, "a") as f: + if subrun == first_subrun: # Only write instructions for the first subrun of the run + f.write(get_sbatch_script(run_id, log_dir,name_job,i)) + f.write( + get_sbatch_instruction( + run_id, + log_dir, + new_file, + output_dir, + pixel_file + ) + ) if write_job_file: - with open(job_file, "a") as f: - f.write(get_sbatch_time()) + with open(job_file, "a") as f: + f.write(get_sbatch_time()) def apply_pixel_selection(date): """ From c2f1f486f121147ef8d45902e1d6aebd8cc556c3 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Thu, 15 Feb 2024 17:59:09 +0100 Subject: [PATCH 08/12] Simplify drafts_job_file function --- src/osa/scripts/data_reduction.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index b7d65da4..5378af70 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -84,22 +84,22 @@ def get_sbatch_time(): """ ) -def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,write_job_file,job_file,i): +def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,job_file,i,write_job_file=True): """Check if the pixel_mask file exists and write the job file to be launched.""" new_file = Path(f"{original_dir}/LST-1.1.Run{run_id:05d}.{subrun:04d}.fits.fz") - pixel_file = Path(f"/fefs/aswg/data/real/auxiliary/DataVolumeReduction/PixelMasks/Pixel_selection_LST-1.Run{run_id:05d}.{subrun:04d}.h5") + pixel_masks_dir = Path("/fefs/aswg/data/real/auxiliary/DataVolumeReduction/PixelMasks/") + pixel_file = pixel_masks_dir / f"Pixel_selection_LST-1.Run{run_id:05d}.{subrun:04d}.h5" if not pixel_file.exists(): all_streams = original_dir.glob(f"LST-1.?.Run{run_id:05d}.{subrun:04d}.fits.fz") for stream in all_streams: - log.info(f"Copying file {stream} to {output_dir}") - sp.run(["cp", all_stream, output_dir]) + log.info(f"No PixelMask file found for run {run_id:05d}.{subrun}, \ + copying file {stream} to {output_dir}") + sp.run(["cp", all_stream, output_dir]) else: - if not write_job_file: - write_job_file = True with open(job_file, "a") as f: if subrun == first_subrun: # Only write instructions for the first subrun of the run - f.write(get_sbatch_script(run_id, log_dir,name_job,i)) + f.write(get_sbatch_script(run_id, log_dir,name_job, i)) f.write( get_sbatch_instruction( run_id, @@ -109,8 +109,6 @@ def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id pixel_file ) ) - - if write_job_file: with open(job_file, "a") as f: f.write(get_sbatch_time()) From 2498496c14d86c769ffc5bd2eec6a08b73e5223b Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Thu, 15 Feb 2024 18:36:46 +0100 Subject: [PATCH 09/12] Add start and end time as input arguments --- src/osa/scripts/data_reduction.py | 43 +++++++++++++++++-------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index 5378af70..8dddcd8f 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -112,7 +112,7 @@ def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id with open(job_file, "a") as f: f.write(get_sbatch_time()) -def apply_pixel_selection(date): +def apply_pixel_selection(date: str, start: int, end: int): """ Submit the jobs to apply the pixel selection to the data for a given date on a run-by-run basis. Only data runs have pixel mask files, the rest of @@ -123,14 +123,16 @@ def apply_pixel_selection(date): summary_table = Table.read(run_summary_file) # Apply pixel selection only to DATA runs data_runs = summary_table[summary_table["run_type"] == "DATA"] + output_basedir = Path("/fefs/aswg/data/real/R0V") output_dir = output_basedir / date log_dir = output_basedir / "log" / date output_dir.mkdir(parents=True, exist_ok=True) log_dir.mkdir(parents=True, exist_ok=True) original_dir = Path(f"/fefs/aswg/data/real/R0G/{date}") + if not original_dir.exists(): - original_dir = Path (f"/fefs/aswg/data/real/R0/{date}") + original_dir = Path (f"/fefs/aswg/data/real/R0/{date}") # d_run = data_runs[data_runs["run_id"] == run] # print(d_run) # for run in d_run: @@ -138,25 +140,24 @@ def apply_pixel_selection(date): # Check slurm queue status and sleep for a while to avoid overwhelming the queue check_job_status_and_wait(max_jobs=1500) # Avoid running jobs while it is still night time - wait_for_daytime(start=10, end=18) + wait_for_daytime(start, end) run_id = run["run_id"] files = glob.glob(f"{original_dir}/LST-1.?.Run{run_id:05d}.????.fits.fz") subrun_numbers = [int(file[-12:-8]) for file in files] - run=int(run_id) + run = int(run_id) n_subruns = max(subrun_numbers) write_job_file = False - # If the number of subruns is above 200, the run is split into multiple jobs + # If the number of subruns is above 190, the run is split into multiple jobs if n_subruns>=190: group_size = 100 - i=0 + i = 0 for start_subrun in range(0, n_subruns+1, group_size): end_subrun = min(start_subrun + group_size, n_subruns+1) - i=i+1 - + i = i+1 job_file = log_dir / f"dvr_reduction_{run:05d}_{start_subrun}-{end_subrun}.sh" - first_subrun=start_subrun + first_subrun = start_subrun for subrun in range(start_subrun, end_subrun): name_job=False #job = drafts_job_file(original_dir, output_dir, log_dir, name_job,first_subrun,run_id, subrun,write_job_file, job_file,i) @@ -164,25 +165,24 @@ def apply_pixel_selection(date): if job_file.exists(): log.info(f"Launching job {job_file}") sp.run(["sbatch", job_file], check=True) - else: - job_file_2 = log_dir / f"dvr_reduction_{run:05d}.sh" - first_subrun=0 - i=0 + job_file = log_dir / f"dvr_reduction_{run:05d}.sh" + first_subrun = 0 + i = 0 for subrun in range (n_subruns +1): name_job=True #job3=drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,write_job_file,job_file_2,i) - if job_file_2.exists(): - log.info(f"Launching job{job_file_2}") - sp.run(["sbatch", job_file_2], check=True) + if job_file.exists(): + log.info(f"Launching job{job_file}") + sp.run(["sbatch", job_file], check=True) # Non-data files won't be reduced calib_runs = summary_table[summary_table["run_type"] != "DATA"] for run in calib_runs: # Avoid copying files while it is still night time - wait_for_daytime(start=10, end=18) + wait_for_daytime(start, end) run_id = run["run_id"] r0_files = original_dir.glob(f"LST-1.?.Run{run_id:05d}.????.fits.fz") @@ -193,7 +193,9 @@ def apply_pixel_selection(date): @click.command() @click.argument("dates-file", type=click.Path(exists=True, path_type=Path)) -def main(dates_file: Path = None): +@click.option("-s", "--start-time", type=int, default=10, help="Time to (re)start data reduction in HH format.") +@click.option("-e", "--end-time", type=int, default=18, help="Time to stop data reduction in HH format.") +def main(dates_file: Path = None, start_time: int = 10, end_time: int = 18): """ Loop over the dates listed in the input file and launch the data reduction script for each of them. The input file should list the dates in the format @@ -209,8 +211,9 @@ def main(dates_file: Path = None): # date = row[0] # run = int(row[1]) for date in list_of_dates: - print (date) - apply_pixel_selection(date) + log.info(f"Applying pixel selection for date {date}") + apply_pixel_selection(date, start_time, end_time) + log.info("Done! No more dates to process.") From 126da88ff22fe370d83d231b2eee6e57a997b743 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Fri, 16 Feb 2024 10:51:06 +0100 Subject: [PATCH 10/12] Uncomment necessary lines + remove unnecesary variable definition --- src/osa/scripts/data_reduction.py | 36 ++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index 8dddcd8f..65b5fa4a 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -133,9 +133,7 @@ def apply_pixel_selection(date: str, start: int, end: int): if not original_dir.exists(): original_dir = Path (f"/fefs/aswg/data/real/R0/{date}") -# d_run = data_runs[data_runs["run_id"] == run] -# print(d_run) -# for run in d_run: + for run in data_runs: # Check slurm queue status and sleep for a while to avoid overwhelming the queue check_job_status_and_wait(max_jobs=1500) @@ -159,8 +157,19 @@ def apply_pixel_selection(date: str, start: int, end: int): job_file = log_dir / f"dvr_reduction_{run:05d}_{start_subrun}-{end_subrun}.sh" first_subrun = start_subrun for subrun in range(start_subrun, end_subrun): - name_job=False - #job = drafts_job_file(original_dir, output_dir, log_dir, name_job,first_subrun,run_id, subrun,write_job_file, job_file,i) + name_job = False + drafts_job_file( + original_dir, + output_dir, + log_dir, + name_job, + first_subrun, + run_id, + subrun, + write_job_file, + job_file, + i + ) if job_file.exists(): log.info(f"Launching job {job_file}") @@ -169,9 +178,20 @@ def apply_pixel_selection(date: str, start: int, end: int): job_file = log_dir / f"dvr_reduction_{run:05d}.sh" first_subrun = 0 i = 0 - for subrun in range (n_subruns +1): - name_job=True - #job3=drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,write_job_file,job_file_2,i) + for subrun in range(n_subruns+1): + name_job = True + drafts_job_file( + original_dir, + output_dir, + log_dir, + name_job, + first_subrun, + run_id, + subrun, + write_job_file, + job_file, + i + ) if job_file.exists(): log.info(f"Launching job{job_file}") From 32edd45d220a525652289c54d8b0f15f1de94557 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Fri, 16 Feb 2024 10:55:18 +0100 Subject: [PATCH 11/12] Remove unnecessary argument (write_job_file) --- src/osa/scripts/data_reduction.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index 65b5fa4a..5a2393cc 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -1,7 +1,6 @@ """Script to run the data volume reduction over a list of dates.""" import logging import glob -import os import subprocess as sp from pathlib import Path from textwrap import dedent @@ -84,7 +83,7 @@ def get_sbatch_time(): """ ) -def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,job_file,i,write_job_file=True): +def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id,subrun,job_file,i): """Check if the pixel_mask file exists and write the job file to be launched.""" new_file = Path(f"{original_dir}/LST-1.1.Run{run_id:05d}.{subrun:04d}.fits.fz") pixel_masks_dir = Path("/fefs/aswg/data/real/auxiliary/DataVolumeReduction/PixelMasks/") @@ -95,7 +94,7 @@ def drafts_job_file(original_dir,output_dir,log_dir,name_job,first_subrun,run_id for stream in all_streams: log.info(f"No PixelMask file found for run {run_id:05d}.{subrun}, \ copying file {stream} to {output_dir}") - sp.run(["cp", all_stream, output_dir]) + sp.run(["cp", stream, output_dir]) else: with open(job_file, "a") as f: if subrun == first_subrun: # Only write instructions for the first subrun of the run @@ -145,7 +144,6 @@ def apply_pixel_selection(date: str, start: int, end: int): subrun_numbers = [int(file[-12:-8]) for file in files] run = int(run_id) n_subruns = max(subrun_numbers) - write_job_file = False # If the number of subruns is above 190, the run is split into multiple jobs if n_subruns>=190: @@ -166,7 +164,6 @@ def apply_pixel_selection(date: str, start: int, end: int): first_subrun, run_id, subrun, - write_job_file, job_file, i ) @@ -188,7 +185,6 @@ def apply_pixel_selection(date: str, start: int, end: int): first_subrun, run_id, subrun, - write_job_file, job_file, i ) From 3b9534dc8347434f97fb02f40d665f3e7c621d51 Mon Sep 17 00:00:00 2001 From: Maria Lainez <98marialainez@gmail.com> Date: Fri, 16 Feb 2024 10:59:38 +0100 Subject: [PATCH 12/12] Remove unnecessary lines --- src/osa/scripts/data_reduction.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/osa/scripts/data_reduction.py b/src/osa/scripts/data_reduction.py index 5a2393cc..8d328d5d 100644 --- a/src/osa/scripts/data_reduction.py +++ b/src/osa/scripts/data_reduction.py @@ -220,12 +220,6 @@ def main(dates_file: Path = None, start_time: int = 10, end_time: int = 18): log.setLevel(logging.INFO) list_of_dates = get_list_of_dates(dates_file) -# with open('DVR/PixelSel/time_limit.csv', mode='r', newline='') as file: -# reader = csv.reader(file) -# header = next(reader) -# for row in reader: -# date = row[0] -# run = int(row[1]) for date in list_of_dates: log.info(f"Applying pixel selection for date {date}") apply_pixel_selection(date, start_time, end_time)