Skip to content

Commit

Permalink
Merge pull request #3633 from WikiWatershed/tt/3628/add-celery-task-f…
Browse files Browse the repository at this point in the history
…or-stac-fetching-2
  • Loading branch information
rajadain authored Jul 22, 2024
2 parents 36aa715 + fffe8cd commit e0d8de1
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 4 deletions.
179 changes: 179 additions & 0 deletions src/mmw/apps/geoprocessing_api/stac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
from celery import shared_task
from pystac_client import Client
from rasterio.coords import BoundingBox
from rasterio.enums import Resampling
from rasterio.mask import mask
from rasterio.merge import merge
from rasterio.warp import (
calculate_default_transform,
transform_geom,
reproject,
)
from shapely.geometry import box, mapping, shape
import json
import numpy as np
import os
import rasterio as rio
import tempfile


@shared_task
def query_histogram(geojson, url, collection, asset, filter):
# TODO Validate inputs

aoi = shape(json.loads(geojson))

# Get list of intersecting tiffs from catalog
client = Client.open(url)
search = client.search(
collections=collection,
intersects=mapping(aoi),
filter=filter,
)
tiffs = [item.assets[asset].href for item in search.items()]

# Sort the tiffs, since the merging is order sensitive. We use the "first"
# method, using the first image for overlapping pixels.
tiffs = sorted(tiffs)

# TODO Handle empty tiff list

# Find the Albers Equal Area CRS for this AoI
dst_crs = get_albers_crs_for_aoi(aoi)

# Reproject the tiffs and clip to the AoI to make tiles
clips = []
for tiff in tiffs:
clip_data, clip_transform, clip_meta = clip_and_reproject_tile(
tiff, aoi, dst_crs
)
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.tif')
with rio.open(temp_file.name, 'w', **clip_meta) as dst:
dst.write(clip_data)
clips.append(temp_file.name)

# Merge the clipped rasters
datasets = [rio.open(clip_path) for clip_path in clips]
merged_data, merged_transform = merge(datasets, method='first')

# Count how many of each class type there are
values, counts = np.unique(merged_data, return_counts=True)
histogram = list(zip(values.tolist(), counts.tolist()))

# Ensure the result is JSON serializable
histogram = {int(value): int(count) for value, count in histogram}

# Calculate pixel size from dataset resolution
pixel_size = datasets[0].res[0] * datasets[0].res[1]

# Close datasets
for ds in datasets:
ds.close()

# Clean up temporary files
for temp_file in clips:
os.remove(temp_file)

return {
'result': histogram,
'pixel_size': pixel_size,
}


@shared_task
def format_as_mmw_geoprocessing(result):
"""
Return results formatted as MMW Geoprocessing
To play well with existing patterns, and to allow for switching this out
for other implementations in the future.
"""
# Pass errors on
if 'error' in result:
return result

formatted_histogram = {
f'List({v})': int(c) for v, c in result['result'].items()
}
result['result'] = formatted_histogram

return result


def get_albers_crs_for_aoi(aoi):
"""
Return the Albers Equal Area Projection for the given AoI
Since we want to calculate area, we need to use an Equal Area projection,
but this differs based on where you are in the globe. We use rough bounding
boxes to see if the shape neatly fits within one of the continents. If not,
we fall back to a global approximation.
"""

if aoi.within(box(-170, 15, -50, 75)): # North America
return 'EPSG:5070'
elif aoi.within(box(-10, 34, 40, 72)): # Europe
return 'EPSG:3035'
elif aoi.within(box(25, -10, 180, 60)): # Asia
return 'EPSG:102025'
elif aoi.within(box(-20, -35, 55, 38)): # Africa
return 'EPSG:102022'
elif aoi.within(box(-90, -60, -30, 15)): # South America
return 'EPSG:102033'
elif aoi.within(box(112, -45, 155, -10)): # Australia
return 'EPSG:102034'
else: # Global
return 'EPSG:54017' # Behrmann Equal Area Cylindrical


def clip_and_reproject_tile(tiff, aoi, dst_crs, resampling=Resampling.nearest):
"""
Clip a tiff to the given AoI and reproject to destination CRS
We clip the tiff first, and then reproject only the clipped bits to
minimize transformation. Uses nearest neighbor resampling by default.
Returns the data, transform, and meta after clipping and reprojecting.
"""
aoi_crs = aoi.crs if hasattr(aoi, 'crs') else 4326

with rio.open(tiff) as src:
# Reproject the AoI into the tiff's CRS, then clip the tiff to
# the reprojected AoI's bounding box
reprojected_aoi = transform_geom(aoi_crs, src.crs, aoi)
reprojected_aoi_bbox = BoundingBox(*shape(reprojected_aoi).bounds)
clip_data, clip_transform = mask(src, [reprojected_aoi], crop=True)

# Define the output metadata for the reprojected clip
dst_transform, width, height = calculate_default_transform(
src.crs,
dst_crs,
clip_data.shape[2],
clip_data.shape[1],
*reprojected_aoi_bbox,
)
dst_meta = src.meta.copy()
dst_meta.update(
{
'crs': dst_crs,
'transform': dst_transform,
'width': width,
'height': height,
}
)

# Reproject the clipped data to the destination CRS
reprojected_clip_data = np.empty(
(src.count, height, width), dtype=src.dtypes[0]
)
reproject(
source=clip_data,
destination=reprojected_clip_data,
src_transform=clip_transform,
src_crs=src.crs,
dst_transform=dst_transform,
dst_crs=dst_crs,
resampling=resampling,
)

return reprojected_clip_data, dst_transform, dst_meta
40 changes: 40 additions & 0 deletions src/mmw/apps/geoprocessing_api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,46 @@ def area(dictionary, key, default=0):
}


@shared_task(throws=Exception)
def analyze_global_land(result, year):
if 'error' in result:
raise Exception(f'[analyze_global_land_{year}] {result["error"]}')

histogram = parse(result['result'])
pixel_size = result['pixel_size']

# Count up all the legitimate IO LULC classes
# This excludes NODATA which is IO LULC 0
total_count = sum(
[
count
for ioclass, count in histogram.items()
if ioclass in layer_classmaps.IO_LULC
]
)

categories = []

for ioclass, (code, name) in layer_classmaps.IO_LULC.items():
categories.append(
{
'area': histogram.get(ioclass, 0) * pixel_size,
'code': code,
'coverage': float(histogram.get(ioclass, 0)) / total_count,
'ioclass': ioclass,
'type': name,
}
)

return {
'survey': {
'name': f'global_land_io_{year}',
'displayName': f'Global Land Use/Cover {year}',
'categories': categories,
}
}


@shared_task(throws=Exception)
def analyze_soil(result, area_of_interest=None):
if 'error' in result:
Expand Down
3 changes: 3 additions & 0 deletions src/mmw/apps/geoprocessing_api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
name="authtoken"),
re_path(r'analyze/land/(?P<nlcd_year>\w+)/?$', views.start_analyze_land,
name='start_analyze_land'),
re_path(r'analyze/global-land/(?P<year>\w+)/?$',
views.start_analyze_global_land,
name='start_analyze_global_land'),
re_path(r'analyze/soil/$', views.start_analyze_soil,
name='start_analyze_soil'),
re_path(r'analyze/animals/$', views.start_analyze_animals,
Expand Down
132 changes: 131 additions & 1 deletion src/mmw/apps/geoprocessing_api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from apps.modeling.serializers import AoiSerializer
from apps.modeling.views import _initiate_subbasin_gwlfe_job_chain

from apps.geoprocessing_api import exceptions, schemas, tasks
from apps.geoprocessing_api import exceptions, schemas, stac, tasks
from apps.geoprocessing_api.calcs import huc12s_for_huc, huc12_for_point
from apps.geoprocessing_api.permissions import AuthTokenSerializerAuthentication # noqa
from apps.geoprocessing_api.throttling import (BurstRateThrottle,
Expand Down Expand Up @@ -443,6 +443,136 @@ def start_analyze_land(request, nlcd_year, format=None):
], area_of_interest, user, messages=msg)


@swagger_auto_schema(method='post',
request_body=schemas.ANALYZE_REQUEST,
responses={200: schemas.JOB_STARTED_RESPONSE})
@decorators.api_view(['POST'])
@decorators.authentication_classes((SessionAuthentication,
TokenAuthentication, ))
@decorators.permission_classes((IsAuthenticated, ))
@decorators.throttle_classes([BurstRateThrottle, SustainedRateThrottle])
@log_request
def start_analyze_global_land(request, year, format=None):
"""
Starts a job to produce a land-use histogram for a given area and year.
This endpoint supports year values of 2017, 2018, 2019, 2020, 2021, 2022,
and 2023.
This endpoint supports global data, as opposed to analyze_land which only
works with the Continental United States (CONUS). The underlying datasource
is Impact Observatory's Global Annual LULC layer hosted on AWS Open Registry. # NOQA
For more information, see the [technical documentation](https://wikiwatershed.org/documentation/mmw-tech/#overlays-tab-coverage). # NOQA
## Response
You can use the URL provided in the response's `Location`
header to poll for the job's results.
<summary>
**Example of a completed job's `result`**
</summary>
<details>
{
"survey": {
"displayName": "Global Land Use/Cover 2019",
"name": "global_land_io_2019",
"categories": [
{
"area": 14941947.428126818,
"code": "water",
"coverage": 0.02144532388706757,
"ioclass": 1,
"type": "Water"
},
{
"area": 109588951.93420613,
"code": "trees",
"coverage": 0.15728676465889277,
"ioclass": 2,
"type": "Trees"
},
{
"area": 48464.99853478383,
"code": "flooded_vegetation",
"coverage": 0.00006955904481421345,
"ioclass": 4,
"type": "Flooded vegetation"
},
{
"area": 29756139.065063003,
"code": "crops",
"coverage": 0.042707287182504744,
"ioclass": 5,
"type": "Crops"
},
{
"area": 488978611.8600828,
"code": "built_area",
"coverage": 0.7018030785899226,
"ioclass": 7,
"type": "Built area"
},
{
"area": 535341.2912358101,
"code": "bare_ground",
"coverage": 0.0007683447847676015,
"ioclass": 8,
"type": "Bare ground"
},
{
"area": 0,
"code": "snow_ice",
"coverage": 0,
"ioclass": 9,
"type": "Snow/ice"
},
{
"area": 0,
"code": "clouds",
"coverage": 0,
"ioclass": 10,
"type": "Clouds"
},
{
"area": 52896720.20292212,
"code": "rangeland",
"coverage": 0.07591964185203046,
"ioclass": 11,
"type": "Rangeland"
}
]
}
}
</details>
"""
user = request.user if request.user.is_authenticated else None
area_of_interest, wkaoi, msg = _parse_analyze_input(request)

filter = {
"op": "like",
"args": [{"property": "id"}, f"%-{year}"],
}

# TODO Implement caching

return start_celery_job([
stac.query_histogram.s(
area_of_interest,
url='https://api.impactobservatory.com/stac-aws',
collection='io-10m-annual-lulc',
asset='supercell',
filter=filter
),
stac.format_as_mmw_geoprocessing.s(),
tasks.analyze_global_land.s(year),
], area_of_interest, user, messages=msg)


@swagger_auto_schema(method='post',
request_body=schemas.ANALYZE_REQUEST,
responses={200: schemas.JOB_STARTED_RESPONSE})
Expand Down
Loading

0 comments on commit e0d8de1

Please sign in to comment.