Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 44 additions & 23 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,47 +1,68 @@
name := "spark-sftp"
lazy val scala211Version = "2.11.12"
lazy val scala212Version = "2.12.10"
lazy val sparkVersion = "3.2.1"

organization := "com.springml"
lazy val commonSettings = Seq(
name := "spark-sftp",
organization := "com.springml",
version := "1.3.3",
scalaVersion := scala212Version,
crossScalaVersions := Seq(scala212Version)
)

javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")

scalaVersion := "2.11.8"
initialize := {
val _ = initialize.value
val javaVersion = sys.props("java.specification.version")
if (javaVersion != "1.8")
sys.error("Java 1.8 is required for this project. Found " + javaVersion + " instead")
}

sparkVersion := "2.3.0"

spName := "springml/spark-sftp"
lazy val shaded = (project in file("."))
.settings(commonSettings)

version := "1.1.4"
// Test dependencies
lazy val commonTestDependencies = Seq(
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.apache.avro" % "avro-mapred" % "1.7.7" % "test" exclude("org.mortbay.jetty", "servlet-api"),
"org.apache.spark" %% "spark-hive" % sparkVersion % "test"
)

// Dependent libraries
libraryDependencies ++= Seq(
libraryDependencies ++= (commonTestDependencies ++ Seq(
//spark libs
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
//spark -dependents
"org.apache.spark" %% "spark-avro" % sparkVersion,
"com.springml" % "sftp.client" % "1.0.3",
"org.mockito" % "mockito-core" % "2.0.31-beta",
"com.databricks" % "spark-xml_2.11" % "0.4.1"
)
"com.databricks" %% "spark-xml" % "0.5.0"
))

// used spark components
sparkComponents += "sql"

// Repositories
resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven"

// Spark packages
spDependencies += "com.databricks/spark-avro_2.11:3.2.0"

// Test dependencies
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test"
libraryDependencies += "org.apache.avro" % "avro-mapred" % "1.7.7" % "test" exclude("org.mortbay.jetty", "servlet-api")
libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test"

spIgnoreProvided := true



// licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"))

credentials += Credentials(Path.userHome / ".ivy2" / ".credentials")
//credentials += Credentials(Path.userHome / ".ivy2" / ".credentials")

credentials += Credentials("Sonatype Nexus Repository Manager", "example.com", "deployment", "deployment_pwd")

publishTo := {
val nexus = "https://oss.sonatype.org/"
val nexus = "http://example.com:8081/nexus/repository"
if (version.value.endsWith("SNAPSHOT"))
Some("snapshots" at nexus + "content/repositories/snapshots")
Some("snapshots" at nexus + "/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
Some("releases" at nexus + "/releases")
}

pomExtra := (
Expand All @@ -64,4 +85,4 @@ pomExtra := (
<name>Springml</name>
<url>http://www.springml.com</url>
</developer>
</developers>)
</developers>)
7 changes: 2 additions & 5 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
// You may use this file to add plugin dependencies for sbt.
resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/"

addSbtPlugin("org.spark-packages" %% "sbt-spark-package" % "0.2.5")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.1")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
resolvers += Resolver.mavenLocal
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.7")
3 changes: 1 addition & 2 deletions src/main/scala/com/springml/spark/sftp/DatasetRelation.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.springml.spark.sftp

import com.databricks.spark.avro._
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
Expand Down Expand Up @@ -36,7 +35,7 @@ case class DatasetRelation(
var df: DataFrame = null

df = fileType match {
case "avro" => dataframeReader.avro(fileLocation)
case "avro" => dataframeReader.format("avro").load(fileLocation)
case "txt" => dataframeReader.format("text").load(fileLocation)
case "xml" => dataframeReader.format(constants.xmlClass)
.option(constants.xmlRowTag, rowTag)
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/com/springml/spark/sftp/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
val hdfsPath = new Path(hdfsTemp)
val fs = hdfsPath.getFileSystem(hadoopConf)
if ("hdfs".equalsIgnoreCase(fs.getScheme)) {
logger.debug("#### copyFromHdfs-->fs.getScheme: "+fs.getScheme)
if ("dbfs".equalsIgnoreCase(fs.getScheme) || "hdfs".equalsIgnoreCase(fs.getScheme)) {
logger.debug(s"#### Copying from ${fs.getScheme}:$hdfsTemp -->$fileLocation: ")
fs.copyToLocalFile(new Path(hdfsTemp), new Path(fileLocation))
fs.deleteOnExit(new Path(hdfsTemp))
return fileLocation
Expand Down Expand Up @@ -272,6 +274,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr

private def copiedFile(tempFileLocation: String) : String = {
val baseTemp = new File(tempFileLocation)
print("#### copiedFile-->baseTemp.listFiles(): "+baseTemp.listFiles().mkString("Array(",",",")"))
val files = baseTemp.listFiles().filter { x =>
(!x.isDirectory()
&& !x.getName.contains("SUCCESS")
Expand Down