aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-15 15:08:08 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-16 17:37:51 -0800
commit3ade8be8f218cdec3b90e48069595a3c556e0f27 (patch)
treeceeb1dd63caabfd8190429e1f5bc406252eb1d24
parent0476c84c516dda8c23e66f7796389995a44a0878 (diff)
downloadspark-3ade8be8f218cdec3b90e48069595a3c556e0f27.tar.gz
spark-3ade8be8f218cdec3b90e48069595a3c556e0f27.tar.bz2
spark-3ade8be8f218cdec3b90e48069595a3c556e0f27.zip
Add clustered index on edges by source vertex
This allows efficient edge scan in mapReduceTriplets when many source vertices are inactive. The scan method switches from edge scan to clustered index scan when less than 80% of source vertices are active.
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Edge.scala7
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala99
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala39
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala57
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala3
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala25
7 files changed, 167 insertions, 65 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Edge.scala b/graph/src/main/scala/org/apache/spark/graph/Edge.scala
index 1aa1b36b47..7e8ae7c790 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Edge.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Edge.scala
@@ -41,3 +41,10 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
def relativeDirection(vid: Vid): EdgeDirection =
if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }
}
+
+object Edge {
+ def lexicographicOrdering[ED] = new Ordering[Edge[ED]] {
+ override def compare(a: Edge[ED], b: Edge[ED]): Int =
+ Ordering[(Vid, Vid)].compare((a.srcId, a.dstId), (b.srcId, b.dstId))
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
index eb3fd60d74..bfdafcc542 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
@@ -1,29 +1,36 @@
package org.apache.spark.graph.impl
import org.apache.spark.graph._
-import org.apache.spark.util.collection.OpenHashMap
+import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
/**
- * A collection of edges stored in 3 large columnar arrays (src, dst, attribute).
+ * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are
+ * clustered by src.
*
* @param srcIds the source vertex id of each edge
* @param dstIds the destination vertex id of each edge
* @param data the attribute associated with each edge
+ * @param index a clustered index on source vertex id
* @tparam ED the edge attribute type.
*/
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest](
val srcIds: Array[Vid],
val dstIds: Array[Vid],
- val data: Array[ED]) {
+ val data: Array[ED],
+ val index: PrimitiveKeyOpenHashMap[Vid, Int]) {
/**
* Reverse all the edges in this partition.
*
- * @note No new data structures are created.
- *
* @return a new edge partition with all edges reversed.
*/
- def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data)
+ def reverse: EdgePartition[ED] = {
+ val builder = new EdgePartitionBuilder(size)
+ for (e <- iterator) {
+ builder.add(e.dstId, e.srcId, e.attr)
+ }
+ builder.toEdgePartition
+ }
/**
* Construct a new edge partition by applying the function f to all
@@ -46,7 +53,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
newData(i) = f(edge)
i += 1
}
- new EdgePartition(srcIds, dstIds, newData)
+ new EdgePartition(srcIds, dstIds, newData, index)
}
/**
@@ -54,17 +61,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*
* @param f an external state mutating user defined function.
*/
- def foreach(f: Edge[ED] => Unit) {
- val edge = new Edge[ED]
- val size = data.size
- var i = 0
- while (i < size) {
- edge.srcId = srcIds(i)
- edge.dstId = dstIds(i)
- edge.attr = data(i)
- f(edge)
- i += 1
- }
+ def foreach(f: Edge[ED] => Unit) {
+ iterator.foreach(f)
}
/**
@@ -75,21 +73,29 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* @return a new edge partition without duplicate edges
*/
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
- // Aggregate all matching edges in a hashmap
- val agg = new OpenHashMap[(Vid,Vid), ED]
- foreach { e => agg.setMerge((e.srcId, e.dstId), e.attr, merge) }
- // Populate new srcId, dstId, and data, arrays
- val newSrcIds = new Array[Vid](agg.size)
- val newDstIds = new Array[Vid](agg.size)
- val newData = new Array[ED](agg.size)
+ val builder = new EdgePartitionBuilder[ED]
+ var firstIter: Boolean = true
+ var currSrcId: Vid = nullValue[Vid]
+ var currDstId: Vid = nullValue[Vid]
+ var currAttr: ED = nullValue[ED]
var i = 0
- agg.foreach { kv =>
- newSrcIds(i) = kv._1._1
- newDstIds(i) = kv._1._2
- newData(i) = kv._2
+ while (i < size) {
+ if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
+ currAttr = merge(currAttr, data(i))
+ } else {
+ if (i > 0) {
+ builder.add(currSrcId, currDstId, currAttr)
+ }
+ currSrcId = srcIds(i)
+ currDstId = dstIds(i)
+ currAttr = data(i)
+ }
i += 1
}
- new EdgePartition(newSrcIds, newDstIds, newData)
+ if (size > 0) {
+ builder.add(currSrcId, currDstId, currAttr)
+ }
+ builder.toEdgePartition
}
/**
@@ -99,6 +105,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/
def size: Int = srcIds.size
+ /** The number of unique source vertices in the partition. */
+ def indexSize: Int = index.size
+
/**
* Get an iterator over the edges in this partition.
*
@@ -118,4 +127,34 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
edge
}
}
+
+ /**
+ * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The
+ * iterator is generated using an index scan, so it is efficient at skipping edges that don't
+ * match srcIdPred.
+ */
+ def indexIterator(srcIdPred: Vid => Boolean): Iterator[Edge[ED]] =
+ index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
+
+ /**
+ * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
+ * cluster must start at position `index`.
+ */
+ private def clusterIterator(srcId: Vid, index: Int) = new Iterator[Edge[ED]] {
+ private[this] val edge = new Edge[ED]
+ private[this] var pos = index
+
+ override def hasNext: Boolean = {
+ pos >= 0 && pos < EdgePartition.this.size && srcIds(pos) == srcId
+ }
+
+ override def next(): Edge[ED] = {
+ assert(srcIds(pos) == srcId)
+ edge.srcId = srcIds(pos)
+ edge.dstId = dstIds(pos)
+ edge.attr = data(pos)
+ pos += 1
+ edge
+ }
+ }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
index 76c11a364c..3876273369 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
@@ -1,24 +1,45 @@
package org.apache.spark.graph.impl
+import scala.util.Sorting
+
import org.apache.spark.graph._
-import org.apache.spark.util.collection.PrimitiveVector
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
//private[graph]
-class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassManifest] {
+class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassManifest](size: Int = 64) {
- val srcIds = new PrimitiveVector[Vid]
- val dstIds = new PrimitiveVector[Vid]
- var dataBuilder = new PrimitiveVector[ED]
+ var edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */
def add(src: Vid, dst: Vid, d: ED) {
- srcIds += src
- dstIds += dst
- dataBuilder += d
+ edges += Edge(src, dst, d)
}
def toEdgePartition: EdgePartition[ED] = {
- new EdgePartition(srcIds.trim().array, dstIds.trim().array, dataBuilder.trim().array)
+ val edgeArray = edges.trim().array
+ Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
+ val srcIds = new Array[Vid](edgeArray.size)
+ val dstIds = new Array[Vid](edgeArray.size)
+ val data = new Array[ED](edgeArray.size)
+ val index = new PrimitiveKeyOpenHashMap[Vid, Int]
+ // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
+ // adding them to the index
+ if (edgeArray.length > 0) {
+ index.update(srcIds(0), 0)
+ var currSrcId: Vid = srcIds(0)
+ var i = 0
+ while (i < edgeArray.size) {
+ srcIds(i) = edgeArray(i).srcId
+ dstIds(i) = edgeArray(i).dstId
+ data(i) = edgeArray(i).attr
+ if (edgeArray(i).srcId != currSrcId) {
+ currSrcId = edgeArray(i).srcId
+ index.update(currSrcId, i)
+ }
+ i += 1
+ }
+ }
+ new EdgePartition(srcIds, dstIds, data, index)
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 08bef82150..0adc350187 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -245,37 +245,44 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Map and combine.
val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) =>
- val (_, vertexPartition) = vTableReplicatedIter.next()
+ val (_, vPart) = vTableReplicatedIter.next()
+
+ // Choose scan method
+ val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
+ val edgeIter = activeDirectionOpt match {
+ case Some(EdgeDirection.Both) =>
+ if (activeFraction < 0.8) {
+ edgePartition.indexIterator(srcVid => vPart.isActive(srcVid))
+ .filter(e => vPart.isActive(e.dstId))
+ } else {
+ edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
+ }
+ case Some(EdgeDirection.Out) =>
+ if (activeFraction < 0.8) {
+ edgePartition.indexIterator(srcVid => vPart.isActive(srcVid))
+ } else {
+ edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
+ }
+ case Some(EdgeDirection.In) =>
+ edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
+ case None =>
+ edgePartition.iterator
+ }
- // Iterate over the partition
+ // Scan edges and run the map function
val et = new EdgeTriplet[VD, ED]
- val filteredEdges = edgePartition.iterator.flatMap { e =>
- // Ensure the edge is adjacent to a vertex in activeSet if necessary
- val adjacent = activeDirectionOpt match {
- case Some(EdgeDirection.In) =>
- vertexPartition.isActive(e.dstId)
- case Some(EdgeDirection.Out) =>
- vertexPartition.isActive(e.srcId)
- case Some(EdgeDirection.Both) =>
- vertexPartition.isActive(e.srcId) && vertexPartition.isActive(e.dstId)
- case None =>
- true
+ val mapOutputs = edgeIter.flatMap { e =>
+ et.set(e)
+ if (mapUsesSrcAttr) {
+ et.srcAttr = vPart(e.srcId)
}
- if (adjacent) {
- et.set(e)
- if (mapUsesSrcAttr) {
- et.srcAttr = vertexPartition(e.srcId)
- }
- if (mapUsesDstAttr) {
- et.dstAttr = vertexPartition(e.dstId)
- }
- mapFunc(et)
- } else {
- Iterator.empty
+ if (mapUsesDstAttr) {
+ et.dstAttr = vPart(e.dstId)
}
+ mapFunc(et)
}
// Note: This doesn't allow users to send messages to arbitrary vertices.
- vertexPartition.aggregateUsingIndex(filteredEdges, reduceFunc).iterator
+ vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
}
// do the final reduction reusing the index map
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
index 161c98f158..b9b2a4705b 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
@@ -115,7 +115,7 @@ class VTableReplicated[VD: ClassManifest](
for (i <- 0 until block.vids.size) {
val vid = block.vids(i)
val attr = block.attrs(i)
- val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
+ val ind = vidToIndex.getPos(vid)
vertexArray(ind) = attr
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
index 1c589c9b72..ccbc83c512 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
@@ -54,6 +54,9 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
activeSet.get.contains(vid)
}
+ /** The number of active vertices, if any exist. */
+ def numActives: Option[Int] = activeSet.map(_.size)
+
/**
* Pass each vertex attribute along with the vertex id through a map
* function and retain the original RDD's partitioning and index.
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index 68a171b12f..a85a31f79d 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -1,9 +1,12 @@
package org.apache.spark.graph
+import scala.util.Random
+
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.graph.LocalSparkContext._
+import org.apache.spark.graph.impl.EdgePartitionBuilder
import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext {
@@ -59,6 +62,13 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// mapVertices changing type
val mappedVAttrs2 = reverseStar.mapVertices((vid, attr) => attr.length)
assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet)
+ // groupEdges
+ val doubleStar = Graph.fromEdgeTuples(
+ sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v")
+ val star2 = doubleStar.groupEdges { (a, b) => a}
+ assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
+ star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
+ assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
}
}
@@ -206,4 +216,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
}
}
+
+ test("EdgePartition.sort") {
+ val edgesFrom0 = List(Edge(0, 1, 0))
+ val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
+ val sortedEdges = edgesFrom0 ++ edgesFrom1
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- Random.shuffle(sortedEdges)) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges)
+ assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
+ assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
+ }
}