Skip to content

Commit

Permalink
adding point to existing zarr store
Browse files Browse the repository at this point in the history
  • Loading branch information
tobin-ford committed Oct 28, 2024
1 parent 33bb216 commit 7894b0a
Showing 2 changed files with 3,019 additions and 1,434 deletions.
64 changes: 26 additions & 38 deletions pvdeg/store.py
Original file line number Diff line number Diff line change
@@ -75,13 +75,13 @@ def store(weather_ds, meta_df):

combined_ds = _combine_geo_weather_meta(weather_ds, meta_df)


if not os.path.exists(os.path.join(METOROLOGICAL_DOWNLOAD_PATH, ".zmetadata")): # no zstore in directory
print("Creating Zarr")

combined_ds.to_zarr(
store=METOROLOGICAL_DOWNLOAD_PATH,
group=f"{group}-{periodicity}",
mode="w-", # only use for first time creating store
)
else: # store already exists
print("adding to store")
@@ -90,6 +90,7 @@ def store(weather_ds, meta_df):
stored_ds = xr.open_zarr(
store=METOROLOGICAL_DOWNLOAD_PATH,
group=f"{group}-{periodicity}",
# consolidated=True
)

lat_lon_gid_2d_map = _make_coords_to_gid_da(ds_from_zarr=stored_ds)
@@ -104,57 +105,42 @@ def store(weather_ds, meta_df):

if lat_exists and lon_exists:
print("(lat, lon) exists already")
stored_gid = lat_lon_gid_2d_map.sel(latitude=target_lat, longitude=target_lon)

# overwrite previous value at that lat-lon, keeps old gid
raise NotImplementedError

# stored_gid = lat_lon_gid_2d_map.sel(latitude=target_lat, longitude=target_lon)

# # overwrite previous value at that lat-lon, keeps old gid

# # need to set the gid of the current "sheet" to the stored gid
# updated_entry = combined_ds.loc[{"gid": gid}].assign_coords({"gid": stored_gid}) # this may need to be a list s.t. [stored_gid]
# # loc may remove the gid dimension so we might have to add it back with .expand_dims

# # overwrite the current entry at the gid = stored_gid entry of the zarr
# updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode='w')

# will this be a view
# how can we assign the value
# cant slice?
stored_ds.sel(gid=stored_gid)[:] = combined_ds.sel(gid=gid).values()

else: # coordinate pair doesnt exist and it needs to be added, this will be a HEAVY operation
print("add entry to dataset")

# we are trying to save 1 "sheet" of weather (weather at a single gid)
# need to update the index to fit into the stored data after we concatenate
# we want to update the arbitrary gid in the input (combined_ds) to the next index in the gid array (starts at 0, current_gid + 1 = sizes["gid"] = new gid)
new_gid = stored_ds.sizes["gid"]

# combined_ds.sel(gid=gid) = combined_ds.sel(gid=gid).assign_coords(gid=[new_gid]) # we may have the issues with this sel returning a view
updated_entry = combined_ds.sel(gid=gid).assign_coords(gid=[new_gid])

stored_ds = xr.concat([stored_ds, updated_entry], dim="gid")
# this concatenates along the the gid axis
# gid has no guarantee of being unqiue but duplicate gids are fine for xarray
# we slice so we can get a Dataset with dimensions of (gid, time) indexing to grab one gid will drop the gid dimension
new_gid = stored_ds.sizes["gid"]

# trigger rechunking
# should this happen outside of the loop
stored_ds = stored_ds.chunk()
weather_sheet = combined_ds.sel(gid=slice(gid))
updated_entry = weather_sheet.assign_coords({"gid": [new_gid]})
updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode="a", append_dim="gid")

# SAVE DATASET BACK TO STORE
stored_ds.to_zarr(METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode='w') # test with "a" probably wont work
# new_entry_added_ds = xr.concat([stored_ds, updated_entry], dim="gid")

# new_entry_added_ds.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode="a", append_dim="gid")

print(f"dataset saved to zarr store at {METOROLOGICAL_DOWNLOAD_PATH}")

### THIS NEEDS TO BE DEPRECATED
def _add_entry_to_ds(combined_ds, stored_ds, target_lat, target_lon, gid):

new_gid = stored_ds.sizes["gid"] # zero indexed so the next index will be the current size

# new_entry = combined_ds.sel(gid=gid).expand_dims(gid=new_gid)

# for var in new_entry.data_vars:
# existing_data = stored_ds[var]
# new_data = new_entry[var]

# updated_data = xr.concat([existing_data, new_data], dim='gid')
stored_ds = xr.concat([stored_ds, combined_ds.sel(gid=gid)], dim="gid")

# stored_ds[var] = updated_datag

# stored_ds['latitude'] = xr.concat([stored_ds['latitude'], xr.DataArray([target_lat], dims='gid')], dim='gid')
# stored_ds['longitude'] = xr.concat([stored_ds['longitude'], xr.DataArray([target_lon], dims='gid')], dim='gid')



def check_store():
"""Check if you have a zarr store at the default download path defined in pvdeg.config"""
@@ -189,8 +175,10 @@ def _combine_geo_weather_meta(
):
"""Combine weather dataset and meta dataframe into a single dataset"""

# if meta_df.index.name == 'index':
meta_ds = xr.Dataset.from_dataframe(meta_df).rename({'index' : 'gid'})


combined = xr.merge([weather_ds, meta_ds]).assign_coords(
latitude=("gid", meta_ds.latitude.values),
longitude=('gid', meta_ds.longitude.values),
Loading

0 comments on commit 7894b0a

Please sign in to comment.