diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-03 17:55:58 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-03 18:01:13 -0700 |
commit | cfab8f2062fad9ea400716afce28d200dd714c2b (patch) | |
tree | cef8dfbc0645a7c690e24e83daa1b4d0278c93c7 /graph/src | |
parent | 17311359c44a99541e16489a8842f3013919e832 (diff) | |
download | spark-cfab8f2062fad9ea400716afce28d200dd714c2b.tar.gz spark-cfab8f2062fad9ea400716afce28d200dd714c2b.tar.bz2 spark-cfab8f2062fad9ea400716afce28d200dd714c2b.zip |
Revert "Create VertexPlacement on demand in VTableReplicated"
This reverts commit 32f957f3317bd62768b415da0c0cd9114f59782c.
Conflicts:
graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
Diffstat (limited to 'graph/src')
3 files changed, 109 insertions, 75 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index e7c4b5db82..16d73820f0 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -19,19 +19,28 @@ import org.apache.spark.util.ClosureCleaner * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges` * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods, * vertex attributes are replicated to the edge partitions where they appear as sources or - * destinations. `replicatedVertexView` stores a view of the replicated vertex attributes, which are - * co-partitioned with the relevant edges. + * destinations. `routingTable` stores the routing information for shipping vertex attributes to + * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created + * using the routing table. */ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val vertices: VertexRDD[VD], @transient val edges: EdgeRDD[ED], + @transient val routingTable: RoutingTable, @transient val replicatedVertexView: ReplicatedVertexView[VD]) extends Graph[VD, ED] { def this( vertices: VertexRDD[VD], + edges: EdgeRDD[ED], + routingTable: RoutingTable) = { + this(vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable)) + } + + def this( + vertices: VertexRDD[VD], edges: EdgeRDD[ED]) = { - this(vertices, edges, new ReplicatedVertexView(vertices, edges)) + this(vertices, edges, new RoutingTable(edges, vertices)) } /** Return a RDD that brings edges together with their source and destination vertices. */ @@ -76,8 +85,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } override def statistics: Map[String, Any] = { + // Get the total number of vertices after replication, used to compute the replication ratio. + def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = { + vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble + } + val numVertices = this.ops.numVertices val numEdges = this.ops.numEdges + val replicationRatioBoth = numReplicatedVertices(routingTable.bothAttrs) / numVertices + val replicationRatioSrcOnly = numReplicatedVertices(routingTable.srcAttrOnly) / numVertices + val replicationRatioDstOnly = numReplicatedVertices(routingTable.dstAttrOnly) / numVertices // One entry for each partition, indicate the total number of edges on that partition. val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges) val minLoad = loadArray.min @@ -85,6 +102,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( Map( "Num Vertices" -> numVertices, "Num Edges" -> numEdges, + "Replication (both)" -> replicationRatioBoth, + "Replication (src only)" -> replicationRatioSrcOnly, + "Replication (dest only)" -> replicationRatioDstOnly, "Load Array" -> loadArray, "Min Load" -> minLoad, "Max Load" -> maxLoad) @@ -121,13 +141,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( println("\n\nvertices ------------------------------------------") traverseLineage(vertices, " ", visited) visited += (vertices.id -> "vertices") + println("\n\nroutingTable.bothAttrs -------------------------------") + traverseLineage(routingTable.bothAttrs, " ", visited) + visited += (routingTable.bothAttrs.id -> "routingTable.bothAttrs") println("\n\ntriplets ----------------------------------------") traverseLineage(triplets, " ", visited) println(visited) } // end of printLineage override def reverse: Graph[VD, ED] = - new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), replicatedVertexView) + new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), routingTable, replicatedVertexView) override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { if (classManifest[VD] equals classManifest[VD2]) { @@ -135,16 +158,17 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val newVerts = vertices.mapVertexPartitions(_.map(f)) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) val newReplicatedVertexView = new ReplicatedVertexView[VD2]( - changedVerts, edges, Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) - new GraphImpl(newVerts, edges, newReplicatedVertexView) + changedVerts, edges, routingTable, + Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) + new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView) } else { // The map does not preserve type, so we must re-replicate all vertices - new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges) + new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable) } } override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = - new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), replicatedVertexView) + new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), routingTable, replicatedVertexView) override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit @@ -162,7 +186,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } Iterator((pid, newEdgePartition)) } - new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), replicatedVertexView) + new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView) } override def subgraph( @@ -185,7 +209,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been // removed will be ignored, since we only refer to replicated vertices when they are adjacent to // an edge. - new GraphImpl(newVerts, newEdges, replicatedVertexView) + new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView) } // end of subgraph override def mask[VD2: ClassManifest, ED2: ClassManifest] ( @@ -195,13 +219,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been // removed will be ignored, since we only refer to replicated vertices when they are adjacent to // an edge. - new GraphImpl(newVerts, newEdges, replicatedVertexView) + new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView) } override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = { ClosureCleaner.clean(merge) val newEdges = edges.mapEdgePartitions(_.groupEdges(merge)) - new GraphImpl(vertices, newEdges, replicatedVertexView) + new GraphImpl(vertices, newEdges, routingTable, replicatedVertexView) } ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -281,12 +305,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val newVerts = vertices.leftJoin(updates)(updateF) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) val newReplicatedVertexView = new ReplicatedVertexView[VD2]( - changedVerts, edges, Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) - new GraphImpl(newVerts, edges, newReplicatedVertexView) + changedVerts, edges, routingTable, + Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) + new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView) } else { // updateF does not preserve type, so we must re-replicate all vertices val newVerts = vertices.leftJoin(updates)(updateF) - new GraphImpl(newVerts, edges) + new GraphImpl(newVerts, edges, routingTable) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala index bef99810bd..175586b87e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala @@ -1,13 +1,11 @@ package org.apache.spark.graph.impl -import org.apache.spark.Partitioner -import scala.collection.mutable - import org.apache.spark.SparkContext._ -import org.apache.spark.graph._ import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet} +import org.apache.spark.graph._ + /** * A view of the vertices after they are shipped to the join sites specified in * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is @@ -22,6 +20,7 @@ private[impl] class ReplicatedVertexView[VD: ClassManifest]( updatedVerts: VertexRDD[VD], edges: EdgeRDD[_], + routingTable: RoutingTable, prevViewOpt: Option[ReplicatedVertexView[VD]] = None) { /** @@ -50,9 +49,6 @@ class ReplicatedVertexView[VD: ClassManifest]( private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true) private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false) - private val routingTables: mutable.Map[(Boolean, Boolean), RDD[Array[Array[Vid]]]] = - new mutable.HashMap[(Boolean, Boolean), RDD[Array[Array[Vid]]]] - def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = { (includeSrc, includeDst) match { case (true, true) => bothAttrs @@ -70,7 +66,7 @@ class ReplicatedVertexView[VD: ClassManifest]( // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is // also shipped there. - val shippedActives = getRoutingTable(true, true) + val shippedActives = routingTable.get(true, true) .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _)) .partitionBy(edges.partitioner.get) // Update the view with shippedActives, setting activeness flags in the resulting @@ -88,7 +84,7 @@ class ReplicatedVertexView[VD: ClassManifest]( // Ship vertex attributes to edge partitions according to vertexPlacement val verts = updatedVerts.partitionsRDD - val shippedVerts = getRoutingTable(includeSrc, includeDst) + val shippedVerts = routingTable.get(includeSrc, includeDst) .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdManifest)) .partitionBy(edges.partitioner.get) // TODO: Consider using a specialized shuffler. @@ -126,19 +122,6 @@ class ReplicatedVertexView[VD: ClassManifest]( }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst)) } } - - /** - * Returns an RDD with the locations of edge-partition join sites for each vertex attribute in - * `vertices`; that is, the routing information for shipping vertex attributes to edge - * partitions. The routing information is stored as a compressed bitmap for each vertex partition. - */ - private def getRoutingTable( - includeSrc: Boolean, includeDst: Boolean): RDD[Array[Array[Vid]]] = { - routingTables.getOrElseUpdate( - (includeSrc, includeDst), - ReplicatedVertexView.createRoutingTable( - edges, updatedVerts.partitioner.get, includeSrc, includeDst)) - } } object ReplicatedVertexView { @@ -188,44 +171,6 @@ object ReplicatedVertexView { (pid, actives.trim().array) } } - - private def createRoutingTable( - edges: EdgeRDD[_], - vertexPartitioner: Partitioner, - includeSrc: Boolean, - includeDst: Boolean): RDD[Array[Array[Vid]]] = { - // Determine which vertices each edge partition needs by creating a mapping from vid to pid. - val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter => - val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next() - val numEdges = edgePartition.size - val vSet = new VertexSet - if (includeSrc) { // Add src vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.srcIds(i)) - i += 1 - } - } - if (includeDst) { // Add dst vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.dstIds(i)) - i += 1 - } - } - vSet.iterator.map { vid => (vid, pid) } - } - - val numPartitions = vertexPartitioner.numPartitions - vid2pid.partitionBy(vertexPartitioner).mapPartitions { iter => - val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid]) - for ((vid, pid) <- iter) { - pid2vid(pid) += vid - } - - Iterator(pid2vid.map(_.trim().array)) - }.cache().setName("VertexPlacement %s %s".format(includeSrc, includeDst)) - } } class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala new file mode 100644 index 0000000000..b6cd048b33 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala @@ -0,0 +1,64 @@ +package org.apache.spark.graph.impl + +import org.apache.spark.SparkContext._ +import org.apache.spark.graph._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.collection.PrimitiveVector + +/** + * Stores the locations of edge-partition join sites for each vertex attribute; that is, the routing + * information for shipping vertex attributes to edge partitions. This is always cached because it + * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and + * (possibly) once to ship the active-set information. + */ +class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { + + val bothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(true, true) + val srcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(true, false) + val dstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(false, true) + val noAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(false, false) + + def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = + (includeSrcAttr, includeDstAttr) match { + case (true, true) => bothAttrs + case (true, false) => srcAttrOnly + case (false, true) => dstAttrOnly + case (false, false) => noAttrs + } + + private def createPid2Vid( + includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = { + // Determine which vertices each edge partition needs by creating a mapping from vid to pid. + val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter => + val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next() + val numEdges = edgePartition.size + val vSet = new VertexSet + if (includeSrcAttr) { // Add src vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.srcIds(i)) + i += 1 + } + } + if (includeDstAttr) { // Add dst vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.dstIds(i)) + i += 1 + } + } + vSet.iterator.map { vid => (vid, pid) } + } + + val numPartitions = vertices.partitions.size + vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter => + val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid]) + for ((vid, pid) <- iter) { + pid2vid(pid) += vid + } + + Iterator(pid2vid.map(_.trim().array)) + }.cache().setName("RoutingTable %s %s".format(includeSrcAttr, includeDstAttr)) + } +} |