aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuoQiang Li <witgo@qq.com>2014-12-06 00:56:51 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-12-06 00:57:02 -0800
commit27d9f13af2df3bd7af029cf7ac48443ba6f4d6e0 (patch)
treea2b3b2a0779328f755463351d645bf64080feb45
parent11446a6488fa95aca75e94f8fbecea80dc8f5331 (diff)
downloadspark-27d9f13af2df3bd7af029cf7ac48443ba6f4d6e0.tar.gz
spark-27d9f13af2df3bd7af029cf7ac48443ba6f4d6e0.tar.bz2
spark-27d9f13af2df3bd7af029cf7ac48443ba6f4d6e0.zip
[SPARK-3623][GraphX] GraphX should support the checkpoint operation
Author: GuoQiang Li <witgo@qq.com> Closes #2631 from witgo/SPARK-3623 and squashes the following commits: a70c500 [GuoQiang Li] Remove java related 4d1e249 [GuoQiang Li] Add comments e682724 [GuoQiang Li] Graph should support the checkpoint operation (cherry picked from commit e895e0cbecbbec1b412ff21321e57826d2d0a982) Signed-off-by: Ankur Dave <ankurdave@gmail.com>
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala5
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala21
3 files changed, 34 insertions, 0 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 6377915435..23538b7156 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -97,6 +97,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def cache(): Graph[VD, ED]
/**
+ * Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint
+ * directory set with SparkContext.setCheckpointDir() and all references to its parent
+ * RDDs will be removed. It is strongly recommended that this Graph is persisted in
+ * memory, otherwise saving it on a file will require recomputation.
+ */
+ def checkpoint(): Unit
+
+ /**
* Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative
* algorithms that modify the vertex attributes but reuse the edges. This method can be used to
* uncache the vertex attributes of previous iterations once they are no longer needed, improving
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 0eae2a6738..a617d84aea 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
this
}
+ override def checkpoint(): Unit = {
+ vertices.checkpoint()
+ replicatedVertexView.edges.checkpoint()
+ }
+
override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
vertices.unpersist(blocking)
// TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index a05d1ddb21..9da0064104 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite
+import com.google.common.io.Files
+
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
@@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("checkpoint") {
+ val checkpointDir = Files.createTempDir()
+ checkpointDir.deleteOnExit()
+ withSpark { sc =>
+ sc.setCheckpointDir(checkpointDir.getAbsolutePath)
+ val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
+ val rdd = sc.parallelize(ring)
+ val graph = Graph.fromEdges(rdd, 1.0F)
+ graph.checkpoint()
+ graph.edges.map(_.attr).count()
+ graph.vertices.map(_._2).count()
+
+ val edgesDependencies = graph.edges.partitionsRDD.dependencies
+ val verticesDependencies = graph.vertices.partitionsRDD.dependencies
+ assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+ assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+ }
+ }
+
}