aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-08-22 16:32:14 -0700
committerReynold Xin <rxin@databricks.com>2016-08-22 16:32:14 -0700
commit8e223ea67acf5aa730ccf688802f17f6fc10907c (patch)
treeb60988fb0166c27148e44636326ed20399327d94 /repl
parent71afeeea4ec8e67edc95b5d504c557c88a2598b9 (diff)
downloadspark-8e223ea67acf5aa730ccf688802f17f6fc10907c.tar.gz
spark-8e223ea67acf5aa730ccf688802f17f6fc10907c.tar.bz2
spark-8e223ea67acf5aa730ccf688802f17f6fc10907c.zip
[SPARK-16550][SPARK-17042][CORE] Certain classes fail to deserialize in block manager replication
## What changes were proposed in this pull request? This is a straightforward clone of JoshRosen 's original patch. I have follow-up changes to fix block replication for repl-defined classes as well, but those appear to be flaking tests so I'm going to leave that for SPARK-17042 ## How was this patch tested? End-to-end test in ReplSuite (also more tests in DistributedSuite from the original patch). Author: Eric Liang <ekl@databricks.com> Closes #14311 from ericl/spark-16550.
Diffstat (limited to 'repl')
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala14
1 files changed, 14 insertions, 0 deletions
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index c10db947bc..06b09f3158 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -396,6 +396,20 @@ class ReplSuite extends SparkFunSuite {
assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
}
+ test("replicating blocks of object with class defined in repl") {
+ val output = runInterpreter("local-cluster[2,1,1024]",
+ """
+ |import org.apache.spark.storage.StorageLevel._
+ |case class Foo(i: Int)
+ |val ret = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_ONLY_2)
+ |ret.count()
+ |sc.getExecutorStorageStatus.map(s => s.rddBlocksById(ret.id).size).sum
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains(": Int = 20", output)
+ }
+
test("line wrapper only initialized once when used as encoder outer scope") {
val output = runInterpreter("local",
"""