diff options
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala | 114 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala) | 51 |
2 files changed, 77 insertions, 88 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index ebeebd4c65..e7c4b5db82 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -16,27 +16,22 @@ import org.apache.spark.util.ClosureCleaner * A Graph RDD that supports computation on graphs. * * Graphs are represented using two classes of data: vertex-partitioned and - * edge-partitioned. `vTable` contains vertex attributes, which are - * vertex-partitioned. `eTable` contains edge attributes, which are - * edge-partitioned. For operations on vertex neighborhoods, vertex attributes - * are replicated to the edge partitions where they appear as sources or - * destinations. `vertexPlacement` specifies where each vertex will be - * replicated. `vTableReplicated` stores the replicated vertex attributes, which - * are co-partitioned with the relevant edges. - * - * mask in vertices means filter - * mask in vTableReplicated means active + * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges` + * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods, + * vertex attributes are replicated to the edge partitions where they appear as sources or + * destinations. `replicatedVertexView` stores a view of the replicated vertex attributes, which are + * co-partitioned with the relevant edges. */ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val vertices: VertexRDD[VD], @transient val edges: EdgeRDD[ED], - @transient val vTableReplicated: VTableReplicated[VD]) + @transient val replicatedVertexView: ReplicatedVertexView[VD]) extends Graph[VD, ED] { def this( vertices: VertexRDD[VD], edges: EdgeRDD[ED]) = { - this(vertices, edges, new VTableReplicated(vertices, edges)) + this(vertices, edges, new ReplicatedVertexView(vertices, edges)) } /** Return a RDD that brings edges together with their source and destination vertices. */ @@ -44,7 +39,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val vdManifest = classManifest[VD] val edManifest = classManifest[ED] - edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (ePart, vPartIter) => + edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (ePart, vPartIter) => val (_, vPart) = vPartIter.next() new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest) } @@ -120,28 +115,28 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited)) } } - println("eTable ------------------------------------------") + println("edges ------------------------------------------") traverseLineage(edges, " ") - var visited = Map(edges.id -> "eTable") - println("\n\nvTable ------------------------------------------") + var visited = Map(edges.id -> "edges") + println("\n\nvertices ------------------------------------------") traverseLineage(vertices, " ", visited) - visited += (vertices.id -> "vTable") + visited += (vertices.id -> "vertices") println("\n\ntriplets ----------------------------------------") traverseLineage(triplets, " ", visited) println(visited) } // end of printLineage override def reverse: Graph[VD, ED] = - new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vTableReplicated) + new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), replicatedVertexView) override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { if (classManifest[VD] equals classManifest[VD2]) { // The map preserves type, so we can use incremental replication val newVerts = vertices.mapVertexPartitions(_.map(f)) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) - val newVTableReplicated = new VTableReplicated[VD2]( - changedVerts, edges, Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]])) - new GraphImpl(newVerts, edges, newVTableReplicated) + val newReplicatedVertexView = new ReplicatedVertexView[VD2]( + changedVerts, edges, Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) + new GraphImpl(newVerts, edges, newReplicatedVertexView) } else { // The map does not preserve type, so we must re-replicate all vertices new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges) @@ -149,15 +144,15 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = - new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vTableReplicated) + new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), replicatedVertexView) override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit // manifest from GraphImpl (which would require serializing GraphImpl). val vdManifest = classManifest[VD] - val newETable = - edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (edgePartition, vTableReplicatedIter) => - val (pid, vPart) = vTableReplicatedIter.next() + val newEdgePartitions = + edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (edgePartition, vPartIter) => + val (pid, vPart) = vPartIter.next() val et = new EdgeTriplet[VD, ED] val newEdgePartition = edgePartition.map { e => et.set(e) @@ -167,7 +162,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } Iterator((pid, newEdgePartition)) } - new GraphImpl(vertices, new EdgeRDD(newETable), vTableReplicated) + new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), replicatedVertexView) } override def subgraph( @@ -187,26 +182,26 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( Iterator((pid, edgePartition)) }, preservesPartitioning = true)).cache() - // Reuse the previous VTableReplicated unmodified. The replicated vertices that have been + // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been // removed will be ignored, since we only refer to replicated vertices when they are adjacent to // an edge. - new GraphImpl(newVerts, newEdges, vTableReplicated) + new GraphImpl(newVerts, newEdges, replicatedVertexView) } // end of subgraph override def mask[VD2: ClassManifest, ED2: ClassManifest] ( other: Graph[VD2, ED2]): Graph[VD, ED] = { val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v } val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v } - // Reuse the previous VTableReplicated unmodified. The replicated vertices that have been + // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been // removed will be ignored, since we only refer to replicated vertices when they are adjacent to // an edge. - new GraphImpl(newVerts, newEdges, vTableReplicated) + new GraphImpl(newVerts, newEdges, replicatedVertexView) } override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = { ClosureCleaner.clean(merge) - val newETable = edges.mapEdgePartitions(_.groupEdges(merge)) - new GraphImpl(vertices, newETable, vTableReplicated) + val newEdges = edges.mapEdgePartitions(_.groupEdges(merge)) + new GraphImpl(vertices, newEdges, replicatedVertexView) } ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -226,14 +221,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr") val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr") val vs = activeSetOpt match { - case Some((activeSet, _)) => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet) - case None => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr) + case Some((activeSet, _)) => + replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet) + case None => + replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr) } val activeDirectionOpt = activeSetOpt.map(_._2) // Map and combine. - val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) => - val (_, vPart) = vTableReplicatedIter.next() + val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vPartIter) => + val (_, vPart) = vPartIter.next() // Choose scan method val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat @@ -283,9 +280,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // updateF preserves type, so we can use incremental replication val newVerts = vertices.leftJoin(updates)(updateF) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) - val newVTableReplicated = new VTableReplicated[VD2]( - changedVerts, edges, Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]])) - new GraphImpl(newVerts, edges, newVTableReplicated) + val newReplicatedVertexView = new ReplicatedVertexView[VD2]( + changedVerts, edges, Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) + new GraphImpl(newVerts, edges, newReplicatedVertexView) } else { // updateF does not preserve type, so we must re-replicate all vertices val newVerts = vertices.leftJoin(updates)(updateF) @@ -309,13 +306,13 @@ object GraphImpl { edges: RDD[Edge[ED]], defaultVertexAttr: VD): GraphImpl[VD, ED] = { - fromEdgeRDD(createETable(edges), defaultVertexAttr) + fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr) } def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest]( - edges: RDD[(Pid, EdgePartition[ED])], + edgePartitions: RDD[(Pid, EdgePartition[ED])], defaultVertexAttr: VD): GraphImpl[VD, ED] = { - fromEdgeRDD(createETableFromEdgePartitions(edges), defaultVertexAttr) + fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr) } def apply[VD: ClassManifest, ED: ClassManifest]( @@ -323,44 +320,39 @@ object GraphImpl { edges: RDD[Edge[ED]], defaultVertexAttr: VD): GraphImpl[VD, ED] = { - val etable = createETable(edges).cache() + val edgeRDD = createEdgeRDD(edges).cache() // Get the set of all vids val partitioner = Partitioner.defaultPartitioner(vertices) val vPartitioned = vertices.partitionBy(partitioner) - val vidsFromEdges = collectVidsFromEdges(etable, partitioner) + val vidsFromEdges = collectVidsFromEdges(edgeRDD, partitioner) val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) => vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1) } - val vtable = VertexRDD(vids, vPartitioned, defaultVertexAttr) + val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr) - new GraphImpl(vtable, etable) + new GraphImpl(vertexRDD, edgeRDD) } /** - * Create the edge table RDD, which is much more efficient for Java heap storage than the - * normal edges data structure (RDD[(Vid, Vid, ED)]). + * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges + * data structure (RDD[(Vid, Vid, ED)]). * - * The edge table contains multiple partitions, and each partition contains only one RDD - * key-value pair: the key is the partition id, and the value is an EdgePartition object - * containing all the edges in a partition. + * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value + * pair: the key is the partition id, and the value is an EdgePartition object containing all the + * edges in a partition. */ - private def createETable[ED: ClassManifest]( + private def createEdgeRDD[ED: ClassManifest]( edges: RDD[Edge[ED]]): EdgeRDD[ED] = { - val eTable = edges.mapPartitionsWithIndex { (pid, iter) => + val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => val builder = new EdgePartitionBuilder[ED] iter.foreach { e => builder.add(e.srcId, e.dstId, e.attr) } Iterator((pid, builder.toEdgePartition)) } - new EdgeRDD(eTable) - } - - private def createETableFromEdgePartitions[ED: ClassManifest]( - edges: RDD[(Pid, EdgePartition[ED])]): EdgeRDD[ED] = { - new EdgeRDD(edges) + new EdgeRDD(edgePartitions) } private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest]( @@ -370,8 +362,8 @@ object GraphImpl { // Get the set of all vids val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size)) // Create the VertexRDD. - val vtable = VertexRDD(vids.mapValues(x => defaultVertexAttr)) - new GraphImpl(vtable, edges) + val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr)) + new GraphImpl(vertices, edges) } /** Collects all vids mentioned in edges and partitions them by partitioner. */ diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala index fc708da3d9..bef99810bd 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala @@ -10,19 +10,19 @@ import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet} /** * A view of the vertices after they are shipped to the join sites specified in - * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevVTableReplicated` - * is specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, - * a fresh view is created. + * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is + * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a + * fresh view is created. * * The view is always cached (i.e., once it is created, it remains materialized). This avoids * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for * example. */ private[impl] -class VTableReplicated[VD: ClassManifest]( +class ReplicatedVertexView[VD: ClassManifest]( updatedVerts: VertexRDD[VD], edges: EdgeRDD[_], - prevVTableReplicated: Option[VTableReplicated[VD]] = None) { + prevViewOpt: Option[ReplicatedVertexView[VD]] = None) { /** * Within each edge partition, create a local map from vid to an index into the attribute @@ -30,9 +30,9 @@ class VTableReplicated[VD: ClassManifest]( * vids from both the source and destination of edges. It must always include both source and * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. */ - private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevVTableReplicated match { - case Some(prev) => - prev.localVidMap + private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { + case Some(prevView) => + prevView.localVidMap case None => edges.partitionsRDD.mapPartitions(_.map { case (pid, epart) => @@ -42,7 +42,7 @@ class VTableReplicated[VD: ClassManifest]( vidToIndex.add(e.dstId) } (pid, vidToIndex) - }, preservesPartitioning = true).cache().setName("VTableReplicated localVidMap") + }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVidMap") } private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true) @@ -66,15 +66,14 @@ class VTableReplicated[VD: ClassManifest]( includeSrc: Boolean, includeDst: Boolean, actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = { - // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is // also shipped there. val shippedActives = getRoutingTable(true, true) - .zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _)) + .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _)) .partitionBy(edges.partitioner.get) - // Update vTableReplicated with shippedActives, setting activeness flags in the resulting + // Update the view with shippedActives, setting activeness flags in the resulting // VertexPartitions get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) => val (pid, vPart) = viewIter.next() @@ -90,22 +89,20 @@ class VTableReplicated[VD: ClassManifest]( // Ship vertex attributes to edge partitions according to vertexPlacement val verts = updatedVerts.partitionsRDD val shippedVerts = getRoutingTable(includeSrc, includeDst) - .zipPartitions(verts)(VTableReplicated.buildBuffer(_, _)(vdManifest)) + .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdManifest)) .partitionBy(edges.partitioner.get) // TODO: Consider using a specialized shuffler. - prevVTableReplicated match { - case Some(vTableReplicated) => - val prevView: RDD[(Pid, VertexPartition[VD])] = - vTableReplicated.get(includeSrc, includeDst) - - // Update vTableReplicated with shippedVerts, setting staleness flags in the resulting + prevViewOpt match { + case Some(prevView) => + // Update prevView with shippedVerts, setting staleness flags in the resulting // VertexPartitions - prevView.zipPartitions(shippedVerts) { (prevViewIter, shippedVertsIter) => - val (pid, prevVPart) = prevViewIter.next() - val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator)) - Iterator((pid, newVPart)) - }.cache().setName("VTableReplicated delta %s %s".format(includeSrc, includeDst)) + prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) { + (prevViewIter, shippedVertsIter) => + val (pid, prevVPart) = prevViewIter.next() + val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator)) + Iterator((pid, newVPart)) + }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst)) case None => // Within each edge partition, place the shipped vertex attributes into the correct @@ -126,7 +123,7 @@ class VTableReplicated[VD: ClassManifest]( val newVPart = new VertexPartition( vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest) Iterator((pid, newVPart)) - }.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst)) + }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst)) } } @@ -139,12 +136,12 @@ class VTableReplicated[VD: ClassManifest]( includeSrc: Boolean, includeDst: Boolean): RDD[Array[Array[Vid]]] = { routingTables.getOrElseUpdate( (includeSrc, includeDst), - VTableReplicated.createRoutingTable( + ReplicatedVertexView.createRoutingTable( edges, updatedVerts.partitioner.get, includeSrc, includeDst)) } } -object VTableReplicated { +object ReplicatedVertexView { protected def buildBuffer[VD: ClassManifest]( pid2vidIter: Iterator[Array[Array[Vid]]], vertexPartIter: Iterator[VertexPartition[VD]]) = { |