aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authortien-dungle <tien-dung.le@realimpactanalytics.com>2015-07-17 12:11:32 -0700
committerAnkur Dave <ankurdave@gmail.com>2015-07-17 12:11:32 -0700
commit587c315b204f1439f696620543c38166d95f8a3d (patch)
tree3f9e71eb5b30565dca84e5189de81f75d191ad02 /graphx
parenteba6a1af4c8ffb21934a59a61a419d625f37cceb (diff)
downloadspark-587c315b204f1439f696620543c38166d95f8a3d.tar.gz
spark-587c315b204f1439f696620543c38166d95f8a3d.tar.bz2
spark-587c315b204f1439f696620543c38166d95f8a3d.zip
[SPARK-9109] [GRAPHX] Keep the cached edge in the graph
The change here is to keep the cached RDDs in the graph object so that when the graph.unpersist() is called these RDDs are correctly unpersisted. ```java import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.slf4j.LoggerFactory import org.apache.spark.graphx.util.GraphGenerators // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) graph.cache().numEdges graph.unpersist() sc.getPersistentRDDs.foreach( r => println( r._2.toString)) ``` Author: tien-dungle <tien-dung.le@realimpactanalytics.com> Closes #7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the following commits: 8d87997 [tien-dungle] Keep the cached edge in the graph
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala9
1 files changed, 7 insertions, 2 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 90a74d23a2..da95314440 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -332,9 +332,9 @@ object GraphImpl {
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
- .withTargetStorageLevel(edgeStorageLevel).cache()
+ .withTargetStorageLevel(edgeStorageLevel)
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
- .withTargetStorageLevel(vertexStorageLevel).cache()
+ .withTargetStorageLevel(vertexStorageLevel)
GraphImpl(vertexRDD, edgeRDD)
}
@@ -346,9 +346,14 @@ object GraphImpl {
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
+
+ vertices.cache()
+
// Convert the vertex partitions in edges to the correct type
val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
+ .cache()
+
GraphImpl.fromExistingRDDs(vertices, newEdges)
}