aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2017-01-25 17:17:34 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2017-01-25 17:17:34 -0800
commit47d5d0ddb06c7d2c86515d9556c41dc80081f560 (patch)
tree0ad7f2c384c9aebf547e871aa2ba4efc85ca7461 /graphx
parent965c82d8c4b7f2d4dfbc45ec4d47d6b6588094c3 (diff)
downloadspark-47d5d0ddb06c7d2c86515d9556c41dc80081f560.tar.gz
spark-47d5d0ddb06c7d2c86515d9556c41dc80081f560.tar.bz2
spark-47d5d0ddb06c7d2c86515d9556c41dc80081f560.zip
[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD
## What changes were proposed in this pull request? EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed. This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException. The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic. ## How was this patch tested? New unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15396 from tdas/SPARK-14804.
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala27
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala26
2 files changed, 53 insertions, 0 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
index f1ecc9e221..7a24e320c3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
@@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("checkpointing") {
+ withSpark { sc =>
+ val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
+ val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
+ sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+ edges.checkpoint()
+
+ // EdgeRDD not yet checkpointed
+ assert(!edges.isCheckpointed)
+ assert(!edges.isCheckpointedAndMaterialized)
+ assert(!edges.partitionsRDD.isCheckpointed)
+ assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+ val data = edges.collect().toSeq // force checkpointing
+
+ // EdgeRDD shows up as checkpointed, but internally it is not.
+ // Only internal partitionsRDD is checkpointed.
+ assert(edges.isCheckpointed)
+ assert(!edges.isCheckpointedAndMaterialized)
+ assert(edges.partitionsRDD.isCheckpointed)
+ assert(edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+ assert(edges.collect().toSeq === data) // test checkpointed RDD
+ }
+ }
+
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 0bb9e0a3ea..8e63043527 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
@@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("checkpoint") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+ verts.checkpoint()
+
+ // VertexRDD not yet checkpointed
+ assert(!verts.isCheckpointed)
+ assert(!verts.isCheckpointedAndMaterialized)
+ assert(!verts.partitionsRDD.isCheckpointed)
+ assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+ val data = verts.collect().toSeq // force checkpointing
+
+ // VertexRDD shows up as checkpointed, but internally it is not.
+ // Only internal partitionsRDD is checkpointed.
+ assert(verts.isCheckpointed)
+ assert(!verts.isCheckpointedAndMaterialized)
+ assert(verts.partitionsRDD.isCheckpointed)
+ assert(verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+ assert(verts.collect().toSeq === data) // test checkpointed RDD
+ }
+ }
}