aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-04-15 19:49:40 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-04-15 19:49:40 +0530
commit19b0256ae455d8dcfa3b2eafb602633eff9d4f1c (patch)
treecd2d5b8cb3f2038f408e605b1a7b4333b05d5aa4 /repl
parentf31e41c2709368d1275f23be79231fbc0eb54f57 (diff)
downloadspark-19b0256ae455d8dcfa3b2eafb602633eff9d4f1c.tar.gz
spark-19b0256ae455d8dcfa3b2eafb602633eff9d4f1c.tar.bz2
spark-19b0256ae455d8dcfa3b2eafb602633eff9d4f1c.zip
Added standalone cluster repl suite
Diffstat (limited to 'repl')
-rw-r--r--repl/src/test/scala/spark/repl/ReplSuite.scala59
-rw-r--r--repl/src/test/scala/spark/repl/ReplSuiteMixin.scala56
-rw-r--r--repl/src/test/scala/spark/repl/StandaloneClusterReplSuite.scala32
3 files changed, 100 insertions, 47 deletions
diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala
index a3274c2737..4dfd3127bf 100644
--- a/repl/src/test/scala/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/spark/repl/ReplSuite.scala
@@ -1,50 +1,14 @@
package spark.repl
-import java.io._
-import java.net.URLClassLoader
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
+import java.io.FileWriter
import org.scalatest.FunSuite
-import com.google.common.io.Files
-
-class ReplSuite extends FunSuite {
- def runInterpreter(master: String, input: String): String = {
- val in = new BufferedReader(new StringReader(input + "\n"))
- val out = new StringWriter()
- val cl = getClass.getClassLoader
- var paths = new ArrayBuffer[String]
- if (cl.isInstanceOf[URLClassLoader]) {
- val urlLoader = cl.asInstanceOf[URLClassLoader]
- for (url <- urlLoader.getURLs) {
- if (url.getProtocol == "file") {
- paths += url.getFile
- }
- }
- }
- val interp = new SparkILoop(in, new PrintWriter(out), master)
- spark.repl.Main.interp = interp
- val separator = System.getProperty("path.separator")
- interp.process(Array("-classpath", paths.mkString(separator)))
- if (interp != null)
- interp.closeInterpreter();
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- return out.toString
- }
- def assertContains(message: String, output: String) {
- assert(output contains message,
- "Interpreter output did not contain '" + message + "':\n" + output)
- }
+import com.google.common.io.Files
- def assertDoesNotContain(message: String, output: String) {
- assert(!(output contains message),
- "Interpreter output contained '" + message + "':\n" + output)
- }
+class ReplSuite extends FunSuite with ReplSuiteMixin {
- test ("simple foreach with accumulator") {
+ test("simple foreach with accumulator") {
val output = runInterpreter("local", """
val accum = sc.accumulator(0)
sc.parallelize(1 to 10).foreach(x => accum += x)
@@ -55,7 +19,7 @@ class ReplSuite extends FunSuite {
assertContains("res1: Int = 55", output)
}
- test ("external vars") {
+ test("external vars") {
val output = runInterpreter("local", """
var v = 7
sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
@@ -68,7 +32,7 @@ class ReplSuite extends FunSuite {
assertContains("res1: Int = 100", output)
}
- test ("external classes") {
+ test("external classes") {
val output = runInterpreter("local", """
class C {
def foo = 5
@@ -80,7 +44,7 @@ class ReplSuite extends FunSuite {
assertContains("res0: Int = 50", output)
}
- test ("external functions") {
+ test("external functions") {
val output = runInterpreter("local", """
def double(x: Int) = x + x
sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
@@ -90,7 +54,7 @@ class ReplSuite extends FunSuite {
assertContains("res0: Int = 110", output)
}
- test ("external functions that access vars") {
+ test("external functions that access vars") {
val output = runInterpreter("local", """
var v = 7
def getV() = v
@@ -104,7 +68,7 @@ class ReplSuite extends FunSuite {
assertContains("res1: Int = 100", output)
}
- test ("broadcast vars") {
+ test("broadcast vars") {
// Test that the value that a broadcast var had when it was created is used,
// even if that variable is then modified in the driver program
// TODO: This doesn't actually work for arrays when we run in local mode!
@@ -121,7 +85,7 @@ class ReplSuite extends FunSuite {
assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
}
- test ("interacting with files") {
+ test("interacting with files") {
val tempDir = Files.createTempDir()
val out = new FileWriter(tempDir + "/input")
out.write("Hello world!\n")
@@ -142,7 +106,7 @@ class ReplSuite extends FunSuite {
}
if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
- test ("running on Mesos") {
+ test("running on Mesos") {
val output = runInterpreter("localquiet", """
var v = 7
def getV() = v
@@ -163,4 +127,5 @@ class ReplSuite extends FunSuite {
assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
}
}
+
}
diff --git a/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala b/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala
new file mode 100644
index 0000000000..35429bf01f
--- /dev/null
+++ b/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala
@@ -0,0 +1,56 @@
+package spark.repl
+
+import java.io.BufferedReader
+import java.io.PrintWriter
+import java.io.StringReader
+import java.io.StringWriter
+import java.net.URLClassLoader
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.future
+
+import spark.deploy.master.Master
+import spark.deploy.worker.Worker
+
+trait ReplSuiteMixin {
+ def setupStandaloneCluster() {
+ future { Master.main(Array("-i", "127.0.1.2", "-p", "7089")) }
+ Thread.sleep(2000)
+ future { Worker.main(Array("spark://127.0.1.2:7089", "--webui-port", "0")) }
+ }
+
+ def runInterpreter(master: String, input: String): String = {
+ val in = new BufferedReader(new StringReader(input + "\n"))
+ val out = new StringWriter()
+ val cl = getClass.getClassLoader
+ var paths = new ArrayBuffer[String]
+ if (cl.isInstanceOf[URLClassLoader]) {
+ val urlLoader = cl.asInstanceOf[URLClassLoader]
+ for (url <- urlLoader.getURLs) {
+ if (url.getProtocol == "file") {
+ paths += url.getFile
+ }
+ }
+ }
+ val interp = new SparkILoop(in, new PrintWriter(out), master)
+ spark.repl.Main.interp = interp
+ val separator = System.getProperty("path.separator")
+ interp.process(Array("-classpath", paths.mkString(separator)))
+ if (interp != null)
+ interp.closeInterpreter();
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ return out.toString
+ }
+
+ def assertContains(message: String, output: String) {
+ assert(output contains message,
+ "Interpreter output did not contain '" + message + "':\n" + output)
+ }
+
+ def assertDoesNotContain(message: String, output: String) {
+ assert(!(output contains message),
+ "Interpreter output contained '" + message + "':\n" + output)
+ }
+} \ No newline at end of file
diff --git a/repl/src/test/scala/spark/repl/StandaloneClusterReplSuite.scala b/repl/src/test/scala/spark/repl/StandaloneClusterReplSuite.scala
new file mode 100644
index 0000000000..a0940e2166
--- /dev/null
+++ b/repl/src/test/scala/spark/repl/StandaloneClusterReplSuite.scala
@@ -0,0 +1,32 @@
+package spark.repl
+
+import org.scalatest.FunSuite
+
+class StandaloneClusterReplSuite extends FunSuite with ReplSuiteMixin {
+ setupStandaloneCluster
+
+ test("simple collect") {
+ val output = runInterpreter("spark://127.0.1.2:7089", """
+ var x = 123
+ val data = sc.parallelize(1 to 3).map(_ + x)
+ data.take(3)
+ """)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("124", output)
+ assertContains("125", output)
+ assertContains("126", output)
+ }
+
+ test("simple foreach with accumulator") {
+ val output = runInterpreter("spark://127.0.1.2:7089", """
+ val accum = sc.accumulator(0)
+ sc.parallelize(1 to 10).foreach(x => accum += x)
+ accum.value
+ """)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res1: Int = 55", output)
+ }
+
+} \ No newline at end of file