Skip to content

Commit

Permalink
auto_templating in GeospatialScenario
Browse files Browse the repository at this point in the history
  • Loading branch information
tobin-ford committed Nov 8, 2024
1 parent a4630af commit 1e58f57
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 39 deletions.
79 changes: 79 additions & 0 deletions pvdeg/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
Private API, should only be used in PVDeg implemenation files.
"""

import functools
import inspect
import warnings

def geospatial_quick_shape(numeric_or_timeseries: bool, shape_names: list[str]) -> None:
"""
Expand Down Expand Up @@ -57,3 +60,79 @@ def decorator(func):
return func

return decorator

# Taken from: https://stackoverflow.com/questions/2536307/decorators-in-the-python-standard-lib-deprecated-specifically
# A future Python version (after 3.13) will include the warnings.deprecated decorator
def deprecated(reason):
"""
This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used.
"""

string_types = (type(b''), type(u''))

if isinstance(reason, string_types):

# The @deprecated is used with a 'reason'.
#
# .. code-block:: python
#
# @deprecated("please, use another function")
# def old_function(x, y):
# pass

def decorator(func1):

if inspect.isclass(func1):
fmt1 = "Call to deprecated class {name} ({reason})."
else:
fmt1 = "Call to deprecated function {name} ({reason})."

@functools.wraps(func1)
def new_func1(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning)
warnings.warn(
fmt1.format(name=func1.__name__, reason=reason),
category=DeprecationWarning,
stacklevel=2
)
warnings.simplefilter('default', DeprecationWarning)
return func1(*args, **kwargs)

return new_func1

return decorator

elif inspect.isclass(reason) or inspect.isfunction(reason):

# The @deprecated is used without any 'reason'.
#
# .. code-block:: python
#
# @deprecated
# def old_function(x, y):
# pass

func2 = reason

if inspect.isclass(func2):
fmt2 = "Call to deprecated class {name}."
else:
fmt2 = "Call to deprecated function {name}."

@functools.wraps(func2)
def new_func2(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning)
warnings.warn(
fmt2.format(name=func2.__name__),
category=DeprecationWarning,
stacklevel=2
)
warnings.simplefilter('default', DeprecationWarning)
return func2(*args, **kwargs)

return new_func2

else:
raise TypeError(repr(type(reason)))
33 changes: 29 additions & 4 deletions pvdeg/geospatial.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
humidity,
letid,
utilities,
decorators,
)

import xarray as xr
Expand Down Expand Up @@ -328,6 +329,7 @@ def output_template(

# This has been replaced with pvdeg.geospatial.auto_templates inside of pvdeg.geospatial.analysis.
# it is here for completeness. it can be removed.
@decorators.deprecated(reason="use geospatial.auto_template or create a template with geospatial.output_template")
def template_parameters(func):
"""
Output parameters for xarray template.
Expand Down Expand Up @@ -445,6 +447,28 @@ def zero_template(

return res

def can_auto_template(func) -> None:
"""
Check if we can use `geospatial.auto_template on a given function.
Raise an error if the function was not declared with the `@geospatial_quick_shape` decorator.
No error raised if we can run `geospatial.auto_template` on provided function, `func`.
Parameters
----------
func: callable
function to create template from.
Returns
-------
None
"""
if not (hasattr(func, "numeric_or_timeseries") and hasattr(func, "shape_names")):
raise ValueError(
f"{func.__name__} cannot be autotemplated. create a template manually"
)



def auto_template(func: Callable, ds_gids: xr.Dataset) -> xr.Dataset:
"""
Expand Down Expand Up @@ -484,10 +508,11 @@ def auto_template(func: Callable, ds_gids: xr.Dataset) -> xr.Dataset:
Template for output data.
"""

if not (hasattr(func, "numeric_or_timeseries") and hasattr(func, "shape_names")):
raise ValueError(
f"{func.__name__} cannot be autotemplated. create a template manually"
)
can_auto_template(func=func)
# if not (hasattr(func, "numeric_or_timeseries") and hasattr(func, "shape_names")):
# raise ValueError(
# f"{func.__name__} cannot be autotemplated. create a template manually"
# )

if func.numeric_or_timeseries == 0:
shapes = {datavar: ("gid",) for datavar in func.shape_names}
Expand Down
103 changes: 68 additions & 35 deletions pvdeg/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -1594,49 +1594,88 @@ def set_geospatial_data(self, weather_ds: xr.Dataset, meta_df: pd.DataFrame ) ->
self.weather_data, self.meta_data = weather_ds, meta_df


# def addJob(
# self,
# func: Callable = None,
# func_params: dict = {},
# see_added: bool = False,
# ):
# """
# Add a pvdeg function to the scenario pipeline

# Parameters:
# -----------
# func : function
# pvdeg function to use for geospatial analysis.
# *Note: geospatial analysis is only available with a limited subset of pvdeg
# functions*
# Current supported functions for geospatial analysis: ``pvdeg.standards.standoff``,
# ``pvdeg.humidity.module``, ``pvdeg.letid.calc_letid_outdoors``
# func_params : dict
# job specific keyword argument dictionary to provide to the function
# see_added : bool
# set flag to get a userWarning notifying the user of the job added
# to the pipeline in method call. ``default = False``
# """
# try:
# pvdeg.geospatial.template_parameters(func)
# except ValueError:
# return ValueError(
# f"{func.__name__} does does not have a valid geospatial results template or does not exist"
# )

# geo_job_dict = {"geospatial_job": {"job": func, "params": func_params}}

# self.pipeline = geo_job_dict

# if see_added:
# message = f"{func.__name__} added to pipeline as \n {geo_job_dict}"
# warnings.warn(message, UserWarning)

def addJob(
self,
func: Callable = None,
func: Callable,
template: xr.Dataset = None,
func_params: dict = {},
see_added: bool = False,
):
see_added: bool = False
) -> None:
"""
Add a pvdeg function to the scenario pipeline
Add a pvdeg geospatial function to the scenario pipeline. If no template is provided, `addJob` attempts to use `geospatial.auto_template` this will raise an
Parameters:
-----------
func : function
pvdeg function to use for geospatial analysis.
*Note: geospatial analysis is only available with a limited subset of pvdeg
functions*
Current supported functions for geospatial analysis: ``pvdeg.standards.standoff``,
``pvdeg.humidity.module``, ``pvdeg.letid.calc_letid_outdoors``
pvdeg function to use for geospatial analysis.
template : xarray.Dataset
Template for output data. Only required if a function is not supported by `geospatial.auto_template`.
func_params : dict
job specific keyword argument dictionary to provide to the function
see_added : bool
set flag to get a userWarning notifying the user of the job added
to the pipeline in method call. ``default = False``
to the pipeline in method call. ``default = False``
"""
try:
pvdeg.geospatial.template_parameters(func)
except ValueError:
return ValueError(
f"{func.__name__} does does not have a valid geospatial results template or does not exist"
)

geo_job_dict = {"geospatial_job": {"job": func, "params": func_params}}
if template is None:

# take the weather datapoints specified by metadata and create a template based on them.
geo_weather_sub = self.weather_data.sel(gid=self.meta_data.index)
template = pvdeg.geospatial.auto_template(func=func, ds_gids=self.weather_data)

self.pipeline = geo_job_dict
self.template = template
self.func = func
self.func_params = func_params

if see_added:
message = f"{func.__name__} added to pipeline as \n {geo_job_dict}"
message = f"{func.__name__} added to scenario with arguments {func_params} using template: {template}"
warnings.warn(message, UserWarning)



def run(self, hpc_worker_conf: Optional[dict] = None) -> None:
"""
Run the function in the geospatial pipeline.
GeospatialScenario only supports one geospatial pipeline job at a time
unlike Scenario which supports unlimited conventional pipeline jobs.
Run the geospatial scenario stored in the geospatial scenario object.
Only supports one function at a time. Unlike `Scenario` which supports unlimited conventional pipeline jobs.
Results are stored in the `GeospatialScenario.results` attribute.
Creates a dask cluster or client using the hpc_worker_conf parameter.
Expand Down Expand Up @@ -1675,20 +1714,14 @@ def run(self, hpc_worker_conf: Optional[dict] = None) -> None:
"""
client = pvdeg.geospatial.start_dask(hpc=hpc_worker_conf)

geo_weather_sub = self.weather_data.sel(gid=self.meta_data.index)

func = self.pipeline["geospatial_job"]["job"]

if func == pvdeg.standards.standoff or func == pvdeg.humidity.module:
geo = {
"func": func,
"weather_ds": geo_weather_sub,
"meta_df": self.meta_data,
}

analysis_result = pvdeg.geospatial.analysis(**geo)
analysis_result = pvdeg.geospatial.analysis(
weather_ds=self.weather_data,
meta_df=self.meta_data,
func=self.func,
template=self.template, # provided or generated via autotemplate in GeospatialScenario.addJob
)

self.results = analysis_result
self.results = analysis_result

client.shutdown()

Expand Down

0 comments on commit 1e58f57

Please sign in to comment.