aboutsummaryrefslogtreecommitdiff
path: root/repl/scala-2.10/src/test/scala/org/apache
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-11-11 21:36:48 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-11 21:36:48 -0800
commitdaaca14c16dc2c1abc98f15ab8c6f7c14761b627 (patch)
treede60da38655f8a7d4b0712872b08a7aedf73460f /repl/scala-2.10/src/test/scala/org/apache
parent2ddb1415e2bea94004947506ded090c2e8ff8dad (diff)
downloadspark-daaca14c16dc2c1abc98f15ab8c6f7c14761b627.tar.gz
spark-daaca14c16dc2c1abc98f15ab8c6f7c14761b627.tar.bz2
spark-daaca14c16dc2c1abc98f15ab8c6f7c14761b627.zip
Support cross building for Scala 2.11
Let's give this another go using a version of Hive that shades its JLine dependency. Author: Prashant Sharma <prashant.s@imaginea.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #3159 from pwendell/scala-2.11-prashant and squashes the following commits: e93aa3e [Patrick Wendell] Restoring -Phive-thriftserver profile and cleaning up build script. f65d17d [Patrick Wendell] Fixing build issue due to merge conflict a8c41eb [Patrick Wendell] Reverting dev/run-tests back to master state. 7a6eb18 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into scala-2.11-prashant 583aa07 [Prashant Sharma] REVERT ME: removed hive thirftserver 3680e58 [Prashant Sharma] Revert "REVERT ME: Temporarily removing some Cli tests." 935fb47 [Prashant Sharma] Revert "Fixed by disabling a few tests temporarily." 925e90f [Prashant Sharma] Fixed by disabling a few tests temporarily. 2fffed3 [Prashant Sharma] Exclude groovy from sbt build, and also provide a way for such instances in future. 8bd4e40 [Prashant Sharma] Switched to gmaven plus, it fixes random failures observer with its predecessor gmaven. 5272ce5 [Prashant Sharma] SPARK_SCALA_VERSION related bugs. 2121071 [Patrick Wendell] Migrating version detection to PySpark b1ed44d [Patrick Wendell] REVERT ME: Temporarily removing some Cli tests. 1743a73 [Patrick Wendell] Removing decimal test that doesn't work with Scala 2.11 f5cad4e [Patrick Wendell] Add Scala 2.11 docs 210d7e1 [Patrick Wendell] Revert "Testing new Hive version with shaded jline" 48518ce [Patrick Wendell] Remove association of Hive and Thriftserver profiles. e9d0a06 [Patrick Wendell] Revert "Enable thritfserver for Scala 2.10 only" 67ec364 [Patrick Wendell] Guard building of thriftserver around Scala 2.10 check 8502c23 [Patrick Wendell] Enable thritfserver for Scala 2.10 only e22b104 [Patrick Wendell] Small fix in pom file ec402ab [Patrick Wendell] Various fixes 0be5a9d [Patrick Wendell] Testing new Hive version with shaded jline 4eaec65 [Prashant Sharma] Changed scripts to ignore target. 5167bea [Prashant Sharma] small correction a4fcac6 [Prashant Sharma] Run against scala 2.11 on jenkins. 80285f4 [Prashant Sharma] MAven equivalent of setting spark.executor.extraClasspath during tests. 034b369 [Prashant Sharma] Setting test jars on executor classpath during tests from sbt. d4874cb [Prashant Sharma] Fixed Python Runner suite. null check should be first case in scala 2.11. 6f50f13 [Prashant Sharma] Fixed build after rebasing with master. We should use ${scala.binary.version} instead of just 2.10 e56ca9d [Prashant Sharma] Print an error if build for 2.10 and 2.11 is spotted. 937c0b8 [Prashant Sharma] SCALA_VERSION -> SPARK_SCALA_VERSION cb059b0 [Prashant Sharma] Code review 0476e5e [Prashant Sharma] Scala 2.11 support with repl and all build changes.
Diffstat (limited to 'repl/scala-2.10/src/test/scala/org/apache')
-rw-r--r--repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala318
1 files changed, 318 insertions, 0 deletions
diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
new file mode 100644
index 0000000000..91c9c52c3c
--- /dev/null
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import java.io._
+import java.net.URLClassLoader
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+import org.apache.spark.SparkContext
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.spark.util.Utils
+
+
+class ReplSuite extends FunSuite {
+
+ def runInterpreter(master: String, input: String): String = {
+ val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath"
+
+ val in = new BufferedReader(new StringReader(input + "\n"))
+ val out = new StringWriter()
+ val cl = getClass.getClassLoader
+ var paths = new ArrayBuffer[String]
+ if (cl.isInstanceOf[URLClassLoader]) {
+ val urlLoader = cl.asInstanceOf[URLClassLoader]
+ for (url <- urlLoader.getURLs) {
+ if (url.getProtocol == "file") {
+ paths += url.getFile
+ }
+ }
+ }
+ val classpath = paths.mkString(File.pathSeparator)
+
+ val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
+ System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
+
+ val interp = new SparkILoop(in, new PrintWriter(out), master)
+ org.apache.spark.repl.Main.interp = interp
+ interp.process(Array("-classpath", classpath))
+ org.apache.spark.repl.Main.interp = null
+ if (interp.sparkContext != null) {
+ interp.sparkContext.stop()
+ }
+ if (oldExecutorClasspath != null) {
+ System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
+ } else {
+ System.clearProperty(CONF_EXECUTOR_CLASSPATH)
+ }
+ return out.toString
+ }
+
+ def assertContains(message: String, output: String) {
+ val isContain = output.contains(message)
+ assert(isContain,
+ "Interpreter output did not contain '" + message + "':\n" + output)
+ }
+
+ def assertDoesNotContain(message: String, output: String) {
+ val isContain = output.contains(message)
+ assert(!isContain,
+ "Interpreter output contained '" + message + "':\n" + output)
+ }
+
+ test("propagation of local properties") {
+ // A mock ILoop that doesn't install the SIGINT handler.
+ class ILoop(out: PrintWriter) extends SparkILoop(None, out, None) {
+ settings = new scala.tools.nsc.Settings
+ settings.usejavacp.value = true
+ org.apache.spark.repl.Main.interp = this
+ override def createInterpreter() {
+ intp = new SparkILoopInterpreter
+ intp.setContextClassLoader()
+ }
+ }
+
+ val out = new StringWriter()
+ val interp = new ILoop(new PrintWriter(out))
+ interp.sparkContext = new SparkContext("local", "repl-test")
+ interp.createInterpreter()
+ interp.intp.initialize()
+ interp.sparkContext.setLocalProperty("someKey", "someValue")
+
+ // Make sure the value we set in the caller to interpret is propagated in the thread that
+ // interprets the command.
+ interp.interpret("org.apache.spark.repl.Main.interp.sparkContext.getLocalProperty(\"someKey\")")
+ assert(out.toString.contains("someValue"))
+
+ interp.sparkContext.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ test("simple foreach with accumulator") {
+ val output = runInterpreter("local",
+ """
+ |val accum = sc.accumulator(0)
+ |sc.parallelize(1 to 10).foreach(x => accum += x)
+ |accum.value
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res1: Int = 55", output)
+ }
+
+ test("external vars") {
+ val output = runInterpreter("local",
+ """
+ |var v = 7
+ |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |v = 10
+ |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res0: Int = 70", output)
+ assertContains("res1: Int = 100", output)
+ }
+
+ test("external classes") {
+ val output = runInterpreter("local",
+ """
+ |class C {
+ |def foo = 5
+ |}
+ |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res0: Int = 50", output)
+ }
+
+ test("external functions") {
+ val output = runInterpreter("local",
+ """
+ |def double(x: Int) = x + x
+ |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res0: Int = 110", output)
+ }
+
+ test("external functions that access vars") {
+ val output = runInterpreter("local",
+ """
+ |var v = 7
+ |def getV() = v
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |v = 10
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res0: Int = 70", output)
+ assertContains("res1: Int = 100", output)
+ }
+
+ test("broadcast vars") {
+ // Test that the value that a broadcast var had when it was created is used,
+ // even if that variable is then modified in the driver program
+ // TODO: This doesn't actually work for arrays when we run in local mode!
+ val output = runInterpreter("local",
+ """
+ |var array = new Array[Int](5)
+ |val broadcastArray = sc.broadcast(array)
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |array(0) = 5
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+ assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
+ }
+
+ test("interacting with files") {
+ val tempDir = Utils.createTempDir()
+ val out = new FileWriter(tempDir + "/input")
+ out.write("Hello world!\n")
+ out.write("What's up?\n")
+ out.write("Goodbye\n")
+ out.close()
+ val output = runInterpreter("local",
+ """
+ |var file = sc.textFile("%s").cache()
+ |file.count()
+ |file.count()
+ |file.count()
+ """.stripMargin.format(StringEscapeUtils.escapeJava(
+ tempDir.getAbsolutePath + File.separator + "input")))
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res0: Long = 3", output)
+ assertContains("res1: Long = 3", output)
+ assertContains("res2: Long = 3", output)
+ Utils.deleteRecursively(tempDir)
+ }
+
+ test("local-cluster mode") {
+ val output = runInterpreter("local-cluster[1,1,512]",
+ """
+ |var v = 7
+ |def getV() = v
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |v = 10
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |var array = new Array[Int](5)
+ |val broadcastArray = sc.broadcast(array)
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |array(0) = 5
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res0: Int = 70", output)
+ assertContains("res1: Int = 100", output)
+ assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+ assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+ }
+
+ test("SPARK-1199 two instances of same class don't type check.") {
+ val output = runInterpreter("local-cluster[1,1,512]",
+ """
+ |case class Sum(exp: String, exp2: String)
+ |val a = Sum("A", "B")
+ |def b(a: Sum): String = a match { case Sum(_, _) => "Found Sum" }
+ |b(a)
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ }
+
+ test("SPARK-2452 compound statements.") {
+ val output = runInterpreter("local",
+ """
+ |val x = 4 ; def f() = x
+ |f()
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ }
+
+ test("SPARK-2576 importing SQLContext.createSchemaRDD.") {
+ // We need to use local-cluster to test this case.
+ val output = runInterpreter("local-cluster[1,1,512]",
+ """
+ |val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ |import sqlContext.createSchemaRDD
+ |case class TestCaseClass(value: Int)
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toSchemaRDD.collect
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ }
+
+ test("SPARK-2632 importing a method from non serializable class and not using it.") {
+ val output = runInterpreter("local",
+ """
+ |class TestClass() { def testMethod = 3 }
+ |val t = new TestClass
+ |import t.testMethod
+ |case class TestCaseClass(value: Int)
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ }
+
+ if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
+ test("running on Mesos") {
+ val output = runInterpreter("localquiet",
+ """
+ |var v = 7
+ |def getV() = v
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |v = 10
+ |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |var array = new Array[Int](5)
+ |val broadcastArray = sc.broadcast(array)
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |array(0) = 5
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("res0: Int = 70", output)
+ assertContains("res1: Int = 100", output)
+ assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+ assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+ }
+ }
+
+ test("collecting objects of class defined in repl") {
+ val output = runInterpreter("local[2]",
+ """
+ |case class Foo(i: Int)
+ |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("ret: Array[Foo] = Array(Foo(1),", output)
+ }
+}