aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xR/install-dev.sh4
-rw-r--r--R/pkg/inst/tests/packageInAJarTest.R30
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RUtils.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala232
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala101
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala156
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala24
9 files changed, 538 insertions, 35 deletions
diff --git a/R/install-dev.sh b/R/install-dev.sh
index 4972bb9217..59d98c9c7a 100755
--- a/R/install-dev.sh
+++ b/R/install-dev.sh
@@ -42,8 +42,4 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
-# Zip the SparkR package so that it can be distributed to worker nodes on YARN
-cd $LIB_DIR
-jar cfM "$LIB_DIR/sparkr.zip" SparkR
-
popd > /dev/null
diff --git a/R/pkg/inst/tests/packageInAJarTest.R b/R/pkg/inst/tests/packageInAJarTest.R
new file mode 100644
index 0000000000..207a37a0cb
--- /dev/null
+++ b/R/pkg/inst/tests/packageInAJarTest.R
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+library(SparkR)
+library(sparkPackageTest)
+
+sc <- sparkR.init()
+
+run1 <- myfunc(5L)
+
+run2 <- myfunc(-4L)
+
+sparkR.stop()
+
+if(run1 != 6) quit(save = "no", status = 1)
+
+if(run2 != -3) quit(save = "no", status = 1)
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index d53abd3408..93b3bea578 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.api.r
import java.io.File
+import scala.collection.JavaConversions._
+
import org.apache.spark.{SparkEnv, SparkException}
private[spark] object RUtils {
@@ -26,7 +28,7 @@ private[spark] object RUtils {
* Get the SparkR package path in the local spark distribution.
*/
def localSparkRPackagePath: Option[String] = {
- val sparkHome = sys.env.get("SPARK_HOME")
+ val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.test.home"))
sparkHome.map(
Seq(_, "R", "lib").mkString(File.separator)
)
@@ -46,8 +48,8 @@ private[spark] object RUtils {
(sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode"))
}
- val isYarnCluster = master.contains("yarn") && deployMode == "cluster"
- val isYarnClient = master.contains("yarn") && deployMode == "client"
+ val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster"
+ val isYarnClient = master != null && master.contains("yarn") && deployMode == "client"
// In YARN mode, the SparkR package is distributed as an archive symbolically
// linked to the "sparkr" file in the current directory. Note that this does not apply
@@ -62,4 +64,10 @@ private[spark] object RUtils {
}
}
}
+
+ /** Check if R is installed before running tests that use R commands. */
+ def isRInstalled: Boolean = {
+ val builder = new ProcessBuilder(Seq("R", "--version"))
+ builder.start().waitFor() == 0
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
new file mode 100644
index 0000000000..ed1e972955
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io._
+import java.util.jar.JarFile
+import java.util.logging.Level
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.JavaConversions._
+
+import com.google.common.io.{ByteStreams, Files}
+
+import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.api.r.RUtils
+import org.apache.spark.util.{RedirectThread, Utils}
+
+private[deploy] object RPackageUtils extends Logging {
+
+ /** The key in the MANIFEST.mf that we look for, in case a jar contains R code. */
+ private final val hasRPackage = "Spark-HasRPackage"
+
+ /** Base of the shell command used in order to install R packages. */
+ private final val baseInstallCmd = Seq("R", "CMD", "INSTALL", "-l")
+
+ /** R source code should exist under R/pkg in a jar. */
+ private final val RJarEntries = "R/pkg"
+
+ /** Documentation on how the R source file layout should be in the jar. */
+ private[deploy] final val RJarDoc =
+ s"""In order for Spark to build R packages that are parts of Spark Packages, there are a few
+ |requirements. The R source code must be shipped in a jar, with additional Java/Scala
+ |classes. The jar must be in the following format:
+ | 1- The Manifest (META-INF/MANIFEST.mf) must contain the key-value: $hasRPackage: true
+ | 2- The standard R package layout must be preserved under R/pkg/ inside the jar. More
+ | information on the standard R package layout can be found in:
+ | http://cran.r-project.org/doc/contrib/Leisch-CreatingPackages.pdf
+ | An example layout is given below. After running `jar tf $$JAR_FILE | sort`:
+ |
+ |META-INF/MANIFEST.MF
+ |R/
+ |R/pkg/
+ |R/pkg/DESCRIPTION
+ |R/pkg/NAMESPACE
+ |R/pkg/R/
+ |R/pkg/R/myRcode.R
+ |org/
+ |org/apache/
+ |...
+ """.stripMargin.trim
+
+ /** Internal method for logging. We log to a printStream in tests, for debugging purposes. */
+ private def print(
+ msg: String,
+ printStream: PrintStream,
+ level: Level = Level.FINE,
+ e: Throwable = null): Unit = {
+ if (printStream != null) {
+ // scalastyle:off println
+ printStream.println(msg)
+ // scalastyle:on println
+ if (e != null) {
+ e.printStackTrace(printStream)
+ }
+ } else {
+ level match {
+ case Level.INFO => logInfo(msg)
+ case Level.WARNING => logWarning(msg)
+ case Level.SEVERE => logError(msg, e)
+ case _ => logDebug(msg)
+ }
+ }
+ }
+
+ /**
+ * Checks the manifest of the Jar whether there is any R source code bundled with it.
+ * Exposed for testing.
+ */
+ private[deploy] def checkManifestForR(jar: JarFile): Boolean = {
+ val manifest = jar.getManifest.getMainAttributes
+ manifest.getValue(hasRPackage) != null && manifest.getValue(hasRPackage).trim == "true"
+ }
+
+ /**
+ * Runs the standard R package installation code to build the R package from source.
+ * Multiple runs don't cause problems.
+ */
+ private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = {
+ // this code should be always running on the driver.
+ val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse(
+ throw new SparkException("SPARK_HOME not set. Can't locate SparkR package."))
+ val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator)
+ val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg)
+ if (verbose) {
+ print(s"Building R package with the command: $installCmd", printStream)
+ }
+ try {
+ val builder = new ProcessBuilder(installCmd)
+ builder.redirectErrorStream(true)
+ val env = builder.environment()
+ env.clear()
+ val process = builder.start()
+ new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start()
+ process.waitFor() == 0
+ } catch {
+ case e: Throwable =>
+ print("Failed to build R package.", printStream, Level.SEVERE, e)
+ false
+ }
+ }
+
+ /**
+ * Extracts the files under /R in the jar to a temporary directory for building.
+ */
+ private def extractRFolder(jar: JarFile, printStream: PrintStream, verbose: Boolean): File = {
+ val tempDir = Utils.createTempDir(null)
+ val jarEntries = jar.entries()
+ while (jarEntries.hasMoreElements) {
+ val entry = jarEntries.nextElement()
+ val entryRIndex = entry.getName.indexOf(RJarEntries)
+ if (entryRIndex > -1) {
+ val entryPath = entry.getName.substring(entryRIndex)
+ if (entry.isDirectory) {
+ val dir = new File(tempDir, entryPath)
+ if (verbose) {
+ print(s"Creating directory: $dir", printStream)
+ }
+ dir.mkdirs
+ } else {
+ val inStream = jar.getInputStream(entry)
+ val outPath = new File(tempDir, entryPath)
+ Files.createParentDirs(outPath)
+ val outStream = new FileOutputStream(outPath)
+ if (verbose) {
+ print(s"Extracting $entry to $outPath", printStream)
+ }
+ Utils.copyStream(inStream, outStream, closeStreams = true)
+ }
+ }
+ }
+ tempDir
+ }
+
+ /**
+ * Extracts the files under /R in the jar to a temporary directory for building.
+ */
+ private[deploy] def checkAndBuildRPackage(
+ jars: String,
+ printStream: PrintStream = null,
+ verbose: Boolean = false): Unit = {
+ jars.split(",").foreach { jarPath =>
+ val file = new File(Utils.resolveURI(jarPath))
+ if (file.exists()) {
+ val jar = new JarFile(file)
+ if (checkManifestForR(jar)) {
+ print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
+ val rSource = extractRFolder(jar, printStream, verbose)
+ try {
+ if (!rPackageBuilder(rSource, printStream, verbose)) {
+ print(s"ERROR: Failed to build R package in $file.", printStream)
+ print(RJarDoc, printStream)
+ }
+ } finally {
+ rSource.delete() // clean up
+ }
+ } else {
+ if (verbose) {
+ print(s"$file doesn't contain R source code, skipping...", printStream)
+ }
+ }
+ } else {
+ print(s"WARN: $file resolved as dependency, but not found.", printStream, Level.WARNING)
+ }
+ }
+ }
+
+ private def listFilesRecursively(dir: File, excludePatterns: Seq[String]): Set[File] = {
+ if (!dir.exists()) {
+ Set.empty[File]
+ } else {
+ if (dir.isDirectory) {
+ val subDir = dir.listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ !excludePatterns.map(name.contains).reduce(_ || _) // exclude files with given pattern
+ }
+ })
+ subDir.flatMap(listFilesRecursively(_, excludePatterns)).toSet
+ } else {
+ Set(dir)
+ }
+ }
+ }
+
+ /** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */
+ private[deploy] def zipRLibraries(dir: File, name: String): File = {
+ val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
+ // create a zip file from scratch, do not append to existing file.
+ val zipFile = new File(dir, name)
+ zipFile.delete()
+ val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
+ try {
+ filesToBundle.foreach { file =>
+ // get the relative paths for proper naming in the zip file
+ val relPath = file.getAbsolutePath.replaceFirst(dir.getAbsolutePath, "")
+ val fis = new FileInputStream(file)
+ val zipEntry = new ZipEntry(relPath)
+ zipOutputStream.putNextEntry(zipEntry)
+ ByteStreams.copy(fis, zipOutputStream)
+ zipOutputStream.closeEntry()
+ fis.close()
+ }
+ } finally {
+ zipOutputStream.close()
+ }
+ zipFile
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 31185c8e77..1186bed485 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -292,6 +292,12 @@ object SparkSubmit {
}
}
+ // install any R packages that may have been passed through --jars or --packages.
+ // Spark Packages may contain R source code inside the jar.
+ if (args.isR && !StringUtils.isBlank(args.jars)) {
+ RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
+ }
+
// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local
if (args.isPython && !isYarnCluster) {
@@ -361,7 +367,8 @@ object SparkSubmit {
if (rPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
}
- val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
+ val rPackageFile =
+ RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
@@ -987,11 +994,9 @@ private[spark] object SparkSubmitUtils {
addExclusionRules(ivySettings, ivyConfName, md)
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)
-
exclusions.foreach { e =>
md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
}
-
// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 44852ce4e8..3f3c6627c2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -611,5 +611,4 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
System.setErr(currentErr)
}
}
-
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
index 823050b0aa..d93febcfd2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
@@ -19,6 +19,10 @@ package org.apache.spark.deploy
import java.io.{File, FileInputStream, FileOutputStream}
import java.util.jar.{JarEntry, JarOutputStream}
+import java.util.jar.Attributes.Name
+import java.util.jar.Manifest
+
+import scala.collection.mutable.ArrayBuffer
import com.google.common.io.{Files, ByteStreams}
@@ -35,7 +39,7 @@ private[deploy] object IvyTestUtils {
* Create the path for the jar and pom from the maven coordinate. Extension should be `jar`
* or `pom`.
*/
- private def pathFromCoordinate(
+ private[deploy] def pathFromCoordinate(
artifact: MavenCoordinate,
prefix: File,
ext: String,
@@ -52,7 +56,7 @@ private[deploy] object IvyTestUtils {
}
/** Returns the artifact naming based on standard ivy or maven format. */
- private def artifactName(
+ private[deploy] def artifactName(
artifact: MavenCoordinate,
useIvyLayout: Boolean,
ext: String = ".jar"): String = {
@@ -73,7 +77,7 @@ private[deploy] object IvyTestUtils {
}
/** Write the contents to a file to the supplied directory. */
- private def writeFile(dir: File, fileName: String, contents: String): File = {
+ private[deploy] def writeFile(dir: File, fileName: String, contents: String): File = {
val outputFile = new File(dir, fileName)
val outputStream = new FileOutputStream(outputFile)
outputStream.write(contents.toCharArray.map(_.toByte))
@@ -90,6 +94,42 @@ private[deploy] object IvyTestUtils {
writeFile(dir, "mylib.py", contents)
}
+ /** Create an example R package that calls the given Java class. */
+ private def createRFiles(
+ dir: File,
+ className: String,
+ packageName: String): Seq[(String, File)] = {
+ val rFilesDir = new File(dir, "R" + File.separator + "pkg")
+ Files.createParentDirs(new File(rFilesDir, "R" + File.separator + "mylib.R"))
+ val contents =
+ s"""myfunc <- function(x) {
+ | SparkR:::callJStatic("$packageName.$className", "myFunc", x)
+ |}
+ """.stripMargin
+ val source = writeFile(new File(rFilesDir, "R"), "mylib.R", contents)
+ val description =
+ """Package: sparkPackageTest
+ |Type: Package
+ |Title: Test for building an R package
+ |Version: 0.1
+ |Date: 2015-07-08
+ |Author: Burak Yavuz
+ |Imports: methods, SparkR
+ |Depends: R (>= 3.1), methods, SparkR
+ |Suggests: testthat
+ |Description: Test for building an R package within a jar
+ |License: Apache License (== 2.0)
+ |Collate: 'mylib.R'
+ """.stripMargin
+ val descFile = writeFile(rFilesDir, "DESCRIPTION", description)
+ val namespace =
+ """import(SparkR)
+ |export("myfunc")
+ """.stripMargin
+ val nameFile = writeFile(rFilesDir, "NAMESPACE", namespace)
+ Seq(("R/pkg/R/mylib.R", source), ("R/pkg/DESCRIPTION", descFile), ("R/pkg/NAMESPACE", nameFile))
+ }
+
/** Create a simple testable Class. */
private def createJavaClass(dir: File, className: String, packageName: String): File = {
val contents =
@@ -97,17 +137,14 @@ private[deploy] object IvyTestUtils {
|
|import java.lang.Integer;
|
- |class $className implements java.io.Serializable {
- |
- | public $className() {}
- |
- | public Integer myFunc(Integer x) {
+ |public class $className implements java.io.Serializable {
+ | public static Integer myFunc(Integer x) {
| return x + 1;
| }
|}
""".stripMargin
val sourceFile =
- new JavaSourceFromString(new File(dir, className + ".java").getAbsolutePath, contents)
+ new JavaSourceFromString(new File(dir, className).getAbsolutePath, contents)
createCompiledClass(className, dir, sourceFile, Seq.empty)
}
@@ -199,14 +236,25 @@ private[deploy] object IvyTestUtils {
}
/** Create the jar for the given maven coordinate, using the supplied files. */
- private def packJar(
+ private[deploy] def packJar(
dir: File,
artifact: MavenCoordinate,
files: Seq[(String, File)],
- useIvyLayout: Boolean): File = {
+ useIvyLayout: Boolean,
+ withR: Boolean,
+ withManifest: Option[Manifest] = None): File = {
val jarFile = new File(dir, artifactName(artifact, useIvyLayout))
val jarFileStream = new FileOutputStream(jarFile)
- val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest())
+ val manifest = withManifest.getOrElse {
+ val mani = new Manifest()
+ if (withR) {
+ val attr = mani.getMainAttributes
+ attr.put(Name.MANIFEST_VERSION, "1.0")
+ attr.put(new Name("Spark-HasRPackage"), "true")
+ }
+ mani
+ }
+ val jarStream = new JarOutputStream(jarFileStream, manifest)
for (file <- files) {
val jarEntry = new JarEntry(file._1)
@@ -239,7 +287,8 @@ private[deploy] object IvyTestUtils {
dependencies: Option[Seq[MavenCoordinate]] = None,
tempDir: Option[File] = None,
useIvyLayout: Boolean = false,
- withPython: Boolean = false): File = {
+ withPython: Boolean = false,
+ withR: Boolean = false): File = {
// Where the root of the repository exists, and what Ivy will search in
val tempPath = tempDir.getOrElse(Files.createTempDir())
// Create directory if it doesn't exist
@@ -255,14 +304,16 @@ private[deploy] object IvyTestUtils {
val javaClass = createJavaClass(root, className, artifact.groupId)
// A tuple of files representation in the jar, and the file
val javaFile = (artifact.groupId.replace(".", "/") + "/" + javaClass.getName, javaClass)
- val allFiles =
- if (withPython) {
- val pythonFile = createPythonFile(root)
- Seq(javaFile, (pythonFile.getName, pythonFile))
- } else {
- Seq(javaFile)
- }
- val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout)
+ val allFiles = ArrayBuffer[(String, File)](javaFile)
+ if (withPython) {
+ val pythonFile = createPythonFile(root)
+ allFiles.append((pythonFile.getName, pythonFile))
+ }
+ if (withR) {
+ val rFiles = createRFiles(root, className, artifact.groupId)
+ allFiles.append(rFiles: _*)
+ }
+ val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout, withR)
assert(jarFile.exists(), "Problem creating Jar file")
val descriptor = createDescriptor(tempPath, artifact, dependencies, useIvyLayout)
assert(descriptor.exists(), "Problem creating Pom file")
@@ -286,9 +337,10 @@ private[deploy] object IvyTestUtils {
dependencies: Option[String],
rootDir: Option[File],
useIvyLayout: Boolean = false,
- withPython: Boolean = false): File = {
+ withPython: Boolean = false,
+ withR: Boolean = false): File = {
val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
- val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython)
+ val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython, withR)
deps.foreach { seq => seq.foreach { dep =>
createLocalRepository(dep, None, Some(mainRepo), useIvyLayout, withPython = false)
}}
@@ -311,11 +363,12 @@ private[deploy] object IvyTestUtils {
rootDir: Option[File],
useIvyLayout: Boolean = false,
withPython: Boolean = false,
+ withR: Boolean = false,
ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = {
val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
purgeLocalIvyCache(artifact, deps, ivySettings)
val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout,
- withPython)
+ withPython, withR)
try {
f(repo.toURI.toString)
} finally {
diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
new file mode 100644
index 0000000000..47a64081e2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.{PrintStream, OutputStream, File}
+import java.net.URI
+import java.util.jar.Attributes.Name
+import java.util.jar.{JarFile, Manifest}
+import java.util.zip.{ZipEntry, ZipFile}
+
+import org.scalatest.BeforeAndAfterEach
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.api.r.RUtils
+import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
+
+class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+ private val main = MavenCoordinate("a", "b", "c")
+ private val dep1 = MavenCoordinate("a", "dep1", "c")
+ private val dep2 = MavenCoordinate("a", "dep2", "d")
+
+ private def getJarPath(coord: MavenCoordinate, repo: File): File = {
+ new File(IvyTestUtils.pathFromCoordinate(coord, repo, "jar", useIvyLayout = false),
+ IvyTestUtils.artifactName(coord, useIvyLayout = false, ".jar"))
+ }
+
+ private val lineBuffer = ArrayBuffer[String]()
+
+ private val noOpOutputStream = new OutputStream {
+ def write(b: Int) = {}
+ }
+
+ /** Simple PrintStream that reads data into a buffer */
+ private class BufferPrintStream extends PrintStream(noOpOutputStream) {
+ // scalastyle:off println
+ override def println(line: String) {
+ // scalastyle:on println
+ lineBuffer += line
+ }
+ }
+
+ def beforeAll() {
+ System.setProperty("spark.testing", "true")
+ }
+
+ override def beforeEach(): Unit = {
+ lineBuffer.clear()
+ }
+
+ test("pick which jars to unpack using the manifest") {
+ val deps = Seq(dep1, dep2).mkString(",")
+ IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
+ val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo)))))
+ assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
+ assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
+ assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
+ }
+ }
+
+ test("build an R package from a jar end to end") {
+ assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+ val deps = Seq(dep1, dep2).mkString(",")
+ IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
+ val jars = Seq(main, dep1, dep2).map { c =>
+ getJarPath(c, new File(new URI(repo)))
+ }.mkString(",")
+ RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose = true)
+ val firstJar = jars.substring(0, jars.indexOf(","))
+ val output = lineBuffer.mkString("\n")
+ assert(output.contains("Building R package"))
+ assert(output.contains("Extracting"))
+ assert(output.contains(s"$firstJar contains R source code. Now installing package."))
+ assert(output.contains("doesn't contain R source code, skipping..."))
+ }
+ }
+
+ test("jars that don't exist are skipped and print warning") {
+ assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+ val deps = Seq(dep1, dep2).mkString(",")
+ IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
+ val jars = Seq(main, dep1, dep2).map { c =>
+ getJarPath(c, new File(new URI(repo))) + "dummy"
+ }.mkString(",")
+ RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose = true)
+ val individualJars = jars.split(",")
+ val output = lineBuffer.mkString("\n")
+ individualJars.foreach { jarFile =>
+ assert(output.contains(s"$jarFile"))
+ }
+ }
+ }
+
+ test("faulty R package shows documentation") {
+ assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+ IvyTestUtils.withRepository(main, None, None) { repo =>
+ val manifest = new Manifest
+ val attr = manifest.getMainAttributes
+ attr.put(Name.MANIFEST_VERSION, "1.0")
+ attr.put(new Name("Spark-HasRPackage"), "true")
+ val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil,
+ useIvyLayout = false, withR = false, Some(manifest))
+ RPackageUtils.checkAndBuildRPackage(jar.getAbsolutePath, new BufferPrintStream,
+ verbose = true)
+ val output = lineBuffer.mkString("\n")
+ assert(output.contains(RPackageUtils.RJarDoc))
+ }
+ }
+
+ test("SparkR zipping works properly") {
+ val tempDir = Files.createTempDir()
+ try {
+ IvyTestUtils.writeFile(tempDir, "test.R", "abc")
+ val fakeSparkRDir = new File(tempDir, "SparkR")
+ assert(fakeSparkRDir.mkdirs())
+ IvyTestUtils.writeFile(fakeSparkRDir, "abc.R", "abc")
+ IvyTestUtils.writeFile(fakeSparkRDir, "DESCRIPTION", "abc")
+ IvyTestUtils.writeFile(tempDir, "package.zip", "abc") // fake zip file :)
+ val fakePackageDir = new File(tempDir, "packageTest")
+ assert(fakePackageDir.mkdirs())
+ IvyTestUtils.writeFile(fakePackageDir, "def.R", "abc")
+ IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
+ val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
+ assert(finalZip.exists())
+ val entries = new ZipFile(finalZip).entries().toSeq.map(_.getName)
+ assert(entries.contains("/test.R"))
+ assert(entries.contains("/SparkR/abc.R"))
+ assert(entries.contains("/SparkR/DESCRIPTION"))
+ assert(!entries.contains("/package.zip"))
+ assert(entries.contains("/packageTest/def.R"))
+ assert(entries.contains("/packageTest/DESCRIPTION"))
+ } finally {
+ FileUtils.deleteDirectory(tempDir)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index aa78bfe309..757e0ce3d2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -362,6 +362,30 @@ class SparkSubmitSuite
}
}
+ test("correctly builds R packages included in a jar with --packages") {
+ // TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins.
+ // It's hard to write the test in SparkR (because we can't create the repository dynamically)
+ /*
+ assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+ val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
+ val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
+ val rScriptDir =
+ Seq(sparkHome, "R", "pkg", "inst", "tests", "packageInAJarTest.R").mkString(File.separator)
+ assert(new File(rScriptDir).exists)
+ IvyTestUtils.withRepository(main, None, None, withR = true) { repo =>
+ val args = Seq(
+ "--name", "testApp",
+ "--master", "local-cluster[2,1,1024]",
+ "--packages", main.toString,
+ "--repositories", repo,
+ "--verbose",
+ "--conf", "spark.ui.enabled=false",
+ rScriptDir)
+ runSparkSubmit(args)
+ }
+ */
+ }
+
test("resolves command line argument paths correctly") {
val jars = "/jar1,/jar2" // --jars
val files = "hdfs:/file1,file2" // --files