diff --git a/docs/contributing.qmd b/docs/contributing.qmd index 85e7dab..19c7ab3 100644 --- a/docs/contributing.qmd +++ b/docs/contributing.qmd @@ -14,19 +14,12 @@ We follow a standard feature branch workflow, as described [here.](https://docs. $ git clone https://github.com/Earth-Information-System/fireatlas.git $ cd fireatlas - # create virtual environment named `feds_env` with venv - $ python -m venv feds_env - $ source feds_env/bin/activate + # create and activate environment named `fire_env` + $ conda env create -f env.yml + $ conda activate fire_env - # OR create a virtual environment with conda - $ conda create --name feds_env python=3.11 - $ conda activate feds_env - - # install required dependencies - $ pip install -e . - - # install optional test dependencies if needed - $ pip install -e '.[test]' + # install optional dev dependencies if needed + $ pip install -e .[dev] ``` 2. Create a new branch and make your changes as needed. Name the branch {your initials}/{descriptive feature name}: @@ -39,7 +32,7 @@ We follow a standard feature branch workflow, as described [here.](https://docs. $ git commit -m "Add NOAA21 inputs" ``` -3. Run tests (from fireatlas root dir) +3. Run tests from fireatlas root dir (requires optional dependencies) ``` $ pytest -v tests/ @@ -63,6 +56,21 @@ We follow a standard feature branch workflow, as described [here.](https://docs. 6. When you are ready, [create a pull request](https://docs.github.com/en/get-started/using-github/github-flow#create-a-pull-request) to merge your branch into the main branch, `conus-dps`. Most substantive changes should be reviewed by another team member, and all changes must wait to be merged until the automated tests have passed. +## Juptyer Notebooks + +To use a Jupyter Notebook with the development environment: + +``` +# First, create conda environment following directions in Step 1 above +# Then, from the fireatlasroot dir: +$ pip install .[dev] +$ python -m ipykernel install --user --name fire_env --display-name "Python - FEDS development environment" +# wait 30 seconds or so, then launch a new notebook. You should see this environment as an option to choose from in the launcher. + +``` + +This assumes you are using a JupyterLab interface such as the MAAP ADE. + ## Editing Docs We use Quarto to build this website. Most of the source files are plaintext `.qmd` files written in [Quarto markdown](https://quarto.org/docs/authoring/markdown-basics.html). You can edit them in any text editor, including with the MAAP ADE or even directly on GitHub. However, if you are making more extensive changes or using formatting, you should preview what the rendered website will look like. diff --git a/env.yml b/env.yml index 601f607..02ef23f 100644 --- a/env.yml +++ b/env.yml @@ -1,36 +1,11 @@ +name: fire_env channels: - conda-forge + dependencies: - python=3.11 - - xarray - - pandas - - dask - - distributed - - geopandas=0.14.2 - - shapely - - fsspec - - jupyterlab - - s3fs=2024.6.0 - - fiona - - rtree - - scikit-learn - - pyarrow - - rasterio - - pyproj - - ipykernel - - ca-certificates - - certifi - - openssl - - pydantic-settings - - pytest - - pytest-cov - - tqdm - - scalene + - awscli - pip - - numpy<2.0.0 - - pyogrio - - pip: - - git+https://github.com/MAAP-Project/maap-py.git@develop - - awscli -# NOTE: this env.yml is here for convenience but really all DPS and test installs leverage pyproject.toml -# so if you change things here do community service and change pyproject.toml too :wink: \ No newline at end of file + + - pip: + - -e . # Install most deps from pyproject.toml with pip \ No newline at end of file diff --git a/fireatlas/DataCheckUpdate.py b/fireatlas/DataCheckUpdate.py index 8a8e55c..7d98322 100644 --- a/fireatlas/DataCheckUpdate.py +++ b/fireatlas/DataCheckUpdate.py @@ -16,7 +16,7 @@ from fireatlas.FireLog import logger from fireatlas.preprocess import preprocess_input_file -MAP_KEY = "0e50658bd8e8ea368db7379b0be28630" +MAP_KEY = "af58dc9df0eb9c54f4cad1e73c6eae4e" N_MAX_RETRIES = 30 # ------------------------------------------------------------------------------ @@ -137,13 +137,13 @@ def update_FIRMS(d:date, sat: Literal["SNPP", "NOAA20", "NOAA21"], product: Lite logger.warning( f"{product} {sat} data is empty for {d}. This date may be outside range of data availability." ) - return + tst = d + else: + daterange = pd.to_datetime(df['acq_date']) + tst, ted = daterange.min(), daterange.max() - daterange = pd.to_datetime(df['acq_date']) - tst, ted = daterange.min(), daterange.max() - - if tst.date() != ted.date(): - raise ValueError(f"Unexpected date range for single day file: {tst} to {ted}") + if tst.date() != ted.date(): + raise ValueError(f"Unexpected date range for single day file: {tst} to {ted}") filename_out = f"FIRMS_VIIRS_{sat}_{product}_{tst.strftime('%Y%m%d')}.csv" downloaded_filepath = os.path.join(data_dir, filename_out) diff --git a/fireatlas/FireConsts.py b/fireatlas/FireConsts.py index b84b6b4..b818f7e 100644 --- a/fireatlas/FireConsts.py +++ b/fireatlas/FireConsts.py @@ -3,13 +3,18 @@ running controls """ -from typing import Literal +from typing import Literal, Optional, Tuple import os import warnings from pyproj import CRS import fsspec -from pydantic_settings import BaseSettings, SettingsConfigDict +from pydantic_settings import ( + BaseSettings, + SettingsConfigDict, + PydanticBaseSettingsSource, + YamlConfigSettingsSource, +) from pydantic import Field, validator, field_validator @@ -18,14 +23,49 @@ root_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) DOTENV_ABS_PATH = os.path.join(os.path.dirname(__file__), ".env") - +YAML_FILENAME = "run_config.yaml" +YAML_ABS_PATH = os.path.join(os.path.dirname(__file__), YAML_FILENAME) class Settings(BaseSettings): - # read in all env vars prefixed with `FEDS_` they can be in a .env file - model_config = SettingsConfigDict( - env_file=DOTENV_ABS_PATH, extra="ignore", env_prefix="FEDS_" - ) + if os.path.exists(YAML_ABS_PATH): + model_config = SettingsConfigDict( + yaml_file=YAML_ABS_PATH, + yaml_config_section="settings", + env_file=DOTENV_ABS_PATH, + extra="ignore", + env_prefix="FEDS_" # note: expects env vars as FEDS_env_var_name + ) + else: + model_config = SettingsConfigDict( + env_file=DOTENV_ABS_PATH, + extra="ignore", + env_prefix="FEDS_" + ) + + # Settings resolution order (in ascending order of priority): + # 1. Starts with default field values provided in FireConsts.py + # 2. Overrides with settings from run_config.yaml if available + # 3. Overrides with environment variables from .env file if available + # 4. Overrides with environment variables from the system env if set + # 5. Overrides with settings passed to the Settings class initializer if passed + # Result: init > system env vars > .env file > yaml file > default values + @classmethod + def settings_customise_sources( + cls, + settings_cls: type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> tuple[PydanticBaseSettingsSource, ...]: + return ( + init_settings, + env_settings, + dotenv_settings, + YamlConfigSettingsSource(settings_cls), + ) + # ------------------------------------------------------------------------------ # where data is stored @@ -48,14 +88,25 @@ class Settings(BaseSettings): OUTPUT_DIR: str = Field( "FEDSoutput-v3", description="directory where output data is stored" ) + REGIONS_DIR: str = Field( + "run_definitions", + description="dirctory where region definitions are stored." + ) READ_LOCATION: Location = Field( "s3", description="Final storage place for written files. This is where everything reads from", ) - LOG_FILENAME: str = Field("running.log", description="Where to write logs to.") + LOG_FILEPATH: str = Field( + os.path.join(root_dir, "running.log"), + description="Absolute path to the log file." + ) + ENV_META_FILEPATH: str = Field( + os.path.join(root_dir, "env_metadata.txt"), + description="Absolute path to the environment metadata file." + ) # ------------------------------------------------------------------------------ # spatiotemporal constraints of fire objects # ------------------------------------------------------------------------------ @@ -120,6 +171,46 @@ def check_epsg(cls, epsg: int): 1, description="the connectivity spatial threshold (to previous fire line), km" ) + # ------------------------------------------------------------------------------ + # OPTIONAL: run parameters + # Can be passed from run_config.yaml if using FireRunArchiveCoordinator.py + # or passed from the command line for all other scripts + # ------------------------------------------------------------------------------ + + TST: Optional[Tuple[int, int, int, Literal["AM", "PM"]]] = Field( + default=None, + description="start time as [year, month, day, 'AM'/'PM']" + ) + + TED: Optional[Tuple[int, int, int, Literal["AM", "PM"]]] = Field( + default=None, + description="end time as [year, month, day, 'AM'/'PM']" + ) + + @field_validator("TST", "TED", mode="after") + @classmethod + def _tuple_to_list(cls, v): + # v is already validated as a tuple of (int, int, int, str) + if v is None: + return None + return list(v) + + RUN_NAME: Optional[str] = Field( + default=None, + description="Run name, e.g. 'ArchiveCONUS' or 'ArchiveCONUS_test'." + ) + + REGION_SHAPEFILE: Optional[str] = Field( + default=None, + description="Name of the file that holds a shapefile that defiens this region. " + "Assumes that this file is in the FEDSinput/run_definitions/RUN_NAME/ directory." + ) + + REGION_BBOX: Optional[list[float]] = Field( + default=None, + description="Bounding box of the region, e.g. [-126,24,-61,49]." + ) + # ------------------------------------------------------------------------------ # shape parameters # ------------------------------------------------------------------------------ @@ -143,7 +234,9 @@ def check_epsg(cls, epsg: int): # MODIS pixel size MCD64buf: float = Field(231.7, description="MODIS fire perimeter buffer, m") - # fire source data + # ------------------------------------------------------------------------------ + # fire data source parameters + # ------------------------------------------------------------------------------ FIRE_SOURCE: Literal["SNPP", "NOAA20", "VIIRS", "BAMOD"] = Field( "VIIRS", description="fire source data" @@ -184,8 +277,15 @@ def check_epsg(cls, epsg: int): export_to_veda: bool = Field( False, description="whether to export data from MAAP to VEDA s3" ) - N_DASK_WORKERS: int = Field(6, description="How many dask workers to use for Run.") + # compute settings + + N_DASK_WORKERS: int = Field(6, description="How many dask workers to use for Run.") + ARCHIVE_RUN_JOB_SIZE: int = Field( + 10, + description="How many days to run in each archive job chunk.") + + # NIFC matching options DO_NIFC_MATCHING: bool = Field( False, description="If True, reads from the NIFC incident database for current " @@ -208,6 +308,8 @@ def check_epsg(cls, epsg: int): CONT_OPT: Literal["preset", "CA", "global"] = Field( "CA", description="continuity threshold option" ) + # ------------------------------------------------------------------------------ + @validator("LOCAL_PATH") def local_path_must_not_end_with_slash(cls, v: str) -> str: diff --git a/fireatlas/FireIO.py b/fireatlas/FireIO.py index 23945c2..6f1026e 100644 --- a/fireatlas/FireIO.py +++ b/fireatlas/FireIO.py @@ -27,6 +27,7 @@ from fireatlas.FireLog import logger from fireatlas.FireTypes import TimeStep +from fireatlas.FireConsts import YAML_FILENAME from fireatlas import FireTime, settings @@ -220,7 +221,7 @@ def VNP14IMGML_filepath(t: TimeStep): Parameters ---------- t : tuple, (int,int,int,str) - the year, month, day and 'AM'|'PM' during the initialization + the year, month, day and 'AM'|'PM' Returns ------- @@ -234,17 +235,23 @@ def VNP14IMGML_filepath(t: TimeStep): "VIIRS", "VNP14IMGML", ) - # prefers collection 2 version 3 (latest as of July 2025) - filepath = os.path.join(file_dir, f"VNP14IMGML.{year}{month:02}.C2.03.csv") - if not settings.fs.exists(filepath): - filepath = os.path.join(file_dir, f"VNP14IMGML.{year}{month:02}.C2.01.txt") - if not settings.fs.exists(filepath): - filepath = os.path.join(file_dir, f"VNP14IMGML.{year}{month:02}.C1.05.txt") - if not settings.fs.exists(filepath): - logger.warning(f"No VNP14IMGML file found for {year}-{month:02}") - return - return filepath + # filename patterns in order of preference + versions = [ + f"VNP14IMGML.{year}{month:02}.C2.04.csv", + f"VNP14IMGML.{year}{month:02}.C2.03.csv", + f"VNP14IMGML.{year}{month:02}.C2.02.csv", + f"VNP14IMGML.{year}{month:02}.C2.01.txt", + f"VNP14IMGML.{year}{month:02}.C1.05.txt", + ] + + for filename in versions: + filepath = os.path.join(file_dir, filename) + if settings.fs.exists(filepath): + return filepath + + logger.warning(f"No monthly data available for SNPP for {year}-{month:02}") + return None def read_VNP14IMGML(filepath: str): @@ -374,15 +381,16 @@ def VJ114IMGML_filepath(t: TimeStep): filepath : str Path to input data or None if file does not exist """ - year, month = t[0], t[1] - file_dir = os.path.join( + year, month = t[0], t[1] + + filepath = os.path.join( settings.dirextdata, "VIIRS", "VJ114IMGML", + f"VJ114IMGML.{year}{month:02}.C2.04.csv", ) - # looks for collection 2 version 3 (latest as of July 2025) - filepath = os.path.join(file_dir, f"VJ114IMGML.{year}{month:02}.C2.03.txt") + if not settings.fs.exists(filepath): logger.warning(f"No VJ114IMGML file found for {year}-{month:02}") return @@ -871,7 +879,7 @@ def AFP_setampm(df): the DataFrame with 'ampm' column """ # calculate local hour using the longitude and datetime column - localhour = (pd.to_timedelta(df.Lon / 15, unit="hours") + df["datetime"]).dt.hour + localhour = FireTime.aprox_local_datetime(df["datetime"], df["Lon"]).dt.hour # set am/pm flag based on local hour df_withampm = df.assign( @@ -2498,3 +2506,62 @@ def convert_v2_pkl_to_csv(files, output_dir, sat): output_paths.append(output_filepath) return output_paths + +def s3_log_destination_path(run_id: str, ted: TimeStep): + """Provide destination path to copy logs from local to s3 output directory for this region. + + Parameters + ---------- + run_id : str + name of run definition + ted : Timestep + last timestep of the current run + + Returns + ------- + path : str + destination path + """ + + ted_str = "ted_" + "".join(str(d) for d in ted) + "_" + + return os.path.join( + settings.get_path(location="s3"), + settings.OUTPUT_DIR, + run_id, + "logs", + ted_str + os.path.basename(settings.LOG_FILEPATH) + ) + +def s3_metadata_destination_path(run_name: str): + """Provide destination path to copy environment metadata from local to s3 output directory for this region. + + Parameters + ---------- + run_name : str + name of run definition + + Returns + ------- + path : str + destination path + """ + + return os.path.join( + settings.get_path(location="s3"), + settings.OUTPUT_DIR, + run_name, + "logs", + os.path.basename(settings.ENV_META_FILEPATH) + ) + +def s3_config_path(run_name: str): + """Provide path where the config file for run_name is expected on s3. + Example: + s3://maap-ops-workspace/shared/zbecker/FEDSstaging/FEDSinput/run_definitions/{run_name}/run_config.yaml""" + return os.path.join( + settings.get_path(location="s3"), + settings.INPUT_DIR, + settings.REGIONS_DIR, + run_name, + YAML_FILENAME) \ No newline at end of file diff --git a/fireatlas/FireLog.py b/fireatlas/FireLog.py index 5610a42..6caea44 100644 --- a/fireatlas/FireLog.py +++ b/fireatlas/FireLog.py @@ -1,6 +1,10 @@ import logging import os from fireatlas import settings +import sys +import platform +import subprocess +import datetime as dt _logger_configured = False @@ -19,7 +23,7 @@ def get_logger(name): ch.setLevel(logging.INFO) # create a file handler as well - fh = logging.FileHandler(os.path.join(root_dir, settings.LOG_FILENAME)) + fh = logging.FileHandler(settings.LOG_FILEPATH) fh.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') @@ -39,3 +43,34 @@ def get_logger(name): return logger logger = get_logger(__name__) + + +def write_run_metadata(): + """ + Writes a text file to settings.ENV_META_FILEPATH that captures the output of + pip freeze and basic platform information. + """ + t = dt.datetime.utcnow().isoformat() + "Z" + python_version = sys.version.replace("\n", " ") + platform_info = platform.platform() + + meta = [ + f"# Environment metadata generated: {t}", + f"# Python: {python_version}", + f"# Platform: {platform_info}", + "#" * 60 + ] + + freeze_output = subprocess.run( + ["conda", "env", "export"], + capture_output=True, + text=True, + check=True + ) + + content = "\n".join(meta) + "\n" + freeze_output.stdout + + with open(settings.ENV_META_FILEPATH, "w", encoding="utf-8") as f: + f.write(content) + + return settings.ENV_META_FILEPATH diff --git a/fireatlas/FireMain.py b/fireatlas/FireMain.py index a2b0368..8d6124a 100644 --- a/fireatlas/FireMain.py +++ b/fireatlas/FireMain.py @@ -261,7 +261,10 @@ def adjust_coincident_pixels(allpixels): duplicate_records = allpixels[(allpixels["y"] == y) & (allpixels["x"] == x)] for idx, row in duplicate_records.iterrows(): - logger.info(f" Y: {row['y']:.8f} | X: {row['x']:.8f} | Sat: {row['Sat']} | DateTime: {row['datetime']} | Version: {row['version']}") + if 'version' in duplicate_records.columns: + logger.info(f" Y: {row['y']:.8f} | X: {row['x']:.8f} | Sat: {row['Sat']} | DateTime: {row['datetime']} | Version: {row['version']}") + else: + logger.info(f" Y: {row['y']:.8f} | X: {row['x']:.8f} | Sat: {row['Sat']} | DateTime: {row['datetime']}") # Apply jitter to all records with this duplicate coordinate pair mask = (allpixels["y"] == y) & (allpixels["x"] == x) diff --git a/fireatlas/FireObj.py b/fireatlas/FireObj.py index 50c1ce1..d2f48d3 100644 --- a/fireatlas/FireObj.py +++ b/fireatlas/FireObj.py @@ -165,7 +165,7 @@ def update_gdf(self): gdf_updates = gpd.GeoDataFrame( new_rows, geometry="hull", - crs=settings.EPSG_CODE + crs=self.gdf.crs ).set_index(["fireID", "t"]) # ensure/cast types once diff --git a/fireatlas/FireRunDaskCoordinator.py b/fireatlas/FireRunDaskCoordinator.py index 9d304a8..c6daf8d 100644 --- a/fireatlas/FireRunDaskCoordinator.py +++ b/fireatlas/FireRunDaskCoordinator.py @@ -47,9 +47,10 @@ FIRMS_VIIRS_SNPP_NRT_filepath, FIRMS_VIIRS_NOAA20_SP_filepath, FIRMS_VIIRS_NOAA20_NRT_filepath, - FIRMS_VIIRS_NOAA21_NRT_filepath + FIRMS_VIIRS_NOAA21_NRT_filepath, + get_reg_shp ) -from fireatlas.FireTime import t_generator, d2t, t_nm, t_nd +from fireatlas.FireTime import dt2t, t2dt, t_generator, d2t, t_nm, t_nd, t_nb, get_current_timestep from fireatlas.FireLog import logger from fireatlas import settings import geopandas as gpd @@ -61,7 +62,7 @@ # via boto3/botocore common resolution paths fs = s3fs.S3FileSystem(config_kwargs={"max_pool_connections": 10}) -logger.info(settings.model_dump()) +# logger.info(settings.model_dump()) def validate_json(s): try: @@ -162,30 +163,54 @@ def job_nrt_current_day_updates(client: Client): return futures -def job_data_update_checker(client: Client, tst: TimeStep, ted: TimeStep): - """ - Checks to see if any input data within the time range needs to be preprocessed. - If settings.FIRE_NRT == False, only tries to preprocess already existing monthly - input files (VNP14IMGML and VJ114IMGML). - If settings.FIRE_NRT == True, tries to download any missing NRT input data from FIRMS, - then preprocess any unprocessed NRT data (FIRMS_VIIRS_SNPP_NRT, FIRMS_VIIRS_NOAA20_NRT, - FIRMS_VIIRS_NOAA21_NRT). Does not try to use monthly files. - NOTE: If settings.FIRE_NRT and any input files are needed, - blocks for downloads inside this function and returns only preprocessing futures. +def job_data_update_checker(client: Client, tst: TimeStep, ted: TimeStep, force: bool = True): + """Checks to see if any input data needs to be downloaded or preprocessed. + + If settings.FIRE_NRT is False, only tries to preprocess existing monthly files + (VNP14IMGML and VJ114IMGML). + + If settings.FIRE_NRT is True, tries to download any missing input data from FIRMS. + Then, preprocesses any daily files that need to be preprocessed. If force is True + and ted is within two days of the current time (UTC), automatically re-downloads and + re-preprocesses the current day and previous two days to catch any missed/lagging data + for NRT runs. Does not try to use monthly files. + + NOTE: When daily file downloads are needed, this function does them in sequence before returning. + This helps avoid overloading the FIRMS API with parallel requests from the Dask client. NOTE: Does not automatically reprocess a timestep that was previously preprocessed - from NRT data when the standard data product becomes available. + based on NRT data when the standard data product becomes available. - Generally, assumes that if a preprocessed file for a date already exists, it does - not need to be reprocessed unless it is from the most recent two days of the - NRT record. + Generally, assumes that if a preprocessed file for a timestep already exists, it does + not need to be reprocessed unless it is from the most recent two days of the NRT record. + + Parameters: + ----------- + client : Client + The Dask client to submit preprocessing jobs to + + tst : TimeStep + Beginning of date range to check. This function will also look for the previous + timestep's input file to make available as filepath_prev, and will log a warning if + not found, but continue. + + ted : TimeStep + End of date range to check. In some cases, this can be up to 12 hours ahead of the current + UTC time. In all cases, this function will look for the next timetstep after ted as well + to make available as filepath_next, and will log a warning but continue if not found. + + force : bool = True + If settings.FIRE_NRT and force is True, if ted is within 48 hours of the current time, + automatically re-downloads and re-processes the previous two days of input data. + This serves as a rolling window to automatically backfill any data lags for NRT runs. Returns: -------- futures : list[dask.distributed.client.Future] List of preprocessing jobs to execute + """ source = settings.FIRE_SOURCE @@ -196,49 +221,49 @@ def job_data_update_checker(client: Client, tst: TimeStep, ted: TimeStep): futures = [] if source == "VIIRS": sats = ["SNPP", "NOAA20", "NOAA21"] - else: - sats = [source] - + else: + sats = [source] + for sat in sats: - if not settings.FIRE_NRT: - # look for already-downloaded monthly files - if sat == "SNPP": + if not settings.FIRE_NRT: + # look for already-downloaded monthly files only + if sat == "SNPP": monthly_filepath_func = VNP14IMGML_filepath - elif sat == "NOAA20": + elif sat == "NOAA20": monthly_filepath_func = VJ114IMGML_filepath - elif sat == "NOAA21": + elif sat == "NOAA21": logger.warning("No standard products available for NOAA21. " "Did you mean to set FireConsts.FIRE_NRT = True?") - continue - + continue + # gives list of timesteps for which there is no preprocessed file available timesteps = check_preprocessed_file(tst, ted, sat=sat, freq="monthly") - if len(timesteps) < 1: # no processing needed for this sat + if len(timesteps) < 1: # no processing needed for this sat continue - - monthly_filepaths = [monthly_filepath_func(t) for t in timesteps] - - # we don't need to preprocess outside of time range, but we do need + + monthly_filepaths = [monthly_filepath_func(t) for t in timesteps] + + # we don't need to preprocess outside of time range, but we do want # the previous and next months to preprocess the first and last days prev_month, next_month = t_nm(tst, "previous"), t_nm(ted, "next") for m in [prev_month, next_month]: - if not monthly_filepath_func(m): + if not monthly_filepath_func(m): logger.warning(f"No monthly input file found for {m} for {sat}") - + indices = [i for i, f in enumerate(monthly_filepaths) if f is not None] missing_indices = [i for i, f in enumerate(monthly_filepaths) if f is None] for i in missing_indices: logger.warning(f"No monthly input file found for {timesteps[i]} for {sat}") - + existing_timesteps = [timesteps[i] for i in indices] futures.extend(client.map(partial(preprocess_monthly_file, sat=sat), existing_timesteps)) - elif settings.FIRE_NRT: - + elif settings.FIRE_NRT: + if sat == "SNPP": nrt_filepath_func = FIRMS_VIIRS_SNPP_NRT_filepath sp_filepath_func = FIRMS_VIIRS_SNPP_SP_filepath @@ -248,79 +273,97 @@ def job_data_update_checker(client: Client, tst: TimeStep, ted: TimeStep): elif sat == "NOAA21": nrt_filepath_func = FIRMS_VIIRS_NOAA21_NRT_filepath sp_filepath_func = None - + + # use these to ensure all downloads are done before any preprocessing starts + preprocess_tasks = {} # (t, sat) -> filepath + + if force and t2dt(ted) >= dt.datetime.now() - dt.timedelta(hours=48): + # if ted is within 48 hours of current time, force re-download + # and re-process latest files + + ctime = dt.datetime.now() # Current UTC time + + datetimes = [ctime - dt.timedelta(days=n) for n in [0,1,2]] + + for d in datetimes: + fp = update_FIRMS(d, sat=sat, product="NRT") + preprocess_tasks[(tuple(dt2t(d)), sat)] = fp # tuple conversion makes hashable + + if t2dt(ted) > t2dt(dt2t(ctime)): + # for some timezones, local ted timestep can be after the current UTC day + # in thise case, mock the filepath for that input file and preprocess. + # preprocess_input_file will use filepath_prev only + fp = nrt_filepath_func(ted) + preprocess_tasks[(tuple(ted), sat)] = fp + + # for timesteps in tst to ted, download and preprocess any missing files + # gives list of timesteps for which there is no preprocessed file available timesteps = check_preprocessed_file(tst, ted, sat=sat, freq="NRT") - if len(timesteps) < 1: # no processing needed for this sat - continue - - # check firms data availability - sp_start, sp_end, nrt_start, nrt_end = get_FIRMS_data_availability(sat) + if len(timesteps) < 1: # no processing needed for this sat + continue - # use these to ensure all downloads are done before any preprocessing starts - download_futures = {} # (t, sat) -> dask future - preprocess_tasks = {} # (t, sat) -> filepath + # check FIRMS data availability + sp_start, sp_end, nrt_start, nrt_end = get_FIRMS_data_availability(sat) - for t in timesteps: + for t in timesteps: d = dt.datetime(t[0], t[1], t[2]) if d > nrt_end: - logger.warning(f"No data available for {sat} on {t[0]}-{t[1]}-{t[2]}: date out of range.") - continue - elif d >= nrt_start: # in NRT availability range + logger.warning(f"{t[0]}-{t[1]}-{t[2]} is after NRT data availability range ends for {sat}") + continue + elif d>= nrt_start: # in NRT availability range fp = nrt_filepath_func(t) - if fs.exists(fp): - preprocess_tasks[(t, sat)] = fp + if fs.exists(fp): + preprocess_tasks[(t, sat)] = fp # if we don't already have this input file, try to download from FIRMS else: - download_futures[(t, sat)] = client.submit(update_FIRMS, d, sat, "NRT") - elif sp_start and d >= sp_start: # check if sp_start because NOAA21 does not have yet - # in standard product availability range + downloaded_fp = update_FIRMS(d, sat, "NRT") + preprocess_tasks[(t, sat)] = downloaded_fp + elif sp_start and d >= sp_start: # check if sp_start because NOAA21 does not yet have + # in standard product availability range fp = sp_filepath_func(t) if fs.exists(fp): - preprocess_tasks[(t, sat)] = fp + preprocess_tasks[(t, sat)] = fp else: - download_futures[(t, sat)] = client.submit(update_FIRMS, d, sat, "SP") + downloaded_fp = update_FIRMS(d, sat, "SP") + preprocess_tasks[(t, sat)] = downloaded_fp else: # either before sp_start, or this is NOAA21 (so, no sp_start) and it is before # nrt start. either way, warn but allow logger.warning(f"No data available for {sat} on {t[0]}-{t[1]}-{t[2]}. " "Date may be out of range.") - # need to have these available to preprocess tst and ted, if possible + # would like to have these available to preprocess tst and ted, if possible prev_day = t_nd(tst, "previous") next_day = t_nd(ted, "next") - for t in [prev_day, next_day]: + for t in [prev_day, next_day]: d = dt.datetime(t[0], t[1], t[2]) - if d > nrt_end: - logger.warning(f"No data available for {sat} on {t[0]}-{t[1]}-{t[2]}. Date out of range.") + if d > nrt_end: + logger.warning(f"{t[0]}-{t[1]}-{t[2]} is after NRT data availability range ends for {sat}") elif d >= nrt_start: fp = nrt_filepath_func(t) - if not fs.exists(fp): + if not fs.exists(fp): update_FIRMS(d, sat, "NRT") elif d >= sp_start: - fp = sp_filepath_func(t) - if not fs.exists(fp): + fp = sp_filepath_func(t) + if not fs.exists(fp): update_FIRMS(d, sat, "SP") else: logger.warning(f"No data available for {sat} on {t[0]}-{t[1]}-{t[2]}. " "Date may be out of range.") - if len(download_futures) > 0: - # block to finish downloads before starting any preprocessing - downloaded_paths = client.gather(download_futures) - preprocess_tasks.update(downloaded_paths) - + logger.info('Preprocess tasks: %s', preprocess_tasks) # schedule preprocessing - for (tk, satk), fp in preprocess_tasks.items(): + for (tk, satk), fp in preprocess_tasks.items(): tk = list(tk) futures.append(client.submit(preprocess_daily_file, fp, tk, satk)) - return futures + return futures @timed def Run_local(region: Region, tst: TimeStep, ted: TimeStep, copy_to_veda: bool=False): @@ -410,12 +453,10 @@ def Run(region: Region, tst: TimeStep, ted: TimeStep, copy_to_veda: bool):\ if tst in (None, "", []): # if no start is given, run from beginning of year tst = [ctime.year, 1, 1, 'AM'] - if ted in (None, "", []): # if no end time is given, set it as the most recent time - if ctime.hour >= 18: - ampm = 'PM' - else: - ampm = 'AM' - ted = [ctime.year, ctime.month, ctime.day, ampm] + if ted in (None, "", []): + # if no end time is given, set it as the most recent timestep for this region + reg_shp = get_reg_shp(region) + ted = t_nb(FireTime.get_current_timestep(reg_shp), "previous") # most recent completed timestep logger.info(f"------------- Starting full run from {tst=} to {ted=} -------------") diff --git a/fireatlas/FireTime.py b/fireatlas/FireTime.py index 100720a..fee075c 100644 --- a/fireatlas/FireTime.py +++ b/fireatlas/FireTime.py @@ -13,10 +13,9 @@ """ -from datetime import date, timedelta, datetime +from datetime import date, timedelta, datetime, timezone import pandas as pd - def t_nb(t, nb="next"): """Calculate the next or previous time step (year, month, day, ampm) Parameters @@ -232,22 +231,33 @@ def d2t(year, month, day, ampm): return t -def dt2t(dt): +def dt2t(indt): """convert datetime to a t tuple Parameters ---------- - dt : datetime datetime - datetime - Returns + dt : datetime object in local time + + + Expected behavior: + 1/1 07:00 to 1/1 17:59 -> 1/1 PM + 1/1 18:00 to 1/2 06:59 -> 1/2 AM + 1/2 07:00 to 1/2 17:59 -> 1/2 PM + 1/2 18:00 to 1/3 06:59 -> 1/3 AM + + This is based on the existing logic in FireIO.AFP_setampm + + Returns ------- - t : tuple, (int,int,int,str) - the year, month, day and 'AM'|'PM' + t : TimeStep ([int year, int month, int day, "AM" or "PM"]) """ - dlh = {"AM": 0, "PM": 12} - dhl = {0: "AM", 12: "PM"} - t = [dt.year, dt.month, dt.day, dhl[dt.hour]] - return t + if indt.hour <= 6: + return [indt.year, indt.month, indt.day, "AM"] + elif indt.hour < 18: + return [indt.year, indt.month, indt.day, "PM"] + else: # belongs to AM overpass for following day + nextday = indt + timedelta(days=1) + return [nextday.year, nextday.month, nextday.day, "AM"] def ftrange(firstday, lastday): @@ -319,3 +329,56 @@ def update_tst_ted(polygon_series, tst=None, ted=None): ) return tst, ted + +def aprox_local_datetime(datetime_utc, lon): + """ + Calculate aproximate local solar datetime from UTC datetime and longitude. + + Parameters + ---------- + datetime_utc : datetime object or pd.Series + Datetime in UTC + lon : float or pd.Series + Longtitude (assumed to be in decimal degrees) + + Returns + ------- + datetime or pd.Series + Local datetime(s) + """ + + # Validate longitude range + if isinstance(lon, pd.Series): + if (lon < -180).any() or (lon > 180).any(): + invalid_values = lon[(lon < -180) | (lon > 180)] + raise ValueError( + f"Longitude values must be in range [-180, 180]. Input expected in decimal degrees." + f"Found {len(invalid_values)} invalid value(s): " + f"{invalid_values.values[:5]}{'...' if len(invalid_values) > 5 else ''}" + ) + else: + if lon < -180 or lon > 180: + raise ValueError( + f"Longitude must be in range [-180, 180]. Input expected in decimal degrees. Got: {lon}" + ) + + return pd.to_timedelta(lon / 15, unit="hours") + datetime_utc + +def get_current_timestep(reg_shp): + """ + Returns the most recent Timestep for a given region based on the current time, based on + aprox local solar time at the centroid of the region geometry. + + Parameters: + ----------- + reg_shp : Any Shapely geometry, assumed to be lat/lon + + Returns: + -------- + local_timestep : FireTypes.TimeStep + + """ + ctime = datetime.now(tz=timezone.utc) + reg_lon = reg_shp.centroid.x + local_timestep = dt2t(aprox_local_datetime(ctime, reg_lon)) + return local_timestep \ No newline at end of file diff --git a/fireatlas/preprocess.py b/fireatlas/preprocess.py index 7444102..5e871d9 100644 --- a/fireatlas/preprocess.py +++ b/fireatlas/preprocess.py @@ -15,7 +15,7 @@ from fireatlas.FireTypes import Region, TimeStep, Location from fireatlas.utils import timed from fireatlas.FireClustering import do_clustering -from fireatlas.FireTime import t_generator, t2dt, t_nb, t_nd, t_nm +from fireatlas.FireTime import t_generator, t2dt, t_nb, t_nd, t_nm, aprox_local_datetime from fireatlas import FireIO, FireMain, settings, FireTime @@ -156,7 +156,7 @@ def FIRMS_NRT_filepath(t: TimeStep, sat: Literal["SNPP", "NOAA20", "NOAA21"]): Returns ------- filepath : str - Path to input data or None if file does not exist + Path to input data, even if there is no file there at present """ if sat == "SNPP": filepath = FireIO.FIRMS_VIIRS_SNPP_NRT_filepath(t) @@ -167,10 +167,7 @@ def FIRMS_NRT_filepath(t: TimeStep, sat: Literal["SNPP", "NOAA20", "NOAA21"]): else: raise ValueError("Please set SNPP, NOAA20, or NOAA21 for sat") - if not settings.fs.exists(filepath): - return None - else: - return filepath + return filepath def FIRMS_SP_filepath(t: TimeStep, sat: Literal["SNPP", "NOAA20", "NOAA21"]): """Filepath for daily SP VIIRS data from FIRMS @@ -185,7 +182,7 @@ def FIRMS_SP_filepath(t: TimeStep, sat: Literal["SNPP", "NOAA20", "NOAA21"]): Returns ------- filepath : str - Path to input data or None if file does not exist + Path to input data even if there is no file currently present """ if sat == "SNPP": filepath = FireIO.FIRMS_VIIRS_SNPP_SP_filepath(t) @@ -194,10 +191,7 @@ def FIRMS_SP_filepath(t: TimeStep, sat: Literal["SNPP", "NOAA20", "NOAA21"]): else: raise ValueError("Please set SNPP or NOAA20 for sat") - if not settings.fs.exists(filepath): - return None - else: - return filepath + return filepath def monthly_filepath(t: TimeStep, sat: Literal["NOAA20", "SNPP"]): """Filepath for monthly VIIRS data @@ -269,7 +263,7 @@ def check_preprocessed_file( @timed def preprocess_input_file(filepath: str, filepath_prev: str | None, filepath_next: str | None): """ - Preprocess monthly or daily NRT file of fire location data. + Preprocess monthly or daily NRT file of fire location data for the date inferred from in the input filepath. NOTE: Input files are named by UTC date or month. Output files are named with the aprox local solar date/time for each observation. Pixels in the @@ -277,12 +271,15 @@ def preprocess_input_file(filepath: str, filepath_prev: str | None, filepath_nex by aprox local solar time. This means that they may come from the previous or next UTC date. - NOTE: Satellite is deduced from the filepath. + NOTE: Satellite and date are deduced from the filepath. + + NOTE: If None is passed for an input filename or no input file exists for a passed filename, + this function will log a warning but continue with the other input files. Parameters ---------- filepath : str - Path to input data. Can be local or s3. + Path to input data for the input file being preprocessed. Can be local or s3. filepath_prev : str | None Path to input data for previous timestep. If None, this function will simply not check the input file for the previous UTC timestep. This can lead to @@ -297,16 +294,26 @@ def preprocess_input_file(filepath: str, filepath_prev: str | None, filepath_nex List of filepaths that this function has written to. """ if filepath is None: + logger.error(f"Null filepath. Args: filepath = {filepath}, " + f"filepath_prev = {filepath_prev}, filepath_next = {filepath_next}") + raise ValueError("Please provide a valid filepath") logger.info(f"preprocessing {filepath.split('/')[-1]}") dfs = [] sat = None + + location = settings.READ_LOCATION + fs = fsspec.filesystem(location, use_listings_cache=False) + + t = get_date_from_input_filename(filepath) + for f in [filepath_prev, filepath, filepath_next]: - if not f: - # it can be valid to have no prev or next file - # move on to next file - continue + + if not f or not fs.exists(f): + # it can be valid to have an empty filepath on data availability boundaries- warn but continue + logger.warning(f"No input file found for {f} while preprocessing {sat} on {t[0]}-{t[1]}-{t[2]}.") + continue # read file if "VNP14IMGTDL" in f: @@ -352,8 +359,7 @@ def preprocess_input_file(filepath: str, filepath_prev: str | None, filepath_nex df = pd.concat(dfs) # Convert from UTC to aprox local time - df["local_datetime"] = (pd.to_timedelta(df.Lon / 15, unit="hours") + df["datetime"]) - + df["local_datetime"] = aprox_local_datetime(df["datetime"], df["Lon"]) # get the date of the main input file query_year, query_month, query_day = get_date_from_input_filename(filepath) # Select only observations that are on the date of the main input file in the local timezone @@ -451,7 +457,7 @@ def preprocess_NRT_file(t: TimeStep, sat: Literal["NOAA20", "SNPP"]): def preprocess_daily_file(filepath, t: TimeStep, sat: Literal["SNPP", "NOAA20", "NOAA21"]): """Find previous and next daily input files, then preprocess this timestep. - Prefers FIRMS standard product (SP) over FIRMS NRT if we have both. + Prefers FIRMS standard product (SP) over FIRMS NRT if we have it. Parameters ---------- filepath : str @@ -469,13 +475,24 @@ def preprocess_daily_file(filepath, t: TimeStep, sat: Literal["SNPP", "NOAA20", day_prev = t_nd(t, "previous") day_next = t_nd(t, "next") - if settings.FIRE_NRT == True: - filepath_prev = FIRMS_NRT_filepath(day_prev, sat) - filepath_next = FIRMS_NRT_filepath(day_next, sat=sat) - - else: - filepath_prev = FIRMS_SP_filepath(day_prev, sat=sat) + if sat in ["SNPP", "NOAA20"]: + # look for SP if available for this sat + filepath_prev = FIRMS_SP_filepath(day_prev, sat=sat) filepath_next = FIRMS_SP_filepath(day_next, sat=sat) + + fs = fsspec.filesystem(settings.READ_LOCATION, use_listings_cache=False) + + if not fs.exists(filepath_prev): + filepath_prev = FIRMS_NRT_filepath(day_prev, sat=sat) + + if not fs.exists(filepath_next): + filepath_next = FIRMS_NRT_filepath(day_next, sat=sat) + else: + # NOAA21 does not yet have SP available for any days + filepath_prev = FIRMS_NRT_filepath(day_prev, sat=sat) + filepath_next = FIRMS_NRT_filepath(day_next, sat=sat) + + return preprocess_input_file(filepath, filepath_prev, filepath_next) diff --git a/maap_runtime/FireRunArchiveCoordinator.py b/maap_runtime/FireRunArchiveCoordinator.py new file mode 100644 index 0000000..083cb97 --- /dev/null +++ b/maap_runtime/FireRunArchiveCoordinator.py @@ -0,0 +1,253 @@ +import argparse +import os +import subprocess +import s3fs +import glob +import dask.config +import geopandas as gpd +import datetime as dt +from functools import partial +from dask.distributed import Client +from fireatlas.FireMain import Fire_Forward +from fireatlas.FireRunDaskCoordinator import ( + job_data_update_checker, + job_preprocess_region, + job_preprocess_region_t, + get_timesteps_needing_region_t_processing +) +from fireatlas.FireConsts import YAML_ABS_PATH +from fireatlas.FireIO import ( + copy_from_local_to_s3, + copy_from_local_to_veda_s3, + s3_log_destination_path, s3_config_path, + s3_metadata_destination_path, + get_reg_shp +) +from fireatlas.FireTime import dt2t, t2dt, t_nb, get_current_timestep +from fireatlas.postprocess import ( + all_dir, + allfires_filepath, + allpixels_filepath, + combined_lf_perims_nifc_join, + find_largefires, + get_t_of_last_allfires_run, + read_allfires_gdf, + read_allpixels, + save_large_fires_layers, + save_large_fires_nplist, + save_snapshots +) +from fireatlas.utils import timed +from fireatlas import settings +from fireatlas.FireLog import logger, write_run_metadata +from maap.maap import MAAP + +dask.config.set({'logging.distributed': 'error'}) + +# NOTE: this expects credentials to be resolvable globally +# via boto3/botocore common resolution paths +fs = s3fs.S3FileSystem(config_kwargs={"max_pool_connections": 10}) + +def main(run_name, copy_to_veda=False): + + wallclock_start = dt.datetime.now() + + if os.path.exists(YAML_ABS_PATH): + logger.info(f"run_config.yaml file found at {YAML_ABS_PATH}. Including settings overrides.") + else: + logger.info(f"run_config.yaml NOT found at {YAML_ABS_PATH}.") + + logger.info(settings.model_dump()) + + if settings.RUN_NAME is None or settings.TST is None: + raise ValueError("Run parameters are not defined in run_config.yaml. " + "To use this script, you must define the full run parameters and settings in " + " FEDSinput/run_definitions/{run_name}/run_config.yaml.") + else: + + + regnm = settings.RUN_NAME + if settings.REGION_SHAPEFILE is not None: + region = [regnm, settings.REGION_SHAPEFILE] + elif settings.REGION_BBOX is not None: + region = [regnm, settings.REGION_BBOX] + else: + raise ValueError("No region shape found. Did you set settings.REGION_SHAPEFILE or" + "settings.REGION_BBOX in run_config.yaml?") + + # parse TST and TED + tst = settings.TST + if settings.TED is not None: + ted = settings.TED + else: + # if no end time set, use current time (for NRT runs) + reg_shp = get_reg_shp(region[1]) + ted = t_nb(get_current_timestep(reg_shp), "previous") # most recent completed timestep + + gpd.show_versions() # for debugging + + # log commit hash of current fireatlas version + try: + logger.info("fireatlas current branch: " + subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"],stderr=subprocess.DEVNULL, text=True).strip()) + logger.info("commit hash of fireatlas version used: " + subprocess.check_output(["git", "rev-parse", "HEAD"],stderr=subprocess.DEVNULL, text=True).strip()) + except: + pass + + logger.info(f"------------- Starting full run from {tst=} to {ted=} -------------") + + client = Client(n_workers=settings.N_DASK_WORKERS) + logger.info(f"dask workers = {len(client.cluster.workers)}") + + # run the first two jobs in parallel + data_update_futures = job_data_update_checker(client, tst, ted) + region_future = client.submit(job_preprocess_region, region) + + # block until data update is complete + client.gather(data_update_futures) + + data_upload_futures = client.map( + partial(copy_from_local_to_s3, fs=fs), + glob.glob(f"{settings.LOCAL_PATH}/{settings.PREPROCESSED_DIR}/*/*.txt") + ) + # block until half-day timesteps and region are on s3 + timed(client.gather, text=f"Dask upload of {len(data_upload_futures) + 1} files")([*data_upload_futures, region_future]) + + logger.info("------------- Done with preprocessing t -------------") + + # then run all region-plus-t in parallel that need it + timesteps_needing_processing = get_timesteps_needing_region_t_processing( + tst, ted, region, force=True + ) + region_and_t_futures = client.map( + partial(job_preprocess_region_t, region=region), + timesteps_needing_processing + ) + # block until preprocessing is complete + client.gather(region_and_t_futures) + + logger.info("------------- Done with preprocessing region + t -------------") + + # run fire forward for the next settings.ARCHIVE_RUN_JOB_SIZE days + # produce and upload only allfires and allpixels + + # get t of latest allfires/allpixels if any + t_saved = get_t_of_last_allfires_run(tst, ted, region=region, location=settings.READ_LOCATION) + + # Pick up where we left off if possible; calculate size of next run chunk + if t_saved is None or t2dt(t_saved) <= t2dt(tst) or t2dt(t_saved) > t2dt(ted): + run_tst = tst + else: + run_tst = t_saved + run_ted = dt2t(min(t2dt(ted), t2dt(run_tst) + dt.timedelta(days=settings.ARCHIVE_RUN_JOB_SIZE))) + + logger.info(f"------------- Running Fire_Forward for {run_tst=} to {run_ted=} -------------") + + try: + allfires, allpixels, t_saved = Fire_Forward(tst=run_tst, ted=run_ted, region=region, restart=False) + allfires_gdf = allfires.gdf + copy_from_local_to_s3(allpixels_filepath(run_tst, run_ted, region, location="local"), fs=fs) + copy_from_local_to_s3(allfires_filepath(run_tst, run_ted, region, location="local"), fs=fs) + except KeyError as e: + logger.warning(f"Fire_Forward has already run. {e}") + allpixels = read_allpixels(tst, ted, region) + allfires_gdf = read_allfires_gdf(tst, ted, region) + t_saved = run_ted + + logger.info(f"------------- Done running Fire_Forward for {run_tst=} to {run_ted=} -------------") + + if t2dt(run_ted) < t2dt(ted): + + logger.info(f"------------- Submitting next job for {t_nb(run_ted)} to {ted} -------------") + + maap = MAAP(maap_host='api.maap-project.org') + job = maap.submitJob( + identifier=f"job-eis-feds-archive:checkpoints", + algo_id="eis-feds-archive", + version="checkpoints", + username="zbecker", + queue="maap-dps-eis-worker-128gb", + run_id=run_name + ) + + logger.info(f"------------- Submitted next job to DPS. Submission status: {job.status} -------------") + + else: + # all done with run: do postprocessing + if t_saved is not None: + snapshot_tst = t_saved + else: + snapshot_tst = run_tst + snapshot_futures = save_snapshots(allfires_gdf, region, snapshot_tst, ted, client=client) + large_fires = find_largefires(allfires_gdf) + save_large_fires_nplist(allpixels, region, large_fires, tst) + save_large_fires_layers(allfires_gdf, region, large_fires, tst, ted, client=client) + + client.gather(snapshot_futures) + + # If flag matching flat set, add overlaps with this year's NIFC incidents to + # CombinedLargefire/lf_perimeter.fgb for ted only. + if settings.DO_NIFC_MATCHING: + logger.info("Started NIFC matching") + combined_lf_perims_nifc_join(tst, ted, region, active_only=True, time_filter=None) + logger.info("Finished NIFC matching") + + + # take all fire forward output and upload all outputs in parallel + data_dir = all_dir(tst, region, location="local") + fgb_s3_upload_futures = client.map( + partial(copy_from_local_to_s3, fs=fs), + glob.glob(os.path.join(data_dir, "*", "*", "*.fgb")) + ) + # block until everything is uploaded + timed(client.gather, text=f"Dask upload of {len(fgb_s3_upload_futures)} files")(fgb_s3_upload_futures) + + if copy_to_veda: + # take latest fire forward output and upload to VEDA S3 in parallel + fgb_veda_upload_futures = client.map( + partial(copy_from_local_to_veda_s3, fs=fs, regnm=region[0]), + glob.glob(os.path.join(data_dir, "*", f"{ted[0]}{ted[1]:02}{ted[2]:02}{ted[3]}", "*.fgb")) + ) + timed(client.gather, text=f"Dask upload of {len(fgb_veda_upload_futures)} files")(fgb_veda_upload_futures) + + logger.info("------------- Full run completed -------------") + + wallclock_end = dt.datetime.now() + run_duration = wallclock_end - wallclock_start + logger.info(f"This job completed in {str(run_duration)}") + + client.close() + + # write environment metadata and copy to s3 + metadata_path = write_run_metadata() + fs.put_file(metadata_path, s3_metadata_destination_path(region[0])) + logger.info(f"Copied environment information to {s3_metadata_destination_path(region[0])}") + + # finally, copy log file to s3 + fs.put_file(settings.LOG_FILEPATH, s3_log_destination_path(region[0], run_ted)) + + return + +if __name__ == "__main__": + """ + CLI script for coordinating retrospective "archive" runs of + FEDS on the MAAP DPS platform. + + If the user-specified time range is greater than + settings.ARCHIVE_RUN_JOB_SIZE days, the run will be split into multiple + jobs. Each job reads the latest checkpoint data, does any neccessary preprocessing, + then runs Fire_Forward on the region for settings.ARCHIVE_RUN_JOB_SIZE days. + Upon completetion, if we still haven't reached the end of the time range, + this script will submit another job for the next chunk of time. + + Parameters + ---------- + run_name : str + Name of the region to run FEDS over, e.g. "ArchiveCONUS" + + """ + + parser = argparse.ArgumentParser() + parser.add_argument("run_name", type=str, help="Name of the run definition to execute") + args = parser.parse_args() + + main(args.run_name) diff --git a/maap_runtime/archive/algorithm_config.yaml b/maap_runtime/archive/algorithm_config.yaml new file mode 100644 index 0000000..0a57ce8 --- /dev/null +++ b/maap_runtime/archive/algorithm_config.yaml @@ -0,0 +1,15 @@ +algorithm_name: eis-feds-archive +algorithm_description: "Run FEDS for large regions/time periods retrospectively" +algorithm_version: checkpoints +environment: ubuntu +repository_url: https://github.com/Earth-Information-System/fireatlas.git +docker_container_url: "mas.maap-project.org/root/maap-workspaces/custom_images/maap_base:v4.3.0" +queue: "maap-dps-eis-worker-128gb" +run_command: "fireatlas/maap_runtime/run_archive_cli.sh" +build_command: "fireatlas/maap_runtime/run_dps_build.sh" +disk_space: "250GB" +inputs: + config: [ ] + positional: + - name: run_id # trigger this run; FEDSinput/run_definitions/run_id/config.yaml must exist on s3 + download: False diff --git a/maap_runtime/register-and-run-archive.ipynb b/maap_runtime/register-and-run-archive.ipynb new file mode 100644 index 0000000..1d8b61d --- /dev/null +++ b/maap_runtime/register-and-run-archive.ipynb @@ -0,0 +1,57 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "6f8909ae-5044-4038-80de-0f0bbf7b0f71", + "metadata": {}, + "outputs": [], + "source": [ + "from maap.maap import MAAP\n", + "maap = MAAP(maap_host='api.maap-project.org')\n", + "response = maap.register_algorithm_from_yaml_file(\"./archive/algorithm_config.yaml\")\n", + "print(response.text)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e04de52f-5e37-4b3f-a355-04eeb790cf1a", + "metadata": {}, + "outputs": [], + "source": [ + "resp = maap.submitJob(\n", + " identifier=f\"job-eis-feds-archive:staging\",\n", + " algo_id=\"eis-feds-archive\",\n", + " version=\"staging\",\n", + " username=\"zbecker\", \n", + " queue=\"maap-dps-eis-worker-128gb\",\n", + " run_id=\"TEST_ArchiveCoordinator_Creek\"\n", + ")\n", + "\n", + "resp" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python [conda env:pangeo] *", + "language": "python", + "name": "conda-env-pangeo-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/maap_runtime/run_archive_cli.sh b/maap_runtime/run_archive_cli.sh new file mode 100644 index 0000000..634caff --- /dev/null +++ b/maap_runtime/run_archive_cli.sh @@ -0,0 +1,66 @@ +#!/bin/bash +set -eo pipefail + +copy_s3_object() { + local from_path="$1" + local to_path="$2" + if ! out=$(aws s3 cp "$from_path" "$to_path" 2>&1); then + echo "Copy failed from $from_path to $to_path" >&2 + echo "$out" >&2 + return 0 + else + echo "Copy succeeded from $from_path to $to_path" + fi +} + + +run_id=$1 + +echo "Running script with run_id: $run_id" + +output_dir=${PWD}/output +mkdir "${output_dir}" + +basedir=$( cd "$(dirname "$0")"; pwd -P ) +echo "Basedir: $basedir" +echo "Initial working directory: $(pwd -P)" +echo "conda: $(which conda)" +echo "Python: $(which python)" + +python --version +source activate fire_env +conda list | grep s3fs + +handle_exit() { + popd + echo "Copying log to special output dir" + cp "$basedir/../running.log" "$output_dir" + # force the calling process to know we've encountered an error put this in DPS failed state + exit 128 +} + +trap 'handle_exit' EXIT + +pushd "$basedir" +echo "Running in directory: $(pwd -P)" +# we now secretly look for s3://maap-ops-workspace/shared/gsfc_landslides/FEDSpreprocessed//.env +# and copy it locally to ../fireatlas/.env so that pydantic can pick up our overrides +copy_s3_object "s3://maap-ops-workspace/shared/gsfc_landslides/FEDSpreprocessed/${run_id}/.env" ../fireatlas/.env +copy_s3_object "s3://maap-ops-workspace/shared/zbecker/FEDSstaging/FEDSinput/run_definitions/${run_id}/run_config.yaml" ../fireatlas/run_config.yaml +ls -lah ../fireatlas/ + +python FireRunArchiveCoordinator.py "$run_id" + +popd +echo "Copying log to special output dir" +cp "$basedir/../running.log" "$output_dir" + +# unset trap since we are successful and send exit +trap - EXIT +echo "Done!" +exit 0 + + + + + diff --git a/maap_runtime/run_dps_build.sh b/maap_runtime/run_dps_build.sh index 39c8b5e..340a1c5 100644 --- a/maap_runtime/run_dps_build.sh +++ b/maap_runtime/run_dps_build.sh @@ -10,10 +10,9 @@ python --version # where mamba should be default resolver pushd "$basedir" -conda create -n "fire_env" python=3.11 +conda env create -f ../env.yml source activate fire_env -echo "Doing fireatlas install next..." -/opt/conda/envs/fire_env/bin/pip install -e .. -/opt/conda/envs/fire_env/bin/pip install "git+https://github.com/MAAP-Project/maap-py.git@develop" +echo "Installing maap-py..." +/opt/conda/envs/fire_env/bin/pip install "git+https://github.com/MAAP-Project/maap-py.git@master" diff --git a/pyproject.toml b/pyproject.toml index 42365ff..1d204e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,14 +23,14 @@ classifiers = [ keywords = ["eis", "fireatlas"] dependencies = [ "xarray", - "pandas", + "pandas<3", "dask", "distributed", - "geopandas==0.14.4", + "geopandas<1", "fiona", "shapely", "fsspec", - "s3fs==2024.6.0", + "s3fs>=2024.6", "rtree", "scikit-learn", "pyarrow", @@ -38,11 +38,11 @@ dependencies = [ "rasterio", "pyproj", "tqdm", - "scalene", - "awscli", - "numpy<2.0.0", + "scalene", + "numpy<2", "pyogrio", - "requests" + "requests", + "aiobotocore[boto3]==2.24.1" ] # [project.urls] @@ -69,6 +69,11 @@ test = [ "pytest", "pytest-cov", ] +dev = [ + "pytest", + "pytest-cov", + "ipykernel" +] [tool.coverage.run] omit = [ diff --git a/tests/test_firetime.py b/tests/test_firetime.py new file mode 100644 index 0000000..7d661cd --- /dev/null +++ b/tests/test_firetime.py @@ -0,0 +1,102 @@ +import pytest +from fireatlas.FireTime import aprox_local_datetime, dt2t +import datetime as dt +import pandas as pd + + +""" +Test cases: + +Single values +Pandas series + +On UTC line (no change) +Positive change +negative change + +After that: run Ofunato again to check? + +""" + +def test_aprox_local_scalar_values_positive_longitude(): + """Test with single datetime and positive longitude.""" + utc_time = dt.datetime(2025, 1, 1, 12, 0, 0) # Noon UTC + lon = 15.0 # Should add 1 hour + + result = aprox_local_datetime(utc_time, lon) + expected = dt.datetime(2025, 1, 1, 13, 0, 0) + + assert result == expected + +def test_aprox_local_scalar_values_negative_longitude(): + """Test with single datetime and negative longitude (western hemisphere).""" + utc_time = dt.datetime(2025, 1, 1, 12, 0, 0) # Noon UTC + lon = -75.0 # Should subtract 5 hours + + result = aprox_local_datetime(utc_time, lon) + expected = dt.datetime(2025, 1, 1, 7, 0, 0) + + assert result == expected + +def test_aprox_local_scalar_values_zero_longitude(): + """Test with longitude = 0 (Prime Meridian).""" + utc_time = dt.datetime(2025, 1, 1, 12, 0, 0) + lon = 0.0 + + result = aprox_local_datetime(utc_time, lon) + + assert result == utc_time + +def test_aprox_local_crossing_dateline_forward(): + """Test with different expected day""" + utc = dt.datetime(2025, 1, 1, 22, 0, 0) + lon = 180.0 + res = aprox_local_datetime(utc, lon) + expected = dt.datetime(2025, 1, 2, 10, 0, 0) + assert res == expected + +def test_aprox_local_crossing_dateline_backward(): + """Test with different expected day""" + utc = dt.datetime(2025, 1, 1, 10, 0, 0) + lon = -180.0 + res = aprox_local_datetime(utc, lon) + expected = dt.datetime(2024, 12, 31, 22, 0, 0) + assert res == expected + +def test_aprox_local_series(): + """Test with series inputs""" + utc = pd.Series([ + dt.datetime(2025, 1, 1, 12, 0, 0), + dt.datetime(2025, 1, 1, 12, 0, 0), + dt.datetime(2025, 1, 1, 12, 0, 0) + ]) + + lons = pd.Series([15.0, -30.0, 45.0]) + + res = aprox_local_datetime(utc, lons) + expected = pd.Series([ + dt.datetime(2025, 1, 1, 13, 0 ,0), # +1 hour + dt.datetime(2025, 1, 1, 10, 0, 0), # -2 hours + dt.datetime(2025, 1, 1, 15, 0, 0) # + 3 + ]) + + pd.testing.assert_series_equal(res, expected) + +def test_dt2t_simple(): + """Test with old version (00:00:00 and 12:00:00 only)""" + am = dt.datetime(2025, 1, 1, 0, 0, 0) + pm = dt.datetime(2025, 1, 1, 12, 0, 0) + + assert dt2t(am) == [2025, 1, 1, "AM"] + assert dt2t(pm) == [2025, 1, 1, "PM"] + +def test_dt2t(): + + assert dt2t(dt.datetime(2025, 1, 1, 7, 0, 0)) == [2025, 1, 1, "PM"] + assert dt2t(dt.datetime(2025, 1, 1, 17, 59, 59)) == [2025, 1, 1, "PM"] + assert dt2t(dt.datetime(2025, 1, 1, 18, 0, 0 )) == [2025, 1, 2, "AM"] + assert dt2t(dt.datetime(2025, 1, 2, 6, 59, 59)) == [2025, 1, 2, "AM"] + assert dt2t(dt.datetime(2025, 1, 2, 7, 0, 0)) == [2025, 1, 2, "PM"] + assert dt2t(dt.datetime(2025, 1, 2, 17, 59, 59)) == [2025, 1, 2, "PM"] + assert dt2t(dt.datetime(2025, 1, 2, 18, 0, 0)) == [2025, 1, 3, "AM"] + \ No newline at end of file