diff --git a/src/main/scala/com/springml/spark/sftp/DefaultSource.scala b/src/main/scala/com/springml/spark/sftp/DefaultSource.scala index a62e57a..1eb25f9 100644 --- a/src/main/scala/com/springml/spark/sftp/DefaultSource.scala +++ b/src/main/scala/com/springml/spark/sftp/DefaultSource.scala @@ -57,6 +57,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr val header = parameters.getOrElse("header", "true") val delimiter = parameters.getOrElse("delimiter", ",") val quote = parameters.getOrElse("quote", "\"") + val quoteAll = parameters.getOrElse("quoteAll", "false") val escape = parameters.getOrElse("escape", "\\") val multiLine = parameters.getOrElse("multiLine", "false") val createDF = parameters.getOrElse("createDF", "true") @@ -87,7 +88,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr logger.info("Returning an empty dataframe after copying files...") createReturnRelation(sqlContext, schema) } else { - DatasetRelation(fileLocation, fileType, inferSchemaFlag, header, delimiter, quote, escape, multiLine, rowTag, schema, + DatasetRelation(fileLocation, fileType, inferSchemaFlag, header, delimiter, quote, quoteAll, escape, multiLine, rowTag, schema, sqlContext) } } @@ -114,6 +115,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr val cryptoAlgorithm = parameters.getOrElse("cryptoAlgorithm", "AES") val delimiter = parameters.getOrElse("delimiter", ",") val quote = parameters.getOrElse("quote", "\"") + val quoteAll = parameters.getOrElse("quoteAll", "false") val escape = parameters.getOrElse("escape", "\\") val multiLine = parameters.getOrElse("multiLine", "false") val codec = parameters.getOrElse("codec", null) @@ -127,7 +129,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr val sftpClient = getSFTPClient(username, password, pemFileLocation, pemPassphrase, host, port, cryptoKey, cryptoAlgorithm) - val tempFile = writeToTemp(sqlContext, data, hdfsTemp, tmpFolder, fileType, header, delimiter, quote, escape, multiLine, codec, rowTag, rootTag) + val tempFile = writeToTemp(sqlContext, data, hdfsTemp, tmpFolder, fileType, header, delimiter, quote, quoteAll, escape, multiLine, codec, rowTag, rootTag) upload(tempFile, path, sftpClient) return createReturnRelation(data) @@ -234,7 +236,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr private def writeToTemp(sqlContext: SQLContext, df: DataFrame, hdfsTemp: String, tempFolder: String, fileType: String, header: String, - delimiter: String, quote: String, escape: String, multiLine: String, codec: String, rowTag: String, rootTag: String) : String = { + delimiter: String, quote: String, quote: Boolean, escape: String, multiLine: String, codec: String, rowTag: String, rootTag: String) : String = { val randomSuffix = "spark_sftp_connection_temp_" + UUID.randomUUID val hdfsTempLocation = hdfsTemp + File.separator + randomSuffix val localTempLocation = tempFolder + File.separator + randomSuffix @@ -251,6 +253,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr option("header", header). option("delimiter", delimiter). option("quote", quote). + option("quoteAll", quoteAll). option("escape", escape). option("multiLine", multiLine). optionNoNull("codec", Option(codec)).