aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-08-04 18:20:12 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-08-04 18:20:12 -0700
commitc9a4c36d052456c2dd1f7e0a871c6b764b5064d2 (patch)
tree63a219ba9757343f09e22caf975dc864d066aaa4 /core/src/test
parenta7fe48f68727d5c0247698cff329fb12faff1d50 (diff)
downloadspark-c9a4c36d052456c2dd1f7e0a871c6b764b5064d2.tar.gz
spark-c9a4c36d052456c2dd1f7e0a871c6b764b5064d2.tar.bz2
spark-c9a4c36d052456c2dd1f7e0a871c6b764b5064d2.zip
[SPARK-8313] R Spark packages support
shivaram cafreeman Could you please help me in testing this out? Exposing and running `rPackageBuilder` from inside the shell works, but for some reason, I can't get it to work during Spark Submit. It just starts relaunching Spark Submit. For testing, you may use the R branch with [sbt-spark-package](https://github.com/databricks/sbt-spark-package). You can call spPackage, and then pass the jar using `--jars`. Author: Burak Yavuz <brkyvz@gmail.com> Closes #7139 from brkyvz/r-submit and squashes the following commits: 0de384f [Burak Yavuz] remove unused imports 2 d253708 [Burak Yavuz] removed unused imports 6603d0d [Burak Yavuz] addressed comments 4258ffe [Burak Yavuz] merged master ddfcc06 [Burak Yavuz] added zipping test 3a1be7d [Burak Yavuz] don't zip 77995df [Burak Yavuz] fix URI ac45527 [Burak Yavuz] added zipping of all libs e6bf7b0 [Burak Yavuz] add println ignores 1bc5554 [Burak Yavuz] add assumes for tests 9778e03 [Burak Yavuz] addressed comments b42b300 [Burak Yavuz] merged master ffd134e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into r-submit d867756 [Burak Yavuz] add apache header eff5ba1 [Burak Yavuz] ready for review 8838edb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into r-submit e5b5a06 [Burak Yavuz] added doc bb751ce [Burak Yavuz] fix null bug 0226768 [Burak Yavuz] fixed issues 8810beb [Burak Yavuz] R packages support
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala101
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala156
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala24
3 files changed, 257 insertions, 24 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
index 823050b0aa..d93febcfd2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
@@ -19,6 +19,10 @@ package org.apache.spark.deploy
import java.io.{File, FileInputStream, FileOutputStream}
import java.util.jar.{JarEntry, JarOutputStream}
+import java.util.jar.Attributes.Name
+import java.util.jar.Manifest
+
+import scala.collection.mutable.ArrayBuffer
import com.google.common.io.{Files, ByteStreams}
@@ -35,7 +39,7 @@ private[deploy] object IvyTestUtils {
* Create the path for the jar and pom from the maven coordinate. Extension should be `jar`
* or `pom`.
*/
- private def pathFromCoordinate(
+ private[deploy] def pathFromCoordinate(
artifact: MavenCoordinate,
prefix: File,
ext: String,
@@ -52,7 +56,7 @@ private[deploy] object IvyTestUtils {
}
/** Returns the artifact naming based on standard ivy or maven format. */
- private def artifactName(
+ private[deploy] def artifactName(
artifact: MavenCoordinate,
useIvyLayout: Boolean,
ext: String = ".jar"): String = {
@@ -73,7 +77,7 @@ private[deploy] object IvyTestUtils {
}
/** Write the contents to a file to the supplied directory. */
- private def writeFile(dir: File, fileName: String, contents: String): File = {
+ private[deploy] def writeFile(dir: File, fileName: String, contents: String): File = {
val outputFile = new File(dir, fileName)
val outputStream = new FileOutputStream(outputFile)
outputStream.write(contents.toCharArray.map(_.toByte))
@@ -90,6 +94,42 @@ private[deploy] object IvyTestUtils {
writeFile(dir, "mylib.py", contents)
}
+ /** Create an example R package that calls the given Java class. */
+ private def createRFiles(
+ dir: File,
+ className: String,
+ packageName: String): Seq[(String, File)] = {
+ val rFilesDir = new File(dir, "R" + File.separator + "pkg")
+ Files.createParentDirs(new File(rFilesDir, "R" + File.separator + "mylib.R"))
+ val contents =
+ s"""myfunc <- function(x) {
+ | SparkR:::callJStatic("$packageName.$className", "myFunc", x)
+ |}
+ """.stripMargin
+ val source = writeFile(new File(rFilesDir, "R"), "mylib.R", contents)
+ val description =
+ """Package: sparkPackageTest
+ |Type: Package
+ |Title: Test for building an R package
+ |Version: 0.1
+ |Date: 2015-07-08
+ |Author: Burak Yavuz
+ |Imports: methods, SparkR
+ |Depends: R (>= 3.1), methods, SparkR
+ |Suggests: testthat
+ |Description: Test for building an R package within a jar
+ |License: Apache License (== 2.0)
+ |Collate: 'mylib.R'
+ """.stripMargin
+ val descFile = writeFile(rFilesDir, "DESCRIPTION", description)
+ val namespace =
+ """import(SparkR)
+ |export("myfunc")
+ """.stripMargin
+ val nameFile = writeFile(rFilesDir, "NAMESPACE", namespace)
+ Seq(("R/pkg/R/mylib.R", source), ("R/pkg/DESCRIPTION", descFile), ("R/pkg/NAMESPACE", nameFile))
+ }
+
/** Create a simple testable Class. */
private def createJavaClass(dir: File, className: String, packageName: String): File = {
val contents =
@@ -97,17 +137,14 @@ private[deploy] object IvyTestUtils {
|
|import java.lang.Integer;
|
- |class $className implements java.io.Serializable {
- |
- | public $className() {}
- |
- | public Integer myFunc(Integer x) {
+ |public class $className implements java.io.Serializable {
+ | public static Integer myFunc(Integer x) {
| return x + 1;
| }
|}
""".stripMargin
val sourceFile =
- new JavaSourceFromString(new File(dir, className + ".java").getAbsolutePath, contents)
+ new JavaSourceFromString(new File(dir, className).getAbsolutePath, contents)
createCompiledClass(className, dir, sourceFile, Seq.empty)
}
@@ -199,14 +236,25 @@ private[deploy] object IvyTestUtils {
}
/** Create the jar for the given maven coordinate, using the supplied files. */
- private def packJar(
+ private[deploy] def packJar(
dir: File,
artifact: MavenCoordinate,
files: Seq[(String, File)],
- useIvyLayout: Boolean): File = {
+ useIvyLayout: Boolean,
+ withR: Boolean,
+ withManifest: Option[Manifest] = None): File = {
val jarFile = new File(dir, artifactName(artifact, useIvyLayout))
val jarFileStream = new FileOutputStream(jarFile)
- val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest())
+ val manifest = withManifest.getOrElse {
+ val mani = new Manifest()
+ if (withR) {
+ val attr = mani.getMainAttributes
+ attr.put(Name.MANIFEST_VERSION, "1.0")
+ attr.put(new Name("Spark-HasRPackage"), "true")
+ }
+ mani
+ }
+ val jarStream = new JarOutputStream(jarFileStream, manifest)
for (file <- files) {
val jarEntry = new JarEntry(file._1)
@@ -239,7 +287,8 @@ private[deploy] object IvyTestUtils {
dependencies: Option[Seq[MavenCoordinate]] = None,
tempDir: Option[File] = None,
useIvyLayout: Boolean = false,
- withPython: Boolean = false): File = {
+ withPython: Boolean = false,
+ withR: Boolean = false): File = {
// Where the root of the repository exists, and what Ivy will search in
val tempPath = tempDir.getOrElse(Files.createTempDir())
// Create directory if it doesn't exist
@@ -255,14 +304,16 @@ private[deploy] object IvyTestUtils {
val javaClass = createJavaClass(root, className, artifact.groupId)
// A tuple of files representation in the jar, and the file
val javaFile = (artifact.groupId.replace(".", "/") + "/" + javaClass.getName, javaClass)
- val allFiles =
- if (withPython) {
- val pythonFile = createPythonFile(root)
- Seq(javaFile, (pythonFile.getName, pythonFile))
- } else {
- Seq(javaFile)
- }
- val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout)
+ val allFiles = ArrayBuffer[(String, File)](javaFile)
+ if (withPython) {
+ val pythonFile = createPythonFile(root)
+ allFiles.append((pythonFile.getName, pythonFile))
+ }
+ if (withR) {
+ val rFiles = createRFiles(root, className, artifact.groupId)
+ allFiles.append(rFiles: _*)
+ }
+ val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout, withR)
assert(jarFile.exists(), "Problem creating Jar file")
val descriptor = createDescriptor(tempPath, artifact, dependencies, useIvyLayout)
assert(descriptor.exists(), "Problem creating Pom file")
@@ -286,9 +337,10 @@ private[deploy] object IvyTestUtils {
dependencies: Option[String],
rootDir: Option[File],
useIvyLayout: Boolean = false,
- withPython: Boolean = false): File = {
+ withPython: Boolean = false,
+ withR: Boolean = false): File = {
val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
- val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython)
+ val mainRepo = createLocalRepository(artifact, deps, rootDir, useIvyLayout, withPython, withR)
deps.foreach { seq => seq.foreach { dep =>
createLocalRepository(dep, None, Some(mainRepo), useIvyLayout, withPython = false)
}}
@@ -311,11 +363,12 @@ private[deploy] object IvyTestUtils {
rootDir: Option[File],
useIvyLayout: Boolean = false,
withPython: Boolean = false,
+ withR: Boolean = false,
ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = {
val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
purgeLocalIvyCache(artifact, deps, ivySettings)
val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, useIvyLayout,
- withPython)
+ withPython, withR)
try {
f(repo.toURI.toString)
} finally {
diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
new file mode 100644
index 0000000000..47a64081e2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.{PrintStream, OutputStream, File}
+import java.net.URI
+import java.util.jar.Attributes.Name
+import java.util.jar.{JarFile, Manifest}
+import java.util.zip.{ZipEntry, ZipFile}
+
+import org.scalatest.BeforeAndAfterEach
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.api.r.RUtils
+import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
+
+class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+ private val main = MavenCoordinate("a", "b", "c")
+ private val dep1 = MavenCoordinate("a", "dep1", "c")
+ private val dep2 = MavenCoordinate("a", "dep2", "d")
+
+ private def getJarPath(coord: MavenCoordinate, repo: File): File = {
+ new File(IvyTestUtils.pathFromCoordinate(coord, repo, "jar", useIvyLayout = false),
+ IvyTestUtils.artifactName(coord, useIvyLayout = false, ".jar"))
+ }
+
+ private val lineBuffer = ArrayBuffer[String]()
+
+ private val noOpOutputStream = new OutputStream {
+ def write(b: Int) = {}
+ }
+
+ /** Simple PrintStream that reads data into a buffer */
+ private class BufferPrintStream extends PrintStream(noOpOutputStream) {
+ // scalastyle:off println
+ override def println(line: String) {
+ // scalastyle:on println
+ lineBuffer += line
+ }
+ }
+
+ def beforeAll() {
+ System.setProperty("spark.testing", "true")
+ }
+
+ override def beforeEach(): Unit = {
+ lineBuffer.clear()
+ }
+
+ test("pick which jars to unpack using the manifest") {
+ val deps = Seq(dep1, dep2).mkString(",")
+ IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
+ val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo)))))
+ assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
+ assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
+ assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
+ }
+ }
+
+ test("build an R package from a jar end to end") {
+ assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+ val deps = Seq(dep1, dep2).mkString(",")
+ IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
+ val jars = Seq(main, dep1, dep2).map { c =>
+ getJarPath(c, new File(new URI(repo)))
+ }.mkString(",")
+ RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose = true)
+ val firstJar = jars.substring(0, jars.indexOf(","))
+ val output = lineBuffer.mkString("\n")
+ assert(output.contains("Building R package"))
+ assert(output.contains("Extracting"))
+ assert(output.contains(s"$firstJar contains R source code. Now installing package."))
+ assert(output.contains("doesn't contain R source code, skipping..."))
+ }
+ }
+
+ test("jars that don't exist are skipped and print warning") {
+ assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+ val deps = Seq(dep1, dep2).mkString(",")
+ IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
+ val jars = Seq(main, dep1, dep2).map { c =>
+ getJarPath(c, new File(new URI(repo))) + "dummy"
+ }.mkString(",")
+ RPackageUtils.checkAndBuildRPackage(jars, new BufferPrintStream, verbose = true)
+ val individualJars = jars.split(",")
+ val output = lineBuffer.mkString("\n")
+ individualJars.foreach { jarFile =>
+ assert(output.contains(s"$jarFile"))
+ }
+ }
+ }
+
+ test("faulty R package shows documentation") {
+ assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+ IvyTestUtils.withRepository(main, None, None) { repo =>
+ val manifest = new Manifest
+ val attr = manifest.getMainAttributes
+ attr.put(Name.MANIFEST_VERSION, "1.0")
+ attr.put(new Name("Spark-HasRPackage"), "true")
+ val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil,
+ useIvyLayout = false, withR = false, Some(manifest))
+ RPackageUtils.checkAndBuildRPackage(jar.getAbsolutePath, new BufferPrintStream,
+ verbose = true)
+ val output = lineBuffer.mkString("\n")
+ assert(output.contains(RPackageUtils.RJarDoc))
+ }
+ }
+
+ test("SparkR zipping works properly") {
+ val tempDir = Files.createTempDir()
+ try {
+ IvyTestUtils.writeFile(tempDir, "test.R", "abc")
+ val fakeSparkRDir = new File(tempDir, "SparkR")
+ assert(fakeSparkRDir.mkdirs())
+ IvyTestUtils.writeFile(fakeSparkRDir, "abc.R", "abc")
+ IvyTestUtils.writeFile(fakeSparkRDir, "DESCRIPTION", "abc")
+ IvyTestUtils.writeFile(tempDir, "package.zip", "abc") // fake zip file :)
+ val fakePackageDir = new File(tempDir, "packageTest")
+ assert(fakePackageDir.mkdirs())
+ IvyTestUtils.writeFile(fakePackageDir, "def.R", "abc")
+ IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
+ val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
+ assert(finalZip.exists())
+ val entries = new ZipFile(finalZip).entries().toSeq.map(_.getName)
+ assert(entries.contains("/test.R"))
+ assert(entries.contains("/SparkR/abc.R"))
+ assert(entries.contains("/SparkR/DESCRIPTION"))
+ assert(!entries.contains("/package.zip"))
+ assert(entries.contains("/packageTest/def.R"))
+ assert(entries.contains("/packageTest/DESCRIPTION"))
+ } finally {
+ FileUtils.deleteDirectory(tempDir)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index aa78bfe309..757e0ce3d2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -362,6 +362,30 @@ class SparkSubmitSuite
}
}
+ test("correctly builds R packages included in a jar with --packages") {
+ // TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins.
+ // It's hard to write the test in SparkR (because we can't create the repository dynamically)
+ /*
+ assume(RUtils.isRInstalled, "R isn't installed on this machine.")
+ val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
+ val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
+ val rScriptDir =
+ Seq(sparkHome, "R", "pkg", "inst", "tests", "packageInAJarTest.R").mkString(File.separator)
+ assert(new File(rScriptDir).exists)
+ IvyTestUtils.withRepository(main, None, None, withR = true) { repo =>
+ val args = Seq(
+ "--name", "testApp",
+ "--master", "local-cluster[2,1,1024]",
+ "--packages", main.toString,
+ "--repositories", repo,
+ "--verbose",
+ "--conf", "spark.ui.enabled=false",
+ rScriptDir)
+ runSparkSubmit(args)
+ }
+ */
+ }
+
test("resolves command line argument paths correctly") {
val jars = "/jar1,/jar2" // --jars
val files = "hdfs:/file1,file2" // --files