Skip to content

Commit

Permalink
[SPARK-8313] R Spark packages support
Browse files Browse the repository at this point in the history
shivaram cafreeman Could you please help me in testing this out? Exposing and running `rPackageBuilder` from inside the shell works, but for some reason, I can't get it to work during Spark Submit. It just starts relaunching Spark Submit.

For testing, you may use the R branch with [sbt-spark-package](https://github.com/databricks/sbt-spark-package). You can call spPackage, and then pass the jar using `--jars`.

Author: Burak Yavuz <[email protected]>

Closes apache#7139 from brkyvz/r-submit and squashes the following commits:

0de384f [Burak Yavuz] remove unused imports 2
d253708 [Burak Yavuz] removed unused imports
6603d0d [Burak Yavuz] addressed comments
4258ffe [Burak Yavuz] merged master
ddfcc06 [Burak Yavuz] added zipping test
3a1be7d [Burak Yavuz] don't zip
77995df [Burak Yavuz] fix URI
ac45527 [Burak Yavuz] added zipping of all libs
e6bf7b0 [Burak Yavuz] add println ignores
1bc5554 [Burak Yavuz] add assumes for tests
9778e03 [Burak Yavuz] addressed comments
b42b300 [Burak Yavuz] merged master
ffd134e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into r-submit
d867756 [Burak Yavuz] add apache header
eff5ba1 [Burak Yavuz] ready for review
8838edb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into r-submit
e5b5a06 [Burak Yavuz] added doc
bb751ce [Burak Yavuz] fix null bug
0226768 [Burak Yavuz] fixed issues
8810beb [Burak Yavuz] R packages support
  • Loading branch information
brkyvz authored and shivaram committed Aug 5, 2015
1 parent a7fe48f commit c9a4c36
Show file tree
Hide file tree
Showing 9 changed files with 538 additions and 35 deletions.
4 changes: 0 additions & 4 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,4 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

# Zip the SparkR package so that it can be distributed to worker nodes on YARN
cd $LIB_DIR
jar cfM "$LIB_DIR/sparkr.zip" SparkR

popd > /dev/null
30 changes: 30 additions & 0 deletions R/pkg/inst/tests/packageInAJarTest.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(SparkR)
library(sparkPackageTest)

sc <- sparkR.init()

run1 <- myfunc(5L)

run2 <- myfunc(-4L)

sparkR.stop()

if(run1 != 6) quit(save = "no", status = 1)

if(run2 != -3) quit(save = "no", status = 1)
14 changes: 11 additions & 3 deletions core/src/main/scala/org/apache/spark/api/r/RUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ package org.apache.spark.api.r

import java.io.File

import scala.collection.JavaConversions._

import org.apache.spark.{SparkEnv, SparkException}

private[spark] object RUtils {
/**
* Get the SparkR package path in the local spark distribution.
*/
def localSparkRPackagePath: Option[String] = {
val sparkHome = sys.env.get("SPARK_HOME")
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.test.home"))
sparkHome.map(
Seq(_, "R", "lib").mkString(File.separator)
)
Expand All @@ -46,8 +48,8 @@ private[spark] object RUtils {
(sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode"))
}

val isYarnCluster = master.contains("yarn") && deployMode == "cluster"
val isYarnClient = master.contains("yarn") && deployMode == "client"
val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster"
val isYarnClient = master != null && master.contains("yarn") && deployMode == "client"

// In YARN mode, the SparkR package is distributed as an archive symbolically
// linked to the "sparkr" file in the current directory. Note that this does not apply
Expand All @@ -62,4 +64,10 @@ private[spark] object RUtils {
}
}
}

/** Check if R is installed before running tests that use R commands. */
def isRInstalled: Boolean = {
val builder = new ProcessBuilder(Seq("R", "--version"))
builder.start().waitFor() == 0
}
}
232 changes: 232 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import java.io._
import java.util.jar.JarFile
import java.util.logging.Level
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConversions._

import com.google.common.io.{ByteStreams, Files}

import org.apache.spark.{SparkException, Logging}
import org.apache.spark.api.r.RUtils
import org.apache.spark.util.{RedirectThread, Utils}

private[deploy] object RPackageUtils extends Logging {

/** The key in the MANIFEST.mf that we look for, in case a jar contains R code. */
private final val hasRPackage = "Spark-HasRPackage"

/** Base of the shell command used in order to install R packages. */
private final val baseInstallCmd = Seq("R", "CMD", "INSTALL", "-l")

/** R source code should exist under R/pkg in a jar. */
private final val RJarEntries = "R/pkg"

/** Documentation on how the R source file layout should be in the jar. */
private[deploy] final val RJarDoc =
s"""In order for Spark to build R packages that are parts of Spark Packages, there are a few
|requirements. The R source code must be shipped in a jar, with additional Java/Scala
|classes. The jar must be in the following format:
| 1- The Manifest (META-INF/MANIFEST.mf) must contain the key-value: $hasRPackage: true
| 2- The standard R package layout must be preserved under R/pkg/ inside the jar. More
| information on the standard R package layout can be found in:
| http://cran.r-project.org/doc/contrib/Leisch-CreatingPackages.pdf
| An example layout is given below. After running `jar tf $$JAR_FILE | sort`:
|
|META-INF/MANIFEST.MF
|R/
|R/pkg/
|R/pkg/DESCRIPTION
|R/pkg/NAMESPACE
|R/pkg/R/
|R/pkg/R/myRcode.R
|org/
|org/apache/
|...
""".stripMargin.trim

/** Internal method for logging. We log to a printStream in tests, for debugging purposes. */
private def print(
msg: String,
printStream: PrintStream,
level: Level = Level.FINE,
e: Throwable = null): Unit = {
if (printStream != null) {
// scalastyle:off println
printStream.println(msg)
// scalastyle:on println
if (e != null) {
e.printStackTrace(printStream)
}
} else {
level match {
case Level.INFO => logInfo(msg)
case Level.WARNING => logWarning(msg)
case Level.SEVERE => logError(msg, e)
case _ => logDebug(msg)
}
}
}

/**
* Checks the manifest of the Jar whether there is any R source code bundled with it.
* Exposed for testing.
*/
private[deploy] def checkManifestForR(jar: JarFile): Boolean = {
val manifest = jar.getManifest.getMainAttributes
manifest.getValue(hasRPackage) != null && manifest.getValue(hasRPackage).trim == "true"
}

/**
* Runs the standard R package installation code to build the R package from source.
* Multiple runs don't cause problems.
*/
private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = {
// this code should be always running on the driver.
val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse(
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package."))
val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator)
val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg)
if (verbose) {
print(s"Building R package with the command: $installCmd", printStream)
}
try {
val builder = new ProcessBuilder(installCmd)
builder.redirectErrorStream(true)
val env = builder.environment()
env.clear()
val process = builder.start()
new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start()
process.waitFor() == 0
} catch {
case e: Throwable =>
print("Failed to build R package.", printStream, Level.SEVERE, e)
false
}
}

/**
* Extracts the files under /R in the jar to a temporary directory for building.
*/
private def extractRFolder(jar: JarFile, printStream: PrintStream, verbose: Boolean): File = {
val tempDir = Utils.createTempDir(null)
val jarEntries = jar.entries()
while (jarEntries.hasMoreElements) {
val entry = jarEntries.nextElement()
val entryRIndex = entry.getName.indexOf(RJarEntries)
if (entryRIndex > -1) {
val entryPath = entry.getName.substring(entryRIndex)
if (entry.isDirectory) {
val dir = new File(tempDir, entryPath)
if (verbose) {
print(s"Creating directory: $dir", printStream)
}
dir.mkdirs
} else {
val inStream = jar.getInputStream(entry)
val outPath = new File(tempDir, entryPath)
Files.createParentDirs(outPath)
val outStream = new FileOutputStream(outPath)
if (verbose) {
print(s"Extracting $entry to $outPath", printStream)
}
Utils.copyStream(inStream, outStream, closeStreams = true)
}
}
}
tempDir
}

/**
* Extracts the files under /R in the jar to a temporary directory for building.
*/
private[deploy] def checkAndBuildRPackage(
jars: String,
printStream: PrintStream = null,
verbose: Boolean = false): Unit = {
jars.split(",").foreach { jarPath =>
val file = new File(Utils.resolveURI(jarPath))
if (file.exists()) {
val jar = new JarFile(file)
if (checkManifestForR(jar)) {
print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
val rSource = extractRFolder(jar, printStream, verbose)
try {
if (!rPackageBuilder(rSource, printStream, verbose)) {
print(s"ERROR: Failed to build R package in $file.", printStream)
print(RJarDoc, printStream)
}
} finally {
rSource.delete() // clean up
}
} else {
if (verbose) {
print(s"$file doesn't contain R source code, skipping...", printStream)
}
}
} else {
print(s"WARN: $file resolved as dependency, but not found.", printStream, Level.WARNING)
}
}
}

private def listFilesRecursively(dir: File, excludePatterns: Seq[String]): Set[File] = {
if (!dir.exists()) {
Set.empty[File]
} else {
if (dir.isDirectory) {
val subDir = dir.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
!excludePatterns.map(name.contains).reduce(_ || _) // exclude files with given pattern
}
})
subDir.flatMap(listFilesRecursively(_, excludePatterns)).toSet
} else {
Set(dir)
}
}
}

/** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */
private[deploy] def zipRLibraries(dir: File, name: String): File = {
val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
// create a zip file from scratch, do not append to existing file.
val zipFile = new File(dir, name)
zipFile.delete()
val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
try {
filesToBundle.foreach { file =>
// get the relative paths for proper naming in the zip file
val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, "")
val fis = new FileInputStream(file)
val zipEntry = new ZipEntry(relPath)
zipOutputStream.putNextEntry(zipEntry)
ByteStreams.copy(fis, zipOutputStream)
zipOutputStream.closeEntry()
fis.close()
}
} finally {
zipOutputStream.close()
}
zipFile
}
}
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ object SparkSubmit {
}
}

// install any R packages that may have been passed through --jars or --packages.
// Spark Packages may contain R source code inside the jar.
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
}

// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local
if (args.isPython && !isYarnCluster) {
Expand Down Expand Up @@ -361,7 +367,8 @@ object SparkSubmit {
if (rPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
}
val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
val rPackageFile =
RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
Expand Down Expand Up @@ -987,11 +994,9 @@ private[spark] object SparkSubmitUtils {
addExclusionRules(ivySettings, ivyConfName, md)
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)

exclusions.foreach { e =>
md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
}

// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,5 +611,4 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
System.setErr(currentErr)
}
}

}
Loading

0 comments on commit c9a4c36

Please sign in to comment.