aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala27
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala26
2 files changed, 53 insertions, 0 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
index f1ecc9e221..7a24e320c3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
@@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("checkpointing") {
+ withSpark { sc =>
+ val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
+ val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
+ sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+ edges.checkpoint()
+
+ // EdgeRDD not yet checkpointed
+ assert(!edges.isCheckpointed)
+ assert(!edges.isCheckpointedAndMaterialized)
+ assert(!edges.partitionsRDD.isCheckpointed)
+ assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+ val data = edges.collect().toSeq // force checkpointing
+
+ // EdgeRDD shows up as checkpointed, but internally it is not.
+ // Only internal partitionsRDD is checkpointed.
+ assert(edges.isCheckpointed)
+ assert(!edges.isCheckpointedAndMaterialized)
+ assert(edges.partitionsRDD.isCheckpointed)
+ assert(edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+ assert(edges.collect().toSeq === data) // test checkpointed RDD
+ }
+ }
+
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 0bb9e0a3ea..8e63043527 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
@@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("checkpoint") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+ verts.checkpoint()
+
+ // VertexRDD not yet checkpointed
+ assert(!verts.isCheckpointed)
+ assert(!verts.isCheckpointedAndMaterialized)
+ assert(!verts.partitionsRDD.isCheckpointed)
+ assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+ val data = verts.collect().toSeq // force checkpointing
+
+ // VertexRDD shows up as checkpointed, but internally it is not.
+ // Only internal partitionsRDD is checkpointed.
+ assert(verts.isCheckpointed)
+ assert(!verts.isCheckpointedAndMaterialized)
+ assert(verts.partitionsRDD.isCheckpointed)
+ assert(verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+ assert(verts.collect().toSeq === data) // test checkpointed RDD
+ }
+ }
}