aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test/scala
diff options
context:
space:
mode:
authorGuoQiang Li <witgo@qq.com>2014-12-06 00:56:51 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-12-06 00:56:51 -0800
commite895e0cbecbbec1b412ff21321e57826d2d0a982 (patch)
tree7f75e1a51a0707a2ebf97c2e0f32c50058a98701 /graphx/src/test/scala
parent6eb1b6f6204ea3c8083af3fb9cd990d9f3dac89d (diff)
downloadspark-e895e0cbecbbec1b412ff21321e57826d2d0a982.tar.gz
spark-e895e0cbecbbec1b412ff21321e57826d2d0a982.tar.bz2
spark-e895e0cbecbbec1b412ff21321e57826d2d0a982.zip
[SPARK-3623][GraphX] GraphX should support the checkpoint operation
Author: GuoQiang Li <witgo@qq.com> Closes #2631 from witgo/SPARK-3623 and squashes the following commits: a70c500 [GuoQiang Li] Remove java related 4d1e249 [GuoQiang Li] Add comments e682724 [GuoQiang Li] Graph should support the checkpoint operation
Diffstat (limited to 'graphx/src/test/scala')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala21
1 files changed, 21 insertions, 0 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index a05d1ddb21..9da0064104 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite
+import com.google.common.io.Files
+
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
@@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("checkpoint") {
+ val checkpointDir = Files.createTempDir()
+ checkpointDir.deleteOnExit()
+ withSpark { sc =>
+ sc.setCheckpointDir(checkpointDir.getAbsolutePath)
+ val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
+ val rdd = sc.parallelize(ring)
+ val graph = Graph.fromEdges(rdd, 1.0F)
+ graph.checkpoint()
+ graph.edges.map(_.attr).count()
+ graph.vertices.map(_._2).count()
+
+ val edgesDependencies = graph.edges.partitionsRDD.dependencies
+ val verticesDependencies = graph.vertices.partitionsRDD.dependencies
+ assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+ assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+ }
+ }
+
}