diff options
5 files changed, 196 insertions, 138 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 8320b663a5..f542ec6069 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -241,6 +241,7 @@ object Analytics extends Logging { var outFname = "" var numVPart = 4 var numEPart = 4 + var partitionStrategy: PartitionStrategy = RandomVertexCut() options.foreach{ case ("numIter", v) => numIter = v.toInt @@ -249,6 +250,15 @@ object Analytics extends Logging { case ("output", v) => outFname = v case ("numVPart", v) => numVPart = v.toInt case ("numEPart", v) => numEPart = v.toInt + case ("partStrategy", v) => { + partitionStrategy = v match { + case "RandomVertexCut" => RandomVertexCut() + case "EdgePartition1D" => EdgePartition1D() + case "EdgePartition2D" => EdgePartition2D() + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut() + case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v) + } + } case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala index baf8099556..6f18e46ab2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.Kryo import org.apache.spark.graph.impl._ import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.util.collection.BitSet +import org.apache.spark.graph._ class GraphKryoRegistrator extends KryoRegistrator { @@ -19,6 +20,8 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[BitSet]) kryo.register(classOf[VertexIdToIndexMap]) kryo.register(classOf[VertexAttributeBlock[Object]]) + kryo.register(classOf[PartitionStrategy]) + // This avoids a large number of hash table lookups. kryo.setReferences(false) } diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index 313737fdbe..4dc33a02ce 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -27,8 +27,8 @@ object GraphLoader { path: String, edgeParser: Array[String] => ED, minEdgePartitions: Int = 1, - minVertexPartitions: Int = 1) - : GraphImpl[Int, ED] = { + minVertexPartitions: Int = 1, + partitionStrategy: PartitionStrategy = RandomVertexCut()): GraphImpl[Int, ED] = { // Parse the edge data table val edges = sc.textFile(path, minEdgePartitions).flatMap { line => @@ -48,13 +48,15 @@ object GraphLoader { } }.cache() - val graph = fromEdges(edges) + val graph = fromEdges(edges, partitionStrategy) graph } - private def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = { + private def fromEdges[ED: ClassManifest]( + edges: RDD[Edge[ED]], + partitionStrategy: PartitionStrategy): GraphImpl[Int, ED] = { val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) } .reduceByKey(_ + _) - GraphImpl(vertices, edges, 0) + GraphImpl(vertices, edges, 0, (a: Int, b: Int) => a, partitionStrategy) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala new file mode 100644 index 0000000000..cf65f50657 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala @@ -0,0 +1,94 @@ +package org.apache.spark.graph + + +sealed trait PartitionStrategy extends Serializable { + def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid +} + + +/** + * This function implements a classic 2D-Partitioning of a sparse matrix. + * Suppose we have a graph with 11 vertices that we want to partition + * over 9 machines. We can use the following sparse matrix representation: + * + * __________________________________ + * v0 | P0 * | P1 | P2 * | + * v1 | **** | * | | + * v2 | ******* | ** | **** | + * v3 | ***** | * * | * | + * ---------------------------------- + * v4 | P3 * | P4 *** | P5 ** * | + * v5 | * * | * | | + * v6 | * | ** | **** | + * v7 | * * * | * * | * | + * ---------------------------------- + * v8 | P6 * | P7 * | P8 * *| + * v9 | * | * * | | + * v10 | * | ** | * * | + * v11 | * <-E | *** | ** | + * ---------------------------------- + * + * The edge denoted by E connects v11 with v1 and is assigned to + * processor P6. To get the processor number we divide the matrix + * into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges + * adjacent to v11 can only be in the first colum of + * blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8). + * As a consequence we can guarantee that v11 will need to be + * replicated to at most 2 * sqrt(numProc) machines. + * + * Notice that P0 has many edges and as a consequence this + * partitioning would lead to poor work balance. To improve + * balance we first multiply each vertex id by a large prime + * to effectively shuffle the vertex locations. + * + * One of the limitations of this approach is that the number of + * machines must either be a perfect square. We partially address + * this limitation by computing the machine assignment to the next + * largest perfect square and then mapping back down to the actual + * number of machines. Unfortunately, this can also lead to work + * imbalance and so it is suggested that a perfect square is used. + * + * + */ +case class EdgePartition2D() extends PartitionStrategy { + override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { + val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt + val mixingPrime: Vid = 1125899906842597L + val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt + val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt + (col * ceilSqrtNumParts + row) % numParts + } +} + + +case class EdgePartition1D() extends PartitionStrategy { + override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { + val mixingPrime: Vid = 1125899906842597L + (math.abs(src) * mixingPrime).toInt % numParts + } +} + + +/** + * Assign edges to an aribtrary machine corresponding to a + * random vertex cut. + */ +case class RandomVertexCut() extends PartitionStrategy { + override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { + math.abs((src, dst).hashCode()) % numParts + } +} + + +/** + * Assign edges to an arbitrary machine corresponding to a random vertex cut. This + * function ensures that edges of opposite direction between the same two vertices + * will end up on the same partition. + */ +case class CanonicalRandomVertexCut() extends PartitionStrategy { + override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { + val lower = math.min(src, dst) + val higher = math.max(src, dst) + math.abs((lower, higher).hashCode()) % numParts + } +} diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index d16a81d203..6ad0ce60a7 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -8,6 +8,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkContext._ import org.apache.spark.HashPartitioner import org.apache.spark.util.ClosureCleaner +import org.apache.spark.SparkException import org.apache.spark.Partitioner import org.apache.spark.graph._ @@ -76,10 +77,11 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( @transient val vTable: VertexSetRDD[VD], @transient val vid2pid: Vid2Pid, @transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)], - @transient val eTable: RDD[(Pid, EdgePartition[ED])] ) + @transient val eTable: RDD[(Pid, EdgePartition[ED])], + @transient val partitioner: PartitionStrategy) extends Graph[VD, ED] { - def this() = this(null, null, null, null) + def this() = this(null, null, null, null, null) @transient val vTableReplicatedValues: VTableReplicatedValues[VD] = new VTableReplicatedValues(vTable, vid2pid, localVidMap) @@ -189,18 +191,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def reverse: Graph[VD, ED] = { val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) }, preservesPartitioning = true) - new GraphImpl(vTable, vid2pid, localVidMap, newEtable) + new GraphImpl(vTable, vid2pid, localVidMap, newEtable, partitioner) } override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data)) - new GraphImpl(newVTable, vid2pid, localVidMap, eTable) + new GraphImpl(newVTable, vid2pid, localVidMap, eTable, partitioner) } override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = { val newETable = eTable.mapPartitions(_.map{ case (pid, epart) => (pid, epart.map(f)) }, preservesPartitioning = true) - new GraphImpl(vTable, vid2pid, localVidMap, newETable) + new GraphImpl(vTable, vid2pid, localVidMap, newETable, partitioner) } override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = @@ -234,7 +236,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( triplets.filter( t => vpred( t.srcId, t.srcAttr ) && vpred( t.dstId, t.dstAttr ) && epred(t) ) - .map( t => Edge(t.srcId, t.dstId, t.attr) )) + .map( t => Edge(t.srcId, t.dstId, t.attr) ), partitioner) // Construct the Vid2Pid map. Here we assume that the filter operation // behaves deterministically. @@ -242,48 +244,60 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val newVid2Pid = new Vid2Pid(newETable, newVTable.index) val newVidMap = createLocalVidMap(newETable) - new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable) + new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable, partitioner) } override def groupEdgeTriplets[ED2: ClassManifest]( f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = { - val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIter => - partIter - // TODO(crankshaw) toList requires that the entire edge partition - // can fit in memory right now. - .toList - // groups all ETs in this partition that have the same src and dst - // Because all ETs with the same src and dst will live on the same - // partition due to the EdgePartitioner, this guarantees that these - // ET groups will be complete. - .groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) } - .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) } - .toList - .toIterator - .map { case ((src, dst), data) => Edge(src, dst, data) } - .toIterator - } + partitioner match { + case _: CanonicalRandomVertexCut => { + val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIter => + partIter + // TODO(crankshaw) toList requires that the entire edge partition + // can fit in memory right now. + .toList + // groups all ETs in this partition that have the same src and dst + // Because all ETs with the same src and dst will live on the same + // partition due to the canonicalRandomVertexCut partitioner, this + // guarantees that these ET groups will be complete. + .groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) } + .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) } + .toList + .toIterator + .map { case ((src, dst), data) => Edge(src, dst, data) } + .toIterator + } + //TODO(crankshaw) eliminate the need to call createETable + val newETable = createETable(newEdges, partitioner) + new GraphImpl(vTable, vid2pid, localVidMap, newETable, partitioner) + } - //TODO(crankshaw) eliminate the need to call createETable - val newETable = createETable(newEdges) - new GraphImpl(vTable, vid2pid, localVidMap, newETable) + case _ => throw new SparkException(partitioner.getClass.getName + + " is incompatible with groupEdgeTriplets") + } } override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ): Graph[VD,ED2] = { + partitioner match { + case _: CanonicalRandomVertexCut => { + val newEdges: RDD[Edge[ED2]] = edges.mapPartitions { partIter => + partIter.toList + .groupBy { e: Edge[ED] => (e.srcId, e.dstId) } + .mapValues { ts => f(ts.toIterator) } + .toList + .toIterator + .map { case ((src, dst), data) => Edge(src, dst, data) } + } + // TODO(crankshaw) eliminate the need to call createETable + val newETable = createETable(newEdges, partitioner) - val newEdges: RDD[Edge[ED2]] = edges.mapPartitions { partIter => - partIter.toList - .groupBy { e: Edge[ED] => (e.srcId, e.dstId) } - .mapValues { ts => f(ts.toIterator) } - .toList - .toIterator - .map { case ((src, dst), data) => Edge(src, dst, data) } - } - // TODO(crankshaw) eliminate the need to call createETable - val newETable = createETable(newEdges) + new GraphImpl(vTable, vid2pid, localVidMap, newETable, partitioner) + } - new GraphImpl(vTable, vid2pid, localVidMap, newETable) + case _ => throw new SparkException(partitioner.getClass.getName + + " is incompatible with groupEdges") + } } ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -301,7 +315,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( : Graph[VD2, ED] = { ClosureCleaner.clean(updateF) val newVTable = vTable.leftJoin(updates)(updateF) - new GraphImpl(newVTable, vid2pid, localVidMap, eTable) + new GraphImpl(newVTable, vid2pid, localVidMap, eTable, partitioner) } } // end of class GraphImpl @@ -309,10 +323,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( object GraphImpl { def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], - defaultVertexAttr: VD): - GraphImpl[VD,ED] = { - apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a) + vertices: RDD[(Vid, VD)], + edges: RDD[Edge[ED]], + defaultVertexAttr: VD): GraphImpl[VD,ED] = { + apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, RandomVertexCut()) + } + + def apply[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[(Vid, VD)], + edges: RDD[Edge[ED]], + defaultVertexAttr: VD, + partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = { + apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, partitionStrategy) } def apply[VD: ClassManifest, ED: ClassManifest]( @@ -320,9 +342,18 @@ object GraphImpl { edges: RDD[Edge[ED]], defaultVertexAttr: VD, mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = { + apply(vertices, edges, defaultVertexAttr, mergeFunc, RandomVertexCut()) + } + + def apply[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[(Vid, VD)], + edges: RDD[Edge[ED]], + defaultVertexAttr: VD, + mergeFunc: (VD, VD) => VD, + partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = { vertices.cache - val etable = createETable(edges).cache + val etable = createETable(edges, partitionStrategy).cache // Get the set of all vids, preserving partitions val partitioner = Partitioner.defaultPartitioner(vertices) val implicitVids = etable.flatMap { @@ -338,9 +369,10 @@ object GraphImpl { val vid2pid = new Vid2Pid(etable, vtable.index) val localVidMap = createLocalVidMap(etable) - new GraphImpl(vtable, vid2pid, localVidMap, etable) + new GraphImpl(vtable, vid2pid, localVidMap, etable, partitionStrategy) } + /** * Create the edge table RDD, which is much more efficient for Java heap storage than the * normal edges data structure (RDD[(Vid, Vid, ED)]). @@ -349,17 +381,14 @@ object GraphImpl { * key-value pair: the key is the partition id, and the value is an EdgePartition object * containing all the edges in a partition. */ - protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]]) - : RDD[(Pid, EdgePartition[ED])] = { + protected def createETable[ED: ClassManifest]( + edges: RDD[Edge[ED]], + partitionStrategy: PartitionStrategy): RDD[(Pid, EdgePartition[ED])] = { // Get the number of partitions val numPartitions = edges.partitions.size - val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt + edges.map { e => - // Random partitioning based on the source vertex id. - // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions) - // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) - val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions) - //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) + val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) // Should we be using 3-tuple or an optimized class new MessageToPartition(part, (e.srcId, e.dstId, e.attr)) @@ -420,7 +449,7 @@ object GraphImpl { } Iterator((pid, newEdgePartition)) } - new GraphImpl(g.vTable, g.vid2pid, g.localVidMap, newETable) + new GraphImpl(g.vTable, g.vid2pid, g.localVidMap, newETable, g.partitioner) } def mapReduceTriplets[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( @@ -491,86 +520,6 @@ object GraphImpl { VertexSetRDD.aggregate(preAgg, g.vTable.index, reduceFunc) } - protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = { - val mixingPrime: Vid = 1125899906842597L - (math.abs(src) * mixingPrime).toInt % numParts - } - - /** - * This function implements a classic 2D-Partitioning of a sparse matrix. - * Suppose we have a graph with 11 vertices that we want to partition - * over 9 machines. We can use the following sparse matrix representation: - * - * __________________________________ - * v0 | P0 * | P1 | P2 * | - * v1 | **** | * | | - * v2 | ******* | ** | **** | - * v3 | ***** | * * | * | - * ---------------------------------- - * v4 | P3 * | P4 *** | P5 ** * | - * v5 | * * | * | | - * v6 | * | ** | **** | - * v7 | * * * | * * | * | - * ---------------------------------- - * v8 | P6 * | P7 * | P8 * *| - * v9 | * | * * | | - * v10 | * | ** | * * | - * v11 | * <-E | *** | ** | - * ---------------------------------- - * - * The edge denoted by E connects v11 with v1 and is assigned to - * processor P6. To get the processor number we divide the matrix - * into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges - * adjacent to v11 can only be in the first colum of - * blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8). - * As a consequence we can guarantee that v11 will need to be - * replicated to at most 2 * sqrt(numProc) machines. - * - * Notice that P0 has many edges and as a consequence this - * partitioning would lead to poor work balance. To improve - * balance we first multiply each vertex id by a large prime - * to effectively shuffle the vertex locations. - * - * One of the limitations of this approach is that the number of - * machines must either be a perfect square. We partially address - * this limitation by computing the machine assignment to the next - * largest perfect square and then mapping back down to the actual - * number of machines. Unfortunately, this can also lead to work - * imbalance and so it is suggested that a perfect square is used. - * - * - */ - protected def edgePartitionFunction2D(src: Vid, dst: Vid, - numParts: Pid, ceilSqrtNumParts: Pid): Pid = { - val mixingPrime: Vid = 1125899906842597L - val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt - val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt - (col * ceilSqrtNumParts + row) % numParts - } - - /** - * Assign edges to an aribtrary machine corresponding to a - * random vertex cut. - */ - protected def randomVertexCut(src: Vid, dst: Vid, numParts: Pid): Pid = { - math.abs((src, dst).hashCode()) % numParts - } - - /** - * @todo This will only partition edges to the upper diagonal - * of the 2D processor space. - */ - protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid, - numParts: Pid, ceilSqrtNumParts: Pid): Pid = { - val mixingPrime: Vid = 1125899906842597L - // Partitions by canonical edge direction - val src = math.min(srcOrig, dstOrig) - val dst = math.max(srcOrig, dstOrig) - val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt - val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt - (col * ceilSqrtNumParts + row) % numParts - } - private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest]( closure: AnyRef, attrName: String): Boolean = { try { |