Skip to content

Commit

Permalink
GeoGridFusion modifications
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobin Ford committed Jan 13, 2025
1 parent 79072fc commit 00ae56a
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 116 deletions.
233 changes: 117 additions & 116 deletions pvdeg/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,153 +8,154 @@

from pvdeg import METOROLOGICAL_DOWNLOAD_PATH

def get(group):
"""
Extract a weather xarray dataset and metadata pandas dataframe from your zarr store.
`get` pulls the entire datastore into these objects. PVDeg does not make indexing available at this stage.
This is practical because all datavariables are stored in dask arrays so they are loaded lazily instead of into memmory when this is called.
Choose the points you need after this method is called by using `sel`, `isel`, `loc, `iloc`.
# THESE HAVE BEEN MOVED TO GEOGRIDFUSION
# def get(group):
# """
# Extract a weather xarray dataset and metadata pandas dataframe from your zarr store.
# `get` pulls the entire datastore into these objects. PVDeg does not make indexing available at this stage.
# This is practical because all datavariables are stored in dask arrays so they are loaded lazily instead of into memmory when this is called.
# Choose the points you need after this method is called by using `sel`, `isel`, `loc, `iloc`.

# `store.get` is meant to match the API of other geospatial weather api's from pvdeg like `pvdeg.weather.get`, `pvdeg.weather.distributed_weather`, `GeospatialScenario.get_geospatial_data`

`store.get` is meant to match the API of other geospatial weather api's from pvdeg like `pvdeg.weather.get`, `pvdeg.weather.distributed_weather`, `GeospatialScenario.get_geospatial_data`
# Parameters
# -----------
# group : str
# name of the group to access from your local zarr store.
# Groups are created automatically in your store when you save data using `pvdeg.store.store`.

Parameters
-----------
group : str
name of the group to access from your local zarr store.
Groups are created automatically in your store when you save data using `pvdeg.store.store`.
# *From `pvdeg.store.store` docstring*
# Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min"

*From `pvdeg.store.store` docstring*
Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min"
# Returns
# -------
# weather_ds : xr.Dataset
# Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory.
# meta_df : pd.DataFrame
# Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds.
# """

Returns
-------
weather_ds : xr.Dataset
Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory.
meta_df : pd.DataFrame
Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds.
"""

combined_ds = xr.open_zarr(
store=METOROLOGICAL_DOWNLOAD_PATH,
group=group
)
# combined_ds = xr.open_zarr(
# store=METOROLOGICAL_DOWNLOAD_PATH,
# group=group
# )

weather_ds, meta_df = _seperate_geo_weather_meta(ds_from_zarr=combined_ds)
# weather_ds, meta_df = _seperate_geo_weather_meta(ds_from_zarr=combined_ds)

return weather_ds, meta_df
# return weather_ds, meta_df

def store(weather_ds, meta_df):
"""
Add geospatial meteorolical data to your zarr store. Data will be saved to the correct group based on its periodicity.
# def store(weather_ds, meta_df):
# """
# Add geospatial meteorolical data to your zarr store. Data will be saved to the correct group based on its periodicity.

Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min"
# Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min"

Parameters
-----------
weather_ds : xr.Dataset
Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory.
meta_df : pd.DataFrame
Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds.
# Parameters
# -----------
# weather_ds : xr.Dataset
# Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory.
# meta_df : pd.DataFrame
# Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds.

Returns
--------
None
"""
# Returns
# --------
# None
# """

group = meta_df.iloc[0]["Source"]
rows_per_entry = weather_ds.isel(gid=0).time.size
# group = meta_df.iloc[0]["Source"]
# rows_per_entry = weather_ds.isel(gid=0).time.size

if rows_per_entry == 8760:
periodicity = "1hr"
elif rows_per_entry == 17520:
periodicity = "30min"
elif rows_per_entry == 35040:
periodicity = "15min"
else:
raise ValueError(f"first location to store has {rows_per_entry} rows, must have 8670, 17520, 35040 rows")
# if rows_per_entry == 8760:
# periodicity = "1hr"
# elif rows_per_entry == 17520:
# periodicity = "30min"
# elif rows_per_entry == 35040:
# periodicity = "15min"
# else:
# raise ValueError(f"first location to store has {rows_per_entry} rows, must have 8670, 17520, 35040 rows")

combined_ds = _combine_geo_weather_meta(weather_ds, meta_df)
# combined_ds = _combine_geo_weather_meta(weather_ds, meta_df)

if not os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")): # no zstore in directory
print("Creating Zarr")
# if not os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")): # no zstore in directory
# print("Creating Zarr")

combined_ds.to_zarr(
store=METOROLOGICAL_DOWNLOAD_PATH,
group=f"{group}-{periodicity}",
mode="w-", # only use for first time creating store
)
else: # store already exists
print("adding to store")
# combined_ds.to_zarr(
# store=METOROLOGICAL_DOWNLOAD_PATH,
# group=f"{group}-{periodicity}",
# mode="w-", # only use for first time creating store
# )
# else: # store already exists
# print("adding to store")

print("opening store")
stored_ds = xr.open_zarr(
store=METOROLOGICAL_DOWNLOAD_PATH,
group=f"{group}-{periodicity}",
# consolidated=True
)
# print("opening store")
# stored_ds = xr.open_zarr(
# store=METOROLOGICAL_DOWNLOAD_PATH,
# group=f"{group}-{periodicity}",
# # consolidated=True
# )

lat_lon_gid_2d_map = _make_coords_to_gid_da(ds_from_zarr=stored_ds)
# lat_lon_gid_2d_map = _make_coords_to_gid_da(ds_from_zarr=stored_ds)

for gid, values in meta_df.iterrows():
# for gid, values in meta_df.iterrows():

target_lat = values["latitude"]
target_lon = values["longitude"]
# target_lat = values["latitude"]
# target_lon = values["longitude"]

lat_exists = np.any(lat_lon_gid_2d_map.latitude == target_lat)
lon_exists = np.any(lat_lon_gid_2d_map.longitude == target_lon)
# lat_exists = np.any(lat_lon_gid_2d_map.latitude == target_lat)
# lon_exists = np.any(lat_lon_gid_2d_map.longitude == target_lon)

if lat_exists and lon_exists:
print("(lat, lon) exists already")
# if lat_exists and lon_exists:
# print("(lat, lon) exists already")

raise NotImplementedError
# raise NotImplementedError

# stored_gid = lat_lon_gid_2d_map.sel(latitude=target_lat, longitude=target_lon)
# # stored_gid = lat_lon_gid_2d_map.sel(latitude=target_lat, longitude=target_lon)

# # overwrite previous value at that lat-lon, keeps old gid
# # # overwrite previous value at that lat-lon, keeps old gid

# # need to set the gid of the current "sheet" to the stored gid
# updated_entry = combined_ds.loc[{"gid": gid}].assign_coords({"gid": stored_gid}) # this may need to be a list s.t. [stored_gid]
# # loc may remove the gid dimension so we might have to add it back with .expand_dims
# # # need to set the gid of the current "sheet" to the stored gid
# # updated_entry = combined_ds.loc[{"gid": gid}].assign_coords({"gid": stored_gid}) # this may need to be a list s.t. [stored_gid]
# # # loc may remove the gid dimension so we might have to add it back with .expand_dims

# # overwrite the current entry at the gid = stored_gid entry of the zarr
# updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode='w')
# # # overwrite the current entry at the gid = stored_gid entry of the zarr
# # updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode='w')


else: # coordinate pair doesnt exist and it needs to be added, this will be a HEAVY operation
print("add entry to dataset")
# else: # coordinate pair doesnt exist and it needs to be added, this will be a HEAVY operation
# print("add entry to dataset")

# we are trying to save 1 "sheet" of weather (weather at a single gid)
# need to update the index to fit into the stored data after we concatenate
# # we are trying to save 1 "sheet" of weather (weather at a single gid)
# # need to update the index to fit into the stored data after we concatenate

# this concatenates along the the gid axis
# gid has no guarantee of being unqiue but duplicate gids are fine for xarray
# we slice so we can get a Dataset with dimensions of (gid, time) indexing to grab one gid will drop the gid dimension
new_gid = stored_ds.sizes["gid"]
# # this concatenates along the the gid axis
# # gid has no guarantee of being unqiue but duplicate gids are fine for xarray
# # we slice so we can get a Dataset with dimensions of (gid, time) indexing to grab one gid will drop the gid dimension
# new_gid = stored_ds.sizes["gid"]

weather_sheet = combined_ds.sel(gid=slice(gid))
updated_entry = weather_sheet.assign_coords({"gid": [new_gid]})
updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode="a", append_dim="gid")
# weather_sheet = combined_ds.sel(gid=slice(gid))
# updated_entry = weather_sheet.assign_coords({"gid": [new_gid]})
# updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode="a", append_dim="gid")

print(f"dataset saved to zarr store at {METOROLOGICAL_DOWNLOAD_PATH}")
# print(f"dataset saved to zarr store at {METOROLOGICAL_DOWNLOAD_PATH}")


def check_store():
"""Check if you have a zarr store at the default download path defined in pvdeg.config"""
if os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")):
# def check_store():
# """Check if you have a zarr store at the default download path defined in pvdeg.config"""
# if os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")):

size = sum(f.stat().st_size for f in METOROLOGICAL_DOWNLOAD_PATH.glob('**/*') if f.is_file())
# size = sum(f.stat().st_size for f in METOROLOGICAL_DOWNLOAD_PATH.glob('**/*') if f.is_file())

print(f"""
You have a zarr store at {METOROLOGICAL_DOWNLOAD_PATH}.
# print(f"""
# You have a zarr store at {METOROLOGICAL_DOWNLOAD_PATH}.

It has {size} bytes.
""")
# It has {size} bytes.
# """)

elif os.path.exists(METOROLOGICAL_DOWNLOAD_PATH):
print(f"You have a directory but no zarr store at {METOROLOGICAL_DOWNLOAD_PATH}")
# elif os.path.exists(METOROLOGICAL_DOWNLOAD_PATH):
# print(f"You have a directory but no zarr store at {METOROLOGICAL_DOWNLOAD_PATH}")

else:
raise FileNotFoundError(f"No Directory exists at {METOROLOGICAL_DOWNLOAD_PATH}. Data has not been saved here.")
# else:
# raise FileNotFoundError(f"No Directory exists at {METOROLOGICAL_DOWNLOAD_PATH}. Data has not been saved here.")


def my_path():
Expand All @@ -175,10 +176,10 @@ def _combine_geo_weather_meta(
meta_ds = xr.Dataset.from_dataframe(meta_df).rename({'index' : 'gid'})


combined = xr.merge([weather_ds, meta_ds]).assign_coords(
latitude=("gid", meta_ds.latitude.values),
longitude=('gid', meta_ds.longitude.values),
)
combined = xr.merge([weather_ds, meta_ds])#.assign_coords(
# latitude=("gid", meta_ds.latitude.values),
# longitude=('gid', meta_ds.longitude.values),
# )

combined["Source"] = combined["Source"].astype(str) # save as strings

Expand All @@ -198,10 +199,10 @@ def _seperate_geo_weather_meta(
# there may be a more optimal way to do this
data = np.column_stack(
[
ds_from_zarr.gid.to_numpy(),
ds_from_zarr.latitude.to_numpy(),
ds_from_zarr.longitude.to_numpy(),
ds_from_zarr.altitude.to_numpy(),
ds_from_zarr.gid.to_numpy().astype(int),
ds_from_zarr.latitude.to_numpy().astype(float),
ds_from_zarr.longitude.to_numpy().astype(float),
ds_from_zarr.altitude.to_numpy().astype(float),
ds_from_zarr.Source.to_numpy(),
ds_from_zarr.wind_height.to_numpy(),
]
Expand Down
9 changes: 9 additions & 0 deletions pvdeg/weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Collection of classes and functions to obtain spectral parameters.
"""

import dask.distributed
from pvdeg import humidity
from pvdeg.utilities import nrel_kestrel_check

Expand Down Expand Up @@ -1182,6 +1183,13 @@ def weather_distributed(
"""

import dask.delayed
import dask.distributed

try:
client = dask.distributed.get_client()
print("Connected to a Dask scheduler | Dashboard:", client.dashboard_link)
except ValueError:
raise RuntimeError("No Dask scheduler found. Ensure a dask client is running.")

if (database != "PVGIS" and database != "PSM3"):
raise NotImplementedError(f"Only 'PVGIS' and 'PSM3' are implemented, you entered {database}")
Expand All @@ -1190,6 +1198,7 @@ def weather_distributed(
results = dask.compute(futures)[0] # values are returned in a list with one entry

# what is the difference between these two approaches for dask distributed work, how can we schedule differently
# i believe futures might be better for our needs
# futures = [client.submit(weather_distributed, "PVGIS", coord) for coord in coords]
# client.gather(futures)

Expand Down

0 comments on commit 00ae56a

Please sign in to comment.