Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS 2-4: Add Celery Task for Global Land Cover Data Fetching #3633

Merged
merged 6 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions src/mmw/apps/geoprocessing_api/stac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
from celery import shared_task
from pystac_client import Client
from rasterio.coords import BoundingBox
from rasterio.enums import Resampling
from rasterio.mask import mask
from rasterio.merge import merge
from rasterio.warp import (
calculate_default_transform,
transform_geom,
reproject,
)
from shapely.geometry import box, mapping, shape
import json
import numpy as np
import os
import rasterio as rio
import tempfile


@shared_task
def query_histogram(geojson, url, collection, asset, filter):
# TODO Validate inputs

aoi = shape(json.loads(geojson))

# Get list of intersecting tiffs from catalog
client = Client.open(url)
search = client.search(
collections=collection,
intersects=mapping(aoi),
filter=filter,
)
tiffs = [item.assets[asset].href for item in search.items()]

# Sort the tiffs, since the merging is order sensitive. We use the "first"
# method, using the first image for overlapping pixels.
tiffs = sorted(tiffs)

# TODO Handle empty tiff list

# Find the Albers Equal Area CRS for this AoI
dst_crs = get_albers_crs_for_aoi(aoi)

# Reproject the tiffs and clip to the AoI to make tiles
clips = []
for tiff in tiffs:
clip_data, clip_transform, clip_meta = clip_and_reproject_tile(
tiff, aoi, dst_crs
)
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.tif')
with rio.open(temp_file.name, 'w', **clip_meta) as dst:
dst.write(clip_data)
clips.append(temp_file.name)

# Merge the clipped rasters
datasets = [rio.open(clip_path) for clip_path in clips]
merged_data, merged_transform = merge(datasets, method='first')

# Count how many of each class type there are
values, counts = np.unique(merged_data, return_counts=True)
histogram = list(zip(values.tolist(), counts.tolist()))

# Ensure the result is JSON serializable
histogram = {int(value): int(count) for value, count in histogram}

# Calculate pixel size from dataset resolution
pixel_size = datasets[0].res[0] * datasets[0].res[1]

# Close datasets
for ds in datasets:
ds.close()

# Clean up temporary files
for temp_file in clips:
os.remove(temp_file)

return {
'result': histogram,
'pixel_size': pixel_size,
}


@shared_task
def format_as_mmw_geoprocessing(result):
"""
Return results formatted as MMW Geoprocessing

To play well with existing patterns, and to allow for switching this out
for other implementations in the future.
"""
# Pass errors on
if 'error' in result:
return result

formatted_histogram = {
f'List({v})': int(c) for v, c in result['result'].items()
}
result['result'] = formatted_histogram

return result


def get_albers_crs_for_aoi(aoi):
"""
Return the Albers Equal Area Projection for the given AoI

Since we want to calculate area, we need to use an Equal Area projection,
but this differs based on where you are in the globe. We use rough bounding
boxes to see if the shape neatly fits within one of the continents. If not,
we fall back to a global approximation.
"""

if aoi.within(box(-170, 15, -50, 75)): # North America
return 'EPSG:5070'
elif aoi.within(box(-10, 34, 40, 72)): # Europe
return 'EPSG:3035'
elif aoi.within(box(25, -10, 180, 60)): # Asia
return 'EPSG:102025'
elif aoi.within(box(-20, -35, 55, 38)): # Africa
return 'EPSG:102022'
elif aoi.within(box(-90, -60, -30, 15)): # South America
return 'EPSG:102033'
elif aoi.within(box(112, -45, 155, -10)): # Australia
return 'EPSG:102034'
else: # Global
return 'EPSG:54017' # Behrmann Equal Area Cylindrical


def clip_and_reproject_tile(tiff, aoi, dst_crs, resampling=Resampling.nearest):
"""
Clip a tiff to the given AoI and reproject to destination CRS

We clip the tiff first, and then reproject only the clipped bits to
minimize transformation. Uses nearest neighbor resampling by default.

Returns the data, transform, and meta after clipping and reprojecting.
"""
aoi_crs = aoi.crs if hasattr(aoi, 'crs') else 4326

with rio.open(tiff) as src:
# Reproject the AoI into the tiff's CRS, then clip the tiff to
# the reprojected AoI's bounding box
reprojected_aoi = transform_geom(aoi_crs, src.crs, aoi)
reprojected_aoi_bbox = BoundingBox(*shape(reprojected_aoi).bounds)
clip_data, clip_transform = mask(src, [reprojected_aoi], crop=True)

# Define the output metadata for the reprojected clip
dst_transform, width, height = calculate_default_transform(
src.crs,
dst_crs,
clip_data.shape[2],
clip_data.shape[1],
*reprojected_aoi_bbox,
)
dst_meta = src.meta.copy()
dst_meta.update(
{
'crs': dst_crs,
'transform': dst_transform,
'width': width,
'height': height,
}
)

# Reproject the clipped data to the destination CRS
reprojected_clip_data = np.empty(
(src.count, height, width), dtype=src.dtypes[0]
)
reproject(
source=clip_data,
destination=reprojected_clip_data,
src_transform=clip_transform,
src_crs=src.crs,
dst_transform=dst_transform,
dst_crs=dst_crs,
resampling=resampling,
)

return reprojected_clip_data, dst_transform, dst_meta
40 changes: 40 additions & 0 deletions src/mmw/apps/geoprocessing_api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,46 @@ def area(dictionary, key, default=0):
}


@shared_task(throws=Exception)
def analyze_global_land(result, year):
if 'error' in result:
raise Exception(f'[analyze_global_land_{year}] {result["error"]}')

histogram = parse(result['result'])
pixel_size = result['pixel_size']

# Count up all the legitimate IO LULC classes
# This excludes NODATA which is IO LULC 0
total_count = sum(
[
count
for ioclass, count in histogram.items()
if ioclass in layer_classmaps.IO_LULC
]
)

categories = []

for ioclass, (code, name) in layer_classmaps.IO_LULC.items():
categories.append(
{
'area': histogram.get(ioclass, 0) * pixel_size,
'code': code,
'coverage': float(histogram.get(ioclass, 0)) / total_count,
'ioclass': ioclass,
'type': name,
}
)

return {
'survey': {
'name': f'global_land_io_{year}',
'displayName': f'Global Land Use/Cover {year}',
'categories': categories,
}
}


@shared_task(throws=Exception)
def analyze_soil(result, area_of_interest=None):
if 'error' in result:
Expand Down
3 changes: 3 additions & 0 deletions src/mmw/apps/geoprocessing_api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
name="authtoken"),
re_path(r'analyze/land/(?P<nlcd_year>\w+)/?$', views.start_analyze_land,
name='start_analyze_land'),
re_path(r'analyze/global-land/(?P<year>\w+)/?$',
views.start_analyze_global_land,
name='start_analyze_global_land'),
re_path(r'analyze/soil/$', views.start_analyze_soil,
name='start_analyze_soil'),
re_path(r'analyze/animals/$', views.start_analyze_animals,
Expand Down
132 changes: 131 additions & 1 deletion src/mmw/apps/geoprocessing_api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from apps.modeling.serializers import AoiSerializer
from apps.modeling.views import _initiate_subbasin_gwlfe_job_chain

from apps.geoprocessing_api import exceptions, schemas, tasks
from apps.geoprocessing_api import exceptions, schemas, stac, tasks
from apps.geoprocessing_api.calcs import huc12s_for_huc, huc12_for_point
from apps.geoprocessing_api.permissions import AuthTokenSerializerAuthentication # noqa
from apps.geoprocessing_api.throttling import (BurstRateThrottle,
Expand Down Expand Up @@ -443,6 +443,136 @@ def start_analyze_land(request, nlcd_year, format=None):
], area_of_interest, user, messages=msg)


@swagger_auto_schema(method='post',
request_body=schemas.ANALYZE_REQUEST,
responses={200: schemas.JOB_STARTED_RESPONSE})
@decorators.api_view(['POST'])
@decorators.authentication_classes((SessionAuthentication,
TokenAuthentication, ))
@decorators.permission_classes((IsAuthenticated, ))
@decorators.throttle_classes([BurstRateThrottle, SustainedRateThrottle])
@log_request
def start_analyze_global_land(request, year, format=None):
"""
Starts a job to produce a land-use histogram for a given area and year.

This endpoint supports year values of 2017, 2018, 2019, 2020, 2021, 2022,
and 2023.

This endpoint supports global data, as opposed to analyze_land which only
works with the Continental United States (CONUS). The underlying datasource
is Impact Observatory's Global Annual LULC layer hosted on AWS Open Registry. # NOQA

For more information, see the [technical documentation](https://wikiwatershed.org/documentation/mmw-tech/#overlays-tab-coverage). # NOQA

## Response

You can use the URL provided in the response's `Location`
header to poll for the job's results.

<summary>
**Example of a completed job's `result`**
</summary>

<details>

{
"survey": {
"displayName": "Global Land Use/Cover 2019",
"name": "global_land_io_2019",
"categories": [
{
"area": 14941947.428126818,
"code": "water",
"coverage": 0.02144532388706757,
"ioclass": 1,
"type": "Water"
},
{
"area": 109588951.93420613,
"code": "trees",
"coverage": 0.15728676465889277,
"ioclass": 2,
"type": "Trees"
},
{
"area": 48464.99853478383,
"code": "flooded_vegetation",
"coverage": 0.00006955904481421345,
"ioclass": 4,
"type": "Flooded vegetation"
},
{
"area": 29756139.065063003,
"code": "crops",
"coverage": 0.042707287182504744,
"ioclass": 5,
"type": "Crops"
},
{
"area": 488978611.8600828,
"code": "built_area",
"coverage": 0.7018030785899226,
"ioclass": 7,
"type": "Built area"
},
{
"area": 535341.2912358101,
"code": "bare_ground",
"coverage": 0.0007683447847676015,
"ioclass": 8,
"type": "Bare ground"
},
{
"area": 0,
"code": "snow_ice",
"coverage": 0,
"ioclass": 9,
"type": "Snow/ice"
},
{
"area": 0,
"code": "clouds",
"coverage": 0,
"ioclass": 10,
"type": "Clouds"
},
{
"area": 52896720.20292212,
"code": "rangeland",
"coverage": 0.07591964185203046,
"ioclass": 11,
"type": "Rangeland"
}
]
}
}

</details>
"""
user = request.user if request.user.is_authenticated else None
area_of_interest, wkaoi, msg = _parse_analyze_input(request)

filter = {
"op": "like",
"args": [{"property": "id"}, f"%-{year}"],
}

# TODO Implement caching

return start_celery_job([
stac.query_histogram.s(
area_of_interest,
url='https://api.impactobservatory.com/stac-aws',
collection='io-10m-annual-lulc',
asset='supercell',
filter=filter
),
stac.format_as_mmw_geoprocessing.s(),
tasks.analyze_global_land.s(year),
], area_of_interest, user, messages=msg)


@swagger_auto_schema(method='post',
request_body=schemas.ANALYZE_REQUEST,
responses={200: schemas.JOB_STARTED_RESPONSE})
Expand Down
Loading