diff --git a/pvdeg/store.py b/pvdeg/store.py index cc7644fa..16e8ee32 100644 --- a/pvdeg/store.py +++ b/pvdeg/store.py @@ -8,153 +8,154 @@ from pvdeg import METOROLOGICAL_DOWNLOAD_PATH -def get(group): - """ - Extract a weather xarray dataset and metadata pandas dataframe from your zarr store. - `get` pulls the entire datastore into these objects. PVDeg does not make indexing available at this stage. - This is practical because all datavariables are stored in dask arrays so they are loaded lazily instead of into memmory when this is called. - Choose the points you need after this method is called by using `sel`, `isel`, `loc, `iloc`. +# THESE HAVE BEEN MOVED TO GEOGRIDFUSION +# def get(group): +# """ +# Extract a weather xarray dataset and metadata pandas dataframe from your zarr store. +# `get` pulls the entire datastore into these objects. PVDeg does not make indexing available at this stage. +# This is practical because all datavariables are stored in dask arrays so they are loaded lazily instead of into memmory when this is called. +# Choose the points you need after this method is called by using `sel`, `isel`, `loc, `iloc`. + +# `store.get` is meant to match the API of other geospatial weather api's from pvdeg like `pvdeg.weather.get`, `pvdeg.weather.distributed_weather`, `GeospatialScenario.get_geospatial_data` - `store.get` is meant to match the API of other geospatial weather api's from pvdeg like `pvdeg.weather.get`, `pvdeg.weather.distributed_weather`, `GeospatialScenario.get_geospatial_data` +# Parameters +# ----------- +# group : str +# name of the group to access from your local zarr store. +# Groups are created automatically in your store when you save data using `pvdeg.store.store`. - Parameters - ----------- - group : str - name of the group to access from your local zarr store. - Groups are created automatically in your store when you save data using `pvdeg.store.store`. +# *From `pvdeg.store.store` docstring* +# Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min" - *From `pvdeg.store.store` docstring* - Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min" +# Returns +# ------- +# weather_ds : xr.Dataset +# Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory. +# meta_df : pd.DataFrame +# Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds. +# """ - Returns - ------- - weather_ds : xr.Dataset - Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory. - meta_df : pd.DataFrame - Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds. - """ - - combined_ds = xr.open_zarr( - store=METOROLOGICAL_DOWNLOAD_PATH, - group=group - ) +# combined_ds = xr.open_zarr( +# store=METOROLOGICAL_DOWNLOAD_PATH, +# group=group +# ) - weather_ds, meta_df = _seperate_geo_weather_meta(ds_from_zarr=combined_ds) +# weather_ds, meta_df = _seperate_geo_weather_meta(ds_from_zarr=combined_ds) - return weather_ds, meta_df +# return weather_ds, meta_df -def store(weather_ds, meta_df): - """ - Add geospatial meteorolical data to your zarr store. Data will be saved to the correct group based on its periodicity. +# def store(weather_ds, meta_df): +# """ +# Add geospatial meteorolical data to your zarr store. Data will be saved to the correct group based on its periodicity. - Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min" +# Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min" - Parameters - ----------- - weather_ds : xr.Dataset - Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory. - meta_df : pd.DataFrame - Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds. +# Parameters +# ----------- +# weather_ds : xr.Dataset +# Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory. +# meta_df : pd.DataFrame +# Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds. - Returns - -------- - None - """ +# Returns +# -------- +# None +# """ - group = meta_df.iloc[0]["Source"] - rows_per_entry = weather_ds.isel(gid=0).time.size +# group = meta_df.iloc[0]["Source"] +# rows_per_entry = weather_ds.isel(gid=0).time.size - if rows_per_entry == 8760: - periodicity = "1hr" - elif rows_per_entry == 17520: - periodicity = "30min" - elif rows_per_entry == 35040: - periodicity = "15min" - else: - raise ValueError(f"first location to store has {rows_per_entry} rows, must have 8670, 17520, 35040 rows") +# if rows_per_entry == 8760: +# periodicity = "1hr" +# elif rows_per_entry == 17520: +# periodicity = "30min" +# elif rows_per_entry == 35040: +# periodicity = "15min" +# else: +# raise ValueError(f"first location to store has {rows_per_entry} rows, must have 8670, 17520, 35040 rows") - combined_ds = _combine_geo_weather_meta(weather_ds, meta_df) +# combined_ds = _combine_geo_weather_meta(weather_ds, meta_df) - if not os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")): # no zstore in directory - print("Creating Zarr") +# if not os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")): # no zstore in directory +# print("Creating Zarr") - combined_ds.to_zarr( - store=METOROLOGICAL_DOWNLOAD_PATH, - group=f"{group}-{periodicity}", - mode="w-", # only use for first time creating store - ) - else: # store already exists - print("adding to store") +# combined_ds.to_zarr( +# store=METOROLOGICAL_DOWNLOAD_PATH, +# group=f"{group}-{periodicity}", +# mode="w-", # only use for first time creating store +# ) +# else: # store already exists +# print("adding to store") - print("opening store") - stored_ds = xr.open_zarr( - store=METOROLOGICAL_DOWNLOAD_PATH, - group=f"{group}-{periodicity}", - # consolidated=True - ) +# print("opening store") +# stored_ds = xr.open_zarr( +# store=METOROLOGICAL_DOWNLOAD_PATH, +# group=f"{group}-{periodicity}", +# # consolidated=True +# ) - lat_lon_gid_2d_map = _make_coords_to_gid_da(ds_from_zarr=stored_ds) +# lat_lon_gid_2d_map = _make_coords_to_gid_da(ds_from_zarr=stored_ds) - for gid, values in meta_df.iterrows(): +# for gid, values in meta_df.iterrows(): - target_lat = values["latitude"] - target_lon = values["longitude"] +# target_lat = values["latitude"] +# target_lon = values["longitude"] - lat_exists = np.any(lat_lon_gid_2d_map.latitude == target_lat) - lon_exists = np.any(lat_lon_gid_2d_map.longitude == target_lon) +# lat_exists = np.any(lat_lon_gid_2d_map.latitude == target_lat) +# lon_exists = np.any(lat_lon_gid_2d_map.longitude == target_lon) - if lat_exists and lon_exists: - print("(lat, lon) exists already") +# if lat_exists and lon_exists: +# print("(lat, lon) exists already") - raise NotImplementedError +# raise NotImplementedError - # stored_gid = lat_lon_gid_2d_map.sel(latitude=target_lat, longitude=target_lon) +# # stored_gid = lat_lon_gid_2d_map.sel(latitude=target_lat, longitude=target_lon) - # # overwrite previous value at that lat-lon, keeps old gid +# # # overwrite previous value at that lat-lon, keeps old gid - # # need to set the gid of the current "sheet" to the stored gid - # updated_entry = combined_ds.loc[{"gid": gid}].assign_coords({"gid": stored_gid}) # this may need to be a list s.t. [stored_gid] - # # loc may remove the gid dimension so we might have to add it back with .expand_dims +# # # need to set the gid of the current "sheet" to the stored gid +# # updated_entry = combined_ds.loc[{"gid": gid}].assign_coords({"gid": stored_gid}) # this may need to be a list s.t. [stored_gid] +# # # loc may remove the gid dimension so we might have to add it back with .expand_dims - # # overwrite the current entry at the gid = stored_gid entry of the zarr - # updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode='w') +# # # overwrite the current entry at the gid = stored_gid entry of the zarr +# # updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode='w') - else: # coordinate pair doesnt exist and it needs to be added, this will be a HEAVY operation - print("add entry to dataset") +# else: # coordinate pair doesnt exist and it needs to be added, this will be a HEAVY operation +# print("add entry to dataset") - # we are trying to save 1 "sheet" of weather (weather at a single gid) - # need to update the index to fit into the stored data after we concatenate +# # we are trying to save 1 "sheet" of weather (weather at a single gid) +# # need to update the index to fit into the stored data after we concatenate - # this concatenates along the the gid axis - # gid has no guarantee of being unqiue but duplicate gids are fine for xarray - # we slice so we can get a Dataset with dimensions of (gid, time) indexing to grab one gid will drop the gid dimension - new_gid = stored_ds.sizes["gid"] +# # this concatenates along the the gid axis +# # gid has no guarantee of being unqiue but duplicate gids are fine for xarray +# # we slice so we can get a Dataset with dimensions of (gid, time) indexing to grab one gid will drop the gid dimension +# new_gid = stored_ds.sizes["gid"] - weather_sheet = combined_ds.sel(gid=slice(gid)) - updated_entry = weather_sheet.assign_coords({"gid": [new_gid]}) - updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode="a", append_dim="gid") +# weather_sheet = combined_ds.sel(gid=slice(gid)) +# updated_entry = weather_sheet.assign_coords({"gid": [new_gid]}) +# updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode="a", append_dim="gid") - print(f"dataset saved to zarr store at {METOROLOGICAL_DOWNLOAD_PATH}") +# print(f"dataset saved to zarr store at {METOROLOGICAL_DOWNLOAD_PATH}") -def check_store(): - """Check if you have a zarr store at the default download path defined in pvdeg.config""" - if os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")): +# def check_store(): +# """Check if you have a zarr store at the default download path defined in pvdeg.config""" +# if os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")): - size = sum(f.stat().st_size for f in METOROLOGICAL_DOWNLOAD_PATH.glob('**/*') if f.is_file()) +# size = sum(f.stat().st_size for f in METOROLOGICAL_DOWNLOAD_PATH.glob('**/*') if f.is_file()) - print(f""" - You have a zarr store at {METOROLOGICAL_DOWNLOAD_PATH}. +# print(f""" +# You have a zarr store at {METOROLOGICAL_DOWNLOAD_PATH}. - It has {size} bytes. - """) +# It has {size} bytes. +# """) - elif os.path.exists(METOROLOGICAL_DOWNLOAD_PATH): - print(f"You have a directory but no zarr store at {METOROLOGICAL_DOWNLOAD_PATH}") +# elif os.path.exists(METOROLOGICAL_DOWNLOAD_PATH): +# print(f"You have a directory but no zarr store at {METOROLOGICAL_DOWNLOAD_PATH}") - else: - raise FileNotFoundError(f"No Directory exists at {METOROLOGICAL_DOWNLOAD_PATH}. Data has not been saved here.") +# else: +# raise FileNotFoundError(f"No Directory exists at {METOROLOGICAL_DOWNLOAD_PATH}. Data has not been saved here.") def my_path(): @@ -175,10 +176,10 @@ def _combine_geo_weather_meta( meta_ds = xr.Dataset.from_dataframe(meta_df).rename({'index' : 'gid'}) - combined = xr.merge([weather_ds, meta_ds]).assign_coords( - latitude=("gid", meta_ds.latitude.values), - longitude=('gid', meta_ds.longitude.values), - ) + combined = xr.merge([weather_ds, meta_ds])#.assign_coords( + # latitude=("gid", meta_ds.latitude.values), + # longitude=('gid', meta_ds.longitude.values), + # ) combined["Source"] = combined["Source"].astype(str) # save as strings @@ -198,10 +199,10 @@ def _seperate_geo_weather_meta( # there may be a more optimal way to do this data = np.column_stack( [ - ds_from_zarr.gid.to_numpy(), - ds_from_zarr.latitude.to_numpy(), - ds_from_zarr.longitude.to_numpy(), - ds_from_zarr.altitude.to_numpy(), + ds_from_zarr.gid.to_numpy().astype(int), + ds_from_zarr.latitude.to_numpy().astype(float), + ds_from_zarr.longitude.to_numpy().astype(float), + ds_from_zarr.altitude.to_numpy().astype(float), ds_from_zarr.Source.to_numpy(), ds_from_zarr.wind_height.to_numpy(), ] diff --git a/pvdeg/weather.py b/pvdeg/weather.py index a567869d..eaf5ca5e 100644 --- a/pvdeg/weather.py +++ b/pvdeg/weather.py @@ -2,6 +2,7 @@ Collection of classes and functions to obtain spectral parameters. """ +import dask.distributed from pvdeg import humidity from pvdeg.utilities import nrel_kestrel_check @@ -1182,6 +1183,13 @@ def weather_distributed( """ import dask.delayed + import dask.distributed + + try: + client = dask.distributed.get_client() + print("Connected to a Dask scheduler | Dashboard:", client.dashboard_link) + except ValueError: + raise RuntimeError("No Dask scheduler found. Ensure a dask client is running.") if (database != "PVGIS" and database != "PSM3"): raise NotImplementedError(f"Only 'PVGIS' and 'PSM3' are implemented, you entered {database}") @@ -1190,6 +1198,7 @@ def weather_distributed( results = dask.compute(futures)[0] # values are returned in a list with one entry # what is the difference between these two approaches for dask distributed work, how can we schedule differently + # i believe futures might be better for our needs # futures = [client.submit(weather_distributed, "PVGIS", coord) for coord in coords] # client.gather(futures)