aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-10-19 16:16:31 -0700
committerAndrew Or <andrew@databricks.com>2015-10-19 16:16:31 -0700
commita1413b3662250dd5e980e8b1f7c3dc4585ab4766 (patch)
tree18c4ed4a2c15d44ae770aceb8e64572ad6b5592b /core
parent7ab0ce6501c37f0fc3a49e3332573ae4e4def3e8 (diff)
downloadspark-a1413b3662250dd5e980e8b1f7c3dc4585ab4766.tar.gz
spark-a1413b3662250dd5e980e8b1f7c3dc4585ab4766.tar.bz2
spark-a1413b3662250dd5e980e8b1f7c3dc4585ab4766.zip
[SPARK-11051][CORE] Do not allow local checkpointing after the RDD is materialized and checkpointed
JIRA: https://issues.apache.org/jira/browse/SPARK-11051 When a `RDD` is materialized and checkpointed, its partitions and dependencies are cleared. If we allow local checkpointing on it and assign `LocalRDDCheckpointData` to its `checkpointData`. Next time when the RDD is materialized again, the error will be thrown. Author: Liang-Chi Hsieh <viirya@appier.com> Closes #9072 from viirya/no-localcheckpoint-after-checkpoint.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala4
2 files changed, 32 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a56e542242..a97bb17443 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -294,7 +294,11 @@ abstract class RDD[T: ClassTag](
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
- if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
+ if (isCheckpointedAndMaterialized) {
+ firstParent[T].iterator(split, context)
+ } else {
+ compute(split, context)
+ }
}
/**
@@ -1520,21 +1524,38 @@ abstract class RDD[T: ClassTag](
persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true)
}
- checkpointData match {
- case Some(reliable: ReliableRDDCheckpointData[_]) => logWarning(
- "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
- case _ =>
+ // If this RDD is already checkpointed and materialized, its lineage is already truncated.
+ // We must not override our `checkpointData` in this case because it is needed to recover
+ // the checkpointed data. If it is overridden, next time materializing on this RDD will
+ // cause error.
+ if (isCheckpointedAndMaterialized) {
+ logWarning("Not marking RDD for local checkpoint because it was already " +
+ "checkpointed and materialized")
+ } else {
+ // Lineage is not truncated yet, so just override any existing checkpoint data with ours
+ checkpointData match {
+ case Some(_: ReliableRDDCheckpointData[_]) => logWarning(
+ "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
+ case _ =>
+ }
+ checkpointData = Some(new LocalRDDCheckpointData(this))
}
- checkpointData = Some(new LocalRDDCheckpointData(this))
this
}
/**
- * Return whether this RDD is marked for checkpointing, either reliably or locally.
+ * Return whether this RDD is checkpointed and materialized, either reliably or locally.
*/
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
/**
+ * Return whether this RDD is checkpointed and materialized, either reliably or locally.
+ * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
+ * return value. Exposed for testing.
+ */
+ private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
+
+ /**
* Return whether this RDD is marked for local checkpointing.
* Exposed for testing.
*/
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 4d70bfed90..119e5fc28e 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -241,9 +241,13 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging
val rdd = new BlockRDD[Int](sc, Array[BlockId]())
assert(rdd.partitions.size === 0)
assert(rdd.isCheckpointed === false)
+ assert(rdd.isCheckpointedAndMaterialized === false)
checkpoint(rdd, reliableCheckpoint)
+ assert(rdd.isCheckpointed === false)
+ assert(rdd.isCheckpointedAndMaterialized === false)
assert(rdd.count() === 0)
assert(rdd.isCheckpointed === true)
+ assert(rdd.isCheckpointedAndMaterialized === true)
assert(rdd.partitions.size === 0)
}