aboutsummaryrefslogtreecommitdiff
path: root/graph/src/main
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-13 01:59:52 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-14 15:28:22 -0800
commit5e20cbaf66359338b01f1bf070fd601d39b3c1fd (patch)
tree0784ea2a6c8229303191c6bf41062c49ed49ae5e /graph/src/main
parentdc7214790072d57cab45326e4777452cc5740495 (diff)
downloadspark-5e20cbaf66359338b01f1bf070fd601d39b3c1fd.tar.gz
spark-5e20cbaf66359338b01f1bf070fd601d39b3c1fd.tar.bz2
spark-5e20cbaf66359338b01f1bf070fd601d39b3c1fd.zip
Define localVidMap once per VTableReplicated
Diffstat (limited to 'graph/src/main')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala30
1 files changed, 16 insertions, 14 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
index c88c133426..72032aa5b2 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
@@ -23,6 +23,22 @@ class VTableReplicated[VD: ClassManifest](
vertexPlacement: VertexPlacement,
prevVTableReplicated: Option[VTableReplicated[VD]] = None) {
+ // Within each edge partition, create a local map from vid to an index into
+ // the attribute array. Each map contains a superset of the vertices that it
+ // will receive, because it stores vids from both the source and destination
+ // of edges. It must always include both source and destination vids because
+ // some operations, such as GraphImpl.mapReduceTriplets, rely on this.
+ val localVidMap: RDD[(Int, VertexIdToIndexMap)] = edges.partitionsRDD.mapPartitions(_.map {
+ case (pid, epart) =>
+ val vidToIndex = new VertexIdToIndexMap
+ epart.foreach { e =>
+ vidToIndex.add(e.srcId)
+ vidToIndex.add(e.dstId)
+ }
+ (pid, vidToIndex)
+ }, preservesPartitioning = true).cache()
+
+
val bothAttrs: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(true, true)
val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(true, false)
val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = createVTableReplicated(false, true)
@@ -64,20 +80,6 @@ class VTableReplicated[VD: ClassManifest](
}.cache().setName("VTableReplicated delta %s %s".format(includeSrcAttr, includeDstAttr))
case None =>
- // Within each edge partition, create a local map from vid to an index into
- // the attribute array. Each map contains a superset of the vertices that it
- // will receive, because it stores vids from both the source and destination
- // of edges. It must always include both source and destination vids because
- // some operations, such as GraphImpl.mapReduceTriplets, rely on this.
- val localVidMap = edges.partitionsRDD.mapPartitions(_.map {
- case (pid, epart) =>
- val vidToIndex = new VertexIdToIndexMap
- epart.foreach { e =>
- vidToIndex.add(e.srcId)
- vidToIndex.add(e.dstId)
- }
- (pid, vidToIndex)
- }, preservesPartitioning = true).cache()
// Within each edge partition, place the vertex attributes received from
// msgsByPartition into the correct locations specified in localVidMap