diff --git a/changelog/286.improvement.md b/changelog/286.improvement.md new file mode 100644 index 00000000..c0a66555 --- /dev/null +++ b/changelog/286.improvement.md @@ -0,0 +1 @@ +Added a wrapper for the csg `compose` function to handle input data preparation (remove data which is not needed in the process) and output data handling (set coords and metadata) diff --git a/changelog/323.fix.md b/changelog/323.fix.md new file mode 100644 index 00000000..f6ae4c91 --- /dev/null +++ b/changelog/323.fix.md @@ -0,0 +1,2 @@ +* Fix a pandas stack issue in GHG_inventory_reading +* Fix `skipna` in conversions diff --git a/changelog/323.improvement.md b/changelog/323.improvement.md new file mode 100644 index 00000000..5f0c2e79 --- /dev/null +++ b/changelog/323.improvement.md @@ -0,0 +1 @@ +Added additional non-numerical codes in data reading functions. diff --git a/docs/source/api/index.rst b/docs/source/api/index.rst index 994bf3cd..2b920ada 100644 --- a/docs/source/api/index.rst +++ b/docs/source/api/index.rst @@ -63,6 +63,7 @@ source priorities and matching algorithms. csg.StrategyUnableToProcess csg.SubstitutionStrategy csg.compose + csg.create_composite_source .. currentmodule:: xarray diff --git a/docs/source/data_reading/index.md b/docs/source/data_reading/index.md index fbec3464..00b21733 100644 --- a/docs/source/data_reading/index.md +++ b/docs/source/data_reading/index.md @@ -74,21 +74,26 @@ NaN. For example "IE" stands for "included elsewhere" and thus it has to be mapped to 0 to show that emissions in this timeseries are 0 and not missing. As a default, we use easy rules combined with defined mappings for special cases. -The rules are +The rules are as follows and each data point is tested against the rules in the same order as below. +- If the code contains `FX` it is mapped to `np.nan` - If the code contains `IE` and/or `NO` it is mapped to 0 -- If the code contains `NE` and/or `NA` but neither `IE` nor `NO`, it is mapped to NaN. +- If the code contains `NE` and/or `NA` but neither `IE` nor `NO`, it is mapped to `np.nan`. The special cases are ```python _special_codes = { "C": np.nan, + "CC": np.nan, + "CH4": np.nan, # TODO: move to user passed codes in CRT reading "nan": np.nan, "NaN": np.nan, "-": 0, "NE0": np.nan, + "NE(1)": np.nan, "": np.nan, + "FX": np.nan, } ``` diff --git a/docs/source/data_reading/test_csv_data_sec_cat_if.yaml b/docs/source/data_reading/test_csv_data_sec_cat_if.yaml index 6c26cb42..17d5cfa3 100644 --- a/docs/source/data_reading/test_csv_data_sec_cat_if.yaml +++ b/docs/source/data_reading/test_csv_data_sec_cat_if.yaml @@ -2,9 +2,6 @@ attrs: area: area (ISO3) cat: category (IPCC2006) scen: scenario (general) - sec_cats: - - Class (class) - - Type (type) data_file: test_csv_data_sec_cat_if.csv dimensions: '*': diff --git a/docs/source/usage/csg.md b/docs/source/usage/csg.md index e101c504..14ea8769 100644 --- a/docs/source/usage/csg.md +++ b/docs/source/usage/csg.md @@ -30,6 +30,8 @@ When no missing information is left in the result timeseries, the algorithm term It also terminates if all source timeseries are used, even if missing information is left. +## The `compose` function + The core function to use is the {py:func}`primap2.csg.compose` function. It needs the following input: @@ -111,8 +113,7 @@ priority_definition = primap2.csg.PriorityDefinition( ``` ```{code-cell} ipython3 -# Currently, there is only one strategy implemented, so we use -# the empty selector {}, which matches everything, to configure +# We use the empty selector {}, which matches everything, to configure # to use the substitution strategy for all timeseries. strategy_definition = primap2.csg.StrategyDefinition( strategies=[({}, primap2.csg.SubstitutionStrategy())] @@ -125,6 +126,7 @@ result_ds = primap2.csg.compose( priority_definition=priority_definition, strategy_definition=strategy_definition, progress_bar=None, # The animated progress bar is useless in the generated documentation + ) result_ds @@ -162,3 +164,61 @@ category 1 "lowpop" was preferred. For category 0, the initial timeseries did not contain NaNs, so no filling was needed. For category 1, there was information missing in the initial timeseries, so the lower-priority timeseries was used to fill the holes. + +## The `create_composite_source` wrapper function + +The {py:func}`primap2.csg.compose` function creates a composite time series according to +the given priorities and strategies, but it does not take care of pre- and postprocessing +of the data. It will carry along unnecessary data and the resulting dataset will miss the +priority coordinates. The {py:func}`primap2.csg.create_composite_source` function takes care +of these steps and prepares the input data and completes the output data to a primap2 dataset +with all desired dimensions and metadata. + +The function takes the same inputs as {py:func}`primap2.csg.compose` with additional input to +define pre- and postprocessing: + +* **result_prio_coords** Defines the vales for the priority coordinates in the output dataset. As the +priority coordinates differ for all input sources there is no canonical value +for the result and it has to be explicitly defined +* **metadata** Set metadata values such as title and references + +```{code-cell} ipython3 +result_prio_coords = result_prio_coords = { + "source": {"value": "PRIMAP-test"}, + "scenario": {"value": "HISTORY", "terminology": "PRIMAP"}, + } +metadata = {"references": "test-data", "contact": "test@example.xx"} + +``` + +* **limit_coords** Optional parameter to remove data for coordinate values not needed for the +composition from the input data. The time coordinate is treated separately. +* **time_range** Optional parameter to limit the time coverage of the input data. The input can either be a pandas `DatetimeIndex` or a tuple of `str` or datetime-like in the form (year_from, year_to) where both boundaries are included in the range. Only the overlap of the supplied index or index created from the tuple with the time coordinate of the input dataset will be used. + + +```{code-cell} ipython3 +limit_coords = {'area (ISO3)': ['COL', 'ARG', 'MEX']} +time_range = ("2000", "2010") + +``` + +```{code-cell} ipython3 +complete_result_ds = primap2.csg.create_composite_source( + input_ds, + priority_definition=priority_definition, + strategy_definition=strategy_definition, + result_prio_coords=result_prio_coords, + limit_coords=limit_coords, + time_range=time_range, + metadata=metadata, + progress_bar=None, +) + +complete_result_ds +``` + + +## Filling strategies +Currently the following filling strategies are implemented +* Global least square matching: {py:func}`primap2.csg.GlobalLSStrategy` +* Straight substitution: {py:func}`primap2.csg.SubstitutionStrategy` diff --git a/primap2/_convert.py b/primap2/_convert.py index 1669247a..05ccc741 100644 --- a/primap2/_convert.py +++ b/primap2/_convert.py @@ -179,7 +179,9 @@ def _fill_category( continue # the left-hand side of the conversion formula summed up - lhs = (input_factors * self._da.loc[input_selection]).sum(dim=dim, min_count=1) + lhs = (input_factors * self._da.loc[input_selection]).sum( + dim=dim, min_count=1, skipna=True + ) # the right-hand side of the conversion formula split up rhs = lhs / output_factors @@ -190,7 +192,7 @@ def _fill_category( da = da.reindex({"category (IPCC2006)": new_categories}, fill_value=np.nan) new_output_selection = output_selection.copy() new_output_selection[new_dim] = new_category - da.loc[new_output_selection] = rhs.sum(dim=new_dim, min_count=1) + da.loc[new_output_selection] = rhs.sum(dim=new_dim, min_count=1, skipna=True) return output_selection[new_dim], da else: da.loc[output_selection] = rhs diff --git a/primap2/csg/__init__.py b/primap2/csg/__init__.py index 8610cc70..530877a0 100644 --- a/primap2/csg/__init__.py +++ b/primap2/csg/__init__.py @@ -13,6 +13,7 @@ from ._strategies.exceptions import StrategyUnableToProcess from ._strategies.global_least_squares import GlobalLSStrategy from ._strategies.substitution import SubstitutionStrategy +from ._wrapper import create_composite_source __all__ = [ "GlobalLSStrategy", @@ -21,4 +22,5 @@ "StrategyUnableToProcess", "SubstitutionStrategy", "compose", + "create_composite_source", ] diff --git a/primap2/csg/_wrapper.py b/primap2/csg/_wrapper.py new file mode 100644 index 00000000..9d7e46eb --- /dev/null +++ b/primap2/csg/_wrapper.py @@ -0,0 +1,168 @@ +from datetime import datetime + +import numpy as np +import pandas as pd +import tqdm +import xarray as xr + +from ._compose import compose +from ._models import PriorityDefinition, StrategyDefinition + + +def set_priority_coords( + ds: xr.Dataset, + dims: dict[str, dict[str, str]], +) -> xr.Dataset: + """Set values for priority coordinates in output dataset. + + Parameters + ---------- + ds + Input dataset. + dims + Values to be set for priority coordinates. The format is + {"name": {"value": value, "terminology": terminology}}, where the + terminology is optional. + """ + for dim in dims: + if "terminology" in dims[dim]: + terminology = dims[dim]["terminology"] + else: + terminology = None + ds = ds.pr.expand_dims(dim=dim, coord_value=dims[dim]["value"], terminology=terminology) + return ds + + +def create_composite_source( + input_ds: xr.Dataset, + priority_definition: PriorityDefinition, + strategy_definition: StrategyDefinition, + result_prio_coords: dict[str, dict[str, str]], + limit_coords: dict[str, str | list[str]] | None = None, + time_range: tuple[str | np.datetime64, str | np.datetime64] | pd.DatetimeIndex | None = None, + metadata: dict[str, str] | None = None, + progress_bar: type[tqdm.tqdm] | None = tqdm.tqdm, +) -> xr.Dataset: + """Create a composite data source + + This is a wrapper around `primap2.csg.compose` that prepares the input data and sets result + values for the priority coordinates. + + Parameters + ---------- + input_ds + Dataset containing all input data + priority_definition + Defines the priorities to select timeseries from the input data. Priorities + are formed by a list of selections and are used "from left to right", where the + first matching selection has the highest priority. Each selection has to specify + values for all priority dimensions (so that exactly one timeseries is selected + from the input data), but can also specify other dimensions. That way it is, + e.g., possible to define a different priority for a specific country by listing + it early (i.e. with high priority) before the more general rules which should + be applied for all other countries. + You can also specify the "entity" or "variable" in the selection, which will + limit the rule to a specific entity or variable, respectively. For each + DataArray in the input_data Dataset, the variable is its name, the entity is + the value of the key `entity` in its attrs. + strategy_definition + Defines the filling strategies to be used when filling timeseries with other + timeseries. Again, the priority is defined by a list of selections and + corresponding strategies which are used "from left to right". Selections can use + any dimension and don't have to apply to only one timeseries. For example, to + define a default strategy which should be used for all timeseries unless + something else is configured, configure an empty selection as the last + (rightmost) entry. + You can also specify the "entity" or "variable" in the selection, which will + limit the rule to a specific entity or variable, respectively. For each + DataArray in the input_data Dataset, the variable is its name, the entity is + the value of the key `entity` in its attrs. + result_prio_coords + Defines the vales for the priority coordinates in the output dataset. As the + priority coordinates differ for all input sources there is no canonical value + for the result and it has to be explicitly defined. + limit_coords + Optional parameter to remove data for coordinate values not needed for the + composition from the input data. The time coordinate is treated separately. + time_range + Optional parameter to limit the time coverage of the input data. + Can either be a pandas `DatetimeIndex` or a tuple of `str` or `np.datetime64` in + the form (year_from, year_to) where both boundaries are included in the range. + Only the overlap of the supplied index or index created from the tuple with + the time coordinate of the input dataset will be used. + metadata + Set metadata values such as title and references. + progress_bar + By default, show progress bars using the tqdm package during the + operation. If None, don't show any progress bars. You can supply a class + compatible to tqdm.tqdm's protocol if you want to customize the progress bar. + + Returns + ------- + xr.Dataset with composed data according to the given priority and strategy + definitions + """ + # limit input data to these values + if limit_coords is not None: + if "variable" in limit_coords: + variable = limit_coords.pop("variable") + input_ds = input_ds[variable].pr.loc[limit_coords] + else: + input_ds = input_ds.pr.loc[limit_coords] + + # set time range according to input + if time_range is not None: + time_index = create_time_index(time_range) + time_index = time_index.intersection(input_ds.coords["time"]) + input_ds = input_ds.pr.loc[{"time": time_index}] + + # run compose + result_ds = compose( + input_data=input_ds, + priority_definition=priority_definition, + strategy_definition=strategy_definition, + progress_bar=progress_bar, + ) + + # set priority coordinates + result_ds = set_priority_coords(result_ds, result_prio_coords) + + if metadata is not None: + for key in metadata.keys(): + result_ds.attrs[key] = metadata[key] + + result_ds.pr.ensure_valid() + return result_ds + + +def create_time_index( + time_range: tuple[ + str | np.datetime64 | datetime | pd.Timestamp, str | np.datetime64 | datetime | pd.Timestamp + ] + | pd.DatetimeIndex + | None = None, +) -> pd.DatetimeIndex: + """ + Unify different input options for a time range to a `pd.DatetimeIndex`. + + Parameters + ---------- + time_range + Can either be pandas `DatetimeIndex` or a tuple of `str` or datetime-like in + the form (year_from, year_to) where both boundaries are included in the range. + Only the overlap of the supplied index or index created from the tuple with + the time coordinate of the input dataset will be used. + + Returns + ------- + Pandas DatetimeIndex according to the time range input + """ + + if isinstance(time_range, pd.DatetimeIndex): + time_index = time_range + elif isinstance(time_range, tuple): + time_index = pd.date_range(time_range[0], time_range[1], freq="YS", inclusive="both") + else: + raise ValueError("time_range must be a datetime index or a tuple") + + return time_index diff --git a/primap2/pm2io/_GHG_inventory_reading.py b/primap2/pm2io/_GHG_inventory_reading.py index aa2c834c..d7124886 100644 --- a/primap2/pm2io/_GHG_inventory_reading.py +++ b/primap2/pm2io/_GHG_inventory_reading.py @@ -163,7 +163,7 @@ def nir_convert_df_to_long( if header_long is None: header_long = ["category", "orig_cat_name", "entity", "unit", "time", "data"] - df_stacked = df_nir.stack([0, 1], dropna=False).to_frame() + df_stacked = df_nir.stack([0, 1], future_stack=True).to_frame() df_stacked.insert(0, "year", str(year)) df_stacked = df_stacked.reset_index() df_stacked.columns = header_long diff --git a/primap2/pm2io/_data_reading.py b/primap2/pm2io/_data_reading.py index a8e41a2f..ffbeaf15 100644 --- a/primap2/pm2io/_data_reading.py +++ b/primap2/pm2io/_data_reading.py @@ -1286,11 +1286,15 @@ def rename_columns( _special_codes = { "C": np.nan, + "CC": np.nan, + "CH4": np.nan, # TODO: move to user passed codes in CRT reading "nan": np.nan, "NaN": np.nan, "-": 0, "NE0": np.nan, + "NE(1)": np.nan, "": np.nan, + "FX": np.nan, } @@ -1320,7 +1324,7 @@ def find_str_values_in_data( def parse_code(code: str) -> float: """Parse a string code and return 0 or np.nan based on rules to interpret - the codes. + the codes. Also remove footnote markers "(X)" """ code = code.strip() if code in _special_codes: @@ -1328,19 +1332,28 @@ def parse_code(code: str) -> float: parts = code.split(",") parts = [x.replace(".", "").strip().upper() for x in parts] + if "FX" in parts: + return np.nan if "IE" in parts or "NO" in parts: return 0 - if "NE" in parts or "NA" in parts: + if "NE" in parts or "NA" in parts or "FX" in parts: return np.nan + + # footnote markers + re_foot = re.compile(r"[\-0-9/.,]+(\([0-9]+\))$") + match = re_foot.findall(code) + if match: + return float(code[0 : -len(match[0])]) + raise ValueError(f"Could not parse code: {code!r}.") def create_str_replacement_dict( strs: list[str], user_str_conv: bool | dict[str, float], -) -> dict[str, str]: +) -> dict[str, float]: """Create a dict for replacement of strings by NaN and 0 based on - general rules and user defined rules + general rules and user defined rules. Also remove footnote markers "(X)" """ if isinstance(user_str_conv, bool): if user_str_conv: diff --git a/primap2/tests/csg/test_wrapper.py b/primap2/tests/csg/test_wrapper.py new file mode 100644 index 00000000..01d7db1c --- /dev/null +++ b/primap2/tests/csg/test_wrapper.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python +"""Tests for csg/_wrapper.py""" + +from datetime import datetime +from pathlib import Path + +import pandas as pd + +import primap2.csg +from primap2.csg import create_composite_source +from primap2.csg._wrapper import create_time_index, set_priority_coords +from primap2.tests.utils import assert_ds_aligned_equal + +DATA_PATH = Path(__file__).parent.parent / "data" + + +def test_set_priority_coords(minimal_ds): + prio_coords = {"scenario": {"value": "HISTORY", "terminology": "PRIMAP"}} + + prio_coord_ds = set_priority_coords(minimal_ds, prio_coords) + + assert "scenario (PRIMAP)" in prio_coord_ds.coords + assert prio_coord_ds.coords["scenario (PRIMAP)"].values == ["HISTORY"] + + +def test_create_time_index(): + start = "1990" + end = "2000" + start_dt = datetime.strptime(start, "%Y") + end_dt = datetime.strptime(end, "%Y") + start_ts = pd.Timestamp(start) + end_ts = pd.Timestamp(end) + expected = pd.date_range(start=start, end=end, freq="YS") + + # string tuple + pd.testing.assert_index_equal(create_time_index((start, end)), expected) + + # datatime tuple + pd.testing.assert_index_equal(create_time_index((start_dt, end_dt)), expected) + + # timestamp tuple + pd.testing.assert_index_equal(create_time_index((start_ts, end_ts)), expected) + + # mixed tuple + pd.testing.assert_index_equal(create_time_index((start, end_dt)), expected) + + # DatetimeIndex returned unchanged + pd.testing.assert_index_equal(create_time_index(expected), expected) + + +def test_create_composite_source(): + cat_terminology = "IPCC2006_PRIMAP" + + main_categories = ["1.A", "1.B.2", "2.A", "M.AG.ELV", "M.LULUCF", "4"] + FGAS_categories = ["2"] + + native_entities = ["CO2", "CH4", "N2O", "SF6"] + GWP_entities = ["HFCS"] + GWPs = ["AR6GWP100"] + GWP_variables = [f"{entity} ({GWP})" for entity in GWP_entities for GWP in GWPs] + FGAS_entities = ["SF6", *GWP_entities] + FGAS_variables = ["SF6", *GWP_variables] + variables = native_entities + GWP_variables + # priority + priorities = [ + {"source": "CRF 2023, 240108"}, + {"source": "UNFCCC NAI, 240223"}, + { + "source": "CDIAC 2023, HISTORY", + f"category ({cat_terminology})": ["1.A", "2.A"], + "entity": "CO2", + }, + { + "source": "Andrew cement, HISTORY", + f"category ({cat_terminology})": ["2.A"], + "entity": "CO2", + }, + { + "source": "EI 2023, HISTORY", + f"category ({cat_terminology})": ["1.A", "1.B.2"], + "entity": "CO2", + }, + {"source": "Houghton, HISTORY", f"category ({cat_terminology})": "M.LULUCF"}, + {"source": "FAOSTAT 2023, HISTORY", f"category ({cat_terminology})": ["M.AG.ELV"]}, + {"source": "EDGAR 8.0, HISTORY", "entity": ["CO2", "CH4", "N2O"]}, + { + "source": "EDGAR 7.0, HISTORY", + f"category ({cat_terminology})": FGAS_categories, + "variable": FGAS_variables, + }, + ] + + used_sources = [prio["source"] for prio in priorities] + FGAS_sources = [ + "CRF 2023, 240108", + "CRF 2022, 230510", + "UNFCCC NAI, 240223", + "EDGAR 7.0, HISTORY", + ] + + result_prio_coords = { + "source": {"value": "PRIMAP-test"}, + "scenario": {"value": "HISTORY", "terminology": "PRIMAP"}, + } + + metadata = {"references": "test-data", "contact": "test@example.xx"} + + input_data = primap2.open_dataset(DATA_PATH / "primap2_test_data_v2.5.1_final.nc") + + # we use source as priority dimension, everything else are fixed coordinates. + # we have one country-specific exception for each country in the prioritization + # that's likely a bit more than realistic, but let's aim high + priority_definition = primap2.csg.PriorityDefinition( + priority_dimensions=["source"], + priorities=priorities, + exclude_result=[ + { + "entity": ["CO2", "CH4", "N2O"], + f"category ({cat_terminology})": FGAS_categories, + }, + { + "entity": FGAS_entities, + f"category ({cat_terminology})": main_categories, + }, + ], + ) + + strategies_FGAS = [ + ( + { + "source": FGAS_sources, + "entity": FGAS_entities, + f"category ({cat_terminology})": FGAS_categories, + }, + primap2.csg.GlobalLSStrategy(), + ), + ( + { + "source": FGAS_sources, + "entity": FGAS_entities, + f"category ({cat_terminology})": FGAS_categories, + }, + primap2.csg.SubstitutionStrategy(), + ), + ] + + strategies_CO2CH4N2O = [ + ( + { + "source": used_sources, + "entity": ["CO2", "CH4", "N2O"], + f"category ({cat_terminology})": main_categories, + }, + primap2.csg.GlobalLSStrategy(), + ), + ( + { + "source": used_sources, + "entity": ["CO2", "CH4", "N2O"], + f"category ({cat_terminology})": main_categories, + }, + primap2.csg.SubstitutionStrategy(), + ), + ] + + strategy_definition = primap2.csg.StrategyDefinition( + strategies=strategies_CO2CH4N2O + strategies_FGAS + ) + + test_time_range = ("1962", "2022") # cut a few years to make sure that works + # test_limit_coords = {'entity': ['CO2', 'CH4', 'N2O']} + test_limit_coords = { + "variable": variables, + "category": main_categories + FGAS_categories, + "source": used_sources, + } + + result = create_composite_source( + input_data, + priority_definition=priority_definition, + strategy_definition=strategy_definition, + result_prio_coords=result_prio_coords, + limit_coords=test_limit_coords, + time_range=test_time_range, + progress_bar=None, + metadata=metadata, + ) + + # remove processing info as following functions can't deal with it yet + # in this case to_netcdf can't deal with the None values in processing info + result = result.pr.remove_processing_info() + + # assert results + # load comparison data + comp_filename = "PRIMAP-csg-test.nc" + file_to_load = DATA_PATH / comp_filename + data_comp = primap2.open_dataset(file_to_load) + + assert_ds_aligned_equal(data_comp, result, equal_nan=True) diff --git a/primap2/tests/data/PRIMAP-csg-test.nc b/primap2/tests/data/PRIMAP-csg-test.nc new file mode 100644 index 00000000..266d1459 Binary files /dev/null and b/primap2/tests/data/PRIMAP-csg-test.nc differ diff --git a/primap2/tests/data/primap2_test_data_v2.5.1_final.nc b/primap2/tests/data/primap2_test_data_v2.5.1_final.nc new file mode 100644 index 00000000..32558781 Binary files /dev/null and b/primap2/tests/data/primap2_test_data_v2.5.1_final.nc differ