From 19b0256ae455d8dcfa3b2eafb602633eff9d4f1c Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Mon, 15 Apr 2013 19:49:40 +0530 Subject: Added standalone cluster repl suite --- repl/src/test/scala/spark/repl/ReplSuite.scala | 59 +++++----------------- .../src/test/scala/spark/repl/ReplSuiteMixin.scala | 56 ++++++++++++++++++++ .../spark/repl/StandaloneClusterReplSuite.scala | 32 ++++++++++++ 3 files changed, 100 insertions(+), 47 deletions(-) create mode 100644 repl/src/test/scala/spark/repl/ReplSuiteMixin.scala create mode 100644 repl/src/test/scala/spark/repl/StandaloneClusterReplSuite.scala diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala index a3274c2737..4dfd3127bf 100644 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/spark/repl/ReplSuite.scala @@ -1,50 +1,14 @@ package spark.repl -import java.io._ -import java.net.URLClassLoader - -import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConversions._ +import java.io.FileWriter import org.scalatest.FunSuite -import com.google.common.io.Files - -class ReplSuite extends FunSuite { - def runInterpreter(master: String, input: String): String = { - val in = new BufferedReader(new StringReader(input + "\n")) - val out = new StringWriter() - val cl = getClass.getClassLoader - var paths = new ArrayBuffer[String] - if (cl.isInstanceOf[URLClassLoader]) { - val urlLoader = cl.asInstanceOf[URLClassLoader] - for (url <- urlLoader.getURLs) { - if (url.getProtocol == "file") { - paths += url.getFile - } - } - } - val interp = new SparkILoop(in, new PrintWriter(out), master) - spark.repl.Main.interp = interp - val separator = System.getProperty("path.separator") - interp.process(Array("-classpath", paths.mkString(separator))) - if (interp != null) - interp.closeInterpreter(); - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") - return out.toString - } - def assertContains(message: String, output: String) { - assert(output contains message, - "Interpreter output did not contain '" + message + "':\n" + output) - } +import com.google.common.io.Files - def assertDoesNotContain(message: String, output: String) { - assert(!(output contains message), - "Interpreter output contained '" + message + "':\n" + output) - } +class ReplSuite extends FunSuite with ReplSuiteMixin { - test ("simple foreach with accumulator") { + test("simple foreach with accumulator") { val output = runInterpreter("local", """ val accum = sc.accumulator(0) sc.parallelize(1 to 10).foreach(x => accum += x) @@ -55,7 +19,7 @@ class ReplSuite extends FunSuite { assertContains("res1: Int = 55", output) } - test ("external vars") { + test("external vars") { val output = runInterpreter("local", """ var v = 7 sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) @@ -68,7 +32,7 @@ class ReplSuite extends FunSuite { assertContains("res1: Int = 100", output) } - test ("external classes") { + test("external classes") { val output = runInterpreter("local", """ class C { def foo = 5 @@ -80,7 +44,7 @@ class ReplSuite extends FunSuite { assertContains("res0: Int = 50", output) } - test ("external functions") { + test("external functions") { val output = runInterpreter("local", """ def double(x: Int) = x + x sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_) @@ -90,7 +54,7 @@ class ReplSuite extends FunSuite { assertContains("res0: Int = 110", output) } - test ("external functions that access vars") { + test("external functions that access vars") { val output = runInterpreter("local", """ var v = 7 def getV() = v @@ -104,7 +68,7 @@ class ReplSuite extends FunSuite { assertContains("res1: Int = 100", output) } - test ("broadcast vars") { + test("broadcast vars") { // Test that the value that a broadcast var had when it was created is used, // even if that variable is then modified in the driver program // TODO: This doesn't actually work for arrays when we run in local mode! @@ -121,7 +85,7 @@ class ReplSuite extends FunSuite { assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output) } - test ("interacting with files") { + test("interacting with files") { val tempDir = Files.createTempDir() val out = new FileWriter(tempDir + "/input") out.write("Hello world!\n") @@ -142,7 +106,7 @@ class ReplSuite extends FunSuite { } if (System.getenv("MESOS_NATIVE_LIBRARY") != null) { - test ("running on Mesos") { + test("running on Mesos") { val output = runInterpreter("localquiet", """ var v = 7 def getV() = v @@ -163,4 +127,5 @@ class ReplSuite extends FunSuite { assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) } } + } diff --git a/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala b/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala new file mode 100644 index 0000000000..35429bf01f --- /dev/null +++ b/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala @@ -0,0 +1,56 @@ +package spark.repl + +import java.io.BufferedReader +import java.io.PrintWriter +import java.io.StringReader +import java.io.StringWriter +import java.net.URLClassLoader + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.future + +import spark.deploy.master.Master +import spark.deploy.worker.Worker + +trait ReplSuiteMixin { + def setupStandaloneCluster() { + future { Master.main(Array("-i", "127.0.1.2", "-p", "7089")) } + Thread.sleep(2000) + future { Worker.main(Array("spark://127.0.1.2:7089", "--webui-port", "0")) } + } + + def runInterpreter(master: String, input: String): String = { + val in = new BufferedReader(new StringReader(input + "\n")) + val out = new StringWriter() + val cl = getClass.getClassLoader + var paths = new ArrayBuffer[String] + if (cl.isInstanceOf[URLClassLoader]) { + val urlLoader = cl.asInstanceOf[URLClassLoader] + for (url <- urlLoader.getURLs) { + if (url.getProtocol == "file") { + paths += url.getFile + } + } + } + val interp = new SparkILoop(in, new PrintWriter(out), master) + spark.repl.Main.interp = interp + val separator = System.getProperty("path.separator") + interp.process(Array("-classpath", paths.mkString(separator))) + if (interp != null) + interp.closeInterpreter(); + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port") + return out.toString + } + + def assertContains(message: String, output: String) { + assert(output contains message, + "Interpreter output did not contain '" + message + "':\n" + output) + } + + def assertDoesNotContain(message: String, output: String) { + assert(!(output contains message), + "Interpreter output contained '" + message + "':\n" + output) + } +} \ No newline at end of file diff --git a/repl/src/test/scala/spark/repl/StandaloneClusterReplSuite.scala b/repl/src/test/scala/spark/repl/StandaloneClusterReplSuite.scala new file mode 100644 index 0000000000..a0940e2166 --- /dev/null +++ b/repl/src/test/scala/spark/repl/StandaloneClusterReplSuite.scala @@ -0,0 +1,32 @@ +package spark.repl + +import org.scalatest.FunSuite + +class StandaloneClusterReplSuite extends FunSuite with ReplSuiteMixin { + setupStandaloneCluster + + test("simple collect") { + val output = runInterpreter("spark://127.0.1.2:7089", """ + var x = 123 + val data = sc.parallelize(1 to 3).map(_ + x) + data.take(3) + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("124", output) + assertContains("125", output) + assertContains("126", output) + } + + test("simple foreach with accumulator") { + val output = runInterpreter("spark://127.0.1.2:7089", """ + val accum = sc.accumulator(0) + sc.parallelize(1 to 10).foreach(x => accum += x) + accum.value + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res1: Int = 55", output) + } + +} \ No newline at end of file -- cgit v1.2.3