aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala50
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala50
3 files changed, 63 insertions, 39 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a897e53218..6196f7b165 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -103,7 +103,7 @@ private[spark] class Executor(
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
// Set the classloader for serializer
- env.serializer.setDefaultClassLoader(urlClassLoader)
+ env.serializer.setDefaultClassLoader(replClassLoader)
// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 249f438459..934daaeaaf 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -121,9 +121,9 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|var v = 7
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -137,7 +137,7 @@ class ReplSuite extends FunSuite {
|class C {
|def foo = 5
|}
- |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => (new C).foo).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -148,7 +148,7 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|def double(x: Int) = x + x
- |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -160,9 +160,9 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -178,9 +178,9 @@ class ReplSuite extends FunSuite {
"""
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -216,14 +216,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -262,7 +262,7 @@ class ReplSuite extends FunSuite {
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|import sqlContext.implicits._
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect()
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -275,7 +275,7 @@ class ReplSuite extends FunSuite {
|val t = new TestClass
|import t.testMethod
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -287,14 +287,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -309,10 +309,22 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local[2]",
"""
|case class Foo(i: Int)
- |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+ |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("ret: Array[Foo] = Array(Foo(1),", output)
}
+
+ test("collecting objects of class defined in repl - shuffling") {
+ val output = runInterpreter("local-cluster[1,1,512]",
+ """
+ |case class Foo(i: Int)
+ |val list = List((1, Foo(1)), (1, Foo(2)))
+ |val ret = sc.parallelize(list).groupByKey().collect()
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
+ }
}
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index b3bd135548..fbef5b25ba 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -128,9 +128,9 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|var v = 7
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -144,7 +144,7 @@ class ReplSuite extends FunSuite {
|class C {
|def foo = 5
|}
- |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => (new C).foo).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -155,7 +155,7 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|def double(x: Int) = x + x
- |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -167,9 +167,9 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -185,9 +185,9 @@ class ReplSuite extends FunSuite {
"""
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -224,14 +224,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -270,7 +270,7 @@ class ReplSuite extends FunSuite {
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|import sqlContext.implicits._
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -283,7 +283,7 @@ class ReplSuite extends FunSuite {
|val t = new TestClass
|import t.testMethod
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -295,14 +295,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
- |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+ |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
- |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+ |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -317,10 +317,22 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local[2]",
"""
|case class Foo(i: Int)
- |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+ |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("ret: Array[Foo] = Array(Foo(1),", output)
}
+
+ test("collecting objects of class defined in repl - shuffling") {
+ val output = runInterpreter("local-cluster[1,1,512]",
+ """
+ |case class Foo(i: Int)
+ |val list = List((1, Foo(1)), (1, Foo(2)))
+ |val ret = sc.parallelize(list).groupByKey().collect()
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
+ }
}