aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerryLead <JerryLead@163.com>2014-12-02 17:08:02 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-12-02 17:11:05 -0800
commitf1859fc189d9657381fbe82795420de34cad4025 (patch)
tree194fe39a97b77e8c5f3060001081795f014a3ce4
parent5e026a3e647f077cf9aef15d80cd1fdfa5aad3cd (diff)
downloadspark-f1859fc189d9657381fbe82795420de34cad4025.tar.gz
spark-f1859fc189d9657381fbe82795420de34cad4025.tar.bz2
spark-f1859fc189d9657381fbe82795420de34cad4025.zip
[SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage
The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672 Iterative GraphX applications always have long lineage, while checkpoint() on EdgeRDD and VertexRDD themselves cannot shorten the lineage. In contrast, if we perform checkpoint() on their ParitionsRDD, the long lineage can be cut off. Moreover, the existing operations such as cache() in this code is performed on the PartitionsRDD, so checkpoint() should do the same way. More details and explanation can be found in the JIRA. Author: JerryLead <JerryLead@163.com> Author: Lijie Xu <csxulijie@gmail.com> Closes #3549 from JerryLead/my_graphX_checkpoint and squashes the following commits: d1aa8d8 [JerryLead] Perform checkpoint() on PartitionsRDD not VertexRDD and EdgeRDD themselves ff08ed4 [JerryLead] Merge branch 'master' of https://github.com/apache/spark c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark 52799e3 [Lijie Xu] Merge pull request #1 from apache/master (cherry picked from commit fc0a1475ef7c8b33363d88adfe8e8f28def5afc7) Signed-off-by: Ankur Dave <ankurdave@gmail.com>
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala4
2 files changed, 8 insertions, 0 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index a8169613b4..504559da97 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -70,6 +70,10 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
this
}
+ override def checkpoint() = {
+ partitionsRDD.checkpoint()
+ }
+
/** The number of edges in the RDD. */
override def count(): Long = {
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index d92a55a189..c8898b1369 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -71,6 +71,10 @@ class VertexRDDImpl[VD] private[graphx] (
this
}
+ override def checkpoint() = {
+ partitionsRDD.checkpoint()
+ }
+
/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)