diff options
author | GuoQiang Li <witgo@qq.com> | 2014-12-06 00:56:51 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-12-06 00:56:51 -0800 |
commit | e895e0cbecbbec1b412ff21321e57826d2d0a982 (patch) | |
tree | 7f75e1a51a0707a2ebf97c2e0f32c50058a98701 /graphx/src/test | |
parent | 6eb1b6f6204ea3c8083af3fb9cd990d9f3dac89d (diff) | |
download | spark-e895e0cbecbbec1b412ff21321e57826d2d0a982.tar.gz spark-e895e0cbecbbec1b412ff21321e57826d2d0a982.tar.bz2 spark-e895e0cbecbbec1b412ff21321e57826d2d0a982.zip |
[SPARK-3623][GraphX] GraphX should support the checkpoint operation
Author: GuoQiang Li <witgo@qq.com>
Closes #2631 from witgo/SPARK-3623 and squashes the following commits:
a70c500 [GuoQiang Li] Remove java related
4d1e249 [GuoQiang Li] Add comments
e682724 [GuoQiang Li] Graph should support the checkpoint operation
Diffstat (limited to 'graphx/src/test')
-rw-r--r-- | graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala | 21 |
1 files changed, 21 insertions, 0 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index a05d1ddb21..9da0064104 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.graphx import org.scalatest.FunSuite +import com.google.common.io.Files + import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("checkpoint") { + val checkpointDir = Files.createTempDir() + checkpointDir.deleteOnExit() + withSpark { sc => + sc.setCheckpointDir(checkpointDir.getAbsolutePath) + val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)} + val rdd = sc.parallelize(ring) + val graph = Graph.fromEdges(rdd, 1.0F) + graph.checkpoint() + graph.edges.map(_.attr).count() + graph.vertices.map(_._2).count() + + val edgesDependencies = graph.edges.partitionsRDD.dependencies + val verticesDependencies = graph.vertices.partitionsRDD.dependencies + assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) + assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) + } + } + } |