diff options
author | Ankur Dave <ankurdave@gmail.com> | 2013-12-15 15:08:08 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2013-12-16 17:37:51 -0800 |
commit | 3ade8be8f218cdec3b90e48069595a3c556e0f27 (patch) | |
tree | ceeb1dd63caabfd8190429e1f5bc406252eb1d24 | |
parent | 0476c84c516dda8c23e66f7796389995a44a0878 (diff) | |
download | spark-3ade8be8f218cdec3b90e48069595a3c556e0f27.tar.gz spark-3ade8be8f218cdec3b90e48069595a3c556e0f27.tar.bz2 spark-3ade8be8f218cdec3b90e48069595a3c556e0f27.zip |
Add clustered index on edges by source vertex
This allows efficient edge scan in mapReduceTriplets when many source
vertices are inactive. The scan method switches from edge scan to
clustered index scan when less than 80% of source vertices are active.
7 files changed, 167 insertions, 65 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Edge.scala b/graph/src/main/scala/org/apache/spark/graph/Edge.scala index 1aa1b36b47..7e8ae7c790 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Edge.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Edge.scala @@ -41,3 +41,10 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] def relativeDirection(vid: Vid): EdgeDirection = if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In } } + +object Edge { + def lexicographicOrdering[ED] = new Ordering[Edge[ED]] { + override def compare(a: Edge[ED], b: Edge[ED]): Int = + Ordering[(Vid, Vid)].compare((a.srcId, a.dstId), (b.srcId, b.dstId)) + } +} diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala index eb3fd60d74..bfdafcc542 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala @@ -1,29 +1,36 @@ package org.apache.spark.graph.impl import org.apache.spark.graph._ -import org.apache.spark.util.collection.OpenHashMap +import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap /** - * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). + * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are + * clustered by src. * * @param srcIds the source vertex id of each edge * @param dstIds the destination vertex id of each edge * @param data the attribute associated with each edge + * @param index a clustered index on source vertex id * @tparam ED the edge attribute type. */ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]( val srcIds: Array[Vid], val dstIds: Array[Vid], - val data: Array[ED]) { + val data: Array[ED], + val index: PrimitiveKeyOpenHashMap[Vid, Int]) { /** * Reverse all the edges in this partition. * - * @note No new data structures are created. - * * @return a new edge partition with all edges reversed. */ - def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data) + def reverse: EdgePartition[ED] = { + val builder = new EdgePartitionBuilder(size) + for (e <- iterator) { + builder.add(e.dstId, e.srcId, e.attr) + } + builder.toEdgePartition + } /** * Construct a new edge partition by applying the function f to all @@ -46,7 +53,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) newData(i) = f(edge) i += 1 } - new EdgePartition(srcIds, dstIds, newData) + new EdgePartition(srcIds, dstIds, newData, index) } /** @@ -54,17 +61,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * * @param f an external state mutating user defined function. */ - def foreach(f: Edge[ED] => Unit) { - val edge = new Edge[ED] - val size = data.size - var i = 0 - while (i < size) { - edge.srcId = srcIds(i) - edge.dstId = dstIds(i) - edge.attr = data(i) - f(edge) - i += 1 - } + def foreach(f: Edge[ED] => Unit) { + iterator.foreach(f) } /** @@ -75,21 +73,29 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * @return a new edge partition without duplicate edges */ def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = { - // Aggregate all matching edges in a hashmap - val agg = new OpenHashMap[(Vid,Vid), ED] - foreach { e => agg.setMerge((e.srcId, e.dstId), e.attr, merge) } - // Populate new srcId, dstId, and data, arrays - val newSrcIds = new Array[Vid](agg.size) - val newDstIds = new Array[Vid](agg.size) - val newData = new Array[ED](agg.size) + val builder = new EdgePartitionBuilder[ED] + var firstIter: Boolean = true + var currSrcId: Vid = nullValue[Vid] + var currDstId: Vid = nullValue[Vid] + var currAttr: ED = nullValue[ED] var i = 0 - agg.foreach { kv => - newSrcIds(i) = kv._1._1 - newDstIds(i) = kv._1._2 - newData(i) = kv._2 + while (i < size) { + if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) { + currAttr = merge(currAttr, data(i)) + } else { + if (i > 0) { + builder.add(currSrcId, currDstId, currAttr) + } + currSrcId = srcIds(i) + currDstId = dstIds(i) + currAttr = data(i) + } i += 1 } - new EdgePartition(newSrcIds, newDstIds, newData) + if (size > 0) { + builder.add(currSrcId, currDstId, currAttr) + } + builder.toEdgePartition } /** @@ -99,6 +105,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) */ def size: Int = srcIds.size + /** The number of unique source vertices in the partition. */ + def indexSize: Int = index.size + /** * Get an iterator over the edges in this partition. * @@ -118,4 +127,34 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) edge } } + + /** + * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The + * iterator is generated using an index scan, so it is efficient at skipping edges that don't + * match srcIdPred. + */ + def indexIterator(srcIdPred: Vid => Boolean): Iterator[Edge[ED]] = + index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator)) + + /** + * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The + * cluster must start at position `index`. + */ + private def clusterIterator(srcId: Vid, index: Int) = new Iterator[Edge[ED]] { + private[this] val edge = new Edge[ED] + private[this] var pos = index + + override def hasNext: Boolean = { + pos >= 0 && pos < EdgePartition.this.size && srcIds(pos) == srcId + } + + override def next(): Edge[ED] = { + assert(srcIds(pos) == srcId) + edge.srcId = srcIds(pos) + edge.dstId = dstIds(pos) + edge.attr = data(pos) + pos += 1 + edge + } + } } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala index 76c11a364c..3876273369 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala @@ -1,24 +1,45 @@ package org.apache.spark.graph.impl +import scala.util.Sorting + import org.apache.spark.graph._ -import org.apache.spark.util.collection.PrimitiveVector +import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} //private[graph] -class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassManifest] { +class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassManifest](size: Int = 64) { - val srcIds = new PrimitiveVector[Vid] - val dstIds = new PrimitiveVector[Vid] - var dataBuilder = new PrimitiveVector[ED] + var edges = new PrimitiveVector[Edge[ED]](size) /** Add a new edge to the partition. */ def add(src: Vid, dst: Vid, d: ED) { - srcIds += src - dstIds += dst - dataBuilder += d + edges += Edge(src, dst, d) } def toEdgePartition: EdgePartition[ED] = { - new EdgePartition(srcIds.trim().array, dstIds.trim().array, dataBuilder.trim().array) + val edgeArray = edges.trim().array + Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) + val srcIds = new Array[Vid](edgeArray.size) + val dstIds = new Array[Vid](edgeArray.size) + val data = new Array[ED](edgeArray.size) + val index = new PrimitiveKeyOpenHashMap[Vid, Int] + // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and + // adding them to the index + if (edgeArray.length > 0) { + index.update(srcIds(0), 0) + var currSrcId: Vid = srcIds(0) + var i = 0 + while (i < edgeArray.size) { + srcIds(i) = edgeArray(i).srcId + dstIds(i) = edgeArray(i).dstId + data(i) = edgeArray(i).attr + if (edgeArray(i).srcId != currSrcId) { + currSrcId = edgeArray(i).srcId + index.update(currSrcId, i) + } + i += 1 + } + } + new EdgePartition(srcIds, dstIds, data, index) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 08bef82150..0adc350187 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -245,37 +245,44 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Map and combine. val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) => - val (_, vertexPartition) = vTableReplicatedIter.next() + val (_, vPart) = vTableReplicatedIter.next() + + // Choose scan method + val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat + val edgeIter = activeDirectionOpt match { + case Some(EdgeDirection.Both) => + if (activeFraction < 0.8) { + edgePartition.indexIterator(srcVid => vPart.isActive(srcVid)) + .filter(e => vPart.isActive(e.dstId)) + } else { + edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) + } + case Some(EdgeDirection.Out) => + if (activeFraction < 0.8) { + edgePartition.indexIterator(srcVid => vPart.isActive(srcVid)) + } else { + edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) + } + case Some(EdgeDirection.In) => + edgePartition.iterator.filter(e => vPart.isActive(e.dstId)) + case None => + edgePartition.iterator + } - // Iterate over the partition + // Scan edges and run the map function val et = new EdgeTriplet[VD, ED] - val filteredEdges = edgePartition.iterator.flatMap { e => - // Ensure the edge is adjacent to a vertex in activeSet if necessary - val adjacent = activeDirectionOpt match { - case Some(EdgeDirection.In) => - vertexPartition.isActive(e.dstId) - case Some(EdgeDirection.Out) => - vertexPartition.isActive(e.srcId) - case Some(EdgeDirection.Both) => - vertexPartition.isActive(e.srcId) && vertexPartition.isActive(e.dstId) - case None => - true + val mapOutputs = edgeIter.flatMap { e => + et.set(e) + if (mapUsesSrcAttr) { + et.srcAttr = vPart(e.srcId) } - if (adjacent) { - et.set(e) - if (mapUsesSrcAttr) { - et.srcAttr = vertexPartition(e.srcId) - } - if (mapUsesDstAttr) { - et.dstAttr = vertexPartition(e.dstId) - } - mapFunc(et) - } else { - Iterator.empty + if (mapUsesDstAttr) { + et.dstAttr = vPart(e.dstId) } + mapFunc(et) } // Note: This doesn't allow users to send messages to arbitrary vertices. - vertexPartition.aggregateUsingIndex(filteredEdges, reduceFunc).iterator + vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator } // do the final reduction reusing the index map diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala index 161c98f158..b9b2a4705b 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala @@ -115,7 +115,7 @@ class VTableReplicated[VD: ClassManifest]( for (i <- 0 until block.vids.size) { val vid = block.vids(i) val attr = block.attrs(i) - val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK + val ind = vidToIndex.getPos(vid) vertexArray(ind) = attr } } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala index 1c589c9b72..ccbc83c512 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala @@ -54,6 +54,9 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( activeSet.get.contains(vid) } + /** The number of active vertices, if any exist. */ + def numActives: Option[Int] = activeSet.map(_.size) + /** * Pass each vertex attribute along with the vertex id through a map * function and retain the original RDD's partitioning and index. diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index 68a171b12f..a85a31f79d 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -1,9 +1,12 @@ package org.apache.spark.graph +import scala.util.Random + import org.scalatest.FunSuite import org.apache.spark.SparkContext import org.apache.spark.graph.LocalSparkContext._ +import org.apache.spark.graph.impl.EdgePartitionBuilder import org.apache.spark.rdd._ class GraphSuite extends FunSuite with LocalSparkContext { @@ -59,6 +62,13 @@ class GraphSuite extends FunSuite with LocalSparkContext { // mapVertices changing type val mappedVAttrs2 = reverseStar.mapVertices((vid, attr) => attr.length) assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet) + // groupEdges + val doubleStar = Graph.fromEdgeTuples( + sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v") + val star2 = doubleStar.groupEdges { (a, b) => a} + assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) === + star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int])) + assert(star2.vertices.collect.toSet === star.vertices.collect.toSet) } } @@ -206,4 +216,19 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet) } } + + test("EdgePartition.sort") { + val edgesFrom0 = List(Edge(0, 1, 0)) + val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0)) + val sortedEdges = edgesFrom0 ++ edgesFrom1 + val builder = new EdgePartitionBuilder[Int] + for (e <- Random.shuffle(sortedEdges)) { + builder.add(e.srcId, e.dstId, e.attr) + } + + val edgePartition = builder.toEdgePartition + assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges) + assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0) + assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1) + } } |