diff options
author | Ankur Dave <ankurdave@gmail.com> | 2013-12-20 14:51:32 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2013-12-20 14:51:32 -0800 |
commit | 6d1bf0d78d150925c0dbd9adfe6294c89846f9b7 (patch) | |
tree | 7980fe92fd9bb7dc98fc88425796bb6ff7a0c362 /graph/src | |
parent | 4797c227ff7aafcc1e4dcbbaa5281b55361484e6 (diff) | |
parent | cd01539d6f5231f80877add68b9f02147dd616cf (diff) | |
download | spark-6d1bf0d78d150925c0dbd9adfe6294c89846f9b7.tar.gz spark-6d1bf0d78d150925c0dbd9adfe6294c89846f9b7.tar.bz2 spark-6d1bf0d78d150925c0dbd9adfe6294c89846f9b7.zip |
Merge branch 'subgraph-reuses-view' into HEAD
Diffstat (limited to 'graph/src')
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index e7f975253a..9e44f49113 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -196,14 +196,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def subgraph( epred: EdgeTriplet[VD, ED] => Boolean = x => true, vpred: (Vid, VD) => Boolean = (a, b) => true): Graph[VD, ED] = { + // Filter the vertices, reusing the partitioner and the index from this graph + val newVerts = vertices.mapVertexPartitions(_.filter(vpred)) - // Filter the vertices, reusing the partitioner (but not the index) from - // this graph - val newVTable = vertices.mapVertexPartitions(_.filter(vpred).reindex()) - + // Filter the edges val edManifest = classManifest[ED] - - val newETable = new EdgeRDD[ED](triplets.filter { et => + val newEdges = new EdgeRDD[ED](triplets.filter { et => vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et) }.mapPartitionsWithIndex( { (pid, iter) => val builder = new EdgePartitionBuilder[ED]()(edManifest) @@ -212,7 +210,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( Iterator((pid, edgePartition)) }, preservesPartitioning = true)).cache() - new GraphImpl(newVTable, newETable) + // Reuse the previous VTableReplicated unmodified. It will contain extra vertices, which is + // fine. + new GraphImpl(newVerts, newEdges, new VertexPlacement(newEdges, newVerts), vTableReplicated) } // end of subgraph override def mask[VD2: ClassManifest, ED2: ClassManifest] ( |