11
11
from pathlib import Path
12
12
from shutil import rmtree
13
13
from tempfile import mkdtemp
14
- from typing import Any , Dict , List , Optional , Union
14
+ from typing import Any , Callable , Dict , List , Optional , Union
15
15
16
16
import fsspec
17
- from pystac import ItemCollection
17
+ from pystac import Item , ItemCollection
18
18
19
19
from .asset_io import (
20
20
download_item_assets ,
27
27
# types
28
28
PathLike = Union [str , Path ]
29
29
"""
30
- Tasks can use parameters provided in a `process` Dictionary that is supplied in the ItemCollection
31
- JSON under the "process" field. An example process definition:
30
+ Tasks can use parameters provided in a `process` Dictionary that is supplied in
31
+ the ItemCollection JSON under the "process" field. An example process
32
+ definition:
32
33
33
34
```
34
35
{
@@ -59,7 +60,7 @@ class Task(ABC):
59
60
60
61
def __init__ (
61
62
self : "Task" ,
62
- payload : Dict ,
63
+ payload : Dict [ str , Any ] ,
63
64
workdir : Optional [PathLike ] = None ,
64
65
save_workdir : bool = False ,
65
66
skip_upload : bool = False ,
@@ -88,18 +89,18 @@ def __init__(
88
89
self ._workdir = Path (workdir )
89
90
makedirs (self ._workdir , exist_ok = True )
90
91
91
- def __del__ (self ):
92
+ def __del__ (self ) -> None :
92
93
# remove work directory if not running locally
93
94
if not self ._save_workdir :
94
95
self .logger .debug ("Removing work directory %s" , self ._workdir )
95
96
rmtree (self ._workdir )
96
97
97
98
@property
98
- def process_definition (self ) -> Dict :
99
+ def process_definition (self ) -> Dict [ str , Any ] :
99
100
return self ._payload .get ("process" , {})
100
101
101
102
@property
102
- def parameters (self ) -> Dict :
103
+ def parameters (self ) -> Dict [ str , Any ] :
103
104
task_configs = self .process_definition .get ("tasks" , [])
104
105
if isinstance (task_configs , List ):
105
106
# tasks is a list
@@ -121,11 +122,11 @@ def parameters(self) -> Dict:
121
122
raise ValueError (f"unexpected value for 'tasks': { task_configs } " )
122
123
123
124
@property
124
- def upload_options (self ) -> Dict :
125
+ def upload_options (self ) -> Dict [ str , Any ] :
125
126
return self .process_definition .get ("upload_options" , {})
126
127
127
128
@property
128
- def items_as_dicts (self ) -> List [Dict ]:
129
+ def items_as_dicts (self ) -> List [Dict [ str , Any ] ]:
129
130
return self ._payload .get ("features" , [])
130
131
131
132
@property
@@ -134,12 +135,12 @@ def items(self) -> ItemCollection:
134
135
return ItemCollection .from_dict (items_dict , preserve_dict = True )
135
136
136
137
@classmethod
137
- def validate (cls , payload : Dict ) -> bool :
138
+ def validate (cls , payload : Dict [ str , Any ] ) -> bool :
138
139
# put validation logic on input Items and process definition here
139
140
return True
140
141
141
142
@classmethod
142
- def add_software_version (cls , items : List [Dict ]) :
143
+ def add_software_version (cls , items : List [Dict [ str , Any ]]) -> List [ Dict [ str , Any ]] :
143
144
processing_ext = (
144
145
"https://stac-extensions.github.io/processing/v1.1.0/schema.json"
145
146
)
@@ -153,7 +154,7 @@ def add_software_version(cls, items: List[Dict]):
153
154
i ["properties" ]["processing:software" ] = {cls .name : cls .version }
154
155
return items
155
156
156
- def assign_collections (self ):
157
+ def assign_collections (self ) -> None :
157
158
"""Assigns new collection names based on"""
158
159
for i , (coll , expr ) in itertools .product (
159
160
self ._payload ["features" ],
@@ -163,13 +164,18 @@ def assign_collections(self):
163
164
i ["collection" ] = coll
164
165
165
166
def download_item_assets (
166
- self , item : Dict , path_template : str = "${collection}/${id}" , ** kwargs
167
- ):
168
- """Download provided asset keys for all items in payload. Assets are saved in workdir in a
169
- directory named by the Item ID, and the items are updated with the new asset hrefs.
167
+ self ,
168
+ item : Item ,
169
+ path_template : str = "${collection}/${id}" ,
170
+ ** kwargs : Any ,
171
+ ) -> Item :
172
+ """Download provided asset keys for all items in payload. Assets are
173
+ saved in workdir in a directory named by the Item ID, and the items are
174
+ updated with the new asset hrefs.
170
175
171
176
Args:
172
- assets (Optional[List[str]], optional): List of asset keys to download. Defaults to all assets.
177
+ assets (Optional[List[str]], optional): List of asset keys to
178
+ download. Defaults to all assets.
173
179
"""
174
180
outdir = str (self ._workdir / path_template )
175
181
loop = asyncio .get_event_loop ()
@@ -179,16 +185,21 @@ def download_item_assets(
179
185
return item
180
186
181
187
def download_items_assets (
182
- self , items : List [Dict ], path_template : str = "${collection}/${id}" , ** kwargs
183
- ):
188
+ self ,
189
+ items : List [Item ],
190
+ path_template : str = "${collection}/${id}" ,
191
+ ** kwargs : Any ,
192
+ ) -> List [Item ]:
184
193
outdir = str (self ._workdir / path_template )
185
194
loop = asyncio .get_event_loop ()
186
195
items = loop .run_until_complete (
187
- download_items_assets (self . items , path_template = outdir , ** kwargs )
196
+ download_items_assets (items , path_template = outdir , ** kwargs )
188
197
)
189
198
return items
190
199
191
- def upload_item_assets_to_s3 (self , item : Dict , assets : Optional [List [str ]] = None ):
200
+ def upload_item_assets_to_s3 (
201
+ self , item : Item , assets : Optional [List [str ]] = None
202
+ ) -> Item :
192
203
if self ._skip_upload :
193
204
self .logger .warning ("Skipping upload of new and modified assets" )
194
205
return item
@@ -197,7 +208,7 @@ def upload_item_assets_to_s3(self, item: Dict, assets: Optional[List[str]] = Non
197
208
198
209
# this should be in PySTAC
199
210
@staticmethod
200
- def create_item_from_item (item ) :
211
+ def create_item_from_item (item : Dict [ str , Any ]) -> Dict [ str , Any ] :
201
212
new_item = deepcopy (item )
202
213
# create a derived output item
203
214
links = [
@@ -216,7 +227,7 @@ def create_item_from_item(item):
216
227
return new_item
217
228
218
229
@abstractmethod
219
- def process (self , ** kwargs ) -> List [Dict ]:
230
+ def process (self , ** kwargs : Any ) -> List [Dict [ str , Any ] ]:
220
231
"""Main task logic - virtual
221
232
222
233
Returns:
@@ -229,7 +240,7 @@ def process(self, **kwargs) -> List[Dict]:
229
240
pass
230
241
231
242
@classmethod
232
- def handler (cls , payload : Dict , ** kwargs ) -> Dict [str , Any ]:
243
+ def handler (cls , payload : Dict [ str , Any ], ** kwargs : Any ) -> Dict [str , Any ]:
233
244
if "href" in payload or "url" in payload :
234
245
# read input
235
246
with fsspec .open (payload .get ("href" , payload .get ("url" ))) as f :
@@ -249,7 +260,7 @@ def handler(cls, payload: Dict, **kwargs) -> Dict[str, Any]:
249
260
raise err
250
261
251
262
@classmethod
252
- def parse_args (cls , args ) :
263
+ def parse_args (cls , args : List [ str ]) -> Dict [ str , Any ] :
253
264
dhf = argparse .ArgumentDefaultsHelpFormatter
254
265
parser0 = argparse .ArgumentParser (description = cls .description )
255
266
parser0 .add_argument (
@@ -297,8 +308,8 @@ def parse_args(cls, args):
297
308
default = False ,
298
309
)
299
310
h = """ Run local mode
300
- (save-workdir = True, skip-upload = True, skip-validation = True,
301
- workdir = 'local-output', output = 'local-output/output-payload.json') """
311
+ (save-workdir = True, skip-upload = True, skip-validation = True,
312
+ workdir = 'local-output', output = 'local-output/output-payload.json') """
302
313
parser .add_argument ("--local" , help = h , action = "store_true" , default = False )
303
314
304
315
# turn Namespace into dictionary
@@ -322,7 +333,7 @@ def parse_args(cls, args):
322
333
return pargs
323
334
324
335
@classmethod
325
- def cli (cls ):
336
+ def cli (cls ) -> None :
326
337
args = cls .parse_args (sys .argv [1 :])
327
338
cmd = args .pop ("command" )
328
339
@@ -364,9 +375,9 @@ def cli(cls):
364
375
from functools import wraps # noqa
365
376
366
377
367
- def silence_event_loop_closed (func ) :
378
+ def silence_event_loop_closed (func : Callable [[ Any ], Any ]) -> Callable [[ Any ], Any ] :
368
379
@wraps (func )
369
- def wrapper (self , * args , ** kwargs ):
380
+ def wrapper (self , * args : Any , ** kwargs : Any ) -> Any : # type: ignore
370
381
try :
371
382
return func (self , * args , ** kwargs )
372
383
except RuntimeError as e :
0 commit comments