diff --git a/build.sbt b/build.sbt index ba0e3f0..fe99afc 100644 --- a/build.sbt +++ b/build.sbt @@ -1,47 +1,68 @@ -name := "spark-sftp" +lazy val scala211Version = "2.11.12" +lazy val scala212Version = "2.12.10" +lazy val sparkVersion = "3.2.1" -organization := "com.springml" +lazy val commonSettings = Seq( + name := "spark-sftp", + organization := "com.springml", + version := "1.3.3", + scalaVersion := scala212Version, + crossScalaVersions := Seq(scala212Version) +) + +javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint") -scalaVersion := "2.11.8" +initialize := { + val _ = initialize.value + val javaVersion = sys.props("java.specification.version") + if (javaVersion != "1.8") + sys.error("Java 1.8 is required for this project. Found " + javaVersion + " instead") +} -sparkVersion := "2.3.0" -spName := "springml/spark-sftp" +lazy val shaded = (project in file(".")) + .settings(commonSettings) -version := "1.1.4" +// Test dependencies +lazy val commonTestDependencies = Seq( + "org.scalatest" %% "scalatest" % "3.0.5" % "test", + "org.apache.avro" % "avro-mapred" % "1.7.7" % "test" exclude("org.mortbay.jetty", "servlet-api"), + "org.apache.spark" %% "spark-hive" % sparkVersion % "test" +) // Dependent libraries -libraryDependencies ++= Seq( +libraryDependencies ++= (commonTestDependencies ++ Seq( + //spark libs + "org.apache.spark" %% "spark-core" % sparkVersion, + "org.apache.spark" %% "spark-sql" % sparkVersion, + //spark -dependents + "org.apache.spark" %% "spark-avro" % sparkVersion, "com.springml" % "sftp.client" % "1.0.3", "org.mockito" % "mockito-core" % "2.0.31-beta", - "com.databricks" % "spark-xml_2.11" % "0.4.1" -) + "com.databricks" %% "spark-xml" % "0.5.0" +)) -// used spark components -sparkComponents += "sql" // Repositories resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven" -// Spark packages -spDependencies += "com.databricks/spark-avro_2.11:3.2.0" -// Test dependencies -libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test" -libraryDependencies += "org.apache.avro" % "avro-mapred" % "1.7.7" % "test" exclude("org.mortbay.jetty", "servlet-api") -libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" -spIgnoreProvided := true + + + // licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")) -credentials += Credentials(Path.userHome / ".ivy2" / ".credentials") +//credentials += Credentials(Path.userHome / ".ivy2" / ".credentials") + +credentials += Credentials("Sonatype Nexus Repository Manager", "example.com", "deployment", "deployment_pwd") publishTo := { - val nexus = "https://oss.sonatype.org/" + val nexus = "http://example.com:8081/nexus/repository" if (version.value.endsWith("SNAPSHOT")) - Some("snapshots" at nexus + "content/repositories/snapshots") + Some("snapshots" at nexus + "/snapshots") else - Some("releases" at nexus + "service/local/staging/deploy/maven2") + Some("releases" at nexus + "/releases") } pomExtra := ( @@ -64,4 +85,4 @@ pomExtra := ( Springml http://www.springml.com - ) + ) \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index 6c8c129..37c820e 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,4 @@ // You may use this file to add plugin dependencies for sbt. resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/" - -addSbtPlugin("org.spark-packages" %% "sbt-spark-package" % "0.2.5") -addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0") -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.1") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") +resolvers += Resolver.mavenLocal +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.7") \ No newline at end of file diff --git a/src/main/scala/com/springml/spark/sftp/DatasetRelation.scala b/src/main/scala/com/springml/spark/sftp/DatasetRelation.scala index 60b341c..19cdf58 100644 --- a/src/main/scala/com/springml/spark/sftp/DatasetRelation.scala +++ b/src/main/scala/com/springml/spark/sftp/DatasetRelation.scala @@ -1,6 +1,5 @@ package com.springml.spark.sftp -import com.databricks.spark.avro._ import org.apache.log4j.Logger import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext} @@ -36,7 +35,7 @@ case class DatasetRelation( var df: DataFrame = null df = fileType match { - case "avro" => dataframeReader.avro(fileLocation) + case "avro" => dataframeReader.format("avro").load(fileLocation) case "txt" => dataframeReader.format("text").load(fileLocation) case "xml" => dataframeReader.format(constants.xmlClass) .option(constants.xmlRowTag, rowTag) diff --git a/src/main/scala/com/springml/spark/sftp/DefaultSource.scala b/src/main/scala/com/springml/spark/sftp/DefaultSource.scala index a62e57a..45bfd7c 100644 --- a/src/main/scala/com/springml/spark/sftp/DefaultSource.scala +++ b/src/main/scala/com/springml/spark/sftp/DefaultSource.scala @@ -152,7 +152,9 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr val hadoopConf = sqlContext.sparkContext.hadoopConfiguration val hdfsPath = new Path(hdfsTemp) val fs = hdfsPath.getFileSystem(hadoopConf) - if ("hdfs".equalsIgnoreCase(fs.getScheme)) { + logger.debug("#### copyFromHdfs-->fs.getScheme: "+fs.getScheme) + if ("dbfs".equalsIgnoreCase(fs.getScheme) || "hdfs".equalsIgnoreCase(fs.getScheme)) { + logger.debug(s"#### Copying from ${fs.getScheme}:$hdfsTemp -->$fileLocation: ") fs.copyToLocalFile(new Path(hdfsTemp), new Path(fileLocation)) fs.deleteOnExit(new Path(hdfsTemp)) return fileLocation @@ -272,6 +274,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr private def copiedFile(tempFileLocation: String) : String = { val baseTemp = new File(tempFileLocation) + print("#### copiedFile-->baseTemp.listFiles(): "+baseTemp.listFiles().mkString("Array(",",",")")) val files = baseTemp.listFiles().filter { x => (!x.isDirectory() && !x.getName.contains("SUCCESS")