diff options
author | Ankur Dave <ankurdave@gmail.com> | 2013-12-20 15:37:33 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2013-12-20 15:37:33 -0800 |
commit | 32f957f3317bd62768b415da0c0cd9114f59782c (patch) | |
tree | 2924f25ff4854caa19247d05dcf9ea535327a5f8 | |
parent | 6d1bf0d78d150925c0dbd9adfe6294c89846f9b7 (diff) | |
download | spark-32f957f3317bd62768b415da0c0cd9114f59782c.tar.gz spark-32f957f3317bd62768b415da0c0cd9114f59782c.tar.bz2 spark-32f957f3317bd62768b415da0c0cd9114f59782c.zip |
Create VertexPlacement on demand in VTableReplicated
3 files changed, 79 insertions, 109 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 9e44f49113..ebeebd4c65 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -30,21 +30,13 @@ import org.apache.spark.util.ClosureCleaner class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val vertices: VertexRDD[VD], @transient val edges: EdgeRDD[ED], - @transient val vertexPlacement: VertexPlacement, @transient val vTableReplicated: VTableReplicated[VD]) extends Graph[VD, ED] { def this( vertices: VertexRDD[VD], - edges: EdgeRDD[ED], - vertexPlacement: VertexPlacement) = { - this(vertices, edges, vertexPlacement, new VTableReplicated(vertices, edges, vertexPlacement)) - } - - def this( - vertices: VertexRDD[VD], edges: EdgeRDD[ED]) = { - this(vertices, edges, new VertexPlacement(edges, vertices)) + this(vertices, edges, new VTableReplicated(vertices, edges)) } /** Return a RDD that brings edges together with their source and destination vertices. */ @@ -89,16 +81,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } override def statistics: Map[String, Any] = { - // Get the total number of vertices after replication, used to compute the replication ratio. - def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = { - vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble - } - val numVertices = this.ops.numVertices val numEdges = this.ops.numEdges - val replicationRatioBoth = numReplicatedVertices(vertexPlacement.bothAttrs) / numVertices - val replicationRatioSrcOnly = numReplicatedVertices(vertexPlacement.srcAttrOnly) / numVertices - val replicationRatioDstOnly = numReplicatedVertices(vertexPlacement.dstAttrOnly) / numVertices // One entry for each partition, indicate the total number of edges on that partition. val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges) val minLoad = loadArray.min @@ -106,9 +90,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( Map( "Num Vertices" -> numVertices, "Num Edges" -> numEdges, - "Replication (both)" -> replicationRatioBoth, - "Replication (src only)" -> replicationRatioSrcOnly, - "Replication (dest only)" -> replicationRatioDstOnly, "Load Array" -> loadArray, "Min Load" -> minLoad, "Max Load" -> maxLoad) @@ -145,16 +126,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( println("\n\nvTable ------------------------------------------") traverseLineage(vertices, " ", visited) visited += (vertices.id -> "vTable") - println("\n\nvertexPlacement.bothAttrs -------------------------------") - traverseLineage(vertexPlacement.bothAttrs, " ", visited) - visited += (vertexPlacement.bothAttrs.id -> "vertexPlacement.bothAttrs") println("\n\ntriplets ----------------------------------------") traverseLineage(triplets, " ", visited) println(visited) } // end of printLineage override def reverse: Graph[VD, ED] = - new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vertexPlacement, vTableReplicated) + new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vTableReplicated) override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { if (classManifest[VD] equals classManifest[VD2]) { @@ -162,17 +140,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val newVerts = vertices.mapVertexPartitions(_.map(f)) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) val newVTableReplicated = new VTableReplicated[VD2]( - changedVerts, edges, vertexPlacement, - Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]])) - new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated) + changedVerts, edges, Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]])) + new GraphImpl(newVerts, edges, newVTableReplicated) } else { // The map does not preserve type, so we must re-replicate all vertices - new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, vertexPlacement) + new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges) } } override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = - new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vertexPlacement, vTableReplicated) + new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vTableReplicated) override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit @@ -190,7 +167,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } Iterator((pid, newEdgePartition)) } - new GraphImpl(vertices, new EdgeRDD(newETable), vertexPlacement, vTableReplicated) + new GraphImpl(vertices, new EdgeRDD(newETable), vTableReplicated) } override def subgraph( @@ -210,23 +187,26 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( Iterator((pid, edgePartition)) }, preservesPartitioning = true)).cache() - // Reuse the previous VTableReplicated unmodified. It will contain extra vertices, which is - // fine. - new GraphImpl(newVerts, newEdges, new VertexPlacement(newEdges, newVerts), vTableReplicated) + // Reuse the previous VTableReplicated unmodified. The replicated vertices that have been + // removed will be ignored, since we only refer to replicated vertices when they are adjacent to + // an edge. + new GraphImpl(newVerts, newEdges, vTableReplicated) } // end of subgraph override def mask[VD2: ClassManifest, ED2: ClassManifest] ( other: Graph[VD2, ED2]): Graph[VD, ED] = { val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v } val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v } - new GraphImpl(newVerts, newEdges) - + // Reuse the previous VTableReplicated unmodified. The replicated vertices that have been + // removed will be ignored, since we only refer to replicated vertices when they are adjacent to + // an edge. + new GraphImpl(newVerts, newEdges, vTableReplicated) } override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = { ClosureCleaner.clean(merge) val newETable = edges.mapEdgePartitions(_.groupEdges(merge)) - new GraphImpl(vertices, newETable, vertexPlacement, vTableReplicated) + new GraphImpl(vertices, newETable, vTableReplicated) } ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -304,13 +284,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val newVerts = vertices.leftJoin(updates)(updateF) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) val newVTableReplicated = new VTableReplicated[VD2]( - changedVerts, edges, vertexPlacement, - Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]])) - new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated) + changedVerts, edges, Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]])) + new GraphImpl(newVerts, edges, newVTableReplicated) } else { // updateF does not preserve type, so we must re-replicate all vertices val newVerts = vertices.leftJoin(updates)(updateF) - new GraphImpl(newVerts, edges, vertexPlacement) + new GraphImpl(newVerts, edges) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala index b9b2a4705b..fc708da3d9 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala @@ -1,11 +1,13 @@ package org.apache.spark.graph.impl +import org.apache.spark.Partitioner +import scala.collection.mutable + import org.apache.spark.SparkContext._ +import org.apache.spark.graph._ import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet} -import org.apache.spark.graph._ - /** * A view of the vertices after they are shipped to the join sites specified in * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevVTableReplicated` @@ -20,7 +22,6 @@ private[impl] class VTableReplicated[VD: ClassManifest]( updatedVerts: VertexRDD[VD], edges: EdgeRDD[_], - vertexPlacement: VertexPlacement, prevVTableReplicated: Option[VTableReplicated[VD]] = None) { /** @@ -49,6 +50,9 @@ class VTableReplicated[VD: ClassManifest]( private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true) private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false) + private val routingTables: mutable.Map[(Boolean, Boolean), RDD[Array[Array[Vid]]]] = + new mutable.HashMap[(Boolean, Boolean), RDD[Array[Array[Vid]]]] + def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = { (includeSrc, includeDst) match { case (true, true) => bothAttrs @@ -67,7 +71,7 @@ class VTableReplicated[VD: ClassManifest]( // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is // also shipped there. - val shippedActives = vertexPlacement.get(true, true) + val shippedActives = getRoutingTable(true, true) .zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _)) .partitionBy(edges.partitioner.get) // Update vTableReplicated with shippedActives, setting activeness flags in the resulting @@ -85,7 +89,7 @@ class VTableReplicated[VD: ClassManifest]( // Ship vertex attributes to edge partitions according to vertexPlacement val verts = updatedVerts.partitionsRDD - val shippedVerts = vertexPlacement.get(includeSrc, includeDst) + val shippedVerts = getRoutingTable(includeSrc, includeDst) .zipPartitions(verts)(VTableReplicated.buildBuffer(_, _)(vdManifest)) .partitionBy(edges.partitioner.get) // TODO: Consider using a specialized shuffler. @@ -125,6 +129,19 @@ class VTableReplicated[VD: ClassManifest]( }.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst)) } } + + /** + * Returns an RDD with the locations of edge-partition join sites for each vertex attribute in + * `vertices`; that is, the routing information for shipping vertex attributes to edge + * partitions. The routing information is stored as a compressed bitmap for each vertex partition. + */ + private def getRoutingTable( + includeSrc: Boolean, includeDst: Boolean): RDD[Array[Array[Vid]]] = { + routingTables.getOrElseUpdate( + (includeSrc, includeDst), + VTableReplicated.createRoutingTable( + edges, updatedVerts.partitioner.get, includeSrc, includeDst)) + } } object VTableReplicated { @@ -174,6 +191,44 @@ object VTableReplicated { (pid, actives.trim().array) } } + + private def createRoutingTable( + edges: EdgeRDD[_], + vertexPartitioner: Partitioner, + includeSrc: Boolean, + includeDst: Boolean): RDD[Array[Array[Vid]]] = { + // Determine which vertices each edge partition needs by creating a mapping from vid to pid. + val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter => + val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next() + val numEdges = edgePartition.size + val vSet = new VertexSet + if (includeSrc) { // Add src vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.srcIds(i)) + i += 1 + } + } + if (includeDst) { // Add dst vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.dstIds(i)) + i += 1 + } + } + vSet.iterator.map { vid => (vid, pid) } + } + + val numPartitions = vertexPartitioner.numPartitions + vid2pid.partitionBy(vertexPartitioner).mapPartitions { iter => + val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid]) + for ((vid, pid) <- iter) { + pid2vid(pid) += vid + } + + Iterator(pid2vid.map(_.trim().array)) + }.cache().setName("VertexPlacement %s %s".format(includeSrc, includeDst)) + } } class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala deleted file mode 100644 index 44a0a05f74..0000000000 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala +++ /dev/null @@ -1,64 +0,0 @@ -package org.apache.spark.graph.impl - -import org.apache.spark.SparkContext._ -import org.apache.spark.graph._ -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.collection.PrimitiveVector - -/** - * Stores the locations of edge-partition join sites for each vertex attribute in `vTable`; that is, - * the routing information for shipping vertex attributes to edge partitions. This is always cached - * because it may be used multiple times in VTableReplicated -- once to ship the vertex attributes - * and (possibly) once to ship the active-set information. - */ -class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) { - - val bothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(true, true) - val srcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(true, false) - val dstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(false, true) - val noAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(false, false) - - def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = - (includeSrcAttr, includeDstAttr) match { - case (true, true) => bothAttrs - case (true, false) => srcAttrOnly - case (false, true) => dstAttrOnly - case (false, false) => noAttrs - } - - private def createPid2Vid( - includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = { - // Determine which vertices each edge partition needs by creating a mapping from vid to pid. - val vid2pid: RDD[(Vid, Pid)] = eTable.partitionsRDD.mapPartitions { iter => - val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next() - val numEdges = edgePartition.size - val vSet = new VertexSet - if (includeSrcAttr) { // Add src vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.srcIds(i)) - i += 1 - } - } - if (includeDstAttr) { // Add dst vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.dstIds(i)) - i += 1 - } - } - vSet.iterator.map { vid => (vid, pid) } - } - - val numPartitions = vTable.partitions.size - vid2pid.partitionBy(vTable.partitioner.get).mapPartitions { iter => - val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid]) - for ((vid, pid) <- iter) { - pid2vid(pid) += vid - } - - Iterator(pid2vid.map(_.trim().array)) - }.cache().setName("VertexPlacement %s %s".format(includeSrcAttr, includeDstAttr)) - } -} |