aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-02-01 11:02:17 -0800
committerAndrew Or <andrew@databricks.com>2016-02-01 11:02:17 -0800
commit6075573a93176ee8c071888e4525043d9e73b061 (patch)
tree45cdc80c2f00b52ac5b5f4aaabb04e3e822557fe /core/src/test/scala/org/apache
parentc1da4d421ab78772ffa52ad46e5bdfb4e5268f47 (diff)
downloadspark-6075573a93176ee8c071888e4525043d9e73b061.tar.gz
spark-6075573a93176ee8c071888e4525043d9e73b061.tar.bz2
spark-6075573a93176ee8c071888e4525043d9e73b061.zip
[SPARK-6847][CORE][STREAMING] Fix stack overflow issue when updateStateByKey is followed by a checkpointed dstream
Add a local property to indicate if checkpointing all RDDs that are marked with the checkpoint flag, and enable it in Streaming Author: Shixiong Zhu <shixiong@databricks.com> Closes #10934 from zsxwing/recursive-checkpoint.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala21
1 files changed, 21 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 390764ba24..ce35856dce 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -512,6 +512,27 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
assert(rdd.isCheckpointedAndMaterialized === true)
assert(rdd.partitions.size === 0)
}
+
+ runTest("checkpointAllMarkedAncestors") { reliableCheckpoint: Boolean =>
+ testCheckpointAllMarkedAncestors(reliableCheckpoint, checkpointAllMarkedAncestors = true)
+ testCheckpointAllMarkedAncestors(reliableCheckpoint, checkpointAllMarkedAncestors = false)
+ }
+
+ private def testCheckpointAllMarkedAncestors(
+ reliableCheckpoint: Boolean, checkpointAllMarkedAncestors: Boolean): Unit = {
+ sc.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, checkpointAllMarkedAncestors.toString)
+ try {
+ val rdd1 = sc.parallelize(1 to 10)
+ checkpoint(rdd1, reliableCheckpoint)
+ val rdd2 = rdd1.map(_ + 1)
+ checkpoint(rdd2, reliableCheckpoint)
+ rdd2.count()
+ assert(rdd1.isCheckpointed === checkpointAllMarkedAncestors)
+ assert(rdd2.isCheckpointed === true)
+ } finally {
+ sc.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, null)
+ }
+ }
}
/** RDD partition that has large serialized size. */