Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ This library requires following options:
* `delimiter`: (Optional) Set the field delimiter. Applicable only for csv fileType. Default is comma.
* `quote`: (Optional) Set the quote character. Applicable only for csv fileType. Default is ".
* `escape`: (Optional) Set the escape character. Applicable only for csv fileType. Default is \.
* `encoding`: (Optional) Set the file encoding. Applicable only for the csv fileType. Default is UTF-8.
* `multiLine`: (Optional) Set the multiline. Applicable only for csv fileType. Default is false.
* `codec`: (Optional) Applicable only for csv fileType. Compression codec to use when saving to file. Should be the fully qualified name of a class implementing org.apache.hadoop.io.compress.CompressionCodec or one of case-insensitive shorten names (bzip2, gzip, lz4, and snappy). Defaults to no compression when a codec is not specified.

Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/springml/spark/sftp/DatasetRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ case class DatasetRelation(
delimiter: String,
quote: String,
escape: String,
encoding: String,
multiLine: String,
rowTag: String,
customSchema: StructType,
Expand Down Expand Up @@ -46,6 +47,7 @@ case class DatasetRelation(
option("delimiter", delimiter).
option("quote", quote).
option("escape", escape).
option("encoding", encoding).
option("multiLine", multiLine).
option("inferSchema", inferSchema).
csv(fileLocation)
Expand Down
9 changes: 6 additions & 3 deletions src/main/scala/com/springml/spark/sftp/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
val header = parameters.getOrElse("header", "true")
val delimiter = parameters.getOrElse("delimiter", ",")
val quote = parameters.getOrElse("quote", "\"")
val encoding = parameters.getOrElse("encoding", "UTF-8")
val escape = parameters.getOrElse("escape", "\\")
val multiLine = parameters.getOrElse("multiLine", "false")
val createDF = parameters.getOrElse("createDF", "true")
Expand Down Expand Up @@ -87,7 +88,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
logger.info("Returning an empty dataframe after copying files...")
createReturnRelation(sqlContext, schema)
} else {
DatasetRelation(fileLocation, fileType, inferSchemaFlag, header, delimiter, quote, escape, multiLine, rowTag, schema,
DatasetRelation(fileLocation, fileType, inferSchemaFlag, header, delimiter, quote, escape, encoding, multiLine, rowTag, schema,
sqlContext)
}
}
Expand Down Expand Up @@ -115,6 +116,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
val delimiter = parameters.getOrElse("delimiter", ",")
val quote = parameters.getOrElse("quote", "\"")
val escape = parameters.getOrElse("escape", "\\")
val encoding = parameters.getOrElse("encoding", "UTF-8")
val multiLine = parameters.getOrElse("multiLine", "false")
val codec = parameters.getOrElse("codec", null)
val rowTag = parameters.getOrElse(constants.xmlRowTag, null)
Expand All @@ -127,7 +129,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr

val sftpClient = getSFTPClient(username, password, pemFileLocation, pemPassphrase, host, port,
cryptoKey, cryptoAlgorithm)
val tempFile = writeToTemp(sqlContext, data, hdfsTemp, tmpFolder, fileType, header, delimiter, quote, escape, multiLine, codec, rowTag, rootTag)
val tempFile = writeToTemp(sqlContext, data, hdfsTemp, tmpFolder, fileType, header, delimiter, quote, escape, encoding, multiLine, codec, rowTag, rootTag)

upload(tempFile, path, sftpClient)
return createReturnRelation(data)
Expand Down Expand Up @@ -234,7 +236,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr

private def writeToTemp(sqlContext: SQLContext, df: DataFrame,
hdfsTemp: String, tempFolder: String, fileType: String, header: String,
delimiter: String, quote: String, escape: String, multiLine: String, codec: String, rowTag: String, rootTag: String) : String = {
delimiter: String, quote: String, escape: String, encoding: String, multiLine: String, codec: String, rowTag: String, rootTag: String) : String = {
val randomSuffix = "spark_sftp_connection_temp_" + UUID.randomUUID
val hdfsTempLocation = hdfsTemp + File.separator + randomSuffix
val localTempLocation = tempFolder + File.separator + randomSuffix
Expand All @@ -252,6 +254,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
option("delimiter", delimiter).
option("quote", quote).
option("escape", escape).
option("encoding", encoding).
option("multiLine", multiLine).
optionNoNull("codec", Option(codec)).
csv(hdfsTempLocation)
Expand Down
Binary file added src/test/resources/sample_utf-32be.csv
Binary file not shown.
4 changes: 2 additions & 2 deletions src/test/scala/com/springml/spark/sftp/CustomSchemaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CustomSchemaTest extends FunSuite with BeforeAndAfterEach {
val expectedSchema = StructType(columnStruct)

val fileLocation = getClass.getResource("/sample.csv").getPath
val dsr = DatasetRelation(fileLocation, "csv", "false", "true", ",", "\"", "\\", "false", null, expectedSchema, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "csv", "false", "true", ",", "\"", "\\", "UTF-8", "false", null, expectedSchema, ss.sqlContext)
val rdd = dsr.buildScan()

assert(dsr.schema.fields.length == columnStruct.length)
Expand All @@ -55,7 +55,7 @@ class CustomSchemaTest extends FunSuite with BeforeAndAfterEach {
val expectedSchema = StructType(columnStruct)

val fileLocation = getClass.getResource("/people.json").getPath
val dsr = DatasetRelation(fileLocation, "json", "false", "true", ",", "\"", "\\", "false", null, expectedSchema, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "json", "false", "true", ",", "\"", "\\", "UTF-8", "false", null, expectedSchema, ss.sqlContext)
val rdd = dsr.buildScan()

assert(dsr.schema.fields.length == columnStruct.length)
Expand Down
37 changes: 28 additions & 9 deletions src/test/scala/com/springml/spark/sftp/TestDatasetRelation.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.springml.spark.sftp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.scalatest.{BeforeAndAfterEach, FunSuite}

/**
Expand All @@ -15,63 +16,81 @@ class TestDatasetRelation extends FunSuite with BeforeAndAfterEach {

test ("Read CSV") {
val fileLocation = getClass.getResource("/sample.csv").getPath
val dsr = DatasetRelation(fileLocation, "csv", "false", "true", ",", "\"", "\\", "false", null, null, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "csv", "false", "true", ",", "\"", "\\", "UTF-8","false", null, null, ss.sqlContext)
val rdd = dsr.buildScan()
assert(3 == rdd.count())
}

test ("Read CSV using custom delimiter") {
val fileLocation = getClass.getResource("/sample.csv").getPath
val dsr = DatasetRelation(fileLocation, "csv", "false", "true", ";", "\"", "\\", "false", null, null, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "csv", "false", "true", ";", "\"", "\\", "UTF-8", "false", null, null, ss.sqlContext)
val rdd = dsr.buildScan()
assert(3 == rdd.count())
}

test ("Read multiline CSV using custom quote and escape") {
val fileLocation = getClass.getResource("/sample_quoted_multiline.csv").getPath
val dsr = DatasetRelation(fileLocation, "csv", "false", "true", ",", "\"", "\\", "true", null, null, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "csv", "false", "true", ",", "\"", "\\", "UTF-8", "true", null, null, ss.sqlContext)
val rdd = dsr.buildScan()
assert(3 == rdd.count())
}

test ("Read CSV encoded as UTF-32be") {
val schema = StructType( Array(
StructField("ProposalId", StringType,true),
StructField("OpportunityId", StringType,true),
StructField("Clicks", StringType,true),
StructField("Impressions", StringType,true),
StructField("Currency", StringType,true)
))

val fileLocation = getClass.getResource("/sample_utf-32be.csv").getPath
val dsr = DatasetRelation(fileLocation, "csv", "false", "true", ",", "\"", "\\", "UTF-32BE", "true", null, null, ss.sqlContext)
val rdd = dsr.buildScan()
val df = ss.createDataFrame(rdd, schema)
assert(5 == df.columns.size)
assert("£" == df.head.getString(4))
assert(3 == rdd.count())
}


test ("Read JSON") {
val fileLocation = getClass.getResource("/people.json").getPath
val dsr = DatasetRelation(fileLocation, "json", "false", "true", ",", "\"", "\\", "false", null, null, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "json", "false", "true", ",", "\"", "\\", "UTF-8", "false", null, null, ss.sqlContext)
val rdd = dsr.buildScan()
assert(3 == rdd.count())
}

test ("Read AVRO") {
val fileLocation = getClass.getResource("/users.avro").getPath
val dsr = DatasetRelation(fileLocation, "avro", "false", "true", ",", "\"", "\\", "false", null, null, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "avro", "false", "true", ",", "\"", "\\", "UTF-8", "false", null, null, ss.sqlContext)
val rdd = dsr.buildScan()
assert(2 == rdd.count())
}

test ("Read parquet") {
val fileLocation = getClass.getResource("/users.parquet").getPath
val dsr = DatasetRelation(fileLocation, "parquet", "false", "true", ",", "\"", "\\", "false", null, null, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "parquet", "false", "true", ",", "\"", "\\", "UTF-8", "false", null, null, ss.sqlContext)
val rdd = dsr.buildScan()
assert(2 == rdd.count())
}

test ("Read text file") {
val fileLocation = getClass.getResource("/plaintext.txt").getPath
val dsr = DatasetRelation(fileLocation, "txt", "false", "true", ",", "\"", "\\", "false", null, null, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "txt", "false", "true", ",", "\"", "\\", "UTF-8", "false", null, null, ss.sqlContext)
val rdd = dsr.buildScan()
assert(3 == rdd.count())
}

test ("Read xml file") {
val fileLocation = getClass.getResource("/books.xml").getPath
val dsr = DatasetRelation(fileLocation, "xml", "false", "true", ",", "\"", "\\", "false", "book", null, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "xml", "false", "true", ",", "\"", "\\", "UTF-8", "false", "book", null, ss.sqlContext)
val rdd = dsr.buildScan()
assert(12 == rdd.count())
}
test ("Read orc file") {
val fileLocation = getClass.getResource("/books.orc").getPath
val dsr = DatasetRelation(fileLocation, "orc", "false", "true", ",", "\"", "\\", "false", "book", null, ss.sqlContext)
val dsr = DatasetRelation(fileLocation, "orc", "false", "true", ",", "\"", "\\", "UTF-8", "false", "book", null, ss.sqlContext)
val rdd = dsr.buildScan()
assert(12 == rdd.count())
}
Expand Down