aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala3
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala24
4 files changed, 18 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 7473b40aa3..6ffae8e85f 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -40,6 +40,8 @@ class SparkEnv (
blockManager.stop()
blockManager.master.stop()
actorSystem.shutdown()
+ // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
+ Thread.sleep(100)
actorSystem.awaitTermination()
// Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
Thread.sleep(100)
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 7043361020..e2a9df275a 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -75,7 +75,8 @@ class ExecutorRunner(
def buildCommandSeq(): Seq[String] = {
val command = jobDesc.command
- val runScript = new File(sparkHome, "run").getCanonicalPath
+ val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run";
+ val runScript = new File(sparkHome, script).getCanonicalPath
Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables)
}
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index b7b8a79327..93b876d205 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -18,7 +18,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
val clusterUrl = "local-cluster[2,1,512]"
- var sc: SparkContext = _
+ @transient var sc: SparkContext = _
after {
if (sc != null) {
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index 500af1eb90..fd7a7bd589 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -3,14 +3,14 @@ package spark
import com.google.common.io.Files
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
-import java.io.{File, PrintWriter}
+import java.io.{File, PrintWriter, FileReader, BufferedReader}
import SparkContext._
class FileServerSuite extends FunSuite with BeforeAndAfter {
- var sc: SparkContext = _
- var tmpFile : File = _
- var testJarFile : File = _
+ @transient var sc: SparkContext = _
+ @transient var tmpFile : File = _
+ @transient var testJarFile : File = _
before {
// Create a sample text file
@@ -38,7 +38,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
- val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
+ val in = new BufferedReader(new FileReader("FileServerSuite.txt"))
val fileVal = in.readLine().toInt
in.close()
_ * fileVal + _ * fileVal
@@ -53,7 +53,9 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
- val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
@@ -66,7 +68,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
- val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
+ val in = new BufferedReader(new FileReader("FileServerSuite.txt"))
val fileVal = in.readLine().toInt
in.close()
_ * fileVal + _ * fileVal
@@ -75,19 +77,19 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
assert(result.toSet === Set((1,200), (2,300), (3,500)))
}
-
test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
- val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
a + b
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
}
-
-} \ No newline at end of file
+}