-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move to Collections API and optimize count algorithm #47
Comments
The val layerReader = S3CollectionLayerReader("datahub-catalogs-us-east-1", "")
val queryPoly: MultiPolygon = ???
val nlcdLayerId = LayerId("nlcd-2011-30m-epsg5070-int8", 0)
val soilLayerId = LayerId("ssurgo-hydro-groups-30m-epsg5070-int8", 0)
val nlcdFetch =
Future {
baseLayerReader
.query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](nlcdLayerId)
.where(Intersects(poly)).result
}
val soilFetch =
Future {
baseLayerReader
.query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](soilLayerId)
.where(Intersects(poly)).result
}
for(
nlcdLayer <- nlcdFetch;
soilLayer <- soilFetch
) yield {
histograms(
nlcdLayer,
soilLayer,
queryPoly
)
} |
(I apologize in advance for the length of this comment.) Hi @lossyrob, In our production code, we have to support 1-3 layers. Thus, we cannot work with specific ones as shown in the code above. To support a dynamic number of layers, we usually do something like: val layerReader = S3CollectionLayerReader("datahub-catalogs-us-east-1", "")
def layerFetch (
layerId: LayerId,
multiPolygon: MultiPolygon
): TileLayerCollection[SpatialKey] =
layerReader
.query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](layerId)
.where(Intersects(multiPolygon))
.result
def doTheThing (
input: InputJson
) = {
val layerIds = input.layerIds.map(lid => LayerId(lid, input.zoom))
val multiPolygon = input.polygons.unionGeometries.asMultiPolygon.get
val layers = layerIds.map(layerFetch(_, multiPolygon))
rasterGroupedCount(layers, multiPolygon)
} Where We adapted your def rasterGroupedCount (
layers: Seq[TileLayerCollection[SpatialKey]],
multiPolygon: MultiPolygon
): Map[Seq[Int], Long] = {
val mapTransform = layers.head.metadata.mapTransform
joinCollectionLayers(layers)
.par
.map { case (key, tiles) =>
val tileResult = mutable.Map[Seq[Int], Long]()
val re = RasterExtent(mapTransform(key), tiles.head.cols, tiles.head.rows)
if (multiPolygon.contains(re.extent)) {
cfor(0)(_ < re.cols, _ + 1) { col =>
cfor(0)(_ < re.rows, _ + 1) { row =>
val vs = tiles.map(_.get(col, row))
if (!tileResult.contains(vs)) {
tileResult(vs) = 0
}
tileResult(vs) += 1
}
}
} else {
multiPolygon.foreach(re, Options(includePartial=true, sampleType=PixelIsArea)) { (col, row) =>
val vs = tiles.map(_.get(col, row))
if (!tileResult.contains(vs)) {
tileResult(vs) = 0
}
tileResult(vs) += 1
}
}
tileResult.toMap
}
.reduce { (m1, m2) =>
(m1.toSeq ++ m2.toSeq)
.groupBy(_._1)
.map { case (key, values) => key -> values.map(_._2).sum }
}
} Here is an alternative implementation of def rasterGroupedCount (
layers: Seq[TileLayerCollection[SpatialKey]],
multiPolygon: MultiPolygon
): Map[Seq[Int], Long] = {
val init = () => new LongAdder
val update = (_: LongAdder).increment()
val metadata = layers.head.metadata
val pixelGroups: TrieMap[Seq[Int], LongAdder] = TrieMap.empty
joinCollectionLayers(layers)
.par
.foreach { case (key, tiles) =>
val re = RasterExtent(metadata.mapTransform(key), metadata.layout.tileCols, metadata.layout.tileRows)
Rasterizer.foreachCellByMultiPolygon(multiPolygon, re) { case (col, row) =>
val pixelGroup: Seq[Int] = tiles.map(_.get(col, row))
val acc = pixelGroups.getOrElseUpdate(pixelGroup, init())
update(acc)
}
}
pixelGroups.mapValues(_.sum())
} where def joinCollectionLayers(layers: Seq[TileLayerCollection[SpatialKey]]): Map[SpatialKey, Seq[Tile]] = {
val maps: Seq[Map[SpatialKey, Tile]] = layers.map((_: Seq[(SpatialKey, Tile)]).toMap)
val keySet: Array[SpatialKey] = maps.map(_.keySet).reduce(_ union _).toArray
for (key: SpatialKey <- keySet) yield {
val tiles: Seq[Tile] = maps.map(_.apply(key))
key -> tiles
}
}.toMap In our benchmarks, we found the fastest implementations, quickest first, to be:
Given that we do want to operate on a list of layers, my questions are:
|
I tried some optimizations, but it looks like nothing I did significantly (or insignificantly) improved on the My suggestion for further optimization is to use Futures for fetching layers in parallel. This would mean changing this method to return a So the start of the refactor starts with moving the method to something like import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
def cropRastersToAOI(
rasterIds: List[String],
zoom: Int,
aoi: MultiPolygon
): Future[Seq[TileLayerCollection[SpatialKey]]] =
Future.sequence {
val futures: Seq[Future[TileLayerCollection[SpatialKey]]] =
rasterIds
.map { str =>
Future {
cropSingleRasterToAOI(str, zoom, aoi)
}
}
futures
} and fixing compiler errors from there, by changing everything to work with futures by mapping into the future. E.g.: def getRasterGroupedCount(input: InputData): Future[ResultInt] = {
val aoi = createAOIFromInput(input)
val rasterLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)
rasterLayers.map { layers => ResultInt(rasterGroupedCount(layers, aoi)) }
} |
This code provides a starting point for the Collections API optimizations, and is basically an optimized version of
def histograms(nlcd: TileLayerRDD[SpatialKey], soil: TileLayerRDD[SpatialKey], multiPolygons: Seq[MultiPolygon]): Seq[Map[(Int, Int), Int]]
that uses the Collections API:where
histograms
is:The text was updated successfully, but these errors were encountered: