Skip to content

Commit 1eb9a37

Browse files
committed
work in progress
1 parent 88ab400 commit 1eb9a37

File tree

1 file changed

+85
-38
lines changed

1 file changed

+85
-38
lines changed

feedstock/recipe.py

+85-38
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,97 @@
1-
"""
2-
A synthetic prototype recipe
3-
"""
1+
import os
2+
from dataclasses import dataclass
43

4+
import aiohttp
55
import apache_beam as beam
6-
from leap_data_management_utils.data_management_transforms import (
7-
get_catalog_store_urls,
8-
)
6+
import fsspec
7+
import numpy as np
8+
import xarray as xr
99
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
10-
from pangeo_forge_recipes.transforms import (
11-
ConsolidateDimensionCoordinates,
12-
ConsolidateMetadata,
13-
OpenURLWithFSSpec,
14-
OpenWithXarray,
15-
StoreToZarr,
10+
from pangeo_forge_recipes.storage import FSSpecTarget
11+
from pangeo_forge_recipes.transforms import StoreToZarr
12+
13+
username, password = os.environ['EARTHDATA_USERNAME'], os.environ['EARTHDATA_PASSWORD']
14+
client_kwargs = {
15+
'auth': aiohttp.BasicAuth(username, password),
16+
'trust_env': True,
17+
}
18+
19+
# the urls are a bit hard to construct, so lets try with a few hardcoded ones
20+
input_urls = [
21+
'https://ladsweb.modaps.eosdis.nasa.gov/archive/allData/62/MCD06COSP_M3_MODIS/2023/182/MCD06COSP_M3_MODIS.A2023182.062.2023223000656.nc',
22+
'https://ladsweb.modaps.eosdis.nasa.gov/archive/allData/62/MCD06COSP_M3_MODIS/2023/213/MCD06COSP_M3_MODIS.A2023213.062.2023254000930.nc',
23+
'https://ladsweb.modaps.eosdis.nasa.gov/archive/allData/62/MCD06COSP_M3_MODIS/2023/244/MCD06COSP_M3_MODIS.A2023244.062.2023285000449.nc',
24+
]
25+
26+
27+
# pattern = pattern_from_file_sequence(input_urls, concat_dim='time')
28+
# pattern = pattern.prune(2)
29+
30+
# testing with local files for now
31+
pattern = pattern_from_file_sequence(
32+
[
33+
'/Users/nrhagen/Documents/carbonplan/LEAP/feedstocks/MODIS-COSP/feedstock/MCD06COSP_M3_MODIS.A2023182.062.2023223000656.nc',
34+
'/Users/nrhagen/Documents/carbonplan/LEAP/feedstocks/MODIS-COSP/feedstock/MCD06COSP_M3_MODIS.A2023213.062.2023254000930.nc',
35+
],
36+
concat_dim='time',
1637
)
1738

18-
# parse the catalog store locations (this is where the data is copied to after successful write (and maybe testing)
19-
catalog_store_urls = get_catalog_store_urls('feedstock/catalog.yaml')
2039

21-
###########################
22-
## Start Modifying here ###
23-
###########################
40+
def _append_group_name_to_vars(dst: xr.DataTree) -> xr.DataTree:
41+
dataset_list = []
42+
for node in dst.children:
43+
time = np.datetime64(dst.attrs['time_coverage_start'])
44+
ds = dst[node].to_dataset()
45+
ds = ds.expand_dims(time=np.array([time]))
2446

25-
## Monthly version
26-
input_urls_a = [
27-
'gs://cmip6/pgf-debugging/hanging_bug/file_a.nc',
28-
'gs://cmip6/pgf-debugging/hanging_bug/file_b.nc',
29-
]
47+
group_name = dst[node].groups[0].split('/')[1]
48+
rename_dict = {f'{var}': f'{group_name}' + '_' + f'{var}' for var in list(ds)}
49+
ds = ds.rename(rename_dict)
50+
dataset_list.append(ds)
3051

52+
return xr.merge(dataset_list)
3153

32-
file_pattern = pattern_from_file_sequence(input_urls_a, concat_dim='time')
3354

55+
@dataclass
56+
class DatatreeToDataset(beam.PTransform):
57+
"""Convert all datatree nodes into a single xarray dataset
58+
The netcdf file is organized into groups. We can open as a datatree, then parse all groups by
59+
adding the group name to the variable them, then merging back into a xarray dataset"""
3460

35-
small = (
36-
beam.Create(file_pattern.items())
37-
| OpenURLWithFSSpec()
38-
| OpenWithXarray()
39-
| StoreToZarr(
40-
# Make sure to change this name!
41-
store_name='<name_of_your_dataset>.zarr',
42-
# Note: This name must exactly match the name in meta.yaml
43-
combine_dims=file_pattern.combine_dim_keys,
44-
# Note: You can modify the chunking structure here. Ex: {'time':-1, 'lat':180, 'lon':360}
45-
# You should aim for 100MB chunks
46-
target_chunks={},
61+
def _convert(self, dst: xr.DataTree) -> xr.Dataset:
62+
return _append_group_name_to_vars(dst)
63+
64+
def expand(self, pcoll):
65+
return pcoll | '_convert' >> beam.MapTuple(lambda k, v: (k, self._convert(v)))
66+
67+
68+
@dataclass
69+
class OpenDatatreeXarray(beam.PTransform):
70+
"""Open Xarray datatree"""
71+
72+
def _open_dt(self, path: str) -> xr.DataTree:
73+
return xr.open_datatree(path)
74+
75+
def expand(self, pcoll):
76+
return pcoll | '_open_dt' >> beam.MapTuple(lambda k, v: (k, self._open_dt(v)))
77+
78+
79+
fs = fsspec.get_filesystem_class('file')()
80+
target_root = FSSpecTarget(fs, 'modis_cosp')
81+
with beam.Pipeline() as p:
82+
(
83+
p
84+
| beam.Create(pattern.items())
85+
# | OpenURLWithFSSpec(
86+
# open_kwargs={'block_size': 0, 'client_kwargs': client_kwargs},
87+
# max_concurrency=10,
88+
# )
89+
| OpenDatatreeXarray()
90+
| DatatreeToDataset()
91+
# | beam.Map(print)
92+
| StoreToZarr(
93+
target_root='.',
94+
store_name='MODIS_COSP.zarr',
95+
combine_dims=pattern.combine_dim_keys,
96+
)
4797
)
48-
| ConsolidateDimensionCoordinates()
49-
| ConsolidateMetadata()
50-
)

0 commit comments

Comments
 (0)