Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/main/scala/com/springml/spark/sftp/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
val header = parameters.getOrElse("header", "true")
val delimiter = parameters.getOrElse("delimiter", ",")
val quote = parameters.getOrElse("quote", "\"")
val quoteAll = parameters.getOrElse("quoteAll", "false")
val escape = parameters.getOrElse("escape", "\\")
val multiLine = parameters.getOrElse("multiLine", "false")
val createDF = parameters.getOrElse("createDF", "true")
Expand Down Expand Up @@ -87,7 +88,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
logger.info("Returning an empty dataframe after copying files...")
createReturnRelation(sqlContext, schema)
} else {
DatasetRelation(fileLocation, fileType, inferSchemaFlag, header, delimiter, quote, escape, multiLine, rowTag, schema,
DatasetRelation(fileLocation, fileType, inferSchemaFlag, header, delimiter, quote, quoteAll, escape, multiLine, rowTag, schema,
sqlContext)
}
}
Expand All @@ -114,6 +115,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
val cryptoAlgorithm = parameters.getOrElse("cryptoAlgorithm", "AES")
val delimiter = parameters.getOrElse("delimiter", ",")
val quote = parameters.getOrElse("quote", "\"")
val quoteAll = parameters.getOrElse("quoteAll", "false")
val escape = parameters.getOrElse("escape", "\\")
val multiLine = parameters.getOrElse("multiLine", "false")
val codec = parameters.getOrElse("codec", null)
Expand All @@ -127,7 +129,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr

val sftpClient = getSFTPClient(username, password, pemFileLocation, pemPassphrase, host, port,
cryptoKey, cryptoAlgorithm)
val tempFile = writeToTemp(sqlContext, data, hdfsTemp, tmpFolder, fileType, header, delimiter, quote, escape, multiLine, codec, rowTag, rootTag)
val tempFile = writeToTemp(sqlContext, data, hdfsTemp, tmpFolder, fileType, header, delimiter, quote, quoteAll, escape, multiLine, codec, rowTag, rootTag)

upload(tempFile, path, sftpClient)
return createReturnRelation(data)
Expand Down Expand Up @@ -234,7 +236,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr

private def writeToTemp(sqlContext: SQLContext, df: DataFrame,
hdfsTemp: String, tempFolder: String, fileType: String, header: String,
delimiter: String, quote: String, escape: String, multiLine: String, codec: String, rowTag: String, rootTag: String) : String = {
delimiter: String, quote: String, quote: Boolean, escape: String, multiLine: String, codec: String, rowTag: String, rootTag: String) : String = {
val randomSuffix = "spark_sftp_connection_temp_" + UUID.randomUUID
val hdfsTempLocation = hdfsTemp + File.separator + randomSuffix
val localTempLocation = tempFolder + File.separator + randomSuffix
Expand All @@ -251,6 +253,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
option("header", header).
option("delimiter", delimiter).
option("quote", quote).
option("quoteAll", quoteAll).
option("escape", escape).
option("multiLine", multiLine).
optionNoNull("codec", Option(codec)).
Expand Down