aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-03 17:32:25 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-03 18:30:17 -0800
commit9e6f3bdcda1ab48159afa4f54b64d05e42a8688e (patch)
treedbe72e606726f95c725bce3caed8a32a15d74b5c /core/src
parentbc311bb826b5548b9c4c55320711f3b18dc19397 (diff)
downloadspark-9e6f3bdcda1ab48159afa4f54b64d05e42a8688e.tar.gz
spark-9e6f3bdcda1ab48159afa4f54b64d05e42a8688e.tar.bz2
spark-9e6f3bdcda1ab48159afa4f54b64d05e42a8688e.zip
Changes on top of Prashant's patch.
Closes #316
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala77
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala6
4 files changed, 35 insertions, 54 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c6f6883b01..e80e43af6d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -173,7 +173,8 @@ class SparkContext(
value <- Option(System.getenv(key))) {
executorEnvs(key) = value
}
- // A workaround for SPARK_TESTING and SPARK_HOME
+ // Convert java options to env vars as a work around
+ // since we can't set env vars directly in sbt.
for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing"))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 89c5631ad8..7e1e55fa3b 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -30,8 +30,7 @@ import org.apache.spark.util.Utils
class DriverSuite extends FunSuite with Timeouts {
test("driver should exit after finishing") {
- val sparkHome = Option(System.getenv("SPARK_HOME"))
- .orElse(Option(System.getProperty("spark.home"))).get
+ val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 506f7484fb..a2eb9a4e84 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -27,54 +27,39 @@ import org.scalatest.FunSuite
class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpFile: File = _
- @transient var testJarFile: String = _
-
+ @transient var tmpJarUrl: String = _
override def beforeAll() {
super.beforeAll()
- val buffer = new Array[Byte](10240)
- val tmpdir = new File(Files.createTempDir(), "test")
- tmpdir.mkdir()
- val tmpJarEntry = new File(tmpdir, "FileServerSuite2.txt")
- val pw = new PrintWriter(tmpJarEntry)
- pw.println("test String in the file named FileServerSuite2.txt")
+ val tmpDir = new File(Files.createTempDir(), "test")
+ tmpDir.mkdir()
+
+ val textFile = new File(tmpDir, "FileServerSuite.txt")
+ val pw = new PrintWriter(textFile)
+ pw.println("100")
pw.close()
- // The ugliest code possible, was translated from java.
- val tmpFile2 = new File(tmpdir, "test.jar")
- val stream = new FileOutputStream(tmpFile2)
- val jar = new JarOutputStream(stream, new java.util.jar.Manifest())
- val jarAdd = new JarEntry(tmpJarEntry.getName)
- jarAdd.setTime(tmpJarEntry.lastModified)
- jar.putNextEntry(jarAdd)
- val in = new FileInputStream(tmpJarEntry)
+
+ val jarFile = new File(tmpDir, "test.jar")
+ val jarStream = new FileOutputStream(jarFile)
+ val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
+
+ val jarEntry = new JarEntry(textFile.getName)
+ jar.putNextEntry(jarEntry)
+
+ val in = new FileInputStream(textFile)
+ val buffer = new Array[Byte](10240)
var nRead = 0
- while (nRead <= 0) {
+ while (nRead <= 0) {
nRead = in.read(buffer, 0, buffer.length)
jar.write(buffer, 0, nRead)
}
+
in.close()
jar.close()
- stream.close()
- testJarFile = tmpFile2.toURI.toURL.toString
- }
-
- override def beforeEach() {
- super.beforeEach()
- // Create a sample text file
- val tmpdir = new File(Files.createTempDir(), "test")
- tmpdir.mkdir()
- tmpFile = new File(tmpdir, "FileServerSuite.txt")
- val pw = new PrintWriter(tmpFile)
- pw.println("100")
- pw.close()
- }
+ jarStream.close()
- override def afterEach() {
- super.afterEach()
- // Clean up downloaded file
- if (tmpFile.exists) {
- tmpFile.delete()
- }
+ tmpFile = textFile
+ tmpJarUrl = jarFile.toURI.toURL.toString
}
test("Distributing files locally") {
@@ -108,10 +93,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test ("Dynamically adding JARS locally") {
sc = new SparkContext("local[4]", "test")
- sc.addJar(testJarFile)
+ sc.addJar(tmpJarUrl)
val testData = Array((1, 1))
- sc.parallelize(testData).foreach { (x) =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite2.txt") == null) {
+ sc.parallelize(testData).foreach { x =>
+ if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
throw new SparkException("jar not added")
}
}
@@ -133,10 +118,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
- sc.addJar(testJarFile)
+ sc.addJar(tmpJarUrl)
val testData = Array((1,1))
- sc.parallelize(testData).foreach { (x) =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite2.txt") == null) {
+ sc.parallelize(testData).foreach { x =>
+ if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
throw new SparkException("jar not added")
}
}
@@ -144,10 +129,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test ("Dynamically adding JARS on a standalone cluster using local: URL") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
- sc.addJar(testJarFile.replace("file", "local"))
+ sc.addJar(tmpJarUrl.replace("file", "local"))
val testData = Array((1,1))
- sc.parallelize(testData).foreach { (x) =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite2.txt") == null) {
+ sc.parallelize(testData).foreach { x =>
+ if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
throw new SparkException("jar not added")
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 894a72284b..f58b1ee05a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -19,18 +19,14 @@ package org.apache.spark.deploy.worker
import java.io.File
-import scala.util.Try
-
import org.scalatest.FunSuite
import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription}
-
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
- val sparkHome = Try(sys.env("SPARK_HOME")).toOption
- .orElse(Option(System.getProperty("spark.home"))).get
+ val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.env.get("spark.home")).get
val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"