aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala11
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala10
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala4
-rw-r--r--project/MimaExcludes.scala6
6 files changed, 51 insertions, 2 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index ab56580a3a..8494d06b1c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -105,6 +105,18 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def checkpoint(): Unit
/**
+ * Return whether this Graph has been checkpointed or not.
+ * This returns true iff both the vertices RDD and edges RDD have been checkpointed.
+ */
+ def isCheckpointed: Boolean
+
+ /**
+ * Gets the name of the files to which this Graph was checkpointed.
+ * (The vertices RDD and edges RDD are checkpointed separately.)
+ */
+ def getCheckpointFiles: Seq[String]
+
+ /**
* Uncaches both vertices and edges of this graph. This is useful in iterative algorithms that
* build a new graph in each iteration.
*/
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index f1550ac2e1..6c35d7029e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -73,7 +73,15 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
override def checkpoint() = {
partitionsRDD.checkpoint()
}
-
+
+ override def isCheckpointed: Boolean = {
+ firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
+ }
+
+ override def getCheckpointFile: Option[String] = {
+ partitionsRDD.getCheckpointFile
+ }
+
/** The number of edges in the RDD. */
override def count(): Long = {
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 3f4a900d5b..90a74d23a2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -70,6 +70,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
replicatedVertexView.edges.checkpoint()
}
+ override def isCheckpointed: Boolean = {
+ vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed
+ }
+
+ override def getCheckpointFiles: Seq[String] = {
+ Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap {
+ case Some(path) => Seq(path)
+ case None => Seq()
+ }
+ }
+
override def unpersist(blocking: Boolean = true): Graph[VD, ED] = {
unpersistVertices(blocking)
replicatedVertexView.edges.unpersist(blocking)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 9732c5b00c..3e4968d6c0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -74,7 +74,15 @@ class VertexRDDImpl[VD] private[graphx] (
override def checkpoint() = {
partitionsRDD.checkpoint()
}
-
+
+ override def isCheckpointed: Boolean = {
+ firstParent[ShippableVertexPartition[VD]].isCheckpointed
+ }
+
+ override def getCheckpointFile: Option[String] = {
+ partitionsRDD.getCheckpointFile
+ }
+
/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index ed9876b8dc..59a57ba7a3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -375,6 +375,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
val rdd = sc.parallelize(ring)
val graph = Graph.fromEdges(rdd, 1.0F)
+ assert(!graph.isCheckpointed)
+ assert(graph.getCheckpointFiles.size === 0)
graph.checkpoint()
graph.edges.map(_.attr).count()
graph.vertices.map(_._2).count()
@@ -383,6 +385,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val verticesDependencies = graph.vertices.partitionsRDD.dependencies
assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+ assert(graph.isCheckpointed)
+ assert(graph.getCheckpointFiles.size === 2)
}
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 14ba03ed46..45be1db9a5 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -127,6 +127,12 @@ object MimaExcludes {
// SPARK-5315 Spark Streaming Java API returns Scala DStream
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
+ ) ++ Seq(
+ // SPARK-5461 Graph should have isCheckpointed, getCheckpointFiles methods
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.graphx.Graph.getCheckpointFiles"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.graphx.Graph.isCheckpointed")
)
case v if v.startsWith("1.2") =>