Skip to content

Commit cae8e91

Browse files
committed
Initial implementation of tiles writer.
1 parent 92b7f7f commit cae8e91

File tree

12 files changed

+481
-297
lines changed

12 files changed

+481
-297
lines changed

Diff for: build.sbt

+2-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ lazy val datasource = project
125125
geotrellis("s3").value excludeAll ExclusionRule(organization = "com.github.mpilquist"),
126126
spark("core").value % Provided,
127127
spark("mllib").value % Provided,
128-
spark("sql").value % Provided
128+
spark("sql").value % Provided,
129+
`better-files`
129130
),
130131
Compile / console / scalacOptions ~= { _.filterNot(Set("-Ywarn-unused-import", "-Ywarn-unused:imports")) },
131132
Test / console / scalacOptions ~= { _.filterNot(Set("-Ywarn-unused-import", "-Ywarn-unused:imports")) },

Diff for: core/src/main/scala/org/apache/spark/sql/rf/package.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ package object rf {
5656

5757
/** Lookup the registered Catalyst UDT for the given Scala type. */
5858
def udtOf[T >: Null: TypeTag]: UserDefinedType[T] =
59-
UDTRegistration.getUDTFor(typeTag[T].tpe.toString).map(_.newInstance().asInstanceOf[UserDefinedType[T]])
59+
UDTRegistration.getUDTFor(typeTag[T].tpe.toString).map(_.getDeclaredConstructor().newInstance().asInstanceOf[UserDefinedType[T]])
6060
.getOrElse(throw new IllegalArgumentException(typeTag[T].tpe + " doesn't have a corresponding UDT"))
6161

6262
/** Creates a Catalyst expression for flattening the fields in a struct into columns. */

Diff for: datasource/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ org.locationtech.rasterframes.datasource.geotrellis.GeoTrellisCatalog
44
org.locationtech.rasterframes.datasource.raster.RasterSourceDataSource
55
org.locationtech.rasterframes.datasource.geojson.GeoJsonDataSource
66
org.locationtech.rasterframes.datasource.stac.api.StacApiDataSource
7+
org.locationtech.rasterframes.datasource.tiles.TilesDataSource

Diff for: datasource/src/main/scala/org/locationtech/rasterframes/datasource/Poke.scala

-15
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2021 Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*
20+
*/
21+
package org.locationtech.rasterframes.datasource.tiles
22+
23+
import com.typesafe.scalalogging.Logger
24+
import geotrellis.proj4.CRS
25+
import geotrellis.raster.io.geotiff.compression.DeflateCompression
26+
import geotrellis.raster.io.geotiff.tags.codes.ColorSpace
27+
import geotrellis.raster.io.geotiff.{GeoTiffOptions, MultibandGeoTiff, Tags, Tiled}
28+
import geotrellis.raster.render.ColorRamps
29+
import geotrellis.raster.{MultibandTile, Tile}
30+
import geotrellis.store.hadoop.{SerializableConfiguration, _}
31+
import geotrellis.vector.Extent
32+
import org.apache.hadoop.conf.Configuration
33+
import org.apache.hadoop.fs.{FileSystem, Path}
34+
import org.apache.hadoop.io.IOUtils
35+
import org.apache.spark.sql.catalyst.encoders.RowEncoder
36+
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
37+
import org.apache.spark.sql.types.{StringType, StructField, StructType}
38+
import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoders, Row, SQLContext, SaveMode, functions => F}
39+
import org.locationtech.rasterframes._
40+
import org.locationtech.rasterframes.encoders.SparkBasicEncoders
41+
import org.locationtech.rasterframes.util._
42+
import org.slf4j.LoggerFactory
43+
44+
import java.io.IOException
45+
import java.net.URI
46+
import scala.util.Try
47+
48+
class TilesDataSource extends DataSourceRegister with CreatableRelationProvider {
49+
import TilesDataSource._
50+
@transient protected lazy val logger = Logger(LoggerFactory.getLogger(getClass.getName))
51+
override def shortName(): String = SHORT_NAME
52+
53+
/**
54+
* Credit: https://stackoverflow.com/a/50545815/296509
55+
*/
56+
def copyMerge(
57+
srcFS: FileSystem, srcDir: Path,
58+
dstFS: FileSystem, dstFile: Path,
59+
deleteSource: Boolean, conf: Configuration
60+
): Boolean = {
61+
62+
if (dstFS.exists(dstFile))
63+
throw new IOException(s"Target $dstFile already exists")
64+
65+
// Source path is expected to be a directory:
66+
if (srcFS.getFileStatus(srcDir).isDirectory()) {
67+
68+
val outputFile = dstFS.create(dstFile)
69+
Try {
70+
srcFS
71+
.listStatus(srcDir)
72+
.sortBy(_.getPath.getName)
73+
.collect {
74+
case status if status.isFile() =>
75+
val inputFile = srcFS.open(status.getPath())
76+
Try(IOUtils.copyBytes(inputFile, outputFile, conf, false))
77+
inputFile.close()
78+
}
79+
}
80+
outputFile.close()
81+
82+
if (deleteSource) srcFS.delete(srcDir, true) else true
83+
}
84+
else false
85+
}
86+
87+
private def writeCatalog(pipeline: Dataset[Row], pathURI: URI, conf: SerializableConfiguration) = {
88+
// A bit of a hack here. First we write the CSV using Spark's CSV writer, then we clean up all the Hadoop noise.
89+
val fName = "catalog.csv"
90+
val hPath = new Path(new Path(pathURI), "_" + fName)
91+
pipeline
92+
.coalesce(1)
93+
.write
94+
.option("header", "true")
95+
.csv(hPath.toString)
96+
97+
val fs = FileSystem.get(pathURI, conf.value)
98+
val localPath = new Path(new Path(pathURI), fName)
99+
copyMerge(fs, hPath, fs, localPath, true, conf.value)
100+
}
101+
102+
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
103+
val pathURI = parameters.path.getOrElse(throw new IllegalArgumentException("Valid URI 'path' parameter required."))
104+
require(pathURI.getScheme == "file" || pathURI.getScheme == null, "Currently only 'file://' destinations are supported")
105+
106+
val tileCols = data.tileColumns
107+
require(tileCols.nonEmpty, "Could not find any tile columns.")
108+
109+
val filenameCol = parameters.filenameColumn
110+
.map(F.col)
111+
.getOrElse(F.monotonically_increasing_id().cast(StringType))
112+
113+
val SpatialComponents(crsCol, extentCol, _, _) = projectSpatialComponents(data) match {
114+
case Some(parts) => parts
115+
case _ => throw new IllegalArgumentException("Could not find extent and/or CRS data.")
116+
}
117+
118+
val tags = Tags(Map.empty,
119+
tileCols.map(c => Map("source_column" -> c.columnName)).toList
120+
)
121+
122+
// We make some assumptions here.... eventually have column metadata encode this.
123+
val colorSpace = tileCols.size match {
124+
case 3 | 4 => ColorSpace.RGB
125+
case _ => ColorSpace.BlackIsZero
126+
}
127+
128+
val metadataCols = parameters.metadataColumns
129+
130+
// Default format options.
131+
val tiffOptions = GeoTiffOptions(Tiled, DeflateCompression, colorSpace)
132+
133+
val outRowEnc = RowEncoder(StructType(
134+
StructField("filename", StringType) +:
135+
StructField("bbox", StringType) +:
136+
StructField("crs", StringType) +:
137+
metadataCols.map(n =>
138+
StructField(n, StringType)
139+
)
140+
))
141+
142+
val hconf = SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
143+
144+
// Spark ceremony for reifying row contents.
145+
import SparkBasicEncoders._
146+
val inRowEnc = Encoders.tuple(
147+
stringEnc, crsExpressionEncoder, extentEncoder, arrayEnc[Tile], arrayEnc[String])
148+
type RowStuff = (String, CRS, Extent, Array[Tile], Array[String])
149+
val pipeline = data
150+
.select(filenameCol, crsCol, extentCol, F.array(tileCols.map(rf_tile): _*),
151+
F.array(metadataCols.map(data.apply).map(_.cast(StringType)): _*))
152+
.na.drop()
153+
.as[RowStuff](inRowEnc)
154+
.mapPartitions { rows =>
155+
for ((filename, crs, extent, tiles, metadata) <- rows) yield {
156+
val md = metadataCols.zip(metadata).toMap
157+
158+
val finalFilename = if (parameters.asPNG) {
159+
val fnl = filename.toLowerCase()
160+
if (!fnl.endsWith("png")) filename + ".png" else filename
161+
}
162+
else {
163+
val fnl = filename.toLowerCase()
164+
if (!(fnl.endsWith("tiff") || fnl.endsWith("tif"))) filename + ".tif" else filename
165+
}
166+
167+
val finalPath = new Path(new Path(pathURI), finalFilename)
168+
169+
if (parameters.asPNG) {
170+
// `Try` below is due to https://github.com/locationtech/geotrellis/issues/2621
171+
val scaled = tiles.map(t => Try(t.rescale(0, 255)).getOrElse(t))
172+
if (scaled.length > 1)
173+
MultibandTile(scaled).renderPng().write(finalPath, hconf.value)
174+
else
175+
scaled.head.renderPng(ColorRamps.greyscale(255)).write(finalPath, hconf.value)
176+
}
177+
else {
178+
val chipTags = tags.copy(headTags = md.updated("base_filename", filename))
179+
val geotiff = new MultibandGeoTiff(MultibandTile(tiles), extent, crs, chipTags, tiffOptions)
180+
geotiff.write(finalPath, hconf.value)
181+
}
182+
// Ordering:
183+
// bbox = left,bottom,right,top
184+
// bbox = min Longitude , min Latitude , max Longitude , max Latitude
185+
// Avoiding commas with this format:
186+
// [0.489|51.28|0.236|51.686]
187+
val bbox = s"[${extent.xmin}|${extent.ymin}|${extent.xmax}|${extent.ymax}]"
188+
Row(finalFilename +: bbox +: crs.toProj4String +: metadata: _*)
189+
}
190+
}(outRowEnc)
191+
192+
if (parameters.withCatalog)
193+
writeCatalog(pipeline, pathURI, hconf)
194+
else
195+
pipeline.foreach(_ => ())
196+
197+
// The `createRelation` function here is called by
198+
// `org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand`, which
199+
// ignores the return value. It in turn returns `Seq.empty[Row]` (which is then also ignored)...
200+
// ¯\_(ツ)_/¯
201+
null
202+
}
203+
}
204+
205+
object TilesDataSource {
206+
final val SHORT_NAME = "tiles"
207+
// writing
208+
final val PATH_PARAM = "path"
209+
final val FILENAME_COL_PARAM = "filename"
210+
final val CATALOG_PARAM = "catalog"
211+
final val METADATA_PARAM = "metadata"
212+
final val AS_PNG_PARAM = "png"
213+
214+
case class SpatialComponents(crsColumn: Column,
215+
extentColumn: Column,
216+
dimensionColumn: Column,
217+
cellTypeColumn: Column)
218+
219+
220+
object SpatialComponents {
221+
def apply(tileColumn: Column, crsColumn: Column, extentColumn: Column): SpatialComponents = {
222+
val dim = rf_dimensions(tileColumn) as "dims"
223+
val ct = rf_cell_type(tileColumn) as "cellType"
224+
SpatialComponents(crsColumn, extentColumn, dim, ct)
225+
}
226+
def apply(prColumn : Column): SpatialComponents = {
227+
SpatialComponents(
228+
rf_crs(prColumn) as "crs",
229+
rf_extent(prColumn) as "extent",
230+
rf_dimensions(prColumn) as "dims",
231+
rf_cell_type(prColumn) as "cellType"
232+
)
233+
}
234+
}
235+
236+
protected[rasterframes]
237+
implicit class TilesDictAccessors(val parameters: Map[String, String]) extends AnyVal {
238+
def filenameColumn: Option[String] =
239+
parameters.get(FILENAME_COL_PARAM)
240+
241+
def path: Option[URI] =
242+
datasource.uriParam(PATH_PARAM, parameters)
243+
244+
def withCatalog: Boolean =
245+
parameters.get(CATALOG_PARAM).exists(_.toBoolean)
246+
247+
def metadataColumns: Seq[String] =
248+
parameters.get(METADATA_PARAM).toSeq.flatMap(_.split(','))
249+
250+
def asPNG: Boolean =
251+
parameters.get(AS_PNG_PARAM).exists(_.toBoolean)
252+
}
253+
254+
/**
255+
* If the given DataFrame has extent and CRS columns return the DataFrame, the CRS column an extent column.
256+
* Otherwise, see if there's a `ProjectedRaster` column add `crs` and `extent` columns extracted from the
257+
* `ProjectedRaster` column to the returned DataFrame.
258+
*
259+
* @param d DataFrame to process.
260+
* @return Tuple containing the updated DataFrame followed by the CRS column and the extent column
261+
*/
262+
def projectSpatialComponents(d: DataFrame): Option[SpatialComponents] =
263+
d.tileColumns.headOption.zip(d.crsColumns.headOption.zip(d.extentColumns.headOption)).headOption
264+
.map { case (tile, (crs, extent)) => SpatialComponents(tile, crs, extent) }
265+
.orElse(
266+
d.projRasterColumns.headOption
267+
.map(pr => SpatialComponents(pr))
268+
)
269+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright (c) 2021. Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*/
20+
21+
package org.locationtech.rasterframes.datasource
22+
23+
import org.apache.spark.sql.DataFrameWriter
24+
import shapeless.tag.@@
25+
26+
package object tiles {
27+
trait TilesDataFrameReaderTag
28+
trait TilesDataFrameWriterTag
29+
30+
type TilesDataFrameWriter[T] = DataFrameWriter[T] @@ TilesDataFrameWriterTag
31+
32+
/** Adds `tiles` format specifier to `DataFrameWriter` */
33+
implicit class DataFrameWriterHasTilesWriter[T](val writer: DataFrameWriter[T]) {
34+
def tiles: TilesDataFrameWriter[T] =
35+
shapeless.tag[TilesDataFrameWriterTag][DataFrameWriter[T]](
36+
writer.format(TilesDataSource.SHORT_NAME))
37+
}
38+
39+
/** Options for `tiles` format writer. */
40+
implicit class TilesWriterOps[T](val writer: TilesDataFrameWriter[T]) extends TilesWriterOptionsSupport[T]
41+
42+
trait TilesWriterOptionsSupport[T] {
43+
val writer: TilesDataFrameWriter[T]
44+
45+
/**
46+
* Provide the name of a column whose row value will be used as the output filename.
47+
* Generated value may have path components in it. Appropriate filename extension will be automatically added.
48+
*
49+
* @param colName name of column to use.
50+
*/
51+
def withFilenameColumn(colName: String): TilesDataFrameWriter[T] = {
52+
shapeless.tag[TilesDataFrameWriterTag][DataFrameWriter[T]](
53+
writer.option(TilesDataSource.FILENAME_COL_PARAM, colName)
54+
)
55+
}
56+
57+
/**
58+
* Enable generation of a `catalog.csv` file along with the tile filesf listing the file paths relative to
59+
* the base directory along with any identified metadata values vai `withMetadataColumns`.
60+
*/
61+
def withCatalog: TilesDataFrameWriter[T] = {
62+
shapeless.tag[TilesDataFrameWriterTag][DataFrameWriter[T]](
63+
writer.option(TilesDataSource.CATALOG_PARAM, true.toString)
64+
)
65+
}
66+
67+
/**
68+
* Specify column values to to add to chip metadata and catalog (when written).
69+
*
70+
* @param colNames names of columns to add. Values are automatically cast-ed to `String`
71+
*/
72+
def withMetadataColumns(colNames: String*): TilesDataFrameWriter[T] = {
73+
shapeless.tag[TilesDataFrameWriterTag][DataFrameWriter[T]](
74+
writer.option(TilesDataSource.METADATA_PARAM, colNames.mkString(","))
75+
)
76+
}
77+
78+
/** Request Tiles be written out in PNG format. GeoTIFF is the default. */
79+
def asPNG: TilesDataFrameWriter[T] = {
80+
shapeless.tag[TilesDataFrameWriterTag][DataFrameWriter[T]](
81+
writer.option(TilesDataSource.AS_PNG_PARAM, true.toString)
82+
)
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)