aboutsummaryrefslogtreecommitdiff
path: root/repl/scala-2.11/src/test
diff options
context:
space:
mode:
authorKevin (Sangwoo) Kim <sangwookim.me@gmail.com>2015-03-16 23:49:23 -0700
committerReynold Xin <rxin@databricks.com>2015-03-16 23:49:23 -0700
commitf0edeae7f9ab7eae02c227be9162ec69d22c92bd (patch)
tree74b077a59f8d088bf6dde6527349daf9bf5774ad /repl/scala-2.11/src/test
parent9667b9f9c3239f814a0b1120355d9e7bd7a89158 (diff)
downloadspark-f0edeae7f9ab7eae02c227be9162ec69d22c92bd.tar.gz
spark-f0edeae7f9ab7eae02c227be9162ec69d22c92bd.tar.bz2
spark-f0edeae7f9ab7eae02c227be9162ec69d22c92bd.zip
[SPARK-6299][CORE] ClassNotFoundException in standalone mode when running groupByKey with class defined in REPL
``` case class ClassA(value: String) val rdd = sc.parallelize(List(("k1", ClassA("v1")), ("k1", ClassA("v2")) )) rdd.groupByKey.collect ``` This code used to be throw exception in spark-shell, because while shuffling ```JavaSerializer```uses ```defaultClassLoader``` which was defined like ```env.serializer.setDefaultClassLoader(urlClassLoader)```. It should be ```env.serializer.setDefaultClassLoader(replClassLoader)```, like ``` override def run() { val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClassLoader) ``` in TaskRunner. When ```replClassLoader``` cannot be defined, it's identical with ```urlClassLoader``` Author: Kevin (Sangwoo) Kim <sangwookim.me@gmail.com> Closes #5046 from swkimme/master and squashes the following commits: fa2b9ee [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() ) 6e9620b [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() ) d23e4e2 [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() ) a4a3c8a [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to ReplSuite bd00da5 [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to ReplSuite c1b1fc7 [Kevin (Sangwoo) Kim] use REPL class loader for executor's serializer
Diffstat (limited to 'repl/scala-2.11/src/test')
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala50
1 files changed, 31 insertions, 19 deletions
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index b3bd135548..fbef5b25ba 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -128,9 +128,9 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|var v = 7
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -144,7 +144,7 @@ class ReplSuite extends FunSuite {
|class C {
|def foo = 5
|}
- |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => (new C).foo).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -155,7 +155,7 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|def double(x: Int) = x + x
- |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -167,9 +167,9 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -185,9 +185,9 @@ class ReplSuite extends FunSuite {
"""
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -224,14 +224,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -270,7 +270,7 @@ class ReplSuite extends FunSuite {
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|import sqlContext.implicits._
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -283,7 +283,7 @@ class ReplSuite extends FunSuite {
|val t = new TestClass
|import t.testMethod
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -295,14 +295,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -317,10 +317,22 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local[2]",
"""
|case class Foo(i: Int)
- |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+ |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("ret: Array[Foo] = Array(Foo(1),", output)
}
+
+ test("collecting objects of class defined in repl - shuffling") {
+ val output = runInterpreter("local-cluster[1,1,512]",
+ """
+ |case class Foo(i: Int)
+ |val list = List((1, Foo(1)), (1, Foo(2)))
+ |val ret = sc.parallelize(list).groupByKey().collect()
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
+ }
}