diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-02-01 11:02:17 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-02-01 11:02:17 -0800 |
commit | 6075573a93176ee8c071888e4525043d9e73b061 (patch) | |
tree | 45cdc80c2f00b52ac5b5f4aaabb04e3e822557fe /core | |
parent | c1da4d421ab78772ffa52ad46e5bdfb4e5268f47 (diff) | |
download | spark-6075573a93176ee8c071888e4525043d9e73b061.tar.gz spark-6075573a93176ee8c071888e4525043d9e73b061.tar.bz2 spark-6075573a93176ee8c071888e4525043d9e73b061.zip |
[SPARK-6847][CORE][STREAMING] Fix stack overflow issue when updateStateByKey is followed by a checkpointed dstream
Add a local property to indicate if checkpointing all RDDs that are marked with the checkpoint flag, and enable it in Streaming
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10934 from zsxwing/recursive-checkpoint.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 19 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/CheckpointSuite.scala | 21 |
2 files changed, 40 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index be47172581..e8157cf4eb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1542,6 +1542,15 @@ abstract class RDD[T: ClassTag]( private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None + // Whether to checkpoint all ancestor RDDs that are marked for checkpointing. By default, + // we stop as soon as we find the first such RDD, an optimization that allows us to write + // less data but is not safe for all workloads. E.g. in streaming we may checkpoint both + // an RDD and its parent in every batch, in which case the parent may never be checkpointed + // and its lineage never truncated, leading to OOMs in the long run (SPARK-6847). + private val checkpointAllMarkedAncestors = + Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)) + .map(_.toBoolean).getOrElse(false) + /** Returns the first parent RDD */ protected[spark] def firstParent[U: ClassTag]: RDD[U] = { dependencies.head.rdd.asInstanceOf[RDD[U]] @@ -1585,6 +1594,13 @@ abstract class RDD[T: ClassTag]( if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { + if (checkpointAllMarkedAncestors) { + // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint + // them in parallel. + // Checkpoint parents first because our lineage will be truncated after we + // checkpoint ourselves + dependencies.foreach(_.rdd.doCheckpoint()) + } checkpointData.get.checkpoint() } else { dependencies.foreach(_.rdd.doCheckpoint()) @@ -1704,6 +1720,9 @@ abstract class RDD[T: ClassTag]( */ object RDD { + private[spark] val CHECKPOINT_ALL_MARKED_ANCESTORS = + "spark.checkpoint.checkpointAllMarkedAncestors" + // The following implicit functions were in SparkContext before 1.3 and users had to // `import SparkContext._` to enable them. Now we move them here to make the compiler find // them automatically. However, we still keep the old functions in SparkContext for backward diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 390764ba24..ce35856dce 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -512,6 +512,27 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS assert(rdd.isCheckpointedAndMaterialized === true) assert(rdd.partitions.size === 0) } + + runTest("checkpointAllMarkedAncestors") { reliableCheckpoint: Boolean => + testCheckpointAllMarkedAncestors(reliableCheckpoint, checkpointAllMarkedAncestors = true) + testCheckpointAllMarkedAncestors(reliableCheckpoint, checkpointAllMarkedAncestors = false) + } + + private def testCheckpointAllMarkedAncestors( + reliableCheckpoint: Boolean, checkpointAllMarkedAncestors: Boolean): Unit = { + sc.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, checkpointAllMarkedAncestors.toString) + try { + val rdd1 = sc.parallelize(1 to 10) + checkpoint(rdd1, reliableCheckpoint) + val rdd2 = rdd1.map(_ + 1) + checkpoint(rdd2, reliableCheckpoint) + rdd2.count() + assert(rdd1.isCheckpointed === checkpointAllMarkedAncestors) + assert(rdd2.isCheckpointed === true) + } finally { + sc.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, null) + } + } } /** RDD partition that has large serialized size. */ |