Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Weather Downloads (NSRDB, PVGIS) #143

Open
wants to merge 15 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ coverage.xml
.hypothesis/
.pytest_cache/

# pvdeg scenario output files generated during testing
tests/pvd_*

# Translations
*.mo
*.pot
Expand Down
6 changes: 6 additions & 0 deletions docs/source/_autosummary/pvdeg.weather.emtpy_weather_ds.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pvdeg.weather.emtpy\_weather\_ds
================================

.. currentmodule:: pvdeg.weather

.. autofunction:: emtpy_weather_ds
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pvdeg.weather.weather\_distributed
==================================

.. currentmodule:: pvdeg.weather

.. autofunction:: weather_distributed
1 change: 1 addition & 0 deletions pvdeg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from . import montecarlo
from .scenario import Scenario, GeospatialScenario
from . import spectral
from . import store
from . import symbolic
from . import standards
from . import temperature
Expand Down
3 changes: 3 additions & 0 deletions pvdeg/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
DATA_DIR = PVDEG_DIR / "data"
TEST_DIR = PVDEG_DIR.parent / "tests"
TEST_DATA_DIR = PVDEG_DIR.parent / "tests" / "data"

# downloader target directory
METOROLOGICAL_DOWNLOAD_PATH = Path.home() / "PVDeg-Meteorological"

# DATA_LIBRARY = PVDEG_DIR.parent / "DataLibrary"
# if not os.path.isdir(DATA_LIBRARY):
Expand Down
4 changes: 2 additions & 2 deletions pvdeg/geospatial.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ def meta_KDtree(meta_df, leaf_size=40, fp=None):
Create a sklearn.neighbors.KDTree for fast geospatial lookup operations.
Requires Scikit Learn library. Not included in pvdeg depency list.

Parameters:
Parameters
-----------
meta_df: pd.DataFrame
Dataframe of metadata as generated by pvdeg.weather.get for geospatial
Expand All @@ -629,7 +629,7 @@ def meta_KDtree(meta_df, leaf_size=40, fp=None):
If none, no file saved. must be ``.pkl`` file extension. Open saved
pkl file with joblib (sklearn dependency).

Returns:
Returns
--------
kdtree: sklearn.neighbors.KDTree
kdtree containing latitude-longitude pairs for quick lookups
Expand Down
21 changes: 21 additions & 0 deletions pvdeg/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,27 @@ def classify_mountains_weights(
self.meta_data["mountain"] = (self.meta_data.index).isin(gids)
return

def meta_KDtree(self, leaf_size=40, fp=None):
"""
Create a KDTree from this scenario's current metadata

Parameters
----------
leaf_size:
Number of points at which to switch to brute-force. See sci kit docs.
fp: str, optional
Location to save pickled kdtree so we don't have to rebuild the tree.
If none, no file saved. must be ``.pkl`` file extension. Open saved
pkl file with joblib (sklearn dependency).

Returns
--------
kdtree: sklearn.neighbors.KDTree
kdtree containing latitude-longitude pairs for quick lookups
"""

return pvdeg.geospatial.meta_KDtree(meta_df=self.meta_data, leaf_size=leaf_size, fp=fp)

def classify_feature(
self,
kdtree=None,
Expand Down
288 changes: 288 additions & 0 deletions pvdeg/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
from pathlib import Path
import xarray as xr
import pandas as pd
import numpy as np
import dask.array as da
import zarr
import os

from pvdeg import METOROLOGICAL_DOWNLOAD_PATH

# THESE HAVE BEEN MOVED TO GEOGRIDFUSION
# def get(group):
# """
# Extract a weather xarray dataset and metadata pandas dataframe from your zarr store.
# `get` pulls the entire datastore into these objects. PVDeg does not make indexing available at this stage.
# This is practical because all datavariables are stored in dask arrays so they are loaded lazily instead of into memmory when this is called.
# Choose the points you need after this method is called by using `sel`, `isel`, `loc, `iloc`.

# `store.get` is meant to match the API of other geospatial weather api's from pvdeg like `pvdeg.weather.get`, `pvdeg.weather.distributed_weather`, `GeospatialScenario.get_geospatial_data`

# Parameters
# -----------
# group : str
# name of the group to access from your local zarr store.
# Groups are created automatically in your store when you save data using `pvdeg.store.store`.

# *From `pvdeg.store.store` docstring*
# Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min"

# Returns
# -------
# weather_ds : xr.Dataset
# Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory.
# meta_df : pd.DataFrame
# Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds.
# """

# combined_ds = xr.open_zarr(
# store=METOROLOGICAL_DOWNLOAD_PATH,
# group=group
# )

# weather_ds, meta_df = _seperate_geo_weather_meta(ds_from_zarr=combined_ds)

# return weather_ds, meta_df

# def store(weather_ds, meta_df):
# """
# Add geospatial meteorolical data to your zarr store. Data will be saved to the correct group based on its periodicity.

# Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min"

# Parameters
# -----------
# weather_ds : xr.Dataset
# Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory.
# meta_df : pd.DataFrame
# Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds.

# Returns
# --------
# None
# """

# group = meta_df.iloc[0]["Source"]
# rows_per_entry = weather_ds.isel(gid=0).time.size

# if rows_per_entry == 8760:
# periodicity = "1hr"
# elif rows_per_entry == 17520:
# periodicity = "30min"
# elif rows_per_entry == 35040:
# periodicity = "15min"
# else:
# raise ValueError(f"first location to store has {rows_per_entry} rows, must have 8670, 17520, 35040 rows")

# combined_ds = _combine_geo_weather_meta(weather_ds, meta_df)

# if not os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")): # no zstore in directory
# print("Creating Zarr")

# combined_ds.to_zarr(
# store=METOROLOGICAL_DOWNLOAD_PATH,
# group=f"{group}-{periodicity}",
# mode="w-", # only use for first time creating store
# )
# else: # store already exists
# print("adding to store")

# print("opening store")
# stored_ds = xr.open_zarr(
# store=METOROLOGICAL_DOWNLOAD_PATH,
# group=f"{group}-{periodicity}",
# # consolidated=True
# )

# lat_lon_gid_2d_map = _make_coords_to_gid_da(ds_from_zarr=stored_ds)

# for gid, values in meta_df.iterrows():

# target_lat = values["latitude"]
# target_lon = values["longitude"]

# lat_exists = np.any(lat_lon_gid_2d_map.latitude == target_lat)
# lon_exists = np.any(lat_lon_gid_2d_map.longitude == target_lon)

# if lat_exists and lon_exists:
# print("(lat, lon) exists already")

# raise NotImplementedError

# # stored_gid = lat_lon_gid_2d_map.sel(latitude=target_lat, longitude=target_lon)

# # # overwrite previous value at that lat-lon, keeps old gid

# # # need to set the gid of the current "sheet" to the stored gid
# # updated_entry = combined_ds.loc[{"gid": gid}].assign_coords({"gid": stored_gid}) # this may need to be a list s.t. [stored_gid]
# # # loc may remove the gid dimension so we might have to add it back with .expand_dims

# # # overwrite the current entry at the gid = stored_gid entry of the zarr
# # updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode='w')


# else: # coordinate pair doesnt exist and it needs to be added, this will be a HEAVY operation
# print("add entry to dataset")

# # we are trying to save 1 "sheet" of weather (weather at a single gid)
# # need to update the index to fit into the stored data after we concatenate

# # this concatenates along the the gid axis
# # gid has no guarantee of being unqiue but duplicate gids are fine for xarray
# # we slice so we can get a Dataset with dimensions of (gid, time) indexing to grab one gid will drop the gid dimension
# new_gid = stored_ds.sizes["gid"]

# weather_sheet = combined_ds.sel(gid=slice(gid))
# updated_entry = weather_sheet.assign_coords({"gid": [new_gid]})
# updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode="a", append_dim="gid")

# print(f"dataset saved to zarr store at {METOROLOGICAL_DOWNLOAD_PATH}")


# def check_store():
# """Check if you have a zarr store at the default download path defined in pvdeg.config"""
# if os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")):

# size = sum(f.stat().st_size for f in METOROLOGICAL_DOWNLOAD_PATH.glob('**/*') if f.is_file())

# print(f"""
# You have a zarr store at {METOROLOGICAL_DOWNLOAD_PATH}.

# It has {size} bytes.
# """)

# elif os.path.exists(METOROLOGICAL_DOWNLOAD_PATH):
# print(f"You have a directory but no zarr store at {METOROLOGICAL_DOWNLOAD_PATH}")

# else:
# raise FileNotFoundError(f"No Directory exists at {METOROLOGICAL_DOWNLOAD_PATH}. Data has not been saved here.")


def my_path():
"""Finds path to your zarr store of data if it exists"""
if os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zattrs")):
print(METOROLOGICAL_DOWNLOAD_PATH)

else:
print("Directory not found")

def _combine_geo_weather_meta(
weather_ds: xr.Dataset,
meta_df: pd.DataFrame
):
"""Combine weather dataset and meta dataframe into a single dataset"""

# if meta_df.index.name == 'index':
meta_ds = xr.Dataset.from_dataframe(meta_df).rename({'index' : 'gid'})


combined = xr.merge([weather_ds, meta_ds])#.assign_coords(
# latitude=("gid", meta_ds.latitude.values),
# longitude=('gid', meta_ds.longitude.values),
# )

combined["Source"] = combined["Source"].astype(str) # save as strings

return combined


def _seperate_geo_weather_meta(
ds_from_zarr: xr.Dataset,
):
"""
Take loaded dataset in the zarr store schema (weather and meta combined)
and seperate it into `weather_ds` and `meta_df`.
"""

ds_from_zarr["Source"] = ds_from_zarr["Source"].astype(object) # geospatial.mapblocks needs this to be an object

# there may be a more optimal way to do this
data = np.column_stack(
[
ds_from_zarr.gid.to_numpy().astype(int),
ds_from_zarr.latitude.to_numpy().astype(float),
ds_from_zarr.longitude.to_numpy().astype(float),
ds_from_zarr.altitude.to_numpy().astype(float),
ds_from_zarr.Source.to_numpy(),
ds_from_zarr.wind_height.to_numpy(),
]
)

seperated_meta_df = pd.DataFrame(data, columns=["gid", "latitude", "longitude", "altitude", "Source", "wind_height"]).set_index("gid")

seperated_weather_ds = ds_from_zarr.drop_vars(("latitude", "longitude", "altitude", "Source", "wind_height"))

return seperated_weather_ds, seperated_meta_df

def _make_coords_to_gid_da(
ds_from_zarr: xr.Dataset
):
"""Create a 2D indexable array that maps coordinates (lat and lon) to gid stored in zarr store"""


# only want to do this if the arrays are dask arrays
lats = ds_from_zarr.latitude.to_numpy()
lons = ds_from_zarr.longitude.to_numpy()

gids = -1 * da.empty((len(lats), len(lons)))

points = xr.DataArray(
data=gids,
coords={
"latitude": lats,
"longitude": lons
},
dims=["latitude", "longitude"],
)

points.set_index(latitude="latitude", longitude="longitude")

return points

def _create_sample_sheet(fill_value, latitude: float=999, longitude: float=999, altitude: float=-1, wind_height: int=-1, Source: str="SampleSheet"):
"""
Create a dummy sample dataset containing weather for one gid. This will be called a sheet, a single location of weather_data from the dataset with the gid coordinate still present.

The sizes of the dimensions of the sheet will be {"gid": 1, "time": 8760}

Parameters
-----------
fill_value: numeric
value to populate weather_ds single sheet with
latitude: float
dummy latitude WSG84
longitude: float
dummy longitude WSG84
altitude: float
dummy altitude of measured data [m]
wind_height: int
dummy height of measure sample dataset's wind measurement

Returns
--------
sheet_ds : xr.Dataset
Dummy weather data sheet for a single location using a dask array backend. As mentioned above this will look maintain the gid coordinate.
meta_df : pd.DataFrame
Dummy metadata for test location in pandas.DataFrame.
"""

meta_dict = {
'latitude': latitude,
'longitude': longitude,
'altitude': altitude,
'wind_height': wind_height,
'Source': Source
}

meta_df = pd.DataFrame(meta_dict, index=[0])

sheet_ds = pvgis_hourly_empty_weather_ds(gids_size=1)

dummy_da = da.full(shape=(1,sheet_ds.sizes["time"]), fill_value=fill_value)

for var in sheet_ds.data_vars:

dim = sheet_ds[var].dims
sheet_ds[var] = (dim, dummy_da)

return sheet_ds, meta_df
Loading