aboutsummaryrefslogtreecommitdiff
path: root/graphx/src
diff options
context:
space:
mode:
authorJoseph K. Bradley <joseph@databricks.com>2015-02-02 14:34:48 -0800
committerXiangrui Meng <meng@databricks.com>2015-02-02 14:34:48 -0800
commit842d00032d0b09fb1f9cfc77359b77693e70a614 (patch)
treea9212f2760c73aaeebcdec1a9a5781b395750b26 /graphx/src
parent5a5526164bdf9ecf1306d4570e816eb4df5cfd2b (diff)
downloadspark-842d00032d0b09fb1f9cfc77359b77693e70a614.tar.gz
spark-842d00032d0b09fb1f9cfc77359b77693e70a614.tar.bz2
spark-842d00032d0b09fb1f9cfc77359b77693e70a614.zip
[SPARK-5461] [graphx] Add isCheckpointed, getCheckpointedFiles methods to Graph
Added the 2 methods to Graph and GraphImpl. Both make calls to the underlying vertex and edge RDDs. This is needed for another PR (for LDA): [https://github.com/apache/spark/pull/4047] Notes: * getCheckpointedFiles is plural and returns a Seq[String] instead of an Option[String]. * I attempted to test to make sure the methods returned the correct values after checkpointing. It did not work; I guess that checkpointing does not occur quickly enough? I noticed that there are not checkpointing tests for RDDs; is it just hard to test well? CC: rxin CC: mengxr (since related to LDA) Author: Joseph K. Bradley <joseph@databricks.com> Closes #4253 from jkbradley/graphx-checkpoint and squashes the following commits: b680148 [Joseph K. Bradley] added class tag to firstParent call in VertexRDDImpl.isCheckpointed, though not needed to compile 250810e [Joseph K. Bradley] In EdgeRDDImple, VertexRDDImpl, added transient back to partitionsRDD, and made isCheckpointed check firstParent instead of partitionsRDD 695b7a3 [Joseph K. Bradley] changed partitionsRDD in EdgeRDDImpl, VertexRDDImpl to be non-transient cc00767 [Joseph K. Bradley] added overrides for isCheckpointed, getCheckpointFile in EdgeRDDImpl, VertexRDDImpl. The corresponding Graph methods now work. 188665f [Joseph K. Bradley] improved documentation 235738c [Joseph K. Bradley] Added isCheckpointed and getCheckpointFiles to Graph, GraphImpl
Diffstat (limited to 'graphx/src')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala11
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala10
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala4
5 files changed, 45 insertions, 2 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index ab56580a3a..8494d06b1c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -105,6 +105,18 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def checkpoint(): Unit
/**
+ * Return whether this Graph has been checkpointed or not.
+ * This returns true iff both the vertices RDD and edges RDD have been checkpointed.
+ */
+ def isCheckpointed: Boolean
+
+ /**
+ * Gets the name of the files to which this Graph was checkpointed.
+ * (The vertices RDD and edges RDD are checkpointed separately.)
+ */
+ def getCheckpointFiles: Seq[String]
+
+ /**
* Uncaches both vertices and edges of this graph. This is useful in iterative algorithms that
* build a new graph in each iteration.
*/
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index f1550ac2e1..6c35d7029e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -73,7 +73,15 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
override def checkpoint() = {
partitionsRDD.checkpoint()
}
-
+
+ override def isCheckpointed: Boolean = {
+ firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
+ }
+
+ override def getCheckpointFile: Option[String] = {
+ partitionsRDD.getCheckpointFile
+ }
+
/** The number of edges in the RDD. */
override def count(): Long = {
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 3f4a900d5b..90a74d23a2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -70,6 +70,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
replicatedVertexView.edges.checkpoint()
}
+ override def isCheckpointed: Boolean = {
+ vertices.isCheckpointed && replicatedVertexView.edges.isCheckpointed
+ }
+
+ override def getCheckpointFiles: Seq[String] = {
+ Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap {
+ case Some(path) => Seq(path)
+ case None => Seq()
+ }
+ }
+
override def unpersist(blocking: Boolean = true): Graph[VD, ED] = {
unpersistVertices(blocking)
replicatedVertexView.edges.unpersist(blocking)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 9732c5b00c..3e4968d6c0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -74,7 +74,15 @@ class VertexRDDImpl[VD] private[graphx] (
override def checkpoint() = {
partitionsRDD.checkpoint()
}
-
+
+ override def isCheckpointed: Boolean = {
+ firstParent[ShippableVertexPartition[VD]].isCheckpointed
+ }
+
+ override def getCheckpointFile: Option[String] = {
+ partitionsRDD.getCheckpointFile
+ }
+
/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index ed9876b8dc..59a57ba7a3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -375,6 +375,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
val rdd = sc.parallelize(ring)
val graph = Graph.fromEdges(rdd, 1.0F)
+ assert(!graph.isCheckpointed)
+ assert(graph.getCheckpointFiles.size === 0)
graph.checkpoint()
graph.edges.map(_.attr).count()
graph.vertices.map(_._2).count()
@@ -383,6 +385,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val verticesDependencies = graph.vertices.partitionsRDD.dependencies
assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
+ assert(graph.isCheckpointed)
+ assert(graph.getCheckpointFiles.size === 2)
}
}