diff options
author | Joey <joseph.e.gonzalez@gmail.com> | 2014-01-04 12:54:41 -0800 |
---|---|---|
committer | Joey <joseph.e.gonzalez@gmail.com> | 2014-01-04 12:54:41 -0800 |
commit | 280ddf64bd597cd287e6f41b71fbc3c63a0d7e68 (patch) | |
tree | 9169bf85e5ed648212f9c7fad18e27c843d4a693 /graph/src | |
parent | dc9cb83e8661d075d8f3b47c27b2f423287e598d (diff) | |
parent | cfab8f2062fad9ea400716afce28d200dd714c2b (diff) | |
download | spark-280ddf64bd597cd287e6f41b71fbc3c63a0d7e68.tar.gz spark-280ddf64bd597cd287e6f41b71fbc3c63a0d7e68.tar.bz2 spark-280ddf64bd597cd287e6f41b71fbc3c63a0d7e68.zip |
Merge pull request #121 from ankurdave/more-simplify
Simplify GraphImpl internals further
Diffstat (limited to 'graph/src')
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala | 146 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala) | 55 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala) | 18 |
3 files changed, 106 insertions, 113 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 9e44f49113..16d73820f0 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -16,35 +16,31 @@ import org.apache.spark.util.ClosureCleaner * A Graph RDD that supports computation on graphs. * * Graphs are represented using two classes of data: vertex-partitioned and - * edge-partitioned. `vTable` contains vertex attributes, which are - * vertex-partitioned. `eTable` contains edge attributes, which are - * edge-partitioned. For operations on vertex neighborhoods, vertex attributes - * are replicated to the edge partitions where they appear as sources or - * destinations. `vertexPlacement` specifies where each vertex will be - * replicated. `vTableReplicated` stores the replicated vertex attributes, which - * are co-partitioned with the relevant edges. - * - * mask in vertices means filter - * mask in vTableReplicated means active + * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges` + * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods, + * vertex attributes are replicated to the edge partitions where they appear as sources or + * destinations. `routingTable` stores the routing information for shipping vertex attributes to + * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created + * using the routing table. */ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val vertices: VertexRDD[VD], @transient val edges: EdgeRDD[ED], - @transient val vertexPlacement: VertexPlacement, - @transient val vTableReplicated: VTableReplicated[VD]) + @transient val routingTable: RoutingTable, + @transient val replicatedVertexView: ReplicatedVertexView[VD]) extends Graph[VD, ED] { def this( vertices: VertexRDD[VD], edges: EdgeRDD[ED], - vertexPlacement: VertexPlacement) = { - this(vertices, edges, vertexPlacement, new VTableReplicated(vertices, edges, vertexPlacement)) + routingTable: RoutingTable) = { + this(vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable)) } def this( vertices: VertexRDD[VD], edges: EdgeRDD[ED]) = { - this(vertices, edges, new VertexPlacement(edges, vertices)) + this(vertices, edges, new RoutingTable(edges, vertices)) } /** Return a RDD that brings edges together with their source and destination vertices. */ @@ -52,7 +48,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val vdManifest = classManifest[VD] val edManifest = classManifest[ED] - edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (ePart, vPartIter) => + edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (ePart, vPartIter) => val (_, vPart) = vPartIter.next() new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest) } @@ -96,9 +92,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val numVertices = this.ops.numVertices val numEdges = this.ops.numEdges - val replicationRatioBoth = numReplicatedVertices(vertexPlacement.bothAttrs) / numVertices - val replicationRatioSrcOnly = numReplicatedVertices(vertexPlacement.srcAttrOnly) / numVertices - val replicationRatioDstOnly = numReplicatedVertices(vertexPlacement.dstAttrOnly) / numVertices + val replicationRatioBoth = numReplicatedVertices(routingTable.bothAttrs) / numVertices + val replicationRatioSrcOnly = numReplicatedVertices(routingTable.srcAttrOnly) / numVertices + val replicationRatioDstOnly = numReplicatedVertices(routingTable.dstAttrOnly) / numVertices // One entry for each partition, indicate the total number of edges on that partition. val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges) val minLoad = loadArray.min @@ -139,48 +135,48 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited)) } } - println("eTable ------------------------------------------") + println("edges ------------------------------------------") traverseLineage(edges, " ") - var visited = Map(edges.id -> "eTable") - println("\n\nvTable ------------------------------------------") + var visited = Map(edges.id -> "edges") + println("\n\nvertices ------------------------------------------") traverseLineage(vertices, " ", visited) - visited += (vertices.id -> "vTable") - println("\n\nvertexPlacement.bothAttrs -------------------------------") - traverseLineage(vertexPlacement.bothAttrs, " ", visited) - visited += (vertexPlacement.bothAttrs.id -> "vertexPlacement.bothAttrs") + visited += (vertices.id -> "vertices") + println("\n\nroutingTable.bothAttrs -------------------------------") + traverseLineage(routingTable.bothAttrs, " ", visited) + visited += (routingTable.bothAttrs.id -> "routingTable.bothAttrs") println("\n\ntriplets ----------------------------------------") traverseLineage(triplets, " ", visited) println(visited) } // end of printLineage override def reverse: Graph[VD, ED] = - new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vertexPlacement, vTableReplicated) + new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), routingTable, replicatedVertexView) override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { if (classManifest[VD] equals classManifest[VD2]) { // The map preserves type, so we can use incremental replication val newVerts = vertices.mapVertexPartitions(_.map(f)) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) - val newVTableReplicated = new VTableReplicated[VD2]( - changedVerts, edges, vertexPlacement, - Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]])) - new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated) + val newReplicatedVertexView = new ReplicatedVertexView[VD2]( + changedVerts, edges, routingTable, + Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) + new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView) } else { // The map does not preserve type, so we must re-replicate all vertices - new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, vertexPlacement) + new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable) } } override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = - new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vertexPlacement, vTableReplicated) + new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), routingTable, replicatedVertexView) override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit // manifest from GraphImpl (which would require serializing GraphImpl). val vdManifest = classManifest[VD] - val newETable = - edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (edgePartition, vTableReplicatedIter) => - val (pid, vPart) = vTableReplicatedIter.next() + val newEdgePartitions = + edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (edgePartition, vPartIter) => + val (pid, vPart) = vPartIter.next() val et = new EdgeTriplet[VD, ED] val newEdgePartition = edgePartition.map { e => et.set(e) @@ -190,7 +186,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } Iterator((pid, newEdgePartition)) } - new GraphImpl(vertices, new EdgeRDD(newETable), vertexPlacement, vTableReplicated) + new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView) } override def subgraph( @@ -210,23 +206,26 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( Iterator((pid, edgePartition)) }, preservesPartitioning = true)).cache() - // Reuse the previous VTableReplicated unmodified. It will contain extra vertices, which is - // fine. - new GraphImpl(newVerts, newEdges, new VertexPlacement(newEdges, newVerts), vTableReplicated) + // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been + // removed will be ignored, since we only refer to replicated vertices when they are adjacent to + // an edge. + new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView) } // end of subgraph override def mask[VD2: ClassManifest, ED2: ClassManifest] ( other: Graph[VD2, ED2]): Graph[VD, ED] = { val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v } val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v } - new GraphImpl(newVerts, newEdges) - + // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been + // removed will be ignored, since we only refer to replicated vertices when they are adjacent to + // an edge. + new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView) } override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = { ClosureCleaner.clean(merge) - val newETable = edges.mapEdgePartitions(_.groupEdges(merge)) - new GraphImpl(vertices, newETable, vertexPlacement, vTableReplicated) + val newEdges = edges.mapEdgePartitions(_.groupEdges(merge)) + new GraphImpl(vertices, newEdges, routingTable, replicatedVertexView) } ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -246,14 +245,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr") val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr") val vs = activeSetOpt match { - case Some((activeSet, _)) => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet) - case None => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr) + case Some((activeSet, _)) => + replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet) + case None => + replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr) } val activeDirectionOpt = activeSetOpt.map(_._2) // Map and combine. - val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) => - val (_, vPart) = vTableReplicatedIter.next() + val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vPartIter) => + val (_, vPart) = vPartIter.next() // Choose scan method val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat @@ -303,14 +304,14 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // updateF preserves type, so we can use incremental replication val newVerts = vertices.leftJoin(updates)(updateF) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) - val newVTableReplicated = new VTableReplicated[VD2]( - changedVerts, edges, vertexPlacement, - Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]])) - new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated) + val newReplicatedVertexView = new ReplicatedVertexView[VD2]( + changedVerts, edges, routingTable, + Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) + new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView) } else { // updateF does not preserve type, so we must re-replicate all vertices val newVerts = vertices.leftJoin(updates)(updateF) - new GraphImpl(newVerts, edges, vertexPlacement) + new GraphImpl(newVerts, edges, routingTable) } } @@ -330,13 +331,13 @@ object GraphImpl { edges: RDD[Edge[ED]], defaultVertexAttr: VD): GraphImpl[VD, ED] = { - fromEdgeRDD(createETable(edges), defaultVertexAttr) + fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr) } def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest]( - edges: RDD[(Pid, EdgePartition[ED])], + edgePartitions: RDD[(Pid, EdgePartition[ED])], defaultVertexAttr: VD): GraphImpl[VD, ED] = { - fromEdgeRDD(createETableFromEdgePartitions(edges), defaultVertexAttr) + fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr) } def apply[VD: ClassManifest, ED: ClassManifest]( @@ -344,44 +345,39 @@ object GraphImpl { edges: RDD[Edge[ED]], defaultVertexAttr: VD): GraphImpl[VD, ED] = { - val etable = createETable(edges).cache() + val edgeRDD = createEdgeRDD(edges).cache() // Get the set of all vids val partitioner = Partitioner.defaultPartitioner(vertices) val vPartitioned = vertices.partitionBy(partitioner) - val vidsFromEdges = collectVidsFromEdges(etable, partitioner) + val vidsFromEdges = collectVidsFromEdges(edgeRDD, partitioner) val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) => vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1) } - val vtable = VertexRDD(vids, vPartitioned, defaultVertexAttr) + val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr) - new GraphImpl(vtable, etable) + new GraphImpl(vertexRDD, edgeRDD) } /** - * Create the edge table RDD, which is much more efficient for Java heap storage than the - * normal edges data structure (RDD[(Vid, Vid, ED)]). + * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges + * data structure (RDD[(Vid, Vid, ED)]). * - * The edge table contains multiple partitions, and each partition contains only one RDD - * key-value pair: the key is the partition id, and the value is an EdgePartition object - * containing all the edges in a partition. + * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value + * pair: the key is the partition id, and the value is an EdgePartition object containing all the + * edges in a partition. */ - private def createETable[ED: ClassManifest]( + private def createEdgeRDD[ED: ClassManifest]( edges: RDD[Edge[ED]]): EdgeRDD[ED] = { - val eTable = edges.mapPartitionsWithIndex { (pid, iter) => + val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => val builder = new EdgePartitionBuilder[ED] iter.foreach { e => builder.add(e.srcId, e.dstId, e.attr) } Iterator((pid, builder.toEdgePartition)) } - new EdgeRDD(eTable) - } - - private def createETableFromEdgePartitions[ED: ClassManifest]( - edges: RDD[(Pid, EdgePartition[ED])]): EdgeRDD[ED] = { - new EdgeRDD(edges) + new EdgeRDD(edgePartitions) } private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest]( @@ -391,8 +387,8 @@ object GraphImpl { // Get the set of all vids val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size)) // Create the VertexRDD. - val vtable = VertexRDD(vids.mapValues(x => defaultVertexAttr)) - new GraphImpl(vtable, edges) + val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr)) + new GraphImpl(vertices, edges) } /** Collects all vids mentioned in edges and partitions them by partitioner. */ diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala index b9b2a4705b..175586b87e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala @@ -8,20 +8,20 @@ import org.apache.spark.graph._ /** * A view of the vertices after they are shipped to the join sites specified in - * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevVTableReplicated` - * is specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, - * a fresh view is created. + * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is + * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a + * fresh view is created. * * The view is always cached (i.e., once it is created, it remains materialized). This avoids * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for * example. */ private[impl] -class VTableReplicated[VD: ClassManifest]( +class ReplicatedVertexView[VD: ClassManifest]( updatedVerts: VertexRDD[VD], edges: EdgeRDD[_], - vertexPlacement: VertexPlacement, - prevVTableReplicated: Option[VTableReplicated[VD]] = None) { + routingTable: RoutingTable, + prevViewOpt: Option[ReplicatedVertexView[VD]] = None) { /** * Within each edge partition, create a local map from vid to an index into the attribute @@ -29,9 +29,9 @@ class VTableReplicated[VD: ClassManifest]( * vids from both the source and destination of edges. It must always include both source and * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. */ - private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevVTableReplicated match { - case Some(prev) => - prev.localVidMap + private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { + case Some(prevView) => + prevView.localVidMap case None => edges.partitionsRDD.mapPartitions(_.map { case (pid, epart) => @@ -41,7 +41,7 @@ class VTableReplicated[VD: ClassManifest]( vidToIndex.add(e.dstId) } (pid, vidToIndex) - }, preservesPartitioning = true).cache().setName("VTableReplicated localVidMap") + }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVidMap") } private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true) @@ -62,15 +62,14 @@ class VTableReplicated[VD: ClassManifest]( includeSrc: Boolean, includeDst: Boolean, actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = { - // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is // also shipped there. - val shippedActives = vertexPlacement.get(true, true) - .zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _)) + val shippedActives = routingTable.get(true, true) + .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _)) .partitionBy(edges.partitioner.get) - // Update vTableReplicated with shippedActives, setting activeness flags in the resulting + // Update the view with shippedActives, setting activeness flags in the resulting // VertexPartitions get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) => val (pid, vPart) = viewIter.next() @@ -85,23 +84,21 @@ class VTableReplicated[VD: ClassManifest]( // Ship vertex attributes to edge partitions according to vertexPlacement val verts = updatedVerts.partitionsRDD - val shippedVerts = vertexPlacement.get(includeSrc, includeDst) - .zipPartitions(verts)(VTableReplicated.buildBuffer(_, _)(vdManifest)) + val shippedVerts = routingTable.get(includeSrc, includeDst) + .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdManifest)) .partitionBy(edges.partitioner.get) // TODO: Consider using a specialized shuffler. - prevVTableReplicated match { - case Some(vTableReplicated) => - val prevView: RDD[(Pid, VertexPartition[VD])] = - vTableReplicated.get(includeSrc, includeDst) - - // Update vTableReplicated with shippedVerts, setting staleness flags in the resulting + prevViewOpt match { + case Some(prevView) => + // Update prevView with shippedVerts, setting staleness flags in the resulting // VertexPartitions - prevView.zipPartitions(shippedVerts) { (prevViewIter, shippedVertsIter) => - val (pid, prevVPart) = prevViewIter.next() - val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator)) - Iterator((pid, newVPart)) - }.cache().setName("VTableReplicated delta %s %s".format(includeSrc, includeDst)) + prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) { + (prevViewIter, shippedVertsIter) => + val (pid, prevVPart) = prevViewIter.next() + val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator)) + Iterator((pid, newVPart)) + }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst)) case None => // Within each edge partition, place the shipped vertex attributes into the correct @@ -122,12 +119,12 @@ class VTableReplicated[VD: ClassManifest]( val newVPart = new VertexPartition( vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest) Iterator((pid, newVPart)) - }.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst)) + }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst)) } } } -object VTableReplicated { +object ReplicatedVertexView { protected def buildBuffer[VD: ClassManifest]( pid2vidIter: Iterator[Array[Array[Vid]]], vertexPartIter: Iterator[VertexPartition[VD]]) = { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala index 44a0a05f74..b6cd048b33 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala @@ -7,12 +7,12 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.collection.PrimitiveVector /** - * Stores the locations of edge-partition join sites for each vertex attribute in `vTable`; that is, - * the routing information for shipping vertex attributes to edge partitions. This is always cached - * because it may be used multiple times in VTableReplicated -- once to ship the vertex attributes - * and (possibly) once to ship the active-set information. + * Stores the locations of edge-partition join sites for each vertex attribute; that is, the routing + * information for shipping vertex attributes to edge partitions. This is always cached because it + * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and + * (possibly) once to ship the active-set information. */ -class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) { +class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { val bothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(true, true) val srcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(true, false) @@ -30,7 +30,7 @@ class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) { private def createPid2Vid( includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. - val vid2pid: RDD[(Vid, Pid)] = eTable.partitionsRDD.mapPartitions { iter => + val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter => val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next() val numEdges = edgePartition.size val vSet = new VertexSet @@ -51,14 +51,14 @@ class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) { vSet.iterator.map { vid => (vid, pid) } } - val numPartitions = vTable.partitions.size - vid2pid.partitionBy(vTable.partitioner.get).mapPartitions { iter => + val numPartitions = vertices.partitions.size + vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter => val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid]) for ((vid, pid) <- iter) { pid2vid(pid) += vid } Iterator(pid2vid.map(_.trim().array)) - }.cache().setName("VertexPlacement %s %s".format(includeSrcAttr, includeDstAttr)) + }.cache().setName("RoutingTable %s %s".format(includeSrcAttr, includeDstAttr)) } } |