diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2017-01-25 17:17:34 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2017-01-25 17:17:34 -0800 |
commit | 47d5d0ddb06c7d2c86515d9556c41dc80081f560 (patch) | |
tree | 0ad7f2c384c9aebf547e871aa2ba4efc85ca7461 | |
parent | 965c82d8c4b7f2d4dfbc45ec4d47d6b6588094c3 (diff) | |
download | spark-47d5d0ddb06c7d2c86515d9556c41dc80081f560.tar.gz spark-47d5d0ddb06c7d2c86515d9556c41dc80081f560.tar.bz2 spark-47d5d0ddb06c7d2c86515d9556c41dc80081f560.zip |
[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD
## What changes were proposed in this pull request?
EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed.
This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException.
The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic.
## How was this patch tested?
New unit tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #15396 from tdas/SPARK-14804.
3 files changed, 56 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a7e01f397e..0359508c00 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1610,14 +1610,15 @@ abstract class RDD[T: ClassTag]( /** * Return whether this RDD is checkpointed and materialized, either reliably or locally. */ - def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed) + def isCheckpointed: Boolean = isCheckpointedAndMaterialized /** * Return whether this RDD is checkpointed and materialized, either reliably or locally. * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the * return value. Exposed for testing. */ - private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed + private[spark] def isCheckpointedAndMaterialized: Boolean = + checkpointData.exists(_.isCheckpointed) /** * Return whether this RDD is marked for local checkpointing. diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala index f1ecc9e221..7a24e320c3 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx import org.apache.spark.SparkFunSuite import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext { @@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext { } } + test("checkpointing") { + withSpark { sc => + val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3))) + val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]])) + sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath) + edges.checkpoint() + + // EdgeRDD not yet checkpointed + assert(!edges.isCheckpointed) + assert(!edges.isCheckpointedAndMaterialized) + assert(!edges.partitionsRDD.isCheckpointed) + assert(!edges.partitionsRDD.isCheckpointedAndMaterialized) + + val data = edges.collect().toSeq // force checkpointing + + // EdgeRDD shows up as checkpointed, but internally it is not. + // Only internal partitionsRDD is checkpointed. + assert(edges.isCheckpointed) + assert(!edges.isCheckpointedAndMaterialized) + assert(edges.partitionsRDD.isCheckpointed) + assert(edges.partitionsRDD.isCheckpointedAndMaterialized) + + assert(edges.collect().toSeq === data) // test checkpointed RDD + } + } + } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 0bb9e0a3ea..8e63043527 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.graphx import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils class VertexRDDSuite extends SparkFunSuite with LocalSparkContext { @@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext { } } + test("checkpoint") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n) + sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath) + verts.checkpoint() + + // VertexRDD not yet checkpointed + assert(!verts.isCheckpointed) + assert(!verts.isCheckpointedAndMaterialized) + assert(!verts.partitionsRDD.isCheckpointed) + assert(!verts.partitionsRDD.isCheckpointedAndMaterialized) + + val data = verts.collect().toSeq // force checkpointing + + // VertexRDD shows up as checkpointed, but internally it is not. + // Only internal partitionsRDD is checkpointed. + assert(verts.isCheckpointed) + assert(!verts.isCheckpointedAndMaterialized) + assert(verts.partitionsRDD.isCheckpointed) + assert(verts.partitionsRDD.isCheckpointedAndMaterialized) + + assert(verts.collect().toSeq === data) // test checkpointed RDD + } + } } |