From 38c6f5f6122ba32e1ef5d8b8a48ec99e6446d7e1 Mon Sep 17 00:00:00 2001 From: Akihiro Matsukawa Date: Wed, 4 Dec 2013 18:17:14 -0800 Subject: add a predicate to GraphLab to indicate active vertices at start --- graph/src/main/scala/org/apache/spark/graph/GraphLab.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'graph/src/main') diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala index bf1f4168da..799c0fc901 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -24,6 +24,8 @@ object GraphLab { * @param scatterFunc Executed after the apply function the scatter function takes * a triplet and signals whether the neighboring vertex program * must be recomputed. + * @param startVertices predicate to determine which vertices to start the computation on. + * these will be the active vertices in the first iteration. * @param numIter The maximum number of iterations to run. * @param gatherDirection The direction of edges to consider during the gather phase * @param scatterDirection The direction of edges to consider during the scatter phase @@ -40,12 +42,13 @@ object GraphLab { (gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A, mergeFunc: (A, A) => A, applyFunc: (Vid, VD, Option[A]) => VD, - scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean): Graph[VD, ED] = { + scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean, + startVertices: (Vid, VD) => Boolean = (vid: Vid, data: VD) => true): Graph[VD, ED] = { // Add an active attribute to all vertices to track convergence. var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices { - case (id, data) => (true, data) + case (id, data) => (startVertices(id, data), data) }.cache() // The gather function wrapper strips the active attribute and -- cgit v1.2.3 From c49a6a7954e9e1abf2377c5e6121819aa27f6e68 Mon Sep 17 00:00:00 2001 From: Akihiro Matsukawa Date: Wed, 4 Dec 2013 18:18:28 -0800 Subject: vertex should be inactive if no vertices scatter to it --- graph/src/main/scala/org/apache/spark/graph/GraphLab.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'graph/src/main') diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala index 799c0fc901..aa35f9a746 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -89,9 +89,9 @@ object GraphLab { } // Used to set the active status of vertices for the next round - def applyActive(vid: Vid, data: (Boolean, VD), newActive: Boolean): (Boolean, VD) = { + def applyActive(vid: Vid, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = { val (prevActive, vData) = data - (newActive, vData) + (newActiveOpt.getOrElse(false), vData) } // Main Loop --------------------------------------------------------------------- @@ -113,7 +113,7 @@ object GraphLab { val scattered: RDD[(Vid, Boolean)] = activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) - activeGraph = activeGraph.joinVertices(scattered)(applyActive).cache() + activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache() // Calculate the number of active vertices numActive = activeGraph.vertices.map{ -- cgit v1.2.3 From 1e98840128f3cffbe6566b384e742b3a52cdaa9f Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 6 Dec 2013 22:32:47 -0800 Subject: Load edges in columnar format In GraphLoader.edgeListFile, load edges directly into EdgePartitions, avoiding repartitioning. --- .../scala/org/apache/spark/graph/EdgeRDD.scala | 11 ++- .../scala/org/apache/spark/graph/GraphLoader.scala | 55 +++++++++----- .../org/apache/spark/graph/impl/GraphImpl.scala | 88 +++++++++++++--------- 3 files changed, 101 insertions(+), 53 deletions(-) (limited to 'graph/src/main') diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index 24844262bc..a34113b1eb 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -1,6 +1,7 @@ package org.apache.spark.graph +import org.apache.spark.Partitioner import org.apache.spark.{TaskContext, Partition, OneToOneDependency} import org.apache.spark.graph.impl.EdgePartition import org.apache.spark.rdd.RDD @@ -13,10 +14,16 @@ class EdgeRDD[@specialized ED: ClassManifest]( partitionsRDD.setName("EdgeRDD") - override val partitioner = partitionsRDD.partitioner - override protected def getPartitions: Array[Partition] = partitionsRDD.partitions + /** + * If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in + * partitionsRDD correspond to the actual partitions and create a new partitioner that allows + * co-partitioning with partitionsRDD. + */ + override val partitioner = + partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) + override def compute(split: Partition, context: TaskContext): Iterator[Edge[ED]] = { val edgePartition = partitionsRDD.compute(split, context).next()._2 edgePartition.iterator diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index 29d14452de..b00c7c4afe 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -1,9 +1,12 @@ package org.apache.spark.graph -import org.apache.spark.SparkContext +import java.util.{Arrays => JArrays} +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.graph.impl.{EdgePartition, GraphImpl} +import org.apache.spark.util.collection.PrimitiveVector -object GraphLoader { +object GraphLoader extends Logging { /** * Load an edge list from file initializing the Graph @@ -77,24 +80,42 @@ object GraphLoader { minEdgePartitions: Int = 1, partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[Int, Int] = { + val startTime = System.currentTimeMillis + // Parse the edge data table - val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter => - iter.filter(line => !line.isEmpty && line(0) != '#').map { line => - val lineArray = line.split("\\s+") - if(lineArray.length < 2) { - println("Invalid line: " + line) - assert(false) + val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (index, iter) => + val srcIds = new PrimitiveVector[Long] + val dstIds = new PrimitiveVector[Long] + iter.foreach { line => + if (!line.isEmpty && line(0) != '#') { + val lineArray = line.split("\\s+") + if (lineArray.length < 2) { + logWarning("Invalid line: " + line) + } + val srcId = lineArray(0).toLong + val dstId = lineArray(1).toLong + if (canonicalOrientation && dstId > srcId) { + srcIds += dstId + dstIds += srcId + } else { + srcIds += srcId + dstIds += dstId + } } - val source = lineArray(0).trim.toLong - val target = lineArray(1).trim.toLong - if (canonicalOrientation && target > source) { - Edge(target, source, 1) - } else { - Edge(source, target, 1) - } - }) + } + val srcIdArray = srcIds.trim().array + val dstIdArray = dstIds.trim().array + val data = new Array[Int](srcIdArray.length) + JArrays.fill(data, 1) + + Iterator((index, new EdgePartition[Int](srcIdArray, dstIdArray, data))) + }.cache() + edges.count() + + logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) + val defaultVertexAttr = 1 - Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy) + GraphImpl.fromEdgePartitions(edges, defaultVertexAttr, partitionStrategy) } // end of edgeListFile } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 771c460345..1e17fd5a67 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -253,25 +253,11 @@ object GraphImpl { def apply[VD: ClassManifest, ED: ClassManifest]( edges: RDD[Edge[ED]], - defaultValue: VD, + defaultVertexAttr: VD, partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = { val etable = createETable(edges, partitionStrategy).cache() - - // Get the set of all vids - val vids = etable.flatMap { e => - Iterator((e.srcId, 0), (e.dstId, 0)) - } - - // Shuffle the vids and create the VertexRDD. - // TODO: Consider doing map side distinct before shuffle. - val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)]( - vids, new HashPartitioner(edges.partitions.size)) - shuffled.setSerializer(classOf[VidMsgSerializer].getName) - val vtable = VertexRDD(shuffled.mapValues(x => defaultValue)) - - val vertexPlacement = new VertexPlacement(etable, vtable) - new GraphImpl(vtable, etable, vertexPlacement) + fromEdgeRDD(etable, defaultVertexAttr) } def apply[VD: ClassManifest, ED: ClassManifest]( @@ -303,6 +289,14 @@ object GraphImpl { new GraphImpl(vtable, etable, vertexPlacement) } + def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest]( + edges: RDD[(Pid, EdgePartition[ED])], + defaultVertexAttr: VD, + partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = { + val etable = createETableFromEdgePartitions(edges, partitionStrategy) + fromEdgeRDD(etable, defaultVertexAttr) + } + /** * Create the edge table RDD, which is much more efficient for Java heap storage than the * normal edges data structure (RDD[(Vid, Vid, ED)]). @@ -313,29 +307,55 @@ object GraphImpl { */ protected def createETable[ED: ClassManifest]( edges: RDD[Edge[ED]], - partitionStrategy: PartitionStrategy): EdgeRDD[ED] = { - // Get the number of partitions - val numPartitions = edges.partitions.size + partitionStrategy: PartitionStrategy): EdgeRDD[ED] = { + // Get the number of partitions + val numPartitions = edges.partitions.size - val eTable = edges.map { e => - val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) + val eTable = edges.map { e => + val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) - // Should we be using 3-tuple or an optimized class - new MessageToPartition(part, (e.srcId, e.dstId, e.attr)) - } - .partitionBy(new HashPartitioner(numPartitions)) - .mapPartitionsWithIndex( { (pid, iter) => - val builder = new EdgePartitionBuilder[ED] - iter.foreach { message => - val data = message.data - builder.add(data._1, data._2, data._3) - } - val edgePartition = builder.toEdgePartition - Iterator((pid, edgePartition)) - }, preservesPartitioning = true).cache() + // Should we be using 3-tuple or an optimized class + new MessageToPartition(part, (e.srcId, e.dstId, e.attr)) + } + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitionsWithIndex( { (pid, iter) => + val builder = new EdgePartitionBuilder[ED] + iter.foreach { message => + val data = message.data + builder.add(data._1, data._2, data._3) + } + val edgePartition = builder.toEdgePartition + Iterator((pid, edgePartition)) + }, preservesPartitioning = true).cache() new EdgeRDD(eTable) } + protected def createETableFromEdgePartitions[ED: ClassManifest]( + edges: RDD[(Pid, EdgePartition[ED])], + partitionStrategy: PartitionStrategy): EdgeRDD[ED] = { + // TODO(ankurdave): provide option to repartition edges using partitionStrategy + new EdgeRDD(edges) + } + + private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest]( + edges: EdgeRDD[ED], + defaultVertexAttr: VD): GraphImpl[VD, ED] = { + // Get the set of all vids + val vids = edges.flatMap { e => + Iterator((e.srcId, 0), (e.dstId, 0)) + } + + // Shuffle the vids and create the VertexRDD. + // TODO: Consider doing map side distinct before shuffle. + val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)]( + vids, new HashPartitioner(edges.partitions.size)) + shuffled.setSerializer(classOf[VidMsgSerializer].getName) + val vtable = VertexRDD(shuffled.mapValues(x => defaultVertexAttr)) + + val vertexPlacement = new VertexPlacement(edges, vtable) + new GraphImpl(vtable, edges, vertexPlacement) + } + private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = { try { BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName) -- cgit v1.2.3 From a8c7ebf0edc8161debc3f84b0008638321ddd568 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Sat, 7 Dec 2013 15:10:32 -0800 Subject: Don't partition edges by default; refactor Instead, expose Graph.partitionBy(PartitionStrategy). --- .../scala/org/apache/spark/graph/Analytics.scala | 82 ++++++------ .../scala/org/apache/spark/graph/EdgeRDD.scala | 4 + .../main/scala/org/apache/spark/graph/Graph.scala | 52 ++++---- .../scala/org/apache/spark/graph/GraphLoader.scala | 34 ++--- .../spark/graph/algorithms/TriangleCount.scala | 4 +- .../org/apache/spark/graph/impl/GraphImpl.scala | 148 ++++++++++----------- .../org/apache/spark/graph/AnalyticsSuite.scala | 2 +- 7 files changed, 162 insertions(+), 164 deletions(-) (limited to 'graph/src/main') diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index ac50e9a388..2012dadb2f 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -58,14 +58,14 @@ object Analytics extends Logging { var outFname = "" var numVPart = 4 var numEPart = 4 - var partitionStrategy: PartitionStrategy = RandomVertexCut + var partitionStrategy: Option[PartitionStrategy] = None options.foreach{ case ("tol", v) => tol = v.toFloat case ("output", v) => outFname = v case ("numVPart", v) => numVPart = v.toInt case ("numEPart", v) => numEPart = v.toInt - case ("partStrategy", v) => partitionStrategy = pickPartitioner(v) + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } @@ -75,8 +75,9 @@ object Analytics extends Logging { val sc = new SparkContext(host, "PageRank(" + fname + ")") - val graph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart, partitionStrategy = partitionStrategy).cache() + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) println("GRAPHX: Number of vertices " + graph.vertices.count) println("GRAPHX: Number of edges " + graph.edges.count) @@ -96,44 +97,47 @@ object Analytics extends Logging { case "cc" => { - var numIter = Int.MaxValue - var numVPart = 4 - var numEPart = 4 - var isDynamic = false - var partitionStrategy: PartitionStrategy = RandomVertexCut - - options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean - case ("numEPart", v) => numEPart = v.toInt - case ("numVPart", v) => numVPart = v.toInt - case ("partStrategy", v) => partitionStrategy = pickPartitioner(v) - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - if(!isDynamic && numIter == Int.MaxValue) { - println("Set number of iterations!") - sys.exit(1) - } - println("======================================") - println("| Connected Components |") - println("--------------------------------------") - println(" Using parameters:") - println(" \tDynamic: " + isDynamic) - println(" \tNumIter: " + numIter) - println("======================================") - - val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") - val graph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache() - val cc = ConnectedComponents.run(graph) - println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) - sc.stop() - } + var numIter = Int.MaxValue + var numVPart = 4 + var numEPart = 4 + var isDynamic = false + var partitionStrategy: Option[PartitionStrategy] = None + + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case ("numEPart", v) => numEPart = v.toInt + case ("numVPart", v) => numVPart = v.toInt + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + if(!isDynamic && numIter == Int.MaxValue) { + println("Set number of iterations!") + sys.exit(1) + } + println("======================================") + println("| Connected Components |") + println("--------------------------------------") + println(" Using parameters:") + println(" \tDynamic: " + isDynamic) + println(" \tNumIter: " + numIter) + println("======================================") + + val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + val cc = ConnectedComponents.run(graph) + println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) + sc.stop() + } case "triangles" => { var numVPart = 4 var numEPart = 4 + // TriangleCount requires the graph to be partitioned var partitionStrategy: PartitionStrategy = RandomVertexCut options.foreach{ @@ -147,7 +151,7 @@ object Analytics extends Logging { println("--------------------------------------") val sc = new SparkContext(host, "TriangleCount(" + fname + ")") val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, - minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache() + minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() val triangles = TriangleCount.run(graph) println("Triangles: " + triangles.vertices.map { case (vid,data) => data.toLong diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index a34113b1eb..cbc3ca54c2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -64,4 +64,8 @@ class EdgeRDD[@specialized ED: ClassManifest]( } } + def collectVids(): RDD[Vid] = { + partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } + } + } diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index a0907c319a..d31d9dead0 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -85,6 +85,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { */ def cache(): Graph[VD, ED] + /** + * Repartition the edges in the graph according to partitionStrategy. + */ + def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] + /** * Compute statistics describing the graph representation. */ @@ -193,18 +198,15 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] /** - * This function merges multiple edges between two vertices into a - * single Edge. See - * [[org.apache.spark.graph.Graph.groupEdgeTriplets]] for more - * detail. + * This function merges multiple edges between two vertices into a single Edge. For correct + * results, the graph must have been partitioned using partitionBy. * * @tparam ED2 the type of the resulting edge data after grouping. * - * @param f the user supplied commutative associative function to merge - * edge attributes for duplicate edges. + * @param f the user supplied commutative associative function to merge edge attributes for + * duplicate edges. * - * @return Graph[VD,ED2] The resulting graph with a single Edge for - * each source, dest vertex pair. + * @return Graph[VD,ED2] The resulting graph with a single Edge for each source, dest vertex pair. */ def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] @@ -294,26 +296,26 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { object Graph { /** - * Construct a graph from a collection of edges encoded as vertex id - * pairs. + * Construct a graph from a collection of edges encoded as vertex id pairs. * * @param rawEdges a collection of edges in (src,dst) form. - * @param uniqueEdges if multiple identical edges are found they are - * combined and the edge attribute is set to the sum. Otherwise - * duplicate edges are treated as separate. + * @param uniqueEdges if multiple identical edges are found they are combined and the edge + * attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable + * uniqueEdges, a [[PartitionStrategy]] must be provided. * - * @return a graph with edge attributes containing either the count - * of duplicate edges or 1 (if `uniqueEdges=false`) and vertex - * attributes containing the total degree of each vertex. + * @return a graph with edge attributes containing either the count of duplicate edges or 1 + * (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex. */ def fromEdgeTuples[VD: ClassManifest]( rawEdges: RDD[(Vid, Vid)], defaultValue: VD, - uniqueEdges: Boolean = false, - partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, Int] = { + uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = { val edges = rawEdges.map(p => Edge(p._1, p._2, 1)) - val graph = GraphImpl(edges, defaultValue, partitionStrategy) - if (uniqueEdges) graph.groupEdges((a, b) => a + b) else graph + val graph = GraphImpl(edges, defaultValue) + uniqueEdges match { + case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b) + case None => graph + } } /** @@ -327,9 +329,8 @@ object Graph { */ def fromEdges[VD: ClassManifest, ED: ClassManifest]( edges: RDD[Edge[ED]], - defaultValue: VD, - partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, ED] = { - GraphImpl(edges, defaultValue, partitionStrategy) + defaultValue: VD): Graph[VD, ED] = { + GraphImpl(edges, defaultValue) } /** @@ -350,9 +351,8 @@ object Graph { def apply[VD: ClassManifest, ED: ClassManifest]( vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], - defaultVertexAttr: VD = null.asInstanceOf[VD], - partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, ED] = { - GraphImpl(vertices, edges, defaultVertexAttr, partitionStrategy) + defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = { + GraphImpl(vertices, edges, defaultVertexAttr) } /** diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index b00c7c4afe..a69bfde532 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -1,6 +1,7 @@ package org.apache.spark.graph import java.util.{Arrays => JArrays} +import org.apache.spark.graph.impl.EdgePartitionBuilder import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graph.impl.{EdgePartition, GraphImpl} import org.apache.spark.util.collection.PrimitiveVector @@ -25,8 +26,7 @@ object GraphLoader extends Logging { sc: SparkContext, path: String, edgeParser: Array[String] => ED, - minEdgePartitions: Int = 1, - partitionStrategy: PartitionStrategy = RandomVertexCut): + minEdgePartitions: Int = 1): Graph[Int, ED] = { // Parse the edge data table val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter => @@ -43,7 +43,7 @@ object GraphLoader extends Logging { Edge(source, target, edata) }) val defaultVertexAttr = 1 - Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy) + Graph.fromEdges(edges, defaultVertexAttr) } /** @@ -73,19 +73,17 @@ object GraphLoader extends Logging { * @tparam ED * @return */ - def edgeListFile[ED: ClassManifest]( + def edgeListFile( sc: SparkContext, path: String, canonicalOrientation: Boolean = false, - minEdgePartitions: Int = 1, - partitionStrategy: PartitionStrategy = RandomVertexCut): + minEdgePartitions: Int = 1): Graph[Int, Int] = { val startTime = System.currentTimeMillis - // Parse the edge data table - val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (index, iter) => - val srcIds = new PrimitiveVector[Long] - val dstIds = new PrimitiveVector[Long] + // Parse the edge data table directly into edge partitions + val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) => + val builder = new EdgePartitionBuilder[Int] iter.foreach { line => if (!line.isEmpty && line(0) != '#') { val lineArray = line.split("\\s+") @@ -95,27 +93,19 @@ object GraphLoader extends Logging { val srcId = lineArray(0).toLong val dstId = lineArray(1).toLong if (canonicalOrientation && dstId > srcId) { - srcIds += dstId - dstIds += srcId + builder.add(dstId, srcId, 1) } else { - srcIds += srcId - dstIds += dstId + builder.add(srcId, dstId, 1) } } } - val srcIdArray = srcIds.trim().array - val dstIdArray = dstIds.trim().array - val data = new Array[Int](srcIdArray.length) - JArrays.fill(data, 1) - - Iterator((index, new EdgePartition[Int](srcIdArray, dstIdArray, data))) + Iterator((pid, builder.toEdgePartition)) }.cache() edges.count() logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) - val defaultVertexAttr = 1 - GraphImpl.fromEdgePartitions(edges, defaultVertexAttr, partitionStrategy) + GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) } // end of edgeListFile } diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala index 963986d20d..b1cd3c47d0 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala @@ -16,7 +16,9 @@ object TriangleCount { * triangle is counted twice. * * - * @param graph a graph with `sourceId` less than `destId` + * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned + * using Graph.partitionBy. + * * @return */ def run[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD,ED]): Graph[Int, ED] = { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 1e17fd5a67..fea8cfa712 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -1,5 +1,6 @@ package org.apache.spark.graph.impl +import org.apache.spark.util.collection.PrimitiveVector import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.SparkContext._ import org.apache.spark.graph._ @@ -40,6 +41,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( this(vertices, edges, vertexPlacement, new VTableReplicated(vertices, edges, vertexPlacement)) } + def this( + vertices: VertexRDD[VD], + edges: EdgeRDD[ED]) = { + this(vertices, edges, new VertexPlacement(edges, vertices)) + } + /** Return a RDD that brings edges with its source and destination vertices together. */ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = { val vdManifest = classManifest[VD] @@ -60,6 +67,27 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY) + override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { + val numPartitions = edges.partitions.size + val newEdges = new EdgeRDD(edges.map { e => + val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) + + // Should we be using 3-tuple or an optimized class + new MessageToPartition(part, (e.srcId, e.dstId, e.attr)) + } + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitionsWithIndex( { (pid, iter) => + val builder = new EdgePartitionBuilder[ED] + iter.foreach { message => + val data = message.data + builder.add(data._1, data._2, data._3) + } + val edgePartition = builder.toEdgePartition + Iterator((pid, edgePartition)) + }, preservesPartitioning = true).cache()) + new GraphImpl(vertices, newEdges) + } + override def statistics: Map[String, Any] = { // Get the total number of vertices after replication, used to compute the replication ratio. def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = { @@ -175,10 +203,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( Iterator((pid, edgePartition)) }, preservesPartitioning = true)).cache() - // Construct the VertexPlacement map - val newVertexPlacement = new VertexPlacement(newETable, newVTable) - - new GraphImpl(newVTable, newETable, newVertexPlacement) + new GraphImpl(newVTable, newETable) } // end of subgraph override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = { @@ -246,6 +271,14 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( changedVerts, edges, vertexPlacement, Some(vTableReplicated)) new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated) } + + private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = { + try { + BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName) + } catch { + case _: ClassNotFoundException => true // if we don't know, be conservative + } + } } // end of class GraphImpl @@ -253,48 +286,35 @@ object GraphImpl { def apply[VD: ClassManifest, ED: ClassManifest]( edges: RDD[Edge[ED]], - defaultVertexAttr: VD, - partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = + defaultVertexAttr: VD): GraphImpl[VD, ED] = { - val etable = createETable(edges, partitionStrategy).cache() - fromEdgeRDD(etable, defaultVertexAttr) + fromEdgeRDD(createETable(edges), defaultVertexAttr) + } + + def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest]( + edges: RDD[(Pid, EdgePartition[ED])], + defaultVertexAttr: VD): GraphImpl[VD, ED] = { + fromEdgeRDD(createETableFromEdgePartitions(edges), defaultVertexAttr) } def apply[VD: ClassManifest, ED: ClassManifest]( vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], - defaultVertexAttr: VD, - partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = + defaultVertexAttr: VD): GraphImpl[VD, ED] = { - vertices.cache() - val etable = createETable(edges, partitionStrategy).cache() + val etable = createETable(edges).cache() + // Get the set of all vids val partitioner = Partitioner.defaultPartitioner(vertices) - val vPartitioned = vertices.partitionBy(partitioner) - - val vidsFromEdges = { - etable.partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } - .map(vid => (vid, 0)) - .partitionBy(partitioner) - } - + val vidsFromEdges = collectVidsFromEdges(etable, partitioner) val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) => vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1) } val vtable = VertexRDD(vids, vPartitioned, defaultVertexAttr) - val vertexPlacement = new VertexPlacement(etable, vtable) - new GraphImpl(vtable, etable, vertexPlacement) - } - - def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest]( - edges: RDD[(Pid, EdgePartition[ED])], - defaultVertexAttr: VD, - partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = { - val etable = createETableFromEdgePartitions(edges, partitionStrategy) - fromEdgeRDD(etable, defaultVertexAttr) + new GraphImpl(vtable, etable) } /** @@ -305,63 +325,41 @@ object GraphImpl { * key-value pair: the key is the partition id, and the value is an EdgePartition object * containing all the edges in a partition. */ - protected def createETable[ED: ClassManifest]( - edges: RDD[Edge[ED]], - partitionStrategy: PartitionStrategy): EdgeRDD[ED] = { - // Get the number of partitions - val numPartitions = edges.partitions.size - - val eTable = edges.map { e => - val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) - - // Should we be using 3-tuple or an optimized class - new MessageToPartition(part, (e.srcId, e.dstId, e.attr)) + private def createETable[ED: ClassManifest]( + edges: RDD[Edge[ED]]): EdgeRDD[ED] = { + val eTable = edges.mapPartitionsWithIndex { (pid, iter) => + val builder = new EdgePartitionBuilder[ED] + iter.foreach { e => + builder.add(e.srcId, e.dstId, e.attr) + } + Iterator((pid, builder.toEdgePartition)) } - .partitionBy(new HashPartitioner(numPartitions)) - .mapPartitionsWithIndex( { (pid, iter) => - val builder = new EdgePartitionBuilder[ED] - iter.foreach { message => - val data = message.data - builder.add(data._1, data._2, data._3) - } - val edgePartition = builder.toEdgePartition - Iterator((pid, edgePartition)) - }, preservesPartitioning = true).cache() new EdgeRDD(eTable) } - protected def createETableFromEdgePartitions[ED: ClassManifest]( - edges: RDD[(Pid, EdgePartition[ED])], - partitionStrategy: PartitionStrategy): EdgeRDD[ED] = { - // TODO(ankurdave): provide option to repartition edges using partitionStrategy + private def createETableFromEdgePartitions[ED: ClassManifest]( + edges: RDD[(Pid, EdgePartition[ED])]): EdgeRDD[ED] = { new EdgeRDD(edges) } private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest]( edges: EdgeRDD[ED], defaultVertexAttr: VD): GraphImpl[VD, ED] = { + edges.cache() // Get the set of all vids - val vids = edges.flatMap { e => - Iterator((e.srcId, 0), (e.dstId, 0)) - } - - // Shuffle the vids and create the VertexRDD. - // TODO: Consider doing map side distinct before shuffle. - val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)]( - vids, new HashPartitioner(edges.partitions.size)) - shuffled.setSerializer(classOf[VidMsgSerializer].getName) - val vtable = VertexRDD(shuffled.mapValues(x => defaultVertexAttr)) - - val vertexPlacement = new VertexPlacement(edges, vtable) - new GraphImpl(vtable, edges, vertexPlacement) + val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size)) + // Create the VertexRDD. + val vtable = VertexRDD(vids.mapValues(x => defaultVertexAttr)) + new GraphImpl(vtable, edges) } - private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = { - try { - BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName) - } catch { - case _: ClassNotFoundException => true // if we don't know, be conservative - } + /** Collects all vids mentioned in edges and partitions them by partitioner. */ + private def collectVidsFromEdges( + edges: EdgeRDD[_], + partitioner: Partitioner): RDD[(Vid, Int)] = { + // TODO: Consider doing map side distinct before shuffle. + new ShuffledRDD[Vid, Int, (Vid, Int)]( + edges.collectVids.map(vid => (vid, 0)), partitioner) + .setSerializer(classOf[VidMsgSerializer].getName) } - } // end of object GraphImpl diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala index 2e6b57a8ec..61cafcd7c3 100644 --- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -251,7 +251,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { withSpark(new SparkContext("local", "test")) { sc => val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) - val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val graph = Graph.fromEdgeTuples(rawEdges, Some(RandomVertexCut)).cache() val triangleCount = TriangleCount.run(graph) val verts = triangleCount.vertices verts.collect.foreach { case (vid, count) => assert(count === 1) } -- cgit v1.2.3 From d00cc8092bf9c09fffedafbe6d040e2f7bc1da5a Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 12 Dec 2013 17:40:00 -0800 Subject: Fix argument bug and closure capture --- graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala | 3 ++- graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'graph/src/main') diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index fea8cfa712..ba2ebe6497 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -69,6 +69,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { val numPartitions = edges.partitions.size + val edManifest = classManifest[ED] val newEdges = new EdgeRDD(edges.map { e => val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) @@ -77,7 +78,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } .partitionBy(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex( { (pid, iter) => - val builder = new EdgePartitionBuilder[ED] + val builder = new EdgePartitionBuilder[ED]()(edManifest) iter.foreach { message => val data = message.data builder.add(data._1, data._2, data._3) diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala index 61cafcd7c3..b413b4587e 100644 --- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -251,7 +251,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { withSpark(new SparkContext("local", "test")) { sc => val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) - val graph = Graph.fromEdgeTuples(rawEdges, Some(RandomVertexCut)).cache() + val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() val triangleCount = TriangleCount.run(graph) val verts = triangleCount.vertices verts.collect.foreach { case (vid, count) => assert(count === 1) } -- cgit v1.2.3 From 016cabceca624611b2be33c29649a78aee9c0f39 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 12 Dec 2013 17:48:24 -0800 Subject: Clean up imports in EdgeRDD --- graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'graph/src/main') diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index cbc3ca54c2..9a1ebbcdcc 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -1,8 +1,6 @@ package org.apache.spark.graph - -import org.apache.spark.Partitioner -import org.apache.spark.{TaskContext, Partition, OneToOneDependency} +import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} import org.apache.spark.graph.impl.EdgePartition import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -- cgit v1.2.3 From 62bdc44a1e6a28d313e693474071da04caf41c02 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 6 Dec 2013 17:36:09 -0800 Subject: Unrolled while loop in readVarLong. (cherry picked from commit 45ffb1ae3c0527aae50502741a3585c411875b9a) Signed-off-by: Ankur Dave --- .../org/apache/spark/graph/impl/Serializers.scala | 44 +++++++++++++++++----- 1 file changed, 34 insertions(+), 10 deletions(-) (limited to 'graph/src/main') diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala index 68b38de2b8..e4fa4a4421 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala @@ -295,7 +295,7 @@ abstract class ShuffleDeserializationStream(s: InputStream) extends Deserializat var i: Int = 0 def readOrThrow(): Int = { val in = s.read() - if (in < 0) throw new java.io.EOFException + if (in < 0) throw new EOFException in & 0xFF } var b: Int = readOrThrow() @@ -309,21 +309,45 @@ abstract class ShuffleDeserializationStream(s: InputStream) extends Deserializat } def readVarLong(optimizePositive: Boolean): Long = { - // TODO: unroll the while loop. - var value: Long = 0L def readOrThrow(): Int = { val in = s.read() - if (in < 0) throw new java.io.EOFException + if (in < 0) throw new EOFException in & 0xFF } - var i: Int = 0 - var b: Int = readOrThrow() - while (i < 56 && (b & 0x80) != 0) { - value |= (b & 0x7F).toLong << i - i += 7 + var b = readOrThrow() + var ret: Long = b & 0x7F + if ((b & 0x80) != 0) { b = readOrThrow() + ret |= (b & 0x7F) << 7 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F) << 14 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F) << 21 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F).toLong << 28 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F).toLong << 35 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F).toLong << 42 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F).toLong << 49 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= b.toLong << 56 + } + } + } + } + } + } + } } - val ret = value | (b.toLong << i) if (!optimizePositive) (ret >>> 1) ^ -(ret & 1) else ret } -- cgit v1.2.3