From 842d00032d0b09fb1f9cfc77359b77693e70a614 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 2 Feb 2015 14:34:48 -0800 Subject: [SPARK-5461] [graphx] Add isCheckpointed, getCheckpointedFiles methods to Graph Added the 2 methods to Graph and GraphImpl. Both make calls to the underlying vertex and edge RDDs. This is needed for another PR (for LDA): [https://github.com/apache/spark/pull/4047] Notes: * getCheckpointedFiles is plural and returns a Seq[String] instead of an Option[String]. * I attempted to test to make sure the methods returned the correct values after checkpointing. It did not work; I guess that checkpointing does not occur quickly enough? I noticed that there are not checkpointing tests for RDDs; is it just hard to test well? CC: rxin CC: mengxr (since related to LDA) Author: Joseph K. Bradley Closes #4253 from jkbradley/graphx-checkpoint and squashes the following commits: b680148 [Joseph K. Bradley] added class tag to firstParent call in VertexRDDImpl.isCheckpointed, though not needed to compile 250810e [Joseph K. Bradley] In EdgeRDDImple, VertexRDDImpl, added transient back to partitionsRDD, and made isCheckpointed check firstParent instead of partitionsRDD 695b7a3 [Joseph K. Bradley] changed partitionsRDD in EdgeRDDImpl, VertexRDDImpl to be non-transient cc00767 [Joseph K. Bradley] added overrides for isCheckpointed, getCheckpointFile in EdgeRDDImpl, VertexRDDImpl. The corresponding Graph methods now work. 188665f [Joseph K. Bradley] improved documentation 235738c [Joseph K. Bradley] Added isCheckpointed and getCheckpointFiles to Graph, GraphImpl --- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 12 ++++++++++++ .../scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 10 +++++++++- .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 11 +++++++++++ .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala | 10 +++++++++- .../src/test/scala/org/apache/spark/graphx/GraphSuite.scala | 4 ++++ project/MimaExcludes.scala | 6 ++++++ 6 files changed, 51 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index ab56580a3a..8494d06b1c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -104,6 +104,18 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def checkpoint(): Unit + /** + * Return whether this Graph has been checkpointed or not. + * This returns true iff both the vertices RDD and edges RDD have been checkpointed. + */ + def isCheckpointed: Boolean + + /** + * Gets the name of the files to which this Graph was checkpointed. + * (The vertices RDD and edges RDD are checkpointed separately.) + */ + def getCheckpointFiles: Seq[String] + /** * Uncaches both vertices and edges of this graph. This is useful in iterative algorithms that * build a new graph in each iteration. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index f1550ac2e1..6c35d7029e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -73,7 +73,15 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( override def checkpoint() = { partitionsRDD.checkpoint() } - + + override def isCheckpointed: Boolean = { + firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed + } + + override def getCheckpointFile: Option[String] = { + partitionsRDD.getCheckpointFile + } + /** The number of edges in the RDD. */ override def count(): Long = { partitionsRDD.map(_._2.size.toLong).reduce(_ + _) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 3f4a900d5b..90a74d23a2 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -70,6 +70,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( replicatedVertexView.edges.checkpoint() } + override def isCheckpointed: Boolean = { + vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed + } + + override def getCheckpointFiles: Seq[String] = { + Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap { + case Some(path) => Seq(path) + case None => Seq() + } + } + override def unpersist(blocking: Boolean = true): Graph[VD, ED] = { unpersistVertices(blocking) replicatedVertexView.edges.unpersist(blocking) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 9732c5b00c..3e4968d6c0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -74,7 +74,15 @@ class VertexRDDImpl[VD] private[graphx] ( override def checkpoint() = { partitionsRDD.checkpoint() } - + + override def isCheckpointed: Boolean = { + firstParent[ShippableVertexPartition[VD]].isCheckpointed + } + + override def getCheckpointFile: Option[String] = { + partitionsRDD.getCheckpointFile + } + /** The number of vertices in the RDD. */ override def count(): Long = { partitionsRDD.map(_.size).reduce(_ + _) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index ed9876b8dc..59a57ba7a3 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -375,6 +375,8 @@ class GraphSuite extends FunSuite with LocalSparkContext { val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)} val rdd = sc.parallelize(ring) val graph = Graph.fromEdges(rdd, 1.0F) + assert(!graph.isCheckpointed) + assert(graph.getCheckpointFiles.size === 0) graph.checkpoint() graph.edges.map(_.attr).count() graph.vertices.map(_._2).count() @@ -383,6 +385,8 @@ class GraphSuite extends FunSuite with LocalSparkContext { val verticesDependencies = graph.vertices.partitionsRDD.dependencies assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]])) + assert(graph.isCheckpointed) + assert(graph.getCheckpointFiles.size === 2) } } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 14ba03ed46..45be1db9a5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -127,6 +127,12 @@ object MimaExcludes { // SPARK-5315 Spark Streaming Java API returns Scala DStream ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow") + ) ++ Seq( + // SPARK-5461 Graph should have isCheckpointed, getCheckpointFiles methods + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.graphx.Graph.getCheckpointFiles"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.graphx.Graph.isCheckpointed") ) case v if v.startsWith("1.2") => -- cgit v1.2.3