Skip to content

Commit

Permalink
Merge pull request #64 from WikiWatershed/ki/raster-grouped-count
Browse files Browse the repository at this point in the history
Implement RasterGroupedCount

Connects #51
  • Loading branch information
kellyi authored Aug 16, 2017
2 parents 22efbf2 + b657757 commit f55a6eb
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 97 deletions.
1 change: 1 addition & 0 deletions api/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
geoprocessing {
port = 8090
hostname = "0.0.0.0"
s3bucket = "datahub-catalogs-us-east-1"
}
63 changes: 63 additions & 0 deletions api/src/main/scala/Geoprocessing.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.wikiwatershed.mmw.geoprocessing

import java.util.concurrent.atomic.LongAdder

import collection.concurrent.TrieMap

import geotrellis.raster._
import geotrellis.raster.rasterize._
import geotrellis.vector._
import geotrellis.vector.io._

import geotrellis.spark._

trait Geoprocessing extends Utils {
/**
* For an InputData object, return a histogram of raster grouped count results.
*
* @param input The InputData
* @return A histogram of results
*/
def getRasterGroupedCount(input: InputData): Result = {
val aoi = createAOIFromInput(input)
val rasterLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)
Result(rasterGroupedCount(rasterLayers, aoi))
}

/**
* From a sequence of rasterLayers and a shape, return a list of pixel counts.
*
* @param rasterLayers A sequence of TileLayerCollections
* @param multiPolygon The AOI as a MultiPolygon
* @return A Map of cell counts
*/
private def rasterGroupedCount(
rasterLayers: Seq[TileLayerCollection[SpatialKey]],
multiPolygon: MultiPolygon
): Map[String, Int] = {
val init = () => new LongAdder
val update = (_: LongAdder).increment()
// assume all the layouts are the same
val metadata = rasterLayers.head.metadata

var pixelGroups: TrieMap[List[Int], LongAdder] = TrieMap.empty

joinCollectionLayers(rasterLayers).par
.foreach({ case (key, tiles) =>
val extent: Extent = metadata.mapTransform(key)
val re: RasterExtent = RasterExtent(extent, metadata.layout.tileCols,
metadata.layout.tileRows)

Rasterizer.foreachCellByMultiPolygon(multiPolygon, re) { case (col, row) =>
val pixelGroup: List[Int] = tiles.map(_.get(col, row)).toList
val acc = pixelGroups.getOrElseUpdate(pixelGroup, init())
update(acc)
}
})

pixelGroups
.mapValues(_.sum().toInt)
.map { case (k, v) => k.toString -> v}
.toMap
}
}
118 changes: 118 additions & 0 deletions api/src/main/scala/Utils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package org.wikiwatershed.mmw.geoprocessing

import spray.json._
import spray.json.DefaultJsonProtocol._

import geotrellis.proj4.{CRS, ConusAlbers, LatLng, WebMercator}

import geotrellis.raster._
import geotrellis.raster.rasterize._
import geotrellis.vector._
import geotrellis.vector.io._
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.s3._

import com.typesafe.config.ConfigFactory

trait Utils {
val s3bucket = ConfigFactory.load().getString("geoprocessing.s3bucket")
val baseLayerReader = S3CollectionLayerReader(s3bucket, "")

/**
* Given a zoom level & area of interest, transform a list of raster
* filenames into a [[TileLayerCollection[SpatialKey]]].
*
* @param rasterIds A list of raster filenames
* @param zoom The input zoom level
* @param aoi A MultiPolygon area of interest
* @return [[TileLayerCollection[SpatialKey]]]
*/
def cropRastersToAOI(
rasterIds: List[String],
zoom: Int,
aoi: MultiPolygon
): Seq[TileLayerCollection[SpatialKey]] =
rasterIds
.map { str => LayerId(str, zoom) }
.map { layer => fetchCroppedLayer(layer, aoi)}

/**
* Given input data containing a polygonCRS & a raster CRS, transform an
* input polygon into a multipolygon AOI
*
* @param input InputData including polygons, polygonCRS, and rasterCRS
* @return A MultiPolygon
*/
def createAOIFromInput(input: InputData): MultiPolygon =
input.polygon.map { str =>
parseGeometry(str, getCRS(input.polygonCRS), getCRS(input.rasterCRS))
.buffer(0)
.asMultiPolygon
.get
}.unionGeometries.asMultiPolygon.get

/**
* Transform the incoming GeoJSON into a [[MultiPolygon]] in the
* destination CRS.
*
* @param geoJson The incoming geometry
* @param srcCRS The CRS that the incoming geometry is in
* @param destCRS The CRS that the outgoing geometry should be in
* @return A MultiPolygon
*/
def parseGeometry(geoJson: String, srcCRS: CRS, destCRS: CRS): MultiPolygon = {
geoJson.parseJson.convertTo[Geometry] match {
case p: Polygon => MultiPolygon(p.reproject(srcCRS, destCRS))
case mp: MultiPolygon => mp.reproject(srcCRS, destCRS)
case _ => MultiPolygon()
}
}

/**
* For a given config and CRS key, return one of several recognized
* [[geotrellis.proj4.CRS]]s, or raise an error.
*
* @param crs The key (e.g. "input.rasterCRS")
* @return A CRS
*/
def getCRS(crs: String): CRS = crs match {
case "LatLng" => LatLng
case "WebMercator" => WebMercator
case "ConusAlbers" => ConusAlbers
case s: String => throw new Exception(s"Unknown CRS: $s")
}

/**
* From a sequence of layers, return a Map of Tile sequences.
*
* @param layers A sequence of TileLayerCollections
* @return A map of Tile sequences, keyed with the SpatialKey
*/
def joinCollectionLayers(
layers: Seq[TileLayerCollection[SpatialKey]]
): Map[SpatialKey, Seq[Tile]] = {
val maps: Seq[Map[SpatialKey, Tile]] = layers.map((_: Seq[(SpatialKey, Tile)]).toMap)
val keySet: Array[SpatialKey] = maps.map(_.keySet).reduce(_ union _).toArray
for (key: SpatialKey <- keySet) yield {
val tiles: Seq[Tile] = maps.map(_.apply(key))
key -> tiles
}
}.toMap

/**
* For a layerId and a shape, retrieve an intersecting TileLayerCollection.
*
* @param layerId The LayerId
* @param shape The shape as a MultiPolygon
* @return A TileLayerCollection intersecting the shape
*/
def fetchCroppedLayer(
layerId: LayerId,
shape: MultiPolygon
): TileLayerCollection[SpatialKey] =
baseLayerReader
.query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](layerId)
.where(Intersects(shape))
.result
}
123 changes: 27 additions & 96 deletions api/src/main/scala/WebServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,27 @@ import DefaultJsonProtocol._
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging

import geotrellis.raster.NODATA
case class InputData(
operationType: String,
rasters: List[String],
zoom: Int,
polygonCRS: String,
rasterCRS: String,
polygon: List[String]
)

case class PostRequest(input: InputData)
case class Result(result: Map[String, Int])

object PostRequestProtocol extends DefaultJsonProtocol {
implicit val inputFormat = jsonFormat6(InputData)
implicit val postFormat = jsonFormat1(PostRequest)
implicit val resultFormat = jsonFormat1(Result)
}

object WebServer extends HttpApp with App with LazyLogging with Geoprocessing {
import PostRequestProtocol._

object WebServer extends HttpApp with App with LazyLogging {
def routes: Route =
get {
path("ping") {
Expand All @@ -20,100 +38,13 @@ object WebServer extends HttpApp with App with LazyLogging {
} ~
post {
path("run") {
entity(as[String]) { input =>
println(input)
val output = Map(
List(90, 1) -> 1,
List(31, 3) -> 57,
List(81, 7) -> 514,
List(52, 7) -> 416,
List(43, 7) -> 36,
List(21, 3) -> 8134,
List(43, 2) -> 46,
List(23, 2) -> 670,
List(24, 1) -> 62,
List(23, 1) -> 512,
List(23, 7) -> 311,
List(41, 1) -> 50,
List(21, 6) -> 176,
List(21, NODATA) -> 16027,
List(81, 3) -> 2647,
List(31, 2) -> 21,
List(71, 4) -> 126,
List(42, 3) -> 72,
List(52, 1) -> 13,
List(43, 4) -> 50,
List(31, 4) -> 165,
List(71, NODATA) -> 72,
List(22, 7) -> 969,
List(22, NODATA) -> 16279,
List(31, 7) -> 58,
List(24, 7) -> 33,
List(22, 1) -> 603,
List(81, 6) -> 10,
List(82, 4) -> 2133,
List(41, 4) -> 5379,
List(82, NODATA) -> 268,
List(22, 2) -> 1636,
List(21, 1) -> 1601,
List(81, 2) -> 1956,
List(90, 6) -> 19,
List(41, 2) -> 3008,
List(41, 7) -> 4232,
List(81, 1) -> 28,
List(95, 3) -> 14,
List(23, 6) -> 10,
List(82, 3) -> 1889,
List(42, 2) -> 13,
List(21, 4) -> 6982,
List(43, NODATA) -> 106,
List(52, 4) -> 971,
List(82, 7) -> 306,
List(90, 4) -> 509,
List(95, 4) -> 27,
List(21, 7) -> 3241,
List(81, NODATA) -> 1086,
List(52, NODATA) -> 585,
List(71, 6) -> 7,
List(11, 1) -> 2,
List(71, 2) -> 157,
List(90, NODATA) -> 399,
List(11, NODATA) -> 32,
List(41, 3) -> 4419,
List(24, 3) -> 372,
List(42, 4) -> 43,
List(11, 4) -> 5,
List(95, 7) -> 20,
List(22, 4) -> 2876,
List(90, 7) -> 2500,
List(24, 4) -> 100,
List(41, NODATA) -> 2068,
List(82, 2) -> 1716,
List(52, 3) -> 960,
List(42, NODATA) -> 25,
List(95, 2) -> 2,
List(90, 3) -> 404,
List(52, 2) -> 357,
List(22, 6) -> 47,
List(31, NODATA) -> 63,
List(95, NODATA) -> 49,
List(23, 3) -> 1188,
List(23, NODATA) -> 7223,
List(41, 6) -> 62,
List(24, NODATA) -> 3148,
List(24, 2) -> 78,
List(21, 2) -> 4397,
List(22, 3) -> 2820,
List(52, 6) -> 7,
List(90, 2) -> 108,
List(43, 3) -> 91,
List(71, 7) -> 101,
List(81, 4) -> 2681,
List(71, 3) -> 221,
List(23, 4) -> 1062,
List(82, 1) -> 33
) map { case (k, v) => k.toString -> v}
complete(output.toJson)
entity(as[PostRequest]) { data =>
data.input.operationType match {
case "RasterGroupedCount" =>
complete(getRasterGroupedCount(data.input))
case _ =>
throw new Exception(s"Unknown operationType: ${data.input.operationType}")
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion project/build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object Version {
lazy val akkaVersion = "2.4.16"
lazy val akkaHttpCorsVersion = "0.2.1"
lazy val scalaLoggingVersion = "3.7.2"
lazy val sparkCoreVersion = "2.1.1"
}

object Geoprocessing extends Build {
Expand Down Expand Up @@ -102,7 +103,8 @@ object Geoprocessing extends Build {
"com.typesafe.akka" %% "akka-stream" % Version.akkaVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % Version.akkaHttpVersion,
"org.scalatest" %% "scalatest" % Version.scalatest % "test",
"com.typesafe.scala-logging" %% "scala-logging" % Version.scalaLoggingVersion
"com.typesafe.scala-logging" %% "scala-logging" % Version.scalaLoggingVersion,
"org.apache.spark" %% "spark-core" % Version.sparkCoreVersion
)
) ++
defaultAssemblySettings
Expand Down

0 comments on commit f55a6eb

Please sign in to comment.