From 5e20cbaf66359338b01f1bf070fd601d39b3c1fd Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 13 Dec 2013 01:59:52 -0800 Subject: Define localVidMap once per VTableReplicated --- .../apache/spark/graph/impl/VTableReplicated.scala | 30 ++++++++++++---------- 1 file changed, 16 insertions(+), 14 deletions(-) (limited to 'graph/src/main') diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala index c88c133426..72032aa5b2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala @@ -23,6 +23,22 @@ class VTableReplicated[VD: ClassManifest]( vertexPlacement: VertexPlacement, prevVTableReplicated: Option[VTableReplicated[VD]] = None) { + // Within each edge partition, create a local map from vid to an index into + // the attribute array. Each map contains a superset of the vertices that it + // will receive, because it stores vids from both the source and destination + // of edges. It must always include both source and destination vids because + // some operations, such as GraphImpl.mapReduceTriplets, rely on this. + val localVidMap: RDD[(Int, VertexIdToIndexMap)] = edges.partitionsRDD.mapPartitions(_.map { + case (pid, epart) => + val vidToIndex = new VertexIdToIndexMap + epart.foreach { e => + vidToIndex.add(e.srcId) + vidToIndex.add(e.dstId) + } + (pid, vidToIndex) + }, preservesPartitioning = true).cache() + + val bothAttrs: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(true, true) val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(true, false) val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(false, true) @@ -64,20 +80,6 @@ class VTableReplicated[VD: ClassManifest]( }.cache().setName("VTableReplicated delta %s %s".format(includeSrcAttr, includeDstAttr)) case None => - // Within each edge partition, create a local map from vid to an index into - // the attribute array. Each map contains a superset of the vertices that it - // will receive, because it stores vids from both the source and destination - // of edges. It must always include both source and destination vids because - // some operations, such as GraphImpl.mapReduceTriplets, rely on this. - val localVidMap = edges.partitionsRDD.mapPartitions(_.map { - case (pid, epart) => - val vidToIndex = new VertexIdToIndexMap - epart.foreach { e => - vidToIndex.add(e.srcId) - vidToIndex.add(e.dstId) - } - (pid, vidToIndex) - }, preservesPartitioning = true).cache() // Within each edge partition, place the vertex attributes received from // msgsByPartition into the correct locations specified in localVidMap -- cgit v1.2.3