aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-01-25 17:17:34 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2017-01-25 17:17:34 -0800
commit47d5d0ddb06c7d2c86515d9556c41dc80081f560 (patch)
tree0ad7f2c384c9aebf547e871aa2ba4efc85ca7461 /core/src/main/scala/org
parent965c82d8c4b7f2d4dfbc45ec4d47d6b6588094c3 (diff)
downloadspark-47d5d0ddb06c7d2c86515d9556c41dc80081f560.tar.gz
spark-47d5d0ddb06c7d2c86515d9556c41dc80081f560.tar.bz2
spark-47d5d0ddb06c7d2c86515d9556c41dc80081f560.zip
[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD
## What changes were proposed in this pull request? EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed. This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException. The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic. ## How was this patch tested? New unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15396 from tdas/SPARK-14804.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a7e01f397e..0359508c00 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1610,14 +1610,15 @@ abstract class RDD[T: ClassTag](
/**
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
*/
- def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
+ def isCheckpointed: Boolean = isCheckpointedAndMaterialized
/**
* Return whether this RDD is checkpointed and materialized, either reliably or locally.
* This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
* return value. Exposed for testing.
*/
- private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
+ private[spark] def isCheckpointedAndMaterialized: Boolean =
+ checkpointData.exists(_.isCheckpointed)
/**
* Return whether this RDD is marked for local checkpointing.