From e895e0cbecbbec1b412ff21321e57826d2d0a982 Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Sat, 6 Dec 2014 00:56:51 -0800 Subject: [SPARK-3623][GraphX] GraphX should support the checkpoint operation Author: GuoQiang Li Closes #2631 from witgo/SPARK-3623 and squashes the following commits: a70c500 [GuoQiang Li] Remove java related 4d1e249 [GuoQiang Li] Add comments e682724 [GuoQiang Li] Graph should support the checkpoint operation --- .../main/scala/org/apache/spark/graphx/Graph.scala | 8 ++++++++ .../org/apache/spark/graphx/impl/GraphImpl.scala | 5 +++++ .../scala/org/apache/spark/graphx/GraphSuite.scala | 21 +++++++++++++++++++++ 3 files changed, 34 insertions(+) (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 6377915435..23538b7156 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -96,6 +96,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def cache(): Graph[VD, ED] + /** + * Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint + * directory set with SparkContext.setCheckpointDir() and all references to its parent + * RDDs will be removed. It is strongly recommended that this Graph is persisted in + * memory, otherwise saving it on a file will require recomputation. + */ + def checkpoint(): Unit + /** * Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative * algorithms that modify the vertex attributes but reuse the edges. This method can be used to diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 0eae2a6738..a617d84aea 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( this } + override def checkpoint(): Unit = { + vertices.checkpoint() + replicatedVertexView.edges.checkpoint() + } + override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { vertices.unpersist(blocking) // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index a05d1ddb21..9da0064104 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.graphx import org.scalatest.FunSuite +import com.google.common.io.Files + import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("checkpoint") { + val checkpointDir = Files.createTempDir() + checkpointDir.deleteOnExit() + withSpark { sc => + sc.setCheckpointDir(checkpointDir.getAbsolutePath) + val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)} + val rdd = sc.parallelize(ring) + val graph = Graph.fromEdges(rdd, 1.0F) + graph.checkpoint() + graph.edges.map(_.attr).count() + graph.vertices.map(_._2).count() + + val edgesDependencies = graph.edges.partitionsRDD.dependencies + val verticesDependencies = graph.vertices.partitionsRDD.dependencies + assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) + assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) + } + } + } -- cgit v1.2.3