diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-01-03 23:50:14 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-01-03 23:50:14 -0800 |
commit | 10fe23bc34bcc8c25fb5440b71a6be883523e4dc (patch) | |
tree | a327c39fcf9ac53e17fbeb5dfedb11e04f505f3c /core | |
parent | c4d6145f7fde8a516024e886314bf8fecde817ea (diff) | |
parent | 604fad9c39763012d97b404941f7ba7137ec2eed (diff) | |
download | spark-10fe23bc34bcc8c25fb5440b71a6be883523e4dc.tar.gz spark-10fe23bc34bcc8c25fb5440b71a6be883523e4dc.tar.bz2 spark-10fe23bc34bcc8c25fb5440b71a6be883523e4dc.zip |
Merge pull request #329 from pwendell/remove-binaries
SPARK-1002: Remove Binaries from Spark Source
This adds a few changes on top of the work by @scrapcodes.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 8 | ||||
-rw-r--r-- | core/src/test/resources/uncommons-maths-1.2.2.jar | bin | 49019 -> 0 bytes | |||
-rw-r--r-- | core/src/test/scala/org/apache/spark/DriverSuite.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/FileServerSuite.scala | 108 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala | 4 |
5 files changed, 70 insertions, 58 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4d6a97e255..e80e43af6d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -169,10 +169,16 @@ class SparkContext( // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner - for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING"); + for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS"); value <- Option(System.getenv(key))) { executorEnvs(key) = value } + // Convert java options to env vars as a work around + // since we can't set env vars directly in sbt. + for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing")) + value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { + executorEnvs(envKey) = value + } // Since memory can be set with a system property too, use that executorEnvs("SPARK_MEM") = executorMemory + "m" executorEnvs ++= conf.getExecutorEnv diff --git a/core/src/test/resources/uncommons-maths-1.2.2.jar b/core/src/test/resources/uncommons-maths-1.2.2.jar Binary files differdeleted file mode 100644 index e126001c1c..0000000000 --- a/core/src/test/resources/uncommons-maths-1.2.2.jar +++ /dev/null diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index 605588f7f6..fb89537258 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -30,13 +30,15 @@ import org.apache.spark.util.Utils class DriverSuite extends FunSuite with Timeouts { test("driver should exit after finishing") { - assert(System.getenv("SPARK_HOME") != null) + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing" val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) forAll(masters) { (master: String) => failAfter(60 seconds) { - Utils.execute(Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master), - new File(System.getenv("SPARK_HOME"))) + Utils.executeAndGetOutput( + Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master), + new File(sparkHome), + Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) } } } diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index c210dd5c3b..a2eb9a4e84 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -17,33 +17,49 @@ package org.apache.spark +import java.io._ +import java.util.jar.{JarEntry, JarOutputStream} + +import SparkContext._ import com.google.common.io.Files import org.scalatest.FunSuite -import java.io.{File, PrintWriter, FileReader, BufferedReader} -import SparkContext._ class FileServerSuite extends FunSuite with LocalSparkContext { @transient var tmpFile: File = _ - @transient var testJarFile: File = _ - - override def beforeEach() { - super.beforeEach() - // Create a sample text file - val tmpdir = new File(Files.createTempDir(), "test") - tmpdir.mkdir() - tmpFile = new File(tmpdir, "FileServerSuite.txt") - val pw = new PrintWriter(tmpFile) + @transient var tmpJarUrl: String = _ + + override def beforeAll() { + super.beforeAll() + val tmpDir = new File(Files.createTempDir(), "test") + tmpDir.mkdir() + + val textFile = new File(tmpDir, "FileServerSuite.txt") + val pw = new PrintWriter(textFile) pw.println("100") pw.close() - } + + val jarFile = new File(tmpDir, "test.jar") + val jarStream = new FileOutputStream(jarFile) + val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) - override def afterEach() { - super.afterEach() - // Clean up downloaded file - if (tmpFile.exists) { - tmpFile.delete() + val jarEntry = new JarEntry(textFile.getName) + jar.putNextEntry(jarEntry) + + val in = new FileInputStream(textFile) + val buffer = new Array[Byte](10240) + var nRead = 0 + while (nRead <= 0) { + nRead = in.read(buffer, 0, buffer.length) + jar.write(buffer, 0, nRead) } + + in.close() + jar.close() + jarStream.close() + + tmpFile = textFile + tmpJarUrl = jarFile.toURI.toURL.toString } test("Distributing files locally") { @@ -77,18 +93,13 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test ("Dynamically adding JARS locally") { sc = new SparkContext("local[4]", "test") - val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() - sc.addJar(sampleJarFile) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader() - .loadClass("org.uncommons.maths.Maths") - .getDeclaredMethod("factorial", classOf[Int]) - val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - a + b - }.collect() - assert(result.toSet === Set((1,2), (2,7), (3,121))) + sc.addJar(tmpJarUrl) + val testData = Array((1, 1)) + sc.parallelize(testData).foreach { x => + if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { + throw new SparkException("jar not added") + } + } } test("Distributing files on a standalone cluster") { @@ -107,33 +118,24 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test ("Dynamically adding JARS on a standalone cluster") { sc = new SparkContext("local-cluster[1,1,512]", "test") - val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() - sc.addJar(sampleJarFile) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader() - .loadClass("org.uncommons.maths.Maths") - .getDeclaredMethod("factorial", classOf[Int]) - val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - a + b - }.collect() - assert(result.toSet === Set((1,2), (2,7), (3,121))) + sc.addJar(tmpJarUrl) + val testData = Array((1,1)) + sc.parallelize(testData).foreach { x => + if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { + throw new SparkException("jar not added") + } + } } test ("Dynamically adding JARS on a standalone cluster using local: URL") { sc = new SparkContext("local-cluster[1,1,512]", "test") - val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() - sc.addJar(sampleJarFile.replace("file", "local")) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader() - .loadClass("org.uncommons.maths.Maths") - .getDeclaredMethod("factorial", classOf[Int]) - val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - a + b - }.collect() - assert(result.toSet === Set((1,2), (2,7), (3,121))) + sc.addJar(tmpJarUrl.replace("file", "local")) + val testData = Array((1,1)) + sc.parallelize(testData).foreach { x => + if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { + throw new SparkException("jar not added") + } + } } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 4cb4ddc9cd..f58b1ee05a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -18,13 +18,15 @@ package org.apache.spark.deploy.worker import java.io.File + import org.scalatest.FunSuite + import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription} class ExecutorRunnerTest extends FunSuite { test("command includes appId") { def f(s:String) = new File(s) - val sparkHome = sys.env("SPARK_HOME") + val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.env.get("spark.home")).get val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()), sparkHome, "appUiUrl") val appId = "12345-worker321-9876" |