From cb49ea350e24c756171e2d1d1bd0909bf6b2a30a Mon Sep 17 00:00:00 2001 From: Ford Date: Thu, 17 Oct 2024 17:40:44 -0600 Subject: [PATCH 01/14] distributed get pvgis using dask --- pvdeg/weather.py | 152 ++++++- scripts/load_pvgis_distributed.ipynb | 622 +++++++++++++++++++++++++++ 2 files changed, 773 insertions(+), 1 deletion(-) create mode 100644 scripts/load_pvgis_distributed.ipynb diff --git a/pvdeg/weather.py b/pvdeg/weather.py index dbe06f7..91b1e42 100644 --- a/pvdeg/weather.py +++ b/pvdeg/weather.py @@ -930,4 +930,154 @@ def get_anywhere(database = "PSM3", id=None, **kwargs): meta = {'result': 'This location was not found in either the NSRDB or PVGIS'} weather_db = {'result': 'NA'} - return weather_db, meta \ No newline at end of file + return weather_db, meta + + +def process_pvgis_distributed(weather_df): + """Create an xarray.Dataset using numpy array backend from a pvgis weather dataframe""" + + import dask.array as da + + weather_df.index.rename('time', inplace=True) + weather_ds = weather_df.to_xarray().drop_vars('time').copy() + + for var in weather_ds.data_vars: + dask_array = da.from_array(weather_ds[var].values, chunks='auto') + + weather_ds[var] = (weather_ds[var].dims, dask_array) + + return weather_ds + +def _weather_distributed_vec( + database: str, + coord: tuple[float], + ): + """ + Distributed weather calculation for use with dask futures/delayed + + Returns ds, dict, None if unsucessfull + Returns None, None, Exception if unsucessfull + """ + + try: + weather_df, meta_dict = get(database, coord) + except Exception as e: + return None, None, e + + weather_ds = process_pvgis_distributed(weather_df=weather_df) + + return weather_ds, meta_dict, None + +def pvgis_empty_weather_ds(gids_size): + """ + Create an empty weather dataset for pvgis hourly TMY data + + Parameters + ---------- + gids_size: int + number of gids, equivalent to number of unique locations + + Returns + ------- + weather_ds: xarray.Dataset + Weather dataset of the same format/shapes given by a `pvdeg.weather.get` geospatial call or `pvdeg.weather.weather_distributed` call or `GeosptialScenario.get_geospatial_data`. + """ + import dask.array as da + + shapes = { + "temp_air": ("gid", "time"), + "relative_humidity": ("gid", "time"), + "ghi": ("gid", "time"), + "dni": ("gid", "time"), + "dhi": ("gid", "time"), + "IR(h)": ("gid", "time"), + "wind_speed": ("gid", "time"), + "wind_direction": ("gid", "time"), + "pressure": ("gid", "time"), + } + attrs = {} + global_attrs = {} + + dims = {'gid', 'time'} + dims_size = {'time': 8760, 'gid': gids_size} + + weather_ds = xr.Dataset( + data_vars={ + var: (dim, da.empty([dims_size[d] for d in dim]), attrs.get(var)) + for var, dim in shapes.items() + }, + coords={'time': pd.date_range("2022-01-01", freq="h", periods=365 * 24), + 'gid': np.linspace(0, gids_size-1, gids_size, dtype=int)}, + attrs=global_attrs, + ) + + return weather_ds + + +def weather_distributed(database, coords): + """ + Grab weather using pvgis for all of the following locations using dask for parallelization. + You must create a dask client with multiple processes before calling this function, otherwise no speedup will be offered. + + PVGIS supports up to 30 requests per second so your dask client should not have more than $x$ workers/threads + that would put you over this limit. + + Parameters + ---------- + database : (str) + 'PVGIS' or 'NSRDB' (not implemented yet) + coords: list[tuple] + list of tuples containing (latitude, longitude) coordinates + + >>> [ + (49.95, 1.5), + (51.95, -9.5), + (51.95, -8.5), + (51.95, -4.5), + (51.95, -3.5), + ] + + Returns + -------- + weather_ds : xr.Dataset + Weather data for all locations requested in an xarray.Dataset using a dask array backend. + meta_df : pd.DataFrame + Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds. + gids_failed: list + list of index failed coordinates in input `coords` + """ + + import dask.delayed + + if (database != "PVGIS"): + raise NotImplementedError(f"Only 'PVGIS' is implemented, you entered {database}") + + futures = [dask.delayed(_weather_distributed_vec)("PVGIS", coord) for coord in coords] + results = dask.compute(futures)[0] # values are returned in a list with one entry + + # what is the difference between these two approaches for dask distributed work, how can we schedule differently + # futures = [client.submit(weather_distributed, "PVGIS", coord) for coord in coords] + # client.gather(futures) + + # results is a 2d list + # results[0] is the weather_ds with dask backend + # results[1] is meta_dict + weather_ds_collection = [row[0] for row in results] + meta_dict_collection = [row[1] for row in results] + + gids_failed = [] + + weather_ds = pvgis_empty_weather_ds(len(results)) # create empty weather xr.dataset + meta_df = pd.DataFrame.from_dict(meta_dict_collection) # create populated meta pd.DataFrame + + # these gids will be spatially meaningless, they will only show corresponding entries between weather_ds and meta_df + for i, row in enumerate(results): # this loop can be refactored, kinda gross + + if row[2]: + gids_failed.append(i) + continue + + # weather_ds[dict(gid=i)] = weather_ds_collection[i].to_xarray().drop_vars('time') + weather_ds[dict(gid=i)] = weather_ds_collection[i] + + return weather_ds, meta_df, gids_failed \ No newline at end of file diff --git a/scripts/load_pvgis_distributed.ipynb b/scripts/load_pvgis_distributed.ipynb new file mode 100644 index 0000000..447e56a --- /dev/null +++ b/scripts/load_pvgis_distributed.ipynb @@ -0,0 +1,622 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import pvdeg\n", + "from global_land_mask import globe\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "from mpl_toolkits.basemap import Basemap\n", + "import pandas as pd\n", + "import xarray as xr\n", + "import dask.array as da\n", + "import dask.dataframe as dd\n", + "from dask.distributed import LocalCluster, Client, Lock\n", + "import dask.delayed" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# decrease the arange step size for more fine resolution\n", + "# increase the arange step size for increased granularity\n", + "lon_UK = np.arange(-10.5, 1.76, 1)\n", + "lat_UK = np.arange(49.95, 60, 2)\n", + "lon_grid_UK, lat_grid_UK = np.meshgrid(lon_UK,lat_UK)\n", + "land_UK = globe.is_land(lat_grid_UK, lon_grid_UK)\n", + "\n", + "lon_land_UK = lon_grid_UK[land_UK]\n", + "lat_land_UK = lat_grid_UK[land_UK]\n", + "\n", + "lon_Scan = np.arange(-10.5, 31.6, 0.3)\n", + "lat_Scan = np.arange(60, 71.2, 0.3)\n", + "lon_grid_Scan, lat_grid_Scan = np.meshgrid(lon_Scan,lat_Scan)\n", + "land_Scan = globe.is_land(lat_grid_Scan, lon_grid_Scan)\n", + "\n", + "lon_land_Scan = lon_grid_Scan[land_Scan]\n", + "lat_land_Scan = lat_grid_Scan[land_Scan]" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "c:\\Users\\tford\\AppData\\Local\\miniconda3\\envs\\deg\\lib\\site-packages\\distributed\\node.py:182: UserWarning: Port 8787 is already in use.\n", + "Perhaps you already have a cluster running?\n", + "Hosting the HTTP server on port 60223 instead\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "http://127.0.0.1:60223/status\n" + ] + } + ], + "source": [ + "workers = 4\n", + "\n", + "cluster = LocalCluster(\n", + " n_workers=workers,\n", + " processes=True, \n", + " # number of threads may matter\n", + " )\n", + "\n", + "client = Client(cluster)\n", + "\n", + "print(client.dashboard_link)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "coords = list(zip(lat_land_UK, lon_land_UK)) # easiest way to make a list of the right shape\n", + "\n", + "weather_ds, meta_df, failed_gids = pvdeg.weather.weather_distributed(database=\"PVGIS\", coords=coords)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "c:\\Users\\tford\\AppData\\Local\\miniconda3\\envs\\deg\\lib\\site-packages\\distributed\\client.py:3362: UserWarning: Sending large graph of size 13.27 MiB.\n", + "This may cause some slowdown.\n", + "Consider loading the data with Dask directly\n", + " or using futures or delayed objects to embed the data into the graph without repetition.\n", + "See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.\n", + " warnings.warn(\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
<xarray.Dataset> Size: 14MB\n",
+       "Dimensions:            (gid: 22, time: 8760)\n",
+       "Coordinates:\n",
+       "  * time               (time) datetime64[ns] 70kB 2022-01-01 ... 2022-12-31T2...\n",
+       "  * gid                (gid) int32 88B 0 1 2 3 4 5 6 7 ... 15 16 17 18 19 20 21\n",
+       "Data variables:\n",
+       "    temp_air           (gid, time) float64 2MB 4.11 4.7 5.28 ... 0.35 0.37 0.38\n",
+       "    relative_humidity  (gid, time) float64 2MB 87.27 87.03 86.79 ... 86.37 86.14\n",
+       "    ghi                (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n",
+       "    dni                (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n",
+       "    dhi                (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n",
+       "    IR(h)              (gid, time) float64 2MB 314.4 317.4 320.4 ... 271.9 274.1\n",
+       "    wind_speed         (gid, time) float64 2MB 3.11 3.1 3.08 ... 3.53 3.45 3.37\n",
+       "    wind_direction     (gid, time) float64 2MB 269.0 274.0 279.0 ... 245.0 242.0\n",
+       "    pressure           (gid, time) float64 2MB 1.014e+05 1.014e+05 ... 9.965e+04
" + ], + "text/plain": [ + " Size: 14MB\n", + "Dimensions: (gid: 22, time: 8760)\n", + "Coordinates:\n", + " * time (time) datetime64[ns] 70kB 2022-01-01 ... 2022-12-31T2...\n", + " * gid (gid) int32 88B 0 1 2 3 4 5 6 7 ... 15 16 17 18 19 20 21\n", + "Data variables:\n", + " temp_air (gid, time) float64 2MB 4.11 4.7 5.28 ... 0.35 0.37 0.38\n", + " relative_humidity (gid, time) float64 2MB 87.27 87.03 86.79 ... 86.37 86.14\n", + " ghi (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n", + " dni (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n", + " dhi (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n", + " IR(h) (gid, time) float64 2MB 314.4 317.4 320.4 ... 271.9 274.1\n", + " wind_speed (gid, time) float64 2MB 3.11 3.1 3.08 ... 3.53 3.45 3.37\n", + " wind_direction (gid, time) float64 2MB 269.0 274.0 279.0 ... 245.0 242.0\n", + " pressure (gid, time) float64 2MB 1.014e+05 1.014e+05 ... 9.965e+04" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "weather_ds.compute()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "deg", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.19" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From 74b745c507e74be8853d71fc309b501f1aa20f5e Mon Sep 17 00:00:00 2001 From: Ford Date: Thu, 24 Oct 2024 15:53:15 -0600 Subject: [PATCH 02/14] local db skeleton --- pvdeg/__init__.py | 1 + pvdeg/config.py | 3 + pvdeg/store.py | 185 ++++ pvdeg/weather.py | 9 +- scripts/load_pvgis_distributed.ipynb | 1533 ++++++++++++++++++++++++-- 5 files changed, 1613 insertions(+), 118 deletions(-) create mode 100644 pvdeg/store.py diff --git a/pvdeg/__init__.py b/pvdeg/__init__.py index 0b770fc..adf1fa4 100644 --- a/pvdeg/__init__.py +++ b/pvdeg/__init__.py @@ -14,6 +14,7 @@ from . import montecarlo from .scenario import Scenario, GeospatialScenario from . import spectral +from . import store from . import symbolic from . import standards from . import temperature diff --git a/pvdeg/config.py b/pvdeg/config.py index d40706a..14afbf3 100644 --- a/pvdeg/config.py +++ b/pvdeg/config.py @@ -11,6 +11,9 @@ DATA_DIR = PVDEG_DIR / "data" TEST_DIR = PVDEG_DIR.parent / "tests" TEST_DATA_DIR = PVDEG_DIR.parent / "tests" / "data" + +# downloader target directory +METOROLOGICAL_DOWNLOAD_PATH = Path.home() / "PVDeg-Meteorological" # DATA_LIBRARY = PVDEG_DIR.parent / "DataLibrary" # if not os.path.isdir(DATA_LIBRARY): diff --git a/pvdeg/store.py b/pvdeg/store.py new file mode 100644 index 0000000..518d8ba --- /dev/null +++ b/pvdeg/store.py @@ -0,0 +1,185 @@ +from pathlib import Path +import xarray as xr +import pandas as pd +import numpy as np +import zarr +import os + +from pvdeg import METOROLOGICAL_DOWNLOAD_PATH + +def get(group): + """ + Extract a weather xarray dataset and metadata pandas dataframe from your zarr store. + `get` pulls the entire datastore into these objects. PVDeg does not make indexing available at this stage. + This is practical because all datavariables are stored in dask arrays so they are loaded lazily instead of into memmory when this is called. + Choose the points you need after this method is called by using `sel`, `isel`, `loc, `iloc`. + + `store.get` is meant to match the API of other geospatial weather api's from pvdeg like `pvdeg.weather.get`, `pvdeg.weather.distributed_weather`, `GeospatialScenario.get_geospatial_data` + + Parameters + ----------- + group : str + name of the group to access from your local zarr store. + Groups are created automatically in your store when you save data using `pvdeg.store.store`. + + *From `pvdeg.store.store` docstring* + Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min" + + Returns + ------- + weather_ds : xr.Dataset + Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory. + meta_df : pd.DataFrame + Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds. + """ + + combined_ds = xr.open_zarr( + store=METOROLOGICAL_DOWNLOAD_PATH, + group=group + ) + + weather_ds, meta_df = _seperate_geo_weather_meta(ds_from_zarr=combined_ds) + + return weather_ds, meta_df + +def store(weather_ds, meta_df): + """ + Add geospatial meteorolical data to your zarr store. Data will be saved to the correct group based on its periodicity. + + Hourly PVGIS data will be saved to "PVGIS-1hr", 30 minute PVGIS to "PVGIS-30min", similarly 15 minute PVGIS will be saved to "PVGIS-15min" + + Parameters + ----------- + weather_ds : xr.Dataset + Weather data for all locations requested in an xarray.Dataset using a dask array backend. This may be larger than memory. + meta_df : pd.DataFrame + Pandas DataFrame containing metadata for all requested locations. Each row maps to a single entry in the weather_ds. + + Returns + -------- + None + """ + + group = meta_df.iloc[0]["Source"] + rows_per_entry = weather_ds.isel(gid=0).time.size + + if rows_per_entry == 8760: + periodicity = "1hr" + elif rows_per_entry == 17520: + periodicity = "30min" + elif rows_per_entry == 35040: + periodicity = "15min" + else: + raise ValueError(f"first location to store has {rows_per_entry} rows, must have 8670, 17520, 35040 rows") + + combined_ds = _combine_geo_weather_meta(weather_ds, meta_df) + + # what mode should this be + # we want to add to indexes if need be or overwrite old ones + combined_ds.to_zarr( + store=METOROLOGICAL_DOWNLOAD_PATH, + group=f"{group}-{periodicity}" + ) + + print(f"dataset saved to zarr store at {METOROLOGICAL_DOWNLOAD_PATH}") + + + +def check_store(): + """Check if you have a zarr store at the default download path defined in pvdeg.config""" + if os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zattrs")): + + size = sum(f.stat().st_size for f in METOROLOGICAL_DOWNLOAD_PATH.glob('**/*') if f.is_file()) + + print(f""" + You have a zarr store at {METOROLOGICAL_DOWNLOAD_PATH}. + + It has {size} bytes. + """) + + elif os.path.exists(METOROLOGICAL_DOWNLOAD_PATH): + print(f"You have a directory but no zarr store at {METOROLOGICAL_DOWNLOAD_PATH}") + + else: + raise FileNotFoundError(f"No Directory exists at {METOROLOGICAL_DOWNLOAD_PATH}. Data has not been saved here.") + + +def my_path(): + """Finds path to your zarr store of data if it exists""" + if os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zattrs")): + print(METOROLOGICAL_DOWNLOAD_PATH) + + else: + print("Directory not found") + +def _combine_geo_weather_meta( + weather_ds: xr.Dataset, + meta_df: pd.DataFrame + ): + """Combine weather dataset and meta dataframe into a single dataset""" + + meta_ds = xr.Dataset.from_dataframe(meta_df) + # we could do some encoding scheme here, dont need to store source? unless the zarr compression handles it for us + + meta_ds['gid'] = meta_ds['index'].values.astype(np.int32) + meta_ds = meta_ds.drop_vars(["index"]) + + combined = xr.merge([weather_ds, meta_ds]).assign_coords( + latitude=("gid", meta_ds.latitude.values), + longitude=('gid', meta_ds.longitude.values), + ) + + return combined + + +def _seperate_geo_weather_meta( + ds_from_zarr: xr.Dataset, +): + """ + Take loaded dataset in the zarr store schema (weather and meta combined) + and seperate it into `weather_ds` and `meta_df`. + """ + + # there may be a more optimal way to do this + data = np.column_stack( + [ + ds_from_zarr.gid.to_numpy(), + ds_from_zarr.latitude.to_numpy(), + ds_from_zarr.longitude.to_numpy(), + ds_from_zarr.altitude.to_numpy(), + ds_from_zarr.Source.to_numpy(), + ds_from_zarr.wind_height.to_numpy(), + ] + ) + + seperated_meta_df = pd.DataFrame(data, columns=["gid", "latitude", "longitude", "altitude", "Source", "wind_height"]).set_index("gid") + + seperated_weather_ds = ds_from_zarr.drop_vars(("latitude", "longitude", "altitude", "Source", "wind_height")) + + return seperated_weather_ds, seperated_meta_df + +def _make_coords_to_gid_da( + ds_from_zarr: xr.Dataset + ): + """Create a 2D indexable array that maps coordinates (lat and lon) to gid stored in zarr store""" + + import dask.array as da + + # only want to do this if the arrays are dask arrays + lats = ds_from_zarr.latitude.to_numpy() + lons = ds_from_zarr.longitude.to_numpy() + + gids = -1 * da.empty((len(lats), len(lons))) + + points = xr.DataArray( + data=gids, + coords={ + "latitude": lats, + "longitude": lons + }, + dims=["latitude", "longitude"], + ) + + points.set_index(latitude="latitude", longitude="longitude") + + return points \ No newline at end of file diff --git a/pvdeg/weather.py b/pvdeg/weather.py index 91b1e42..59f7365 100644 --- a/pvdeg/weather.py +++ b/pvdeg/weather.py @@ -1014,13 +1014,20 @@ def pvgis_empty_weather_ds(gids_size): return weather_ds +# add some check to see if a dask client exists +# can force user to pass dask client to ensure it exists + +# if called without dask client we will return a xr.Dataset +# with dask backend that does not appear as if it failed until we compute it def weather_distributed(database, coords): """ Grab weather using pvgis for all of the following locations using dask for parallelization. - You must create a dask client with multiple processes before calling this function, otherwise no speedup will be offered. + You must create a dask client with multiple processes before calling this function, otherwise results will not be properly calculated. PVGIS supports up to 30 requests per second so your dask client should not have more than $x$ workers/threads that would put you over this limit. + + With eventual NSRDB implementation API keys will be an issue, each key is rate limited. Parameters ---------- diff --git a/scripts/load_pvgis_distributed.ipynb b/scripts/load_pvgis_distributed.ipynb index 447e56a..5dc552e 100644 --- a/scripts/load_pvgis_distributed.ipynb +++ b/scripts/load_pvgis_distributed.ipynb @@ -46,27 +46,9 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "c:\\Users\\tford\\AppData\\Local\\miniconda3\\envs\\deg\\lib\\site-packages\\distributed\\node.py:182: UserWarning: Port 8787 is already in use.\n", - "Perhaps you already have a cluster running?\n", - "Hosting the HTTP server on port 60223 instead\n", - " warnings.warn(\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "http://127.0.0.1:60223/status\n" - ] - } - ], + "outputs": [], "source": [ "workers = 4\n", "\n", @@ -94,21 +76,88 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "weather_ds.compute()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Saving Geospatial Data Locally\n", + "\n", + "The goal of `pvdeg.store` is to create a living local database of meteoroligical data that grows overtime as your geospatial data needs grow. To do this `PVDeg` will save to a folder called `PVDeg-Meteorological` your user home directory. For me this is located at `C:\\Users\\tford\\PVDeg-Meteorological`. This directory will contain a `zarr` store, this is a popular format for storing multi-dimensional array data, not dissimilar to `h5` files. It was chosen over `h5` because `zarr` stores arrays in chunked compressed files that make access very easy without opening an entire file like `h5`. This is an oversimplification of the design process but we felt `zarr` was a better fit.\n", + "\n", + "## Store\n", + "\n", + "We can use `pvdeg.store.store` to save geospatial data to our living dataset in the common form provided by `pvdeg`. The data is stored in various groups and subfolders but they will be arranged based on the *source* and *periodicity*.\n", + "\n", + "For example: \n", + " - Hourly PVGIS data will be saved to a group called \"PVGIS-1hr\" \n", + " - 30 minute PVGIS to a group called \"PVGIS-30min\" \n", + " - 15 minute PVGIS will be saved to a group called \"PVGIS-15min\" " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pvdeg.store.store(weather_ds, meta_df)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load\n", + "\n", + "`PVDeg` makes use of `dask` to handle larger than memory datasets. Trandionally, this was only useful in our HPC environment but as your local database grows overtime, it will eventually surpass the limits of your computer's volatile memory. Additionally, `dask` allows us to parallelize geospatial calculations via `pvdeg.geospatial.analysis`. This ability can be utilized on local machines or HPC clusters alike. \n", + "\n", + "`PVDeg` implements the ability to access your local living database via `pvdeg.store.get`. This method takes a string called `group`. Groups are created automatically in your store when you save data using `pvdeg.store.store`. As described in the `pvdeg.store.store` docstring and the *Store* section above, NSRDB will follow a similar scheme but it not implemented yet. \n", + " - Hourly PVGIS data will be saved to a group called \"PVGIS-1hr\" \n", + " - 30 minute PVGIS to a group called \"PVGIS-30min\" \n", + " - 15 minute PVGIS will be saved to a group called \"PVGIS-15min\" " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load PVGIS-1hr Data\n", + "\n", + "The example below shows us loading the hourly tmy data from PVGIS that we gathered and saved to our zarr store in the above cells. This gets us the form of a weather xarray.Dataset (`geo_weather` in this example) and a metadata dataframe (`geo_meta` in this example).\n", + "\n", + "These can be treated like any other geospatial data shown in the `pvdeg` tutorials and tools or documentation." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "geo_weather, geo_meta = pvdeg.store.get(group=\"PVGIS-1hr\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Inspecting the Results\n", + "\n", + "explain *.compute() here*" + ] + }, + { + "cell_type": "code", + "execution_count": 3, "metadata": {}, "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "c:\\Users\\tford\\AppData\\Local\\miniconda3\\envs\\deg\\lib\\site-packages\\distributed\\client.py:3362: UserWarning: Sending large graph of size 13.27 MiB.\n", - "This may cause some slowdown.\n", - "Consider loading the data with Dask directly\n", - " or using futures or delayed objects to embed the data into the graph without repetition.\n", - "See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.\n", - " warnings.warn(\n" - ] - }, { "data": { "text/html": [ @@ -479,76 +528,519 @@ "
<xarray.Dataset> Size: 14MB\n",
        "Dimensions:            (gid: 22, time: 8760)\n",
        "Coordinates:\n",
-       "  * time               (time) datetime64[ns] 70kB 2022-01-01 ... 2022-12-31T2...\n",
        "  * gid                (gid) int32 88B 0 1 2 3 4 5 6 7 ... 15 16 17 18 19 20 21\n",
+       "  * time               (time) datetime64[ns] 70kB 2022-01-01 ... 2022-12-31T2...\n",
        "Data variables:\n",
-       "    temp_air           (gid, time) float64 2MB 4.11 4.7 5.28 ... 0.35 0.37 0.38\n",
-       "    relative_humidity  (gid, time) float64 2MB 87.27 87.03 86.79 ... 86.37 86.14\n",
-       "    ghi                (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n",
-       "    dni                (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n",
-       "    dhi                (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n",
-       "    IR(h)              (gid, time) float64 2MB 314.4 317.4 320.4 ... 271.9 274.1\n",
-       "    wind_speed         (gid, time) float64 2MB 3.11 3.1 3.08 ... 3.53 3.45 3.37\n",
-       "    wind_direction     (gid, time) float64 2MB 269.0 274.0 279.0 ... 245.0 242.0\n",
-       "    pressure           (gid, time) float64 2MB 1.014e+05 1.014e+05 ... 9.965e+04
  • " ], "text/plain": [ " Size: 14MB\n", "Dimensions: (gid: 22, time: 8760)\n", "Coordinates:\n", - " * time (time) datetime64[ns] 70kB 2022-01-01 ... 2022-12-31T2...\n", " * gid (gid) int32 88B 0 1 2 3 4 5 6 7 ... 15 16 17 18 19 20 21\n", + " * time (time) datetime64[ns] 70kB 2022-01-01 ... 2022-12-31T2...\n", "Data variables:\n", - " temp_air (gid, time) float64 2MB 4.11 4.7 5.28 ... 0.35 0.37 0.38\n", - " relative_humidity (gid, time) float64 2MB 87.27 87.03 86.79 ... 86.37 86.14\n", - " ghi (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n", - " dni (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n", - " dhi (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n", - " IR(h) (gid, time) float64 2MB 314.4 317.4 320.4 ... 271.9 274.1\n", - " wind_speed (gid, time) float64 2MB 3.11 3.1 3.08 ... 3.53 3.45 3.37\n", - " wind_direction (gid, time) float64 2MB 269.0 274.0 279.0 ... 245.0 242.0\n", - " pressure (gid, time) float64 2MB 1.014e+05 1.014e+05 ... 9.965e+04" + " IR(h) (gid, time) float64 2MB dask.array\n", + " dhi (gid, time) float64 2MB dask.array\n", + " dni (gid, time) float64 2MB dask.array\n", + " ghi (gid, time) float64 2MB dask.array\n", + " pressure (gid, time) float64 2MB dask.array\n", + " relative_humidity (gid, time) float64 2MB dask.array\n", + " temp_air (gid, time) float64 2MB dask.array\n", + " wind_direction (gid, time) float64 2MB dask.array\n", + " wind_speed (gid, time) float64 2MB dask.array" ] }, - "execution_count": 7, + "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "weather_ds.compute()" + "geo_weather" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, - "outputs": [], - "source": [] + "outputs": [ + { + "data": { + "text/html": [ + "
    \n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
    latitudelongitudealtitudeSourcewind_height
    gid
    049.951.5176.0PVGIS10
    151.95-9.5330.0PVGIS10
    251.95-8.5137.0PVGIS10
    351.95-4.5184.0PVGIS10
    451.95-3.5160.0PVGIS10
    551.95-2.586.0PVGIS10
    651.95-1.5217.0PVGIS10
    751.95-0.594.0PVGIS10
    851.950.564.0PVGIS10
    953.95-9.5119.0PVGIS10
    1053.95-8.5129.0PVGIS10
    1153.95-7.597.0PVGIS10
    1253.95-6.533.0PVGIS10
    1353.95-2.5136.0PVGIS10
    1453.95-1.5115.0PVGIS10
    1553.95-0.533.0PVGIS10
    1655.95-5.5205.0PVGIS10
    1755.95-4.5248.0PVGIS10
    1855.95-3.5122.0PVGIS10
    1955.95-2.5150.0PVGIS10
    2057.95-6.5137.0PVGIS10
    2157.95-4.559.0PVGIS10
    \n", + "
    " + ], + "text/plain": [ + " latitude longitude altitude Source wind_height\n", + "gid \n", + "0 49.95 1.5 176.0 PVGIS 10\n", + "1 51.95 -9.5 330.0 PVGIS 10\n", + "2 51.95 -8.5 137.0 PVGIS 10\n", + "3 51.95 -4.5 184.0 PVGIS 10\n", + "4 51.95 -3.5 160.0 PVGIS 10\n", + "5 51.95 -2.5 86.0 PVGIS 10\n", + "6 51.95 -1.5 217.0 PVGIS 10\n", + "7 51.95 -0.5 94.0 PVGIS 10\n", + "8 51.95 0.5 64.0 PVGIS 10\n", + "9 53.95 -9.5 119.0 PVGIS 10\n", + "10 53.95 -8.5 129.0 PVGIS 10\n", + "11 53.95 -7.5 97.0 PVGIS 10\n", + "12 53.95 -6.5 33.0 PVGIS 10\n", + "13 53.95 -2.5 136.0 PVGIS 10\n", + "14 53.95 -1.5 115.0 PVGIS 10\n", + "15 53.95 -0.5 33.0 PVGIS 10\n", + "16 55.95 -5.5 205.0 PVGIS 10\n", + "17 55.95 -4.5 248.0 PVGIS 10\n", + "18 55.95 -3.5 122.0 PVGIS 10\n", + "19 55.95 -2.5 150.0 PVGIS 10\n", + "20 57.95 -6.5 137.0 PVGIS 10\n", + "21 57.95 -4.5 59.0 PVGIS 10" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "geo_meta" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Geospatial Calculations from Locally Stored Data\n", + "\n", + "As shown above we can load from our `zarr` store and treat it like any other geospatial data in `pvdeg`.\n", + "\n", + "For demonstration we can run the analysis below to estimate effective standoff height and operating temperatures for the provided data. It may look like the `geo_res` contains empty results but that is because we did not have input data for all of the points in the input grid (think of this as a 2D plane formed between the latitude and longitude axes). Clicking on the stack of three circles in the bottom cell will expand the datavariable (like an attribute of the multidimensional array structure) and show the results.\n", + "\n", + "Additionally, we can interpolate and plot the results." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "The array tilt angle was not provided, therefore the latitude tilt of 50.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 52.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 52.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 52.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 52.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 52.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 52.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 52.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 52.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 54.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 54.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 54.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 54.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 54.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 54.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 54.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 56.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 56.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 56.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 56.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 58.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n", + "The array tilt angle was not provided, therefore the latitude tilt of 58.0 was used.\n", + "The array azimuth was not provided, therefore an azimuth of 180.0 was used.\n" + ] + } + ], + "source": [ + "func=pvdeg.standards.standoff\n", + "\n", + "template = pvdeg.geospatial.auto_template(\n", + " func=func,\n", + " ds_gids=geo_weather\n", + ")\n", + "\n", + "geo_res = pvdeg.geospatial.analysis(\n", + " weather_ds=geo_weather,\n", + " meta_df=geo_meta,\n", + " func=func,\n", + " template=template\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
    \n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
    <xarray.Dataset> Size: 2kB\n",
    +       "Dimensions:    (latitude: 5, longitude: 12)\n",
    +       "Coordinates:\n",
    +       "  * latitude   (latitude) float64 40B 49.95 51.95 53.95 55.95 57.95\n",
    +       "  * longitude  (longitude) float64 96B -9.5 -8.5 -7.5 -6.5 ... -1.5 -0.5 0.5 1.5\n",
    +       "Data variables:\n",
    +       "    x          (latitude, longitude) float64 480B nan nan nan ... nan nan nan\n",
    +       "    T98_0      (latitude, longitude) float64 480B nan nan nan ... nan nan nan\n",
    +       "    T98_inf    (latitude, longitude) float64 480B nan nan nan ... nan nan nan
    " + ], + "text/plain": [ + " Size: 2kB\n", + "Dimensions: (latitude: 5, longitude: 12)\n", + "Coordinates:\n", + " * latitude (latitude) float64 40B 49.95 51.95 53.95 55.95 57.95\n", + " * longitude (longitude) float64 96B -9.5 -8.5 -7.5 -6.5 ... -1.5 -0.5 0.5 1.5\n", + "Data variables:\n", + " x (latitude, longitude) float64 480B nan nan nan ... nan nan nan\n", + " T98_0 (latitude, longitude) float64 480B nan nan nan ... nan nan nan\n", + " T98_inf (latitude, longitude) float64 480B nan nan nan ... nan nan nan" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "geo_res" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This plot lacks information on the area and does not include political boundary lines. For more information on plotting look at the `Scenario - Non-uniform Mountain Downselect.ipynb` tutorial in the tutorials and tools folder." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(
    ,\n", + " )" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
    " + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "pvdeg.geospatial.plot_sparse_analysis(geo_res, data_var='T98_0', method=\"nearest\")" + ] } ], "metadata": { From 33bb216e034c2c5c369191200c7f0fcf9a9b1022 Mon Sep 17 00:00:00 2001 From: Ford Date: Sat, 26 Oct 2024 22:07:26 -0600 Subject: [PATCH 03/14] add new PVGIS location to existing store --- pvdeg/store.py | 96 +++- pvdeg/weather.py | 4 +- scripts/load_pvgis_distributed.ipynb | 772 +++++++++++++++++++++++---- 3 files changed, 758 insertions(+), 114 deletions(-) diff --git a/pvdeg/store.py b/pvdeg/store.py index 518d8ba..864882b 100644 --- a/pvdeg/store.py +++ b/pvdeg/store.py @@ -2,6 +2,7 @@ import xarray as xr import pandas as pd import numpy as np +import dask.array as da import zarr import os @@ -74,20 +75,90 @@ def store(weather_ds, meta_df): combined_ds = _combine_geo_weather_meta(weather_ds, meta_df) - # what mode should this be - # we want to add to indexes if need be or overwrite old ones - combined_ds.to_zarr( - store=METOROLOGICAL_DOWNLOAD_PATH, - group=f"{group}-{periodicity}" - ) + + if not os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")): # no zstore in directory + print("Creating Zarr") + + combined_ds.to_zarr( + store=METOROLOGICAL_DOWNLOAD_PATH, + group=f"{group}-{periodicity}", + ) + else: # store already exists + print("adding to store") + + print("opening store") + stored_ds = xr.open_zarr( + store=METOROLOGICAL_DOWNLOAD_PATH, + group=f"{group}-{periodicity}", + ) + + lat_lon_gid_2d_map = _make_coords_to_gid_da(ds_from_zarr=stored_ds) + + for gid, values in meta_df.iterrows(): + + target_lat = values["latitude"] + target_lon = values["longitude"] + + lat_exists = np.any(lat_lon_gid_2d_map.latitude == target_lat) + lon_exists = np.any(lat_lon_gid_2d_map.longitude == target_lon) + + if lat_exists and lon_exists: + print("(lat, lon) exists already") + stored_gid = lat_lon_gid_2d_map.sel(latitude=target_lat, longitude=target_lon) + + # overwrite previous value at that lat-lon, keeps old gid + + # will this be a view + # how can we assign the value + # cant slice? + stored_ds.sel(gid=stored_gid)[:] = combined_ds.sel(gid=gid).values() + + else: # coordinate pair doesnt exist and it needs to be added, this will be a HEAVY operation + print("add entry to dataset") + + # we are trying to save 1 "sheet" of weather (weather at a single gid) + # need to update the index to fit into the stored data after we concatenate + # we want to update the arbitrary gid in the input (combined_ds) to the next index in the gid array (starts at 0, current_gid + 1 = sizes["gid"] = new gid) + new_gid = stored_ds.sizes["gid"] + + # combined_ds.sel(gid=gid) = combined_ds.sel(gid=gid).assign_coords(gid=[new_gid]) # we may have the issues with this sel returning a view + updated_entry = combined_ds.sel(gid=gid).assign_coords(gid=[new_gid]) + + stored_ds = xr.concat([stored_ds, updated_entry], dim="gid") + + # trigger rechunking + # should this happen outside of the loop + stored_ds = stored_ds.chunk() + + # SAVE DATASET BACK TO STORE + stored_ds.to_zarr(METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode='w') # test with "a" probably wont work print(f"dataset saved to zarr store at {METOROLOGICAL_DOWNLOAD_PATH}") +### THIS NEEDS TO BE DEPRECATED +def _add_entry_to_ds(combined_ds, stored_ds, target_lat, target_lon, gid): + + new_gid = stored_ds.sizes["gid"] # zero indexed so the next index will be the current size + + # new_entry = combined_ds.sel(gid=gid).expand_dims(gid=new_gid) + + # for var in new_entry.data_vars: + # existing_data = stored_ds[var] + # new_data = new_entry[var] + + # updated_data = xr.concat([existing_data, new_data], dim='gid') + stored_ds = xr.concat([stored_ds, combined_ds.sel(gid=gid)], dim="gid") + + # stored_ds[var] = updated_datag + + # stored_ds['latitude'] = xr.concat([stored_ds['latitude'], xr.DataArray([target_lat], dims='gid')], dim='gid') + # stored_ds['longitude'] = xr.concat([stored_ds['longitude'], xr.DataArray([target_lon], dims='gid')], dim='gid') + def check_store(): """Check if you have a zarr store at the default download path defined in pvdeg.config""" - if os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zattrs")): + if os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")): size = sum(f.stat().st_size for f in METOROLOGICAL_DOWNLOAD_PATH.glob('**/*') if f.is_file()) @@ -118,17 +189,15 @@ def _combine_geo_weather_meta( ): """Combine weather dataset and meta dataframe into a single dataset""" - meta_ds = xr.Dataset.from_dataframe(meta_df) - # we could do some encoding scheme here, dont need to store source? unless the zarr compression handles it for us - - meta_ds['gid'] = meta_ds['index'].values.astype(np.int32) - meta_ds = meta_ds.drop_vars(["index"]) + meta_ds = xr.Dataset.from_dataframe(meta_df).rename({'index' : 'gid'}) combined = xr.merge([weather_ds, meta_ds]).assign_coords( latitude=("gid", meta_ds.latitude.values), longitude=('gid', meta_ds.longitude.values), ) + combined["Source"] = combined["Source"].astype(str) # save as strings + return combined @@ -140,6 +209,8 @@ def _seperate_geo_weather_meta( and seperate it into `weather_ds` and `meta_df`. """ + ds_from_zarr["Source"] = ds_from_zarr["Source"].astype(object) # geospatial.mapblocks needs this to be an object + # there may be a more optimal way to do this data = np.column_stack( [ @@ -163,7 +234,6 @@ def _make_coords_to_gid_da( ): """Create a 2D indexable array that maps coordinates (lat and lon) to gid stored in zarr store""" - import dask.array as da # only want to do this if the arrays are dask arrays lats = ds_from_zarr.latitude.to_numpy() diff --git a/pvdeg/weather.py b/pvdeg/weather.py index 59f7365..4917283 100644 --- a/pvdeg/weather.py +++ b/pvdeg/weather.py @@ -968,7 +968,7 @@ def _weather_distributed_vec( return weather_ds, meta_dict, None -def pvgis_empty_weather_ds(gids_size): +def pvgis_hourly_empty_weather_ds(gids_size): """ Create an empty weather dataset for pvgis hourly TMY data @@ -1074,7 +1074,7 @@ def weather_distributed(database, coords): gids_failed = [] - weather_ds = pvgis_empty_weather_ds(len(results)) # create empty weather xr.dataset + weather_ds = pvgis_hourly_empty_weather_ds(len(results)) # create empty weather xr.dataset meta_df = pd.DataFrame.from_dict(meta_dict_collection) # create populated meta pd.DataFrame # these gids will be spatially meaningless, they will only show corresponding entries between weather_ds and meta_df diff --git a/scripts/load_pvgis_distributed.ipynb b/scripts/load_pvgis_distributed.ipynb index 5dc552e..edc99d7 100644 --- a/scripts/load_pvgis_distributed.ipynb +++ b/scripts/load_pvgis_distributed.ipynb @@ -46,16 +46,23 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "http://127.0.0.1:8787/status\n" + ] + } + ], "source": [ "workers = 4\n", "\n", "cluster = LocalCluster(\n", " n_workers=workers,\n", " processes=True, \n", - " # number of threads may matter\n", " )\n", "\n", "client = Client(cluster)\n", @@ -76,9 +83,486 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/html": [ + "
    \n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
    <xarray.Dataset> Size: 14MB\n",
    +       "Dimensions:            (gid: 22, time: 8760)\n",
    +       "Coordinates:\n",
    +       "  * time               (time) datetime64[ns] 70kB 2022-01-01 ... 2022-12-31T2...\n",
    +       "  * gid                (gid) int32 88B 0 1 2 3 4 5 6 7 ... 15 16 17 18 19 20 21\n",
    +       "Data variables:\n",
    +       "    temp_air           (gid, time) float64 2MB 4.11 4.7 5.28 ... 0.35 0.37 0.38\n",
    +       "    relative_humidity  (gid, time) float64 2MB 87.27 87.03 86.79 ... 86.37 86.14\n",
    +       "    ghi                (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n",
    +       "    dni                (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n",
    +       "    dhi                (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n",
    +       "    IR(h)              (gid, time) float64 2MB 314.4 317.4 320.4 ... 271.9 274.1\n",
    +       "    wind_speed         (gid, time) float64 2MB 3.11 3.1 3.08 ... 3.53 3.45 3.37\n",
    +       "    wind_direction     (gid, time) float64 2MB 269.0 274.0 279.0 ... 245.0 242.0\n",
    +       "    pressure           (gid, time) float64 2MB 1.014e+05 1.014e+05 ... 9.965e+04
    " + ], + "text/plain": [ + " Size: 14MB\n", + "Dimensions: (gid: 22, time: 8760)\n", + "Coordinates:\n", + " * time (time) datetime64[ns] 70kB 2022-01-01 ... 2022-12-31T2...\n", + " * gid (gid) int32 88B 0 1 2 3 4 5 6 7 ... 15 16 17 18 19 20 21\n", + "Data variables:\n", + " temp_air (gid, time) float64 2MB 4.11 4.7 5.28 ... 0.35 0.37 0.38\n", + " relative_humidity (gid, time) float64 2MB 87.27 87.03 86.79 ... 86.37 86.14\n", + " ghi (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n", + " dni (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n", + " dhi (gid, time) float64 2MB 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0\n", + " IR(h) (gid, time) float64 2MB 314.4 317.4 320.4 ... 271.9 274.1\n", + " wind_speed (gid, time) float64 2MB 3.11 3.1 3.08 ... 3.53 3.45 3.37\n", + " wind_direction (gid, time) float64 2MB 269.0 274.0 279.0 ... 245.0 242.0\n", + " pressure (gid, time) float64 2MB 1.014e+05 1.014e+05 ... 9.965e+04" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "weather_ds.compute()" ] @@ -103,9 +587,36 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating Zarr\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "c:\\Users\\tford\\AppData\\Local\\miniconda3\\envs\\deg\\lib\\site-packages\\distributed\\client.py:3362: UserWarning: Sending large graph of size 13.27 MiB.\n", + "This may cause some slowdown.\n", + "Consider loading the data with Dask directly\n", + " or using futures or delayed objects to embed the data into the graph without repetition.\n", + "See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "dataset saved to zarr store at C:\\Users\\tford\\PVDeg-Meteorological\n" + ] + } + ], "source": [ "pvdeg.store.store(weather_ds, meta_df)" ] @@ -137,7 +648,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 8, "metadata": {}, "outputs": [], "source": [ @@ -150,12 +661,12 @@ "source": [ "### Inspecting the Results\n", "\n", - "explain *.compute() here*" + "explain *.compute() and dask here*" ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 9, "metadata": {}, "outputs": [ { @@ -539,11 +1050,11 @@ " relative_humidity (gid, time) float64 2MB dask.array<chunksize=(22, 8760), meta=np.ndarray>\n", " temp_air (gid, time) float64 2MB dask.array<chunksize=(22, 8760), meta=np.ndarray>\n", " wind_direction (gid, time) float64 2MB dask.array<chunksize=(22, 8760), meta=np.ndarray>\n", - " wind_speed (gid, time) float64 2MB dask.array<chunksize=(22, 8760), meta=np.ndarray>