From 7a2b080700f96bc1c9cba245d46b717428b7fe4b Mon Sep 17 00:00:00 2001 From: Jeremy Hooke Date: Thu, 19 Sep 2024 10:42:35 +1000 Subject: [PATCH] Add id-based bulk reprocess command --- scene_select/bulk_process.py | 475 ++++++++++++++++++++++------------- scene_select/collections.py | 14 +- scene_select/library.py | 20 +- 3 files changed, 335 insertions(+), 174 deletions(-) diff --git a/scene_select/bulk_process.py b/scene_select/bulk_process.py index 774bea25..027ec992 100644 --- a/scene_select/bulk_process.py +++ b/scene_select/bulk_process.py @@ -7,6 +7,82 @@ (It is intended to have good defaults, so you don't need to remember our AOI, where our Level 1 lives on NCI, or where logs should go, etc... ) + +""" + +import os +import shlex +import stat +import sys +from collections import defaultdict +from datetime import datetime +from itertools import chain +from pathlib import Path +from textwrap import dedent +from typing import List, Dict + +import click +import structlog +from attr import define, field +from datacube import Datacube +from datacube.index.abstract import AbstractIndex +from datacube.model import Range, Dataset +from datacube.ui import click as ui +from packaging import version + +from scene_select.collections import get_collection, get_product +from scene_select.do_ard import calc_node_with_defaults +from scene_select.library import Level1Dataset, ArdProduct, ArdCollection, ArdDataset +from scene_select.scene_filters import parse_expressions, GreaterThan, LessThan +from scene_select.utils import structlog_setup + +DEFAULT_WORK_DIR = Path("/g/data/v10/work/bulk-runs") + + +def expression_parse(ctx, param, value): + return parse_expressions(*list(value)) + + +@define(unsafe_hash=True) +class Job: + level1: Level1Dataset = field(eq=True, hash=False) + replacement_uuids: List[str] = field(eq=False, hash=False) + target_ard_product: ArdProduct = field(eq=False, hash=False) + +@click.group("ard-bulk-process", help=__doc__) +@ui.environment_option +@ui.config_option +@click.option( + "--max-count", default=sys.maxsize, help="Maximum number of scenes to process" +) +@click.option( + "--work-dir", + type=Path, + default=DEFAULT_WORK_DIR, + help="Base folder for working files (will create subfolders for each job)", +) +@click.option( + "--pkg-dir", + type=Path, + default=None, + help="Output package base path (default: work-dir/pkg)", +) +@click.option("--workers-per-node", type=int, default=48, help="Workers per node") +@ui.pass_index(app_name="bulk-reprocess") +@click.pass_context +def cli(ctx, index: AbstractIndex, + max_count: int, + workers_per_node: int, + work_dir: Path, + pkg_dir: Path): + ctx.ensure_object(dict) + ctx.obj['index'] = index + ctx.obj['max_count'] = max_count + ctx.obj['workers_per_node'] = workers_per_node + ctx.obj['work_dir'] = work_dir + ctx.obj['pkg_dir'] = pkg_dir + +@cli.command("search-ards", help=""" This will search, filter the scenes, and create the ard_pbs script. You can then run the result if it looks good. The first argument is which "collections" to process. This can be "ls", "s2", or more specific matches @@ -59,202 +135,265 @@ ard-bulk-reprocess s2 --max-count 500 2> bulk-reprocess.jsonl -""" - -import os -import shlex -import stat -import sys -from pathlib import Path -from textwrap import dedent -from typing import List, Dict - -import click -import structlog -from datacube import Datacube -from datacube.index.abstract import AbstractIndex -from datacube.model import Range -from datacube.ui import click as ui - -from scene_select.do_ard import calc_node_with_defaults -from scene_select.collections import get_collection -from packaging import version - -from scene_select.library import Level1Dataset -from scene_select.scene_filters import parse_expressions, GreaterThan, LessThan -from scene_select.utils import structlog_setup - -DEFAULT_WORK_DIR = Path("/g/data/v10/work/bulk-runs") - - -def expression_parse(ctx, param, value): - return parse_expressions(*list(value)) - - -@click.command("ard-bulk-reprocess", help=__doc__) -@ui.environment_option -@ui.config_option +""") @click.argument("prefix") @click.argument("expressions", callback=expression_parse, nargs=-1) -@click.option( - "--max-count", default=sys.maxsize, help="Maximum number of scenes to process" -) -@click.option( - "--work-dir", - type=Path, - default=DEFAULT_WORK_DIR, - help="Base folder for working files (will create subfolders for each job)", -) -@click.option( - "--pkg-dir", - type=Path, - default=None, - help="Output package base path (default: work-dir/pkg)", -) -@click.option("--workers-per-node", type=int, default=48, help="Workers per node") -@click.command("ard-bulk-reprocess", help=__doc__) -@ui.environment_option -@ui.config_option -@click.argument("prefix") -@ui.pass_index(app_name="bulk-reprocess") -def cli( - index: AbstractIndex, prefix: str, max_count: int, workers_per_node:int, work_dir: Path, pkg_dir: Path, expressions: dict -): +@click.pass_context +def cli_search(ctx, prefix: str, expressions: dict): import wagl - current_wagl_version = wagl.__version__ + structlog_setup() - software_expressions = pop_software_expressions(expressions) - + log = structlog.get_logger() if len(prefix) < 2: raise ValueError( "Sorry, prefix needs to be at least ls or s2, as we can't process multiple sensors in one go (yet)" ) - platform = prefix[:2] - if platform not in ("ls", "s2"): - raise ValueError( - f"Expected collection to begin with either ls or s2, got {prefix}" - ) - environment_file = f"/g/data/v10/work/landsat_downloads/landsat-downloader/config/dass-prod-wagl-{platform}.env" - structlog_setup() - - log = structlog.get_logger() + index = ctx.obj['index'] + max_count = ctx.obj['max_count'] + work_dir = ctx.obj['work_dir'] + workers_per_node = ctx.obj['workers_per_node'] + pkg_dir = ctx.obj['pkg_dir'] with Datacube(index=index) as dc: collection = get_collection(dc, prefix) + jobs = find_jobs_by_odc_ard_search(dc, collection, expressions, max_count, log) - log.info("chosen_products", products=[c.name for c in collection.products]) - # Filter to our set of ARD products. - - # The level1s to process, and the ids of datasets that will be replaced by them. - level1s_to_process: Dict[Level1Dataset, List[str]] = {} - unique_products = set() + platform = prefix[:2] + create_pbs_jobs( + jobs=jobs, + platform=platform, + work_dir=work_dir, + workers_per_node=workers_per_node, + pkg_dir=pkg_dir, + log=log, + ) + + +@cli.command("ard-ids", help='Specify a list of ARD UUIDs to reprocess') +# Via a text file +@click.option("-f", "--ids-file", type=click.File("r"), help="File containing UUIDs to process") +@click.argument("ids", nargs=-1) +@click.pass_context +def cli_search(ctx, ids_file, ids: List[str]): + structlog_setup() - for ard_product, ard_dataset in collection.iterate_indexed_ard_datasets( - expressions - ): - ilog = log.bind(dataset_id=ard_dataset.dataset_id) - if not ard_dataset.metadata_path.exists(): - ilog.warning("dataset_missing_from_disk") - continue + all_ids = list(ids) + for id_line in ids_file.readlines(): + all_ids.append(id_line.strip()) - if software_expressions: - if not matches_software_expressions( - ard_dataset.software_versions(), software_expressions, log=ilog - ): - continue - - level1 = dc.index.datasets.get(ard_dataset.level1_id) - if level1 is None: - ilog.warning( - "skip.source_level1_not_indexed", dataset_id=ard_dataset.dataset_id - ) - # TODO: Perhaps a newer one exists? Or on disk? - continue + log = structlog.get_logger() - # TODO: Does a newer Level 1 exist? We'd rather use that. - level1_product = [ - s for s in ard_product.sources if s.name == level1.product.name - ][0] + index = ctx.obj['index'] + max_count = ctx.obj['max_count'] + work_dir = ctx.obj['work_dir'] + workers_per_node = ctx.obj['workers_per_node'] + pkg_dir = ctx.obj['pkg_dir'] + platform = None - level1_dataset = Level1Dataset.from_odc(level1, level1_product) - level1s_to_process.setdefault(level1_dataset, []).append( - str(ard_dataset.dataset_id) + with Datacube(index=index) as dc: + jobs = [] + for ard_id in all_ids: + odc_ard = dc.index.datasets.get(ard_id, include_sources=True) + ard_dataset = ArdDataset.from_odc(odc_ard) + ard_product = get_product(odc_ard.product.name) + + this_platform = get_platform(odc_ard) + if platform and (this_platform != platform): + raise ValueError(f"All IDs should be of the same platform: {platform} != {this_platform}") + platform = this_platform + + odc_level1:Dataset = odc_ard.sources['level1'] + [level1_product] = [s for s in ard_product.sources if s.name == odc_level1.product.name] + + level1_dataset = Level1Dataset.from_odc(odc_level1, level1_product) + jobs.append( + Job(level1_dataset, [ard_dataset.dataset_id], ard_product) ) - unique_products.add(ard_product) - if len(level1s_to_process) >= max_count: - log.info("reached_max_dataset_count", max_count=max_count) - break + create_pbs_jobs( + jobs=jobs, + platform=platform, + work_dir=work_dir, + workers_per_node=workers_per_node, + pkg_dir=pkg_dir, + log=log, + ) + + +def get_platform(odc_ard): + if odc_ard.metadata.platform.startswith('s'): + ourplatform = 's2' + elif odc_ard.metadata.platform.startswith('l'): + ourplatform = 'ls' + else: + raise ValueError(f"Unknown platform: {odc_ard.metadata.platform}") + return ourplatform + + +def merge_duplicate_level1s(jobs:List[Job]) -> List[Job]: + jobs_by_level1_id :Dict[str, List[Job]]= defaultdict(list) + for job in jobs: + jobs_by_level1_id[job.level1.dataset_id].append(job) + + for level1_id, job_list in jobs_by_level1_id.items(): + first, *remaining = job_list + if not remaining: + yield first + else: + yield Job( + level1=first.level1, + replacement_uuids=list(chain(*[j.replacement_uuids for j in job_list])), + target_ard_product=first.target_ard_product, + ) - if not level1s_to_process: - log.info("no_datasets_to_process") - return +def create_pbs_jobs( + jobs: List[Job], + platform:str, + work_dir: Path, + workers_per_node: int, + pkg_dir: Path, + log, +): + if not jobs: + log.info("no_datasets_to_process") + return - from datetime import datetime + # Check for any duplicate level1s, and join the jobs together (just their uuids to archive) + jobs = list(merge_duplicate_level1s(jobs)) - job_directory = ( - work_dir / platform / datetime.now().strftime("%Y-%m/%Y-%m-%d-%H%M%S") - ) - job_directory = job_directory.resolve() - job_directory.mkdir(parents=True, exist_ok=False) - scene_list_path = job_directory / "scene-level1-path-list.txt" - scene_archive_path = job_directory / "scene-archive-list.csv" - - # Write file of level1s to process. - with scene_list_path.open("w") as fid: - for level1 in level1s_to_process.keys(): - fid.write(str(level1.data_path) + "\n") - - # And a list of ARDs to archive (folder and uuid)? - with scene_archive_path.open("w") as fid: - for level1_dataset, uuids_to_archive in level1s_to_process.items(): - fid.write(f'{level1_dataset.data_path},{",".join(uuids_to_archive)}\n') - - dirs = dict( - workdir=job_directory / "run", - pkgdir=pkg_dir or job_directory / "pkg", - # ard-pbs creates subfolders for each batch log anyway. - logdir=job_directory, + if platform not in ("ls", "s2"): + raise ValueError( + f"Expected collection to begin with either ls or s2, got {platform}" ) - for _, dir_path in dirs.items(): - dir_path.mkdir(parents=True, exist_ok=True) - - if level1_product.separate_metadata_directory: - dirs["yamls-dir"] = level1_product.separate_metadata_directory - ard_args = dict( - project="v10", - walltime="10:00:00", - env=environment_file, - nodes=None, - workers=workers_per_node, - **dirs, + + environment_file = f"/g/data/v10/work/landsat_downloads/landsat-downloader/config/dass-prod-wagl-{platform}.env" + + job_directory = ( + work_dir / platform / datetime.now().strftime("%Y-%m/%Y-%m-%d-%H%M%S") + ) + job_directory = job_directory.resolve() + job_directory.mkdir(parents=True, exist_ok=False) + scene_list_path = job_directory / "scene-level1-path-list.txt" + scene_archive_path = job_directory / "scene-archive-list.csv" + + # Write file of level1s to process. + with scene_list_path.open("w") as fid: + for level1_job in jobs: + fid.write(str(level1_job.level1.data_path) + "\n") + + # And a list of ARDs to archive (folder and uuid)? + with scene_archive_path.open("w") as fid: + for job in jobs: + fid.write(f'{job.level1.data_path},{",".join(job.replacement_uuids)}\n') + + dirs = dict( + workdir=job_directory / "run", + pkgdir=pkg_dir or job_directory / "pkg", + # ard-pbs creates subfolders for each batch log anyway. + logdir=job_directory, + ) + for _, dir_path in dirs.items(): + dir_path.mkdir(parents=True, exist_ok=True) + + level1_products = list(set(l1.level1.product for l1 in jobs)) + log.info("level1_products", level1_products=[l.name for l in level1_products]) + separate_metadata_dirs = set( + product.separate_metadata_directory for product in level1_products + ) + if len(separate_metadata_dirs) > 1: + raise ValueError( + f"Level1s have inconsistent separate metadata directory settings: {[p.name for p in level1_products]}" ) - calc_node_with_defaults(ard_args, len(level1s_to_process)) - pbs = dedent(f""" - #!/bin/bash - module purge - module load pbs + if any(separate_metadata_dirs): + [dirs["yamls-dir"]] = separate_metadata_dirs + log.info("Using separate metadata directory", directory=dirs["yamls-dir"]) + + ard_args = dict( + project="v10", + walltime="10:00:00", + env=environment_file, + nodes=None, + workers=workers_per_node, + **dirs, + ) + calc_node_with_defaults(ard_args, len(jobs)) + pbs = dedent( + f""" + #!/bin/bash + + module purge + module load pbs + + source {environment_file} + + ard_pbs --level1-list {scene_list_path} {dict_to_cli_args(ard_args, multiline_indent=16)} + """ + ).lstrip() - source {environment_file} + script_path = job_directory / "run_ard_pbs.sh" + with open(script_path, "w") as src: + src.write(pbs) + os.chmod(script_path, os.stat(script_path).st_mode | stat.S_IEXEC) - ard_pbs --level1-list {scene_list_path} {dict_to_cli_args(ard_args, multiline_indent=16)} - """).lstrip() + log.info( + "created_job", + dataset_count=len(jobs), + job_directory=str(job_directory), + script_path=str(script_path), + ) - script_path = job_directory / "run_ard_pbs.sh" - with open(script_path, "w") as src: - src.write(pbs) - os.chmod(script_path, os.stat(script_path).st_mode | stat.S_IEXEC) - log.info( - "created_job", - dataset_count=len(level1s_to_process), - products=len(unique_products), - job_directory=str(job_directory), - script_path=str(script_path), - ) +def find_jobs_by_odc_ard_search( + dc: Datacube, collection: ArdCollection, expressions: dict, max_count: int, log +) -> List[Job]: + software_expressions = pop_software_expressions(expressions) + + # The level1s to process, and the ids of datasets that will be replaced by them. + level1s_to_process: List[Job] = [] + + log.info("chosen_products", products=[c.name for c in collection.products]) + # Filter to our set of ARD products. + for ard_product, ard_dataset in collection.iterate_indexed_ard_datasets( + expressions + ): + ilog = log.bind(dataset_id=ard_dataset.dataset_id) + if not ard_dataset.metadata_path.exists(): + ilog.warning("dataset_missing_from_disk") + continue + + if software_expressions: + if not matches_software_expressions( + ard_dataset.software_versions(), software_expressions, log=ilog + ): + continue + + level1 = dc.index.datasets.get(ard_dataset.level1_id) + if level1 is None: + ilog.warning( + "skip.source_level1_not_indexed", dataset_id=ard_dataset.dataset_id + ) + # TODO: Perhaps a newer one exists? Or on disk? + continue + + # TODO: Does a newer Level 1 exist? We'd rather use that. + level1_product = [ + s for s in ard_product.sources if s.name == level1.product.name + ][0] + level1_dataset = Level1Dataset.from_odc(level1, level1_product) + level1s_to_process.append(Job( + level1=level1_dataset, + replacement_uuids=[ard_dataset.dataset_id], + target_ard_product=ard_product, + )) + + if len(level1s_to_process) >= max_count: + log.info("reached_max_dataset_count", max_count=max_count) + break + + return level1s_to_process def dict_to_cli_args(args: dict, multiline_indent=None) -> str: @@ -283,12 +422,14 @@ def dict_to_cli_args(args: dict, multiline_indent=None) -> str: ard_arg_string = " ".join(ard_params) return ard_arg_string + def pop_software_expressions(expressions: dict) -> dict: """ Any key ending in `_version` is removed from the expressions and returned as a separate dict. """ return {k: expressions.pop(k) for k in list(expressions) if k.endswith("_version")} + def matches_software_expressions( software_versions: dict, software_expressions: dict, log ) -> bool: @@ -353,7 +494,5 @@ def matches_software_expressions( return True - - if __name__ == "__main__": cli() diff --git a/scene_select/collections.py b/scene_select/collections.py index 4475a00c..c8db7755 100644 --- a/scene_select/collections.py +++ b/scene_select/collections.py @@ -97,10 +97,22 @@ } -def get_collection(dc: Datacube, prefix: str) -> ArdCollection: +def get_product(product_name:str) -> ArdProduct: + products = { + product for product in ARD_PRODUCTS if product.name == product_name + } + if not products: + raise ValueError(f"No products found for {product_name=}") + if len(products) > 1: + raise RuntimeError(f"Multiple products should never be found for one product name? {product_name=}") + [product] = products + return product + +def get_collection(dc: Datacube, prefix: str=None) -> ArdCollection: products = { product for product in ARD_PRODUCTS if product.name.startswith(f"ga_{prefix}") } + if not products: raise ValueError(f"No products found for {prefix=}") diff --git a/scene_select/library.py b/scene_select/library.py index 81dbdcae..7cbf289c 100644 --- a/scene_select/library.py +++ b/scene_select/library.py @@ -16,23 +16,23 @@ _LOG = structlog.get_logger() -@define +@define(hash=True) class Level1Product: - name: str + name: str = field(eq=True, hash=True) # Examples: # /g/data/fj7/Copernicus/Sentinel-2/MSI/L1C/2021/2021-01/30S110E-35S115E/S2B_MSIL1C_20210124T023249_N0209_R103_T50JLL_20210124T035242.zip # /g/data/da82/AODH/USGS/L1/Landsat/C2/092_079/LC80920792024074/LC08_L1TP_092079_20240314_20240401_02_T1.odc-metadata.yaml - base_collection_path: Path + base_collection_path: Path = field(eq=False, hash=False) # The metadata, if it's stored separately from the L1 data itself. # (if None, assuming metadata sits alongise the data) - separate_metadata_directory: Optional[Path] = None + separate_metadata_directory: Optional[Path] = field(eq=False, hash=False, default=None) # Is this still receiving new data? ie. do we expect ongoing downloads # (false if the satellite is retired, or if a newer collection is available) - is_active: bool = False + is_active: bool = field(eq=False, hash=False, default=False) @define(unsafe_hash=True) @@ -65,6 +65,7 @@ def metadata_doc(self) -> dict: @define(unsafe_hash=True) class Level1Dataset(BaseDataset): + product: Level1Product = field(eq=False, hash=False) # The zip or tar file data_path: Path = field(eq=False, hash=False) @@ -128,6 +129,7 @@ def from_odc(cls, dataset: Dataset, product: Level1Product): dataset_id=str(dataset.id), metadata_path=metadata_path, data_path=data_path, + product=product, ) @@ -173,6 +175,14 @@ def software_versions(self): def level1_id(self): return self.metadata_doc()["lineage"]["level1"][0] + @classmethod + def from_odc(cls, ard_dataset:Dataset): + return ArdDataset( + dataset_id = str(ard_dataset.id), + metadata_path=ard_dataset.local_path, + maturity=ard_dataset.metadata.dataset_maturity, + ) + class ArdCollection: def __init__(