diff options
author | tien-dungle <tien-dung.le@realimpactanalytics.com> | 2015-07-17 12:11:32 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2015-07-17 12:11:32 -0700 |
commit | 587c315b204f1439f696620543c38166d95f8a3d (patch) | |
tree | 3f9e71eb5b30565dca84e5189de81f75d191ad02 /graphx/src | |
parent | eba6a1af4c8ffb21934a59a61a419d625f37cceb (diff) | |
download | spark-587c315b204f1439f696620543c38166d95f8a3d.tar.gz spark-587c315b204f1439f696620543c38166d95f8a3d.tar.bz2 spark-587c315b204f1439f696620543c38166d95f8a3d.zip |
[SPARK-9109] [GRAPHX] Keep the cached edge in the graph
The change here is to keep the cached RDDs in the graph object so that when the graph.unpersist() is called these RDDs are correctly unpersisted.
```java
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.slf4j.LoggerFactory
import org.apache.spark.graphx.util.GraphGenerators
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
graph.cache().numEdges
graph.unpersist()
sc.getPersistentRDDs.foreach( r => println( r._2.toString))
```
Author: tien-dungle <tien-dung.le@realimpactanalytics.com>
Closes #7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the following commits:
8d87997 [tien-dungle] Keep the cached edge in the graph
Diffstat (limited to 'graphx/src')
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 9 |
1 files changed, 7 insertions, 2 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 90a74d23a2..da95314440 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -332,9 +332,9 @@ object GraphImpl { edgeStorageLevel: StorageLevel, vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD]) - .withTargetStorageLevel(edgeStorageLevel).cache() + .withTargetStorageLevel(edgeStorageLevel) val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr) - .withTargetStorageLevel(vertexStorageLevel).cache() + .withTargetStorageLevel(vertexStorageLevel) GraphImpl(vertexRDD, edgeRDD) } @@ -346,9 +346,14 @@ object GraphImpl { def apply[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], edges: EdgeRDD[ED]): GraphImpl[VD, ED] = { + + vertices.cache() + // Convert the vertex partitions in edges to the correct type val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]] .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD]) + .cache() + GraphImpl.fromExistingRDDs(vertices, newEdges) } |