aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorJoey <joseph.e.gonzalez@gmail.com>2013-12-31 13:51:07 -0800
committerJoey <joseph.e.gonzalez@gmail.com>2013-12-31 13:51:07 -0800
commit32d6ae9d9c15bdb7937e7bd69a55afb21631adb6 (patch)
tree4d298702c7a3bf605509c58b2d1f77032da38707 /graph/src
parent44e4205ac579a9a4dfb2f6041d34caea568059ce (diff)
parentcd01539d6f5231f80877add68b9f02147dd616cf (diff)
downloadspark-32d6ae9d9c15bdb7937e7bd69a55afb21631adb6.tar.gz
spark-32d6ae9d9c15bdb7937e7bd69a55afb21631adb6.tar.bz2
spark-32d6ae9d9c15bdb7937e7bd69a55afb21631adb6.zip
Merge pull request #120 from ankurdave/subgraph-reuses-view
Reuse VTableReplicated in GraphImpl.subgraph
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala14
1 files changed, 7 insertions, 7 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index e7f975253a..9e44f49113 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -196,14 +196,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = x => true,
vpred: (Vid, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
+ // Filter the vertices, reusing the partitioner and the index from this graph
+ val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
- // Filter the vertices, reusing the partitioner (but not the index) from
- // this graph
- val newVTable = vertices.mapVertexPartitions(_.filter(vpred).reindex())
-
+ // Filter the edges
val edManifest = classManifest[ED]
-
- val newETable = new EdgeRDD[ED](triplets.filter { et =>
+ val newEdges = new EdgeRDD[ED](triplets.filter { et =>
vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)
}.mapPartitionsWithIndex( { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED]()(edManifest)
@@ -212,7 +210,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
- new GraphImpl(newVTable, newETable)
+ // Reuse the previous VTableReplicated unmodified. It will contain extra vertices, which is
+ // fine.
+ new GraphImpl(newVerts, newEdges, new VertexPlacement(newEdges, newVerts), vTableReplicated)
} // end of subgraph
override def mask[VD2: ClassManifest, ED2: ClassManifest] (