From feaa07802203b79f454454445c0a12a2784ccfeb Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Sat, 11 Jan 2014 00:28:18 -0800 Subject: algorithms -> lib --- .../main/scala/org/apache/spark/graphx/Graph.scala | 4 +- .../spark/graphx/algorithms/Algorithms.scala | 66 --------- .../apache/spark/graphx/algorithms/Analytics.scala | 157 -------------------- .../graphx/algorithms/ConnectedComponents.scala | 39 ----- .../apache/spark/graphx/algorithms/PageRank.scala | 160 --------------------- .../spark/graphx/algorithms/SVDPlusPlus.scala | 107 -------------- .../algorithms/StronglyConnectedComponents.scala | 88 ------------ .../spark/graphx/algorithms/TriangleCount.scala | 78 ---------- .../apache/spark/graphx/algorithms/package.scala | 8 -- .../org/apache/spark/graphx/lib/Algorithms.scala | 66 +++++++++ .../org/apache/spark/graphx/lib/Analytics.scala | 157 ++++++++++++++++++++ .../spark/graphx/lib/ConnectedComponents.scala | 39 +++++ .../org/apache/spark/graphx/lib/PageRank.scala | 160 +++++++++++++++++++++ .../org/apache/spark/graphx/lib/SVDPlusPlus.scala | 107 ++++++++++++++ .../graphx/lib/StronglyConnectedComponents.scala | 88 ++++++++++++ .../apache/spark/graphx/lib/TriangleCount.scala | 78 ++++++++++ .../org/apache/spark/graphx/lib/package.scala | 8 ++ .../algorithms/ConnectedComponentsSuite.scala | 83 ----------- .../spark/graphx/algorithms/PageRankSuite.scala | 119 --------------- .../spark/graphx/algorithms/SVDPlusPlusSuite.scala | 31 ---- .../StronglyConnectedComponentsSuite.scala | 57 -------- .../graphx/algorithms/TriangleCountSuite.scala | 73 ---------- .../graphx/lib/ConnectedComponentsSuite.scala | 83 +++++++++++ .../apache/spark/graphx/lib/PageRankSuite.scala | 119 +++++++++++++++ .../apache/spark/graphx/lib/SVDPlusPlusSuite.scala | 31 ++++ .../lib/StronglyConnectedComponentsSuite.scala | 57 ++++++++ .../spark/graphx/lib/TriangleCountSuite.scala | 73 ++++++++++ 27 files changed, 1068 insertions(+), 1068 deletions(-) delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/algorithms/Algorithms.scala delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/algorithms/Analytics.scala delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/algorithms/package.scala create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/Algorithms.scala create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/package.scala delete mode 100644 graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala delete mode 100644 graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala delete mode 100644 graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala delete mode 100644 graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala delete mode 100644 graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 64d44c3e06..56513cac20 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -16,8 +16,8 @@ import org.apache.spark.storage.StorageLevel * operations return new graphs. * * @note [[GraphOps]] contains additional convenience operations. - * [[algorithms.Algorithms]] contains graph algorithms; to access these, - * import `org.apache.spark.graphx.algorithms._`. + * [[lib.Algorithms]] contains graph algorithms; to access these, + * import `org.apache.spark.graphx.lib._`. * * @tparam VD the vertex attribute type * @tparam ED the edge attribute type diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/Algorithms.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/Algorithms.scala deleted file mode 100644 index 558d8da310..0000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/Algorithms.scala +++ /dev/null @@ -1,66 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import scala.reflect.ClassTag - -import org.apache.spark.graphx._ - -/** - * Provides graph algorithms directly on [[org.apache.spark.graphx.Graph]] via an implicit - * conversion. - * @example - * {{{ - * import org.apache.spark.graph.algorithms._ - * val graph: Graph[_, _] = loadGraph() - * graph.connectedComponents() - * }}} - */ -class Algorithms[VD: ClassTag, ED: ClassTag](self: Graph[VD, ED]) { - /** - * Run a dynamic version of PageRank returning a graph with vertex attributes containing the - * PageRank and edge attributes containing the normalized edge weight. - * - * @see [[org.apache.spark.graphx.algorithms.PageRank]], method `runUntilConvergence`. - */ - def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = { - PageRank.runUntilConvergence(self, tol, resetProb) - } - - /** - * Run PageRank for a fixed number of iterations returning a graph with vertex attributes - * containing the PageRank and edge attributes the normalized edge weight. - * - * @see [[org.apache.spark.graphx.algorithms.PageRank]], method `run`. - */ - def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = { - PageRank.run(self, numIter, resetProb) - } - - /** - * Compute the connected component membership of each vertex and return a graph with the vertex - * value containing the lowest vertex id in the connected component containing that vertex. - * - * @see [[org.apache.spark.graphx.algorithms.ConnectedComponents]] - */ - def connectedComponents(): Graph[VertexID, ED] = { - ConnectedComponents.run(self) - } - - /** - * Compute the number of triangles passing through each vertex. - * - * @see [[org.apache.spark.graphx.algorithms.TriangleCount]] - */ - def triangleCount(): Graph[Int, ED] = { - TriangleCount.run(self) - } - - /** - * Compute the strongly connected component (SCC) of each vertex and return a graph with the - * vertex value containing the lowest vertex id in the SCC containing that vertex. - * - * @see [[org.apache.spark.graphx.algorithms.StronglyConnectedComponents]] - */ - def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = { - StronglyConnectedComponents.run(self, numIter) - } -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/Analytics.scala deleted file mode 100644 index f09685402a..0000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/Analytics.scala +++ /dev/null @@ -1,157 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import org.apache.spark._ -import org.apache.spark.graphx._ - -/** - * Driver program for running graph algorithms. - */ -object Analytics extends Logging { - - def main(args: Array[String]) = { - val host = args(0) - val taskType = args(1) - val fname = args(2) - val options = args.drop(3).map { arg => - arg.dropWhile(_ == '-').split('=') match { - case Array(opt, v) => (opt -> v) - case _ => throw new IllegalArgumentException("Invalid argument: " + arg) - } - } - - def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = { - loggers.map{ - loggerName => - val logger = org.apache.log4j.Logger.getLogger(loggerName) - val prevLevel = logger.getLevel() - logger.setLevel(level) - loggerName -> prevLevel - }.toMap - } - - def pickPartitioner(v: String): PartitionStrategy = { - v match { - case "RandomVertexCut" => RandomVertexCut - case "EdgePartition1D" => EdgePartition1D - case "EdgePartition2D" => EdgePartition2D - case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut - case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v) - } - } - val serializer = "org.apache.spark.serializer.KryoSerializer" - System.setProperty("spark.serializer", serializer) - System.setProperty("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") - - taskType match { - case "pagerank" => { - - var tol:Float = 0.001F - var outFname = "" - var numVPart = 4 - var numEPart = 4 - var partitionStrategy: Option[PartitionStrategy] = None - - options.foreach{ - case ("tol", v) => tol = v.toFloat - case ("output", v) => outFname = v - case ("numVPart", v) => numVPart = v.toInt - case ("numEPart", v) => numEPart = v.toInt - case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - println("======================================") - println("| PageRank |") - println("======================================") - - val sc = new SparkContext(host, "PageRank(" + fname + ")") - - val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart).cache() - val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) - - println("GRAPHX: Number of vertices " + graph.vertices.count) - println("GRAPHX: Number of edges " + graph.edges.count) - - val pr = graph.pageRank(tol).vertices.cache() - - println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_)) - - if (!outFname.isEmpty) { - logWarning("Saving pageranks of pages to " + outFname) - pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) - } - - sc.stop() - } - - case "cc" => { - - var numIter = Int.MaxValue - var numVPart = 4 - var numEPart = 4 - var isDynamic = false - var partitionStrategy: Option[PartitionStrategy] = None - - options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean - case ("numEPart", v) => numEPart = v.toInt - case ("numVPart", v) => numVPart = v.toInt - case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - if(!isDynamic && numIter == Int.MaxValue) { - println("Set number of iterations!") - sys.exit(1) - } - println("======================================") - println("| Connected Components |") - println("--------------------------------------") - println(" Using parameters:") - println(" \tDynamic: " + isDynamic) - println(" \tNumIter: " + numIter) - println("======================================") - - val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") - val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart).cache() - val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) - - val cc = ConnectedComponents.run(graph) - println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) - sc.stop() - } - - case "triangles" => { - var numVPart = 4 - var numEPart = 4 - // TriangleCount requires the graph to be partitioned - var partitionStrategy: PartitionStrategy = RandomVertexCut - - options.foreach{ - case ("numEPart", v) => numEPart = v.toInt - case ("numVPart", v) => numVPart = v.toInt - case ("partStrategy", v) => partitionStrategy = pickPartitioner(v) - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - println("======================================") - println("| Triangle Count |") - println("--------------------------------------") - val sc = new SparkContext(host, "TriangleCount(" + fname + ")") - val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, - minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() - val triangles = TriangleCount.run(graph) - println("Triangles: " + triangles.vertices.map { - case (vid,data) => data.toLong - }.reduce(_+_) / 3) - sc.stop() - } - - case _ => { - println("Invalid task type.") - } - } - } -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala deleted file mode 100644 index 137a81f4d5..0000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala +++ /dev/null @@ -1,39 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import scala.reflect.ClassTag - -import org.apache.spark.graphx._ - - -object ConnectedComponents { - /** - * Compute the connected component membership of each vertex and return a graph with the vertex - * value containing the lowest vertex id in the connected component containing that vertex. - * - * @tparam VD the vertex attribute type (discarded in the computation) - * @tparam ED the edge attribute type (preserved in the computation) - * - * @param graph the graph for which to compute the connected components - * - * @return a graph with vertex attributes containing the smallest vertex in each - * connected component - */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = { - val ccGraph = graph.mapVertices { case (vid, _) => vid } - - def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { - if (edge.srcAttr < edge.dstAttr) { - Iterator((edge.dstId, edge.srcAttr)) - } else if (edge.srcAttr > edge.dstAttr) { - Iterator((edge.srcId, edge.dstAttr)) - } else { - Iterator.empty - } - } - val initialMessage = Long.MaxValue - Pregel(ccGraph, initialMessage)( - vprog = (id, attr, msg) => math.min(attr, msg), - sendMsg = sendMessage, - mergeMsg = (a, b) => math.min(a, b)) - } // end of connectedComponents -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala deleted file mode 100644 index ab447d5422..0000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala +++ /dev/null @@ -1,160 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import scala.reflect.ClassTag - -import org.apache.spark.Logging -import org.apache.spark.graphx._ - - -object PageRank extends Logging { - - /** - * Run PageRank for a fixed number of iterations returning a graph - * with vertex attributes containing the PageRank and edge - * attributes the normalized edge weight. - * - * The following PageRank fixed point is computed for each vertex. - * - * {{{ - * var PR = Array.fill(n)( 1.0 ) - * val oldPR = Array.fill(n)( 1.0 ) - * for( iter <- 0 until numIter ) { - * swap(oldPR, PR) - * for( i <- 0 until n ) { - * PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum - * } - * } - * }}} - * - * where `alpha` is the random reset probability (typically 0.15), - * `inNbrs[i]` is the set of neighbors whick link to `i` and - * `outDeg[j]` is the out degree of vertex `j`. - * - * Note that this is not the "normalized" PageRank and as a consequence pages that have no - * inlinks will have a PageRank of alpha. - * - * @tparam VD the original vertex attribute (not used) - * @tparam ED the original edge attribute (not used) - * - * @param graph the graph on which to compute PageRank - * @param numIter the number of iterations of PageRank to run - * @param resetProb the random reset probability (alpha) - * - * @return the graph containing with each vertex containing the PageRank and each edge - * containing the normalized weight. - * - */ - def run[VD: ClassTag, ED: ClassTag]( - graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = - { - - /** - * Initialize the pagerankGraph with each edge attribute having - * weight 1/outDegree and each vertex with attribute 1.0. - */ - val pagerankGraph: Graph[Double, Double] = graph - // Associate the degree with each vertex - .outerJoinVertices(graph.outDegrees){ - (vid, vdata, deg) => deg.getOrElse(0) - } - // Set the weight on the edges based on the degree - .mapTriplets( e => 1.0 / e.srcAttr ) - // Set the vertex attributes to the initial pagerank values - .mapVertices( (id, attr) => 1.0 ) - .cache() - - // Display statistics about pagerank - logInfo(pagerankGraph.statistics.toString) - - // Define the three functions needed to implement PageRank in the GraphX - // version of Pregel - def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = - resetProb + (1.0 - resetProb) * msgSum - def sendMessage(edge: EdgeTriplet[Double, Double]) = - Iterator((edge.dstId, edge.srcAttr * edge.attr)) - def messageCombiner(a: Double, b: Double): Double = a + b - // The initial message received by all vertices in PageRank - val initialMessage = 0.0 - - // Execute pregel for a fixed number of iterations. - Pregel(pagerankGraph, initialMessage, numIter)( - vertexProgram, sendMessage, messageCombiner) - } - - /** - * Run a dynamic version of PageRank returning a graph with vertex attributes containing the - * PageRank and edge attributes containing the normalized edge weight. - * - * {{{ - * var PR = Array.fill(n)( 1.0 ) - * val oldPR = Array.fill(n)( 0.0 ) - * while( max(abs(PR - oldPr)) > tol ) { - * swap(oldPR, PR) - * for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) { - * PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum - * } - * } - * }}} - * - * where `alpha` is the random reset probability (typically 0.15), `inNbrs[i]` is the set of - * neighbors whick link to `i` and `outDeg[j]` is the out degree of vertex `j`. - * - * Note that this is not the "normalized" PageRank and as a consequence pages that have no - * inlinks will have a PageRank of alpha. - * - * @tparam VD the original vertex attribute (not used) - * @tparam ED the original edge attribute (not used) - * - * @param graph the graph on which to compute PageRank - * @param tol the tolerance allowed at convergence (smaller => more * accurate). - * @param resetProb the random reset probability (alpha) - * - * @return the graph containing with each vertex containing the PageRank and each edge - * containing the normalized weight. - */ - def runUntilConvergence[VD: ClassTag, ED: ClassTag]( - graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = - { - // Initialize the pagerankGraph with each edge attribute - // having weight 1/outDegree and each vertex with attribute 1.0. - val pagerankGraph: Graph[(Double, Double), Double] = graph - // Associate the degree with each vertex - .outerJoinVertices(graph.outDegrees) { - (vid, vdata, deg) => deg.getOrElse(0) - } - // Set the weight on the edges based on the degree - .mapTriplets( e => 1.0 / e.srcAttr ) - // Set the vertex attributes to (initalPR, delta = 0) - .mapVertices( (id, attr) => (0.0, 0.0) ) - .cache() - - // Display statistics about pagerank - logInfo(pagerankGraph.statistics.toString) - - // Define the three functions needed to implement PageRank in the GraphX - // version of Pregel - def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = { - val (oldPR, lastDelta) = attr - val newPR = oldPR + (1.0 - resetProb) * msgSum - (newPR, newPR - oldPR) - } - - def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { - if (edge.srcAttr._2 > tol) { - Iterator((edge.dstId, edge.srcAttr._2 * edge.attr)) - } else { - Iterator.empty - } - } - - def messageCombiner(a: Double, b: Double): Double = a + b - - // The initial message received by all vertices in PageRank - val initialMessage = resetProb / (1.0 - resetProb) - - // Execute a dynamic version of Pregel. - Pregel(pagerankGraph, initialMessage)(vertexProgram, sendMessage, messageCombiner) - .mapVertices((vid, attr) => attr._1) - } // end of deltaPageRank - -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala deleted file mode 100644 index 2a13553d79..0000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala +++ /dev/null @@ -1,107 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import org.apache.spark.rdd._ -import org.apache.spark.graphx._ -import scala.util.Random -import org.apache.commons.math.linear._ - -class SVDPlusPlusConf( // SVDPlusPlus parameters - var rank: Int, - var maxIters: Int, - var minVal: Double, - var maxVal: Double, - var gamma1: Double, - var gamma2: Double, - var gamma6: Double, - var gamma7: Double) extends Serializable - -object SVDPlusPlus { - /** - * Implement SVD++ based on "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model", - * paper is available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]]. - * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)), see the details on page 6. - * - * @param edges edges for constructing the graph - * - * @param conf SVDPlusPlus parameters - * - * @return a graph with vertex attributes containing the trained model - */ - - def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf): (Graph[(RealVector, RealVector, Double, Double), Double], Double) = { - - // generate default vertex attribute - def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = { - val v1 = new ArrayRealVector(rank) - val v2 = new ArrayRealVector(rank) - for (i <- 0 until rank) { - v1.setEntry(i, Random.nextDouble) - v2.setEntry(i, Random.nextDouble) - } - (v1, v2, 0.0, 0.0) - } - - // calculate global rating mean - edges.cache() - val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) - val u = rs / rc - - // construct graph - var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache() - - // calculate initial bias and norm - var t0 = g.mapReduceTriplets(et => - Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2)) - g = g.outerJoinVertices(t0) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => - (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) - } - - def mapTrainF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) - : Iterator[(VertexID, (RealVector, RealVector, Double))] = { - val (usr, itm) = (et.srcAttr, et.dstAttr) - val (p, q) = (usr._1, itm._1) - var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) - pred = math.max(pred, conf.minVal) - pred = math.min(pred, conf.maxVal) - val err = et.attr - pred - val updateP = ((q.mapMultiply(err)).subtract(p.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) - val updateQ = ((usr._2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) - val updateY = ((q.mapMultiply(err * usr._4)).subtract((itm._2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) - Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)), - (et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1))) - } - - for (i <- 0 until conf.maxIters) { - // phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes - g.cache() - var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2)) - g = g.outerJoinVertices(t1) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => - if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd - } - // phase 2, update p for user nodes and q, y for item nodes - g.cache() - val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) => - (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3)) - g = g.outerJoinVertices(t2) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) => - (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4) - } - } - - // calculate error on training set - def mapTestF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(VertexID, Double)] = { - val (usr, itm) = (et.srcAttr, et.dstAttr) - val (p, q) = (usr._1, itm._1) - var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) - pred = math.max(pred, conf.minVal) - pred = math.min(pred, conf.maxVal) - val err = (et.attr - pred) * (et.attr - pred) - Iterator((et.dstId, err)) - } - g.cache() - val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) - g = g.outerJoinVertices(t3) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => - if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd - } - (g, u) - } -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala deleted file mode 100644 index 864f0ec57c..0000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala +++ /dev/null @@ -1,88 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import scala.reflect.ClassTag - -import org.apache.spark.graphx._ - -object StronglyConnectedComponents { - - /** - * Compute the strongly connected component (SCC) of each vertex and return a graph with the - * vertex value containing the lowest vertex id in the SCC containing that vertex. - * - * @tparam VD the vertex attribute type (discarded in the computation) - * @tparam ED the edge attribute type (preserved in the computation) - * - * @param graph the graph for which to compute the SCC - * - * @return a graph with vertex attributes containing the smallest vertex id in each SCC - */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = { - - // the graph we update with final SCC ids, and the graph we return at the end - var sccGraph = graph.mapVertices { case (vid, _) => vid } - // graph we are going to work with in our iterations - var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) }.cache() - - var numVertices = sccWorkGraph.numVertices - var iter = 0 - while (sccWorkGraph.numVertices > 0 && iter < numIter) { - iter += 1 - do { - numVertices = sccWorkGraph.numVertices - sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.outDegrees) { - (vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true) - }.outerJoinVertices(sccWorkGraph.inDegrees) { - (vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true) - }.cache() - - // get all vertices to be removed - val finalVertices = sccWorkGraph.vertices - .filter { case (vid, (scc, isFinal)) => isFinal} - .mapValues { (vid, data) => data._1} - - // write values to sccGraph - sccGraph = sccGraph.outerJoinVertices(finalVertices) { - (vid, scc, opt) => opt.getOrElse(scc) - } - // only keep vertices that are not final - sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2).cache() - } while (sccWorkGraph.numVertices < numVertices) - - sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) } - - // collect min of all my neighbor's scc values, update if it's smaller than mine - // then notify any neighbors with scc values larger than mine - sccWorkGraph = GraphLab[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Integer.MAX_VALUE)( - (vid, e) => e.otherVertexAttr(vid)._1, - (vid1, vid2) => math.min(vid1, vid2), - (vid, scc, optScc) => - (math.min(scc._1, optScc.getOrElse(scc._1)), scc._2), - (vid, e) => e.vertexAttr(vid)._1 < e.otherVertexAttr(vid)._1 - ) - - // start at root of SCCs. Traverse values in reverse, notify all my neighbors - // do not propagate if colors do not match! - sccWorkGraph = GraphLab[(VertexID, Boolean), ED, Boolean]( - sccWorkGraph, - Integer.MAX_VALUE, - EdgeDirection.Out, - EdgeDirection.In - )( - // vertex is final if it is the root of a color - // or it has the same color as a neighbor that is final - (vid, e) => (vid == e.vertexAttr(vid)._1) || (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1), - (final1, final2) => final1 || final2, - (vid, scc, optFinal) => - (scc._1, scc._2 || optFinal.getOrElse(false)), - // activate neighbor if they are not final, you are, and you have the same color - (vid, e) => e.vertexAttr(vid)._2 && - !e.otherVertexAttr(vid)._2 && (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1), - // start at root of colors - (vid, data) => vid == data._1 - ) - } - sccGraph - } - -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala deleted file mode 100644 index b5a93c1bd1..0000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala +++ /dev/null @@ -1,78 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import scala.reflect.ClassTag - -import org.apache.spark.graphx._ - - -object TriangleCount { - /** - * Compute the number of triangles passing through each vertex. - * - * The algorithm is relatively straightforward and can be computed in three steps: - * - * 1) Compute the set of neighbors for each vertex - * 2) For each edge compute the intersection of the sets and send the - * count to both vertices. - * 3) Compute the sum at each vertex and divide by two since each - * triangle is counted twice. - * - * - * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned - * using Graph.partitionBy. - * - * @return - */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = { - // Remove redundant edges - val g = graph.groupEdges((a, b) => a).cache - - // Construct set representations of the neighborhoods - val nbrSets: VertexRDD[VertexSet] = - g.collectNeighborIds(EdgeDirection.Both).mapValues { (vid, nbrs) => - val set = new VertexSet(4) - var i = 0 - while (i < nbrs.size) { - // prevent self cycle - if(nbrs(i) != vid) { - set.add(nbrs(i)) - } - i += 1 - } - set - } - // join the sets with the graph - val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) { - (vid, _, optSet) => optSet.getOrElse(null) - } - // Edge function computes intersection of smaller vertex with larger vertex - def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = { - assert(et.srcAttr != null) - assert(et.dstAttr != null) - val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) { - (et.srcAttr, et.dstAttr) - } else { - (et.dstAttr, et.srcAttr) - } - val iter = smallSet.iterator - var counter: Int = 0 - while (iter.hasNext) { - val vid = iter.next - if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { counter += 1 } - } - Iterator((et.srcId, counter), (et.dstId, counter)) - } - // compute the intersection along edges - val counters: VertexRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _) - // Merge counters with the graph and divide by two since each triangle is counted twice - g.outerJoinVertices(counters) { - (vid, _, optCounter: Option[Int]) => - val dblCount = optCounter.getOrElse(0) - // double count should be even (divisible by two) - assert((dblCount & 1) == 0) - dblCount / 2 - } - - } // end of TriangleCount - -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/package.scala deleted file mode 100644 index fbabf1257c..0000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/package.scala +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.spark.graphx - -import scala.reflect.ClassTag - -package object algorithms { - implicit def graphToAlgorithms[VD: ClassTag, ED: ClassTag]( - graph: Graph[VD, ED]): Algorithms[VD, ED] = new Algorithms(graph) -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Algorithms.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Algorithms.scala new file mode 100644 index 0000000000..cbcd9c24a0 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Algorithms.scala @@ -0,0 +1,66 @@ +package org.apache.spark.graphx.lib + +import scala.reflect.ClassTag + +import org.apache.spark.graphx._ + +/** + * Provides graph algorithms directly on [[org.apache.spark.graphx.Graph]] via an implicit + * conversion. + * @example + * {{{ + * import org.apache.spark.graph.lib._ + * val graph: Graph[_, _] = loadGraph() + * graph.connectedComponents() + * }}} + */ +class Algorithms[VD: ClassTag, ED: ClassTag](self: Graph[VD, ED]) { + /** + * Run a dynamic version of PageRank returning a graph with vertex attributes containing the + * PageRank and edge attributes containing the normalized edge weight. + * + * @see [[org.apache.spark.graphx.lib.PageRank]], method `runUntilConvergence`. + */ + def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = { + PageRank.runUntilConvergence(self, tol, resetProb) + } + + /** + * Run PageRank for a fixed number of iterations returning a graph with vertex attributes + * containing the PageRank and edge attributes the normalized edge weight. + * + * @see [[org.apache.spark.graphx.lib.PageRank]], method `run`. + */ + def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = { + PageRank.run(self, numIter, resetProb) + } + + /** + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the lowest vertex id in the connected component containing that vertex. + * + * @see [[org.apache.spark.graphx.lib.ConnectedComponents]] + */ + def connectedComponents(): Graph[VertexID, ED] = { + ConnectedComponents.run(self) + } + + /** + * Compute the number of triangles passing through each vertex. + * + * @see [[org.apache.spark.graphx.lib.TriangleCount]] + */ + def triangleCount(): Graph[Int, ED] = { + TriangleCount.run(self) + } + + /** + * Compute the strongly connected component (SCC) of each vertex and return a graph with the + * vertex value containing the lowest vertex id in the SCC containing that vertex. + * + * @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents]] + */ + def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = { + StronglyConnectedComponents.run(self, numIter) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala new file mode 100644 index 0000000000..191e2aa051 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -0,0 +1,157 @@ +package org.apache.spark.graphx.lib + +import org.apache.spark._ +import org.apache.spark.graphx._ + +/** + * Driver program for running graph algorithms. + */ +object Analytics extends Logging { + + def main(args: Array[String]) = { + val host = args(0) + val taskType = args(1) + val fname = args(2) + val options = args.drop(3).map { arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + + def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = { + loggers.map{ + loggerName => + val logger = org.apache.log4j.Logger.getLogger(loggerName) + val prevLevel = logger.getLevel() + logger.setLevel(level) + loggerName -> prevLevel + }.toMap + } + + def pickPartitioner(v: String): PartitionStrategy = { + v match { + case "RandomVertexCut" => RandomVertexCut + case "EdgePartition1D" => EdgePartition1D + case "EdgePartition2D" => EdgePartition2D + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut + case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v) + } + } + val serializer = "org.apache.spark.serializer.KryoSerializer" + System.setProperty("spark.serializer", serializer) + System.setProperty("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + + taskType match { + case "pagerank" => { + + var tol:Float = 0.001F + var outFname = "" + var numVPart = 4 + var numEPart = 4 + var partitionStrategy: Option[PartitionStrategy] = None + + options.foreach{ + case ("tol", v) => tol = v.toFloat + case ("output", v) => outFname = v + case ("numVPart", v) => numVPart = v.toInt + case ("numEPart", v) => numEPart = v.toInt + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| PageRank |") + println("======================================") + + val sc = new SparkContext(host, "PageRank(" + fname + ")") + + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + println("GRAPHX: Number of vertices " + graph.vertices.count) + println("GRAPHX: Number of edges " + graph.edges.count) + + val pr = graph.pageRank(tol).vertices.cache() + + println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_)) + + if (!outFname.isEmpty) { + logWarning("Saving pageranks of pages to " + outFname) + pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) + } + + sc.stop() + } + + case "cc" => { + + var numIter = Int.MaxValue + var numVPart = 4 + var numEPart = 4 + var isDynamic = false + var partitionStrategy: Option[PartitionStrategy] = None + + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case ("numEPart", v) => numEPart = v.toInt + case ("numVPart", v) => numVPart = v.toInt + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + if(!isDynamic && numIter == Int.MaxValue) { + println("Set number of iterations!") + sys.exit(1) + } + println("======================================") + println("| Connected Components |") + println("--------------------------------------") + println(" Using parameters:") + println(" \tDynamic: " + isDynamic) + println(" \tNumIter: " + numIter) + println("======================================") + + val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + val cc = ConnectedComponents.run(graph) + println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) + sc.stop() + } + + case "triangles" => { + var numVPart = 4 + var numEPart = 4 + // TriangleCount requires the graph to be partitioned + var partitionStrategy: PartitionStrategy = RandomVertexCut + + options.foreach{ + case ("numEPart", v) => numEPart = v.toInt + case ("numVPart", v) => numVPart = v.toInt + case ("partStrategy", v) => partitionStrategy = pickPartitioner(v) + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + println("======================================") + println("| Triangle Count |") + println("--------------------------------------") + val sc = new SparkContext(host, "TriangleCount(" + fname + ")") + val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, + minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() + val triangles = TriangleCount.run(graph) + println("Triangles: " + triangles.vertices.map { + case (vid,data) => data.toLong + }.reduce(_+_) / 3) + sc.stop() + } + + case _ => { + println("Invalid task type.") + } + } + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala new file mode 100644 index 0000000000..4a83e2dbb8 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -0,0 +1,39 @@ +package org.apache.spark.graphx.lib + +import scala.reflect.ClassTag + +import org.apache.spark.graphx._ + + +object ConnectedComponents { + /** + * Compute the connected component membership of each vertex and return a graph with the vertex + * value containing the lowest vertex id in the connected component containing that vertex. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * + * @param graph the graph for which to compute the connected components + * + * @return a graph with vertex attributes containing the smallest vertex in each + * connected component + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = { + val ccGraph = graph.mapVertices { case (vid, _) => vid } + + def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { + if (edge.srcAttr < edge.dstAttr) { + Iterator((edge.dstId, edge.srcAttr)) + } else if (edge.srcAttr > edge.dstAttr) { + Iterator((edge.srcId, edge.dstAttr)) + } else { + Iterator.empty + } + } + val initialMessage = Long.MaxValue + Pregel(ccGraph, initialMessage)( + vprog = (id, attr, msg) => math.min(attr, msg), + sendMsg = sendMessage, + mergeMsg = (a, b) => math.min(a, b)) + } // end of connectedComponents +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala new file mode 100644 index 0000000000..809b6d0855 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -0,0 +1,160 @@ +package org.apache.spark.graphx.lib + +import scala.reflect.ClassTag + +import org.apache.spark.Logging +import org.apache.spark.graphx._ + + +object PageRank extends Logging { + + /** + * Run PageRank for a fixed number of iterations returning a graph + * with vertex attributes containing the PageRank and edge + * attributes the normalized edge weight. + * + * The following PageRank fixed point is computed for each vertex. + * + * {{{ + * var PR = Array.fill(n)( 1.0 ) + * val oldPR = Array.fill(n)( 1.0 ) + * for( iter <- 0 until numIter ) { + * swap(oldPR, PR) + * for( i <- 0 until n ) { + * PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum + * } + * } + * }}} + * + * where `alpha` is the random reset probability (typically 0.15), + * `inNbrs[i]` is the set of neighbors whick link to `i` and + * `outDeg[j]` is the out degree of vertex `j`. + * + * Note that this is not the "normalized" PageRank and as a consequence pages that have no + * inlinks will have a PageRank of alpha. + * + * @tparam VD the original vertex attribute (not used) + * @tparam ED the original edge attribute (not used) + * + * @param graph the graph on which to compute PageRank + * @param numIter the number of iterations of PageRank to run + * @param resetProb the random reset probability (alpha) + * + * @return the graph containing with each vertex containing the PageRank and each edge + * containing the normalized weight. + * + */ + def run[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = + { + + /** + * Initialize the pagerankGraph with each edge attribute having + * weight 1/outDegree and each vertex with attribute 1.0. + */ + val pagerankGraph: Graph[Double, Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees){ + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr ) + // Set the vertex attributes to the initial pagerank values + .mapVertices( (id, attr) => 1.0 ) + .cache() + + // Display statistics about pagerank + logInfo(pagerankGraph.statistics.toString) + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = + resetProb + (1.0 - resetProb) * msgSum + def sendMessage(edge: EdgeTriplet[Double, Double]) = + Iterator((edge.dstId, edge.srcAttr * edge.attr)) + def messageCombiner(a: Double, b: Double): Double = a + b + // The initial message received by all vertices in PageRank + val initialMessage = 0.0 + + // Execute pregel for a fixed number of iterations. + Pregel(pagerankGraph, initialMessage, numIter)( + vertexProgram, sendMessage, messageCombiner) + } + + /** + * Run a dynamic version of PageRank returning a graph with vertex attributes containing the + * PageRank and edge attributes containing the normalized edge weight. + * + * {{{ + * var PR = Array.fill(n)( 1.0 ) + * val oldPR = Array.fill(n)( 0.0 ) + * while( max(abs(PR - oldPr)) > tol ) { + * swap(oldPR, PR) + * for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) { + * PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum + * } + * } + * }}} + * + * where `alpha` is the random reset probability (typically 0.15), `inNbrs[i]` is the set of + * neighbors whick link to `i` and `outDeg[j]` is the out degree of vertex `j`. + * + * Note that this is not the "normalized" PageRank and as a consequence pages that have no + * inlinks will have a PageRank of alpha. + * + * @tparam VD the original vertex attribute (not used) + * @tparam ED the original edge attribute (not used) + * + * @param graph the graph on which to compute PageRank + * @param tol the tolerance allowed at convergence (smaller => more * accurate). + * @param resetProb the random reset probability (alpha) + * + * @return the graph containing with each vertex containing the PageRank and each edge + * containing the normalized weight. + */ + def runUntilConvergence[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = + { + // Initialize the pagerankGraph with each edge attribute + // having weight 1/outDegree and each vertex with attribute 1.0. + val pagerankGraph: Graph[(Double, Double), Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees) { + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr ) + // Set the vertex attributes to (initalPR, delta = 0) + .mapVertices( (id, attr) => (0.0, 0.0) ) + .cache() + + // Display statistics about pagerank + logInfo(pagerankGraph.statistics.toString) + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = { + val (oldPR, lastDelta) = attr + val newPR = oldPR + (1.0 - resetProb) * msgSum + (newPR, newPR - oldPR) + } + + def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { + if (edge.srcAttr._2 > tol) { + Iterator((edge.dstId, edge.srcAttr._2 * edge.attr)) + } else { + Iterator.empty + } + } + + def messageCombiner(a: Double, b: Double): Double = a + b + + // The initial message received by all vertices in PageRank + val initialMessage = resetProb / (1.0 - resetProb) + + // Execute a dynamic version of Pregel. + Pregel(pagerankGraph, initialMessage)(vertexProgram, sendMessage, messageCombiner) + .mapVertices((vid, attr) => attr._1) + } // end of deltaPageRank + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala new file mode 100644 index 0000000000..f5570daec1 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -0,0 +1,107 @@ +package org.apache.spark.graphx.lib + +import org.apache.spark.rdd._ +import org.apache.spark.graphx._ +import scala.util.Random +import org.apache.commons.math.linear._ + +class SVDPlusPlusConf( // SVDPlusPlus parameters + var rank: Int, + var maxIters: Int, + var minVal: Double, + var maxVal: Double, + var gamma1: Double, + var gamma2: Double, + var gamma6: Double, + var gamma7: Double) extends Serializable + +object SVDPlusPlus { + /** + * Implement SVD++ based on "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model", + * paper is available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]]. + * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)), see the details on page 6. + * + * @param edges edges for constructing the graph + * + * @param conf SVDPlusPlus parameters + * + * @return a graph with vertex attributes containing the trained model + */ + + def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf): (Graph[(RealVector, RealVector, Double, Double), Double], Double) = { + + // generate default vertex attribute + def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = { + val v1 = new ArrayRealVector(rank) + val v2 = new ArrayRealVector(rank) + for (i <- 0 until rank) { + v1.setEntry(i, Random.nextDouble) + v2.setEntry(i, Random.nextDouble) + } + (v1, v2, 0.0, 0.0) + } + + // calculate global rating mean + edges.cache() + val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) + val u = rs / rc + + // construct graph + var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache() + + // calculate initial bias and norm + var t0 = g.mapReduceTriplets(et => + Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2)) + g = g.outerJoinVertices(t0) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => + (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) + } + + def mapTrainF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) + : Iterator[(VertexID, (RealVector, RealVector, Double))] = { + val (usr, itm) = (et.srcAttr, et.dstAttr) + val (p, q) = (usr._1, itm._1) + var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) + pred = math.max(pred, conf.minVal) + pred = math.min(pred, conf.maxVal) + val err = et.attr - pred + val updateP = ((q.mapMultiply(err)).subtract(p.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + val updateQ = ((usr._2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + val updateY = ((q.mapMultiply(err * usr._4)).subtract((itm._2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)), + (et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1))) + } + + for (i <- 0 until conf.maxIters) { + // phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes + g.cache() + var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2)) + g = g.outerJoinVertices(t1) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => + if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd + } + // phase 2, update p for user nodes and q, y for item nodes + g.cache() + val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) => + (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3)) + g = g.outerJoinVertices(t2) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) => + (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4) + } + } + + // calculate error on training set + def mapTestF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(VertexID, Double)] = { + val (usr, itm) = (et.srcAttr, et.dstAttr) + val (p, q) = (usr._1, itm._1) + var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) + pred = math.max(pred, conf.minVal) + pred = math.min(pred, conf.maxVal) + val err = (et.attr - pred) * (et.attr - pred) + Iterator((et.dstId, err)) + } + g.cache() + val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) + g = g.outerJoinVertices(t3) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => + if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd + } + (g, u) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala new file mode 100644 index 0000000000..9bd227309a --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala @@ -0,0 +1,88 @@ +package org.apache.spark.graphx.lib + +import scala.reflect.ClassTag + +import org.apache.spark.graphx._ + +object StronglyConnectedComponents { + + /** + * Compute the strongly connected component (SCC) of each vertex and return a graph with the + * vertex value containing the lowest vertex id in the SCC containing that vertex. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * + * @param graph the graph for which to compute the SCC + * + * @return a graph with vertex attributes containing the smallest vertex id in each SCC + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = { + + // the graph we update with final SCC ids, and the graph we return at the end + var sccGraph = graph.mapVertices { case (vid, _) => vid } + // graph we are going to work with in our iterations + var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) }.cache() + + var numVertices = sccWorkGraph.numVertices + var iter = 0 + while (sccWorkGraph.numVertices > 0 && iter < numIter) { + iter += 1 + do { + numVertices = sccWorkGraph.numVertices + sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.outDegrees) { + (vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true) + }.outerJoinVertices(sccWorkGraph.inDegrees) { + (vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true) + }.cache() + + // get all vertices to be removed + val finalVertices = sccWorkGraph.vertices + .filter { case (vid, (scc, isFinal)) => isFinal} + .mapValues { (vid, data) => data._1} + + // write values to sccGraph + sccGraph = sccGraph.outerJoinVertices(finalVertices) { + (vid, scc, opt) => opt.getOrElse(scc) + } + // only keep vertices that are not final + sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2).cache() + } while (sccWorkGraph.numVertices < numVertices) + + sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) } + + // collect min of all my neighbor's scc values, update if it's smaller than mine + // then notify any neighbors with scc values larger than mine + sccWorkGraph = GraphLab[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Integer.MAX_VALUE)( + (vid, e) => e.otherVertexAttr(vid)._1, + (vid1, vid2) => math.min(vid1, vid2), + (vid, scc, optScc) => + (math.min(scc._1, optScc.getOrElse(scc._1)), scc._2), + (vid, e) => e.vertexAttr(vid)._1 < e.otherVertexAttr(vid)._1 + ) + + // start at root of SCCs. Traverse values in reverse, notify all my neighbors + // do not propagate if colors do not match! + sccWorkGraph = GraphLab[(VertexID, Boolean), ED, Boolean]( + sccWorkGraph, + Integer.MAX_VALUE, + EdgeDirection.Out, + EdgeDirection.In + )( + // vertex is final if it is the root of a color + // or it has the same color as a neighbor that is final + (vid, e) => (vid == e.vertexAttr(vid)._1) || (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1), + (final1, final2) => final1 || final2, + (vid, scc, optFinal) => + (scc._1, scc._2 || optFinal.getOrElse(false)), + // activate neighbor if they are not final, you are, and you have the same color + (vid, e) => e.vertexAttr(vid)._2 && + !e.otherVertexAttr(vid)._2 && (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1), + // start at root of colors + (vid, data) => vid == data._1 + ) + } + sccGraph + } + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala new file mode 100644 index 0000000000..c6b1c736dd --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -0,0 +1,78 @@ +package org.apache.spark.graphx.lib + +import scala.reflect.ClassTag + +import org.apache.spark.graphx._ + + +object TriangleCount { + /** + * Compute the number of triangles passing through each vertex. + * + * The algorithm is relatively straightforward and can be computed in three steps: + * + * 1) Compute the set of neighbors for each vertex + * 2) For each edge compute the intersection of the sets and send the + * count to both vertices. + * 3) Compute the sum at each vertex and divide by two since each + * triangle is counted twice. + * + * + * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned + * using Graph.partitionBy. + * + * @return + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = { + // Remove redundant edges + val g = graph.groupEdges((a, b) => a).cache + + // Construct set representations of the neighborhoods + val nbrSets: VertexRDD[VertexSet] = + g.collectNeighborIds(EdgeDirection.Both).mapValues { (vid, nbrs) => + val set = new VertexSet(4) + var i = 0 + while (i < nbrs.size) { + // prevent self cycle + if(nbrs(i) != vid) { + set.add(nbrs(i)) + } + i += 1 + } + set + } + // join the sets with the graph + val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) { + (vid, _, optSet) => optSet.getOrElse(null) + } + // Edge function computes intersection of smaller vertex with larger vertex + def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = { + assert(et.srcAttr != null) + assert(et.dstAttr != null) + val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) { + (et.srcAttr, et.dstAttr) + } else { + (et.dstAttr, et.srcAttr) + } + val iter = smallSet.iterator + var counter: Int = 0 + while (iter.hasNext) { + val vid = iter.next + if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { counter += 1 } + } + Iterator((et.srcId, counter), (et.dstId, counter)) + } + // compute the intersection along edges + val counters: VertexRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _) + // Merge counters with the graph and divide by two since each triangle is counted twice + g.outerJoinVertices(counters) { + (vid, _, optCounter: Option[Int]) => + val dblCount = optCounter.getOrElse(0) + // double count should be even (divisible by two) + assert((dblCount & 1) == 0) + dblCount / 2 + } + + } // end of TriangleCount + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/package.scala new file mode 100644 index 0000000000..f6f2626c9d --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/package.scala @@ -0,0 +1,8 @@ +package org.apache.spark.graphx + +import scala.reflect.ClassTag + +package object lib { + implicit def graphToAlgorithms[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED]): Algorithms[VD, ED] = new Algorithms(graph) +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala deleted file mode 100644 index 16fc3fe5a2..0000000000 --- a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala +++ /dev/null @@ -1,83 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import org.scalatest.FunSuite - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.graphx._ -import org.apache.spark.graphx.util.GraphGenerators -import org.apache.spark.rdd._ - - -class ConnectedComponentsSuite extends FunSuite with LocalSparkContext { - - test("Grid Connected Components") { - withSpark { sc => - val gridGraph = GraphGenerators.gridGraph(sc, 10, 10) - val ccGraph = gridGraph.connectedComponents() - val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum - assert(maxCCid === 0) - } - } // end of Grid connected components - - - test("Reverse Grid Connected Components") { - withSpark { sc => - val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse - val ccGraph = gridGraph.connectedComponents() - val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum - assert(maxCCid === 0) - } - } // end of Grid connected components - - - test("Chain Connected Components") { - withSpark { sc => - val chain1 = (0 until 9).map(x => (x, x+1) ) - val chain2 = (10 until 20).map(x => (x, x+1) ) - val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } - val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0) - val ccGraph = twoChains.connectedComponents() - val vertices = ccGraph.vertices.collect() - for ( (id, cc) <- vertices ) { - if(id < 10) { assert(cc === 0) } - else { assert(cc === 10) } - } - val ccMap = vertices.toMap - for (id <- 0 until 20) { - if (id < 10) { - assert(ccMap(id) === 0) - } else { - assert(ccMap(id) === 10) - } - } - } - } // end of chain connected components - - test("Reverse Chain Connected Components") { - withSpark { sc => - val chain1 = (0 until 9).map(x => (x, x+1) ) - val chain2 = (10 until 20).map(x => (x, x+1) ) - val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } - val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse - val ccGraph = twoChains.connectedComponents() - val vertices = ccGraph.vertices.collect - for ( (id, cc) <- vertices ) { - if (id < 10) { - assert(cc === 0) - } else { - assert(cc === 10) - } - } - val ccMap = vertices.toMap - for ( id <- 0 until 20 ) { - if (id < 10) { - assert(ccMap(id) === 0) - } else { - assert(ccMap(id) === 10) - } - } - } - } // end of reverse chain connected components - -} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala deleted file mode 100644 index de2c2d1107..0000000000 --- a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala +++ /dev/null @@ -1,119 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import org.scalatest.FunSuite - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.graphx._ -import org.apache.spark.graphx.algorithms._ -import org.apache.spark.graphx.util.GraphGenerators -import org.apache.spark.rdd._ - -object GridPageRank { - def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { - val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) - val outDegree = Array.fill(nRows * nCols)(0) - // Convert row column address into vertex ids (row major order) - def sub2ind(r: Int, c: Int): Int = r * nCols + c - // Make the grid graph - for (r <- 0 until nRows; c <- 0 until nCols) { - val ind = sub2ind(r,c) - if (r+1 < nRows) { - outDegree(ind) += 1 - inNbrs(sub2ind(r+1,c)) += ind - } - if (c+1 < nCols) { - outDegree(ind) += 1 - inNbrs(sub2ind(r,c+1)) += ind - } - } - // compute the pagerank - var pr = Array.fill(nRows * nCols)(resetProb) - for (iter <- 0 until nIter) { - val oldPr = pr - pr = new Array[Double](nRows * nCols) - for (ind <- 0 until (nRows * nCols)) { - pr(ind) = resetProb + (1.0 - resetProb) * - inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum - } - } - (0L until (nRows * nCols)).zip(pr) - } - -} - - -class PageRankSuite extends FunSuite with LocalSparkContext { - - def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { - a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } - .map { case (id, error) => error }.sum - } - - test("Star PageRank") { - withSpark { sc => - val nVertices = 100 - val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() - val resetProb = 0.15 - val errorTol = 1.0e-5 - - val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices - val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache() - - // Static PageRank should only take 2 iterations to converge - val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => - if (pr1 != pr2) 1 else 0 - }.map { case (vid, test) => test }.sum - assert(notMatching === 0) - - val staticErrors = staticRanks2.map { case (vid, pr) => - val correct = (vid > 0 && pr == resetProb) || - (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) - if (!correct) 1 else 0 - } - assert(staticErrors.sum === 0) - - val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache() - assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) - } - } // end of test Star PageRank - - - - test("Grid PageRank") { - withSpark { sc => - val rows = 10 - val cols = 10 - val resetProb = 0.15 - val tol = 0.0001 - val numIter = 50 - val errorTol = 1.0e-5 - val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() - - val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache() - val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache() - val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache() - - assert(compareRanks(staticRanks, referenceRanks) < errorTol) - assert(compareRanks(dynamicRanks, referenceRanks) < errorTol) - } - } // end of Grid PageRank - - - test("Chain PageRank") { - withSpark { sc => - val chain1 = (0 until 9).map(x => (x, x+1) ) - val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } - val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() - val resetProb = 0.15 - val tol = 0.0001 - val numIter = 10 - val errorTol = 1.0e-5 - - val staticRanks = chain.staticPageRank(numIter, resetProb).vertices - val dynamicRanks = chain.pageRank(tol, resetProb).vertices - - assert(compareRanks(staticRanks, dynamicRanks) < errorTol) - } - } -} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala deleted file mode 100644 index 7bd93e0e6c..0000000000 --- a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala +++ /dev/null @@ -1,31 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import org.scalatest.FunSuite - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.graphx._ -import org.apache.spark.graphx.util.GraphGenerators -import org.apache.spark.rdd._ - - -class SVDPlusPlusSuite extends FunSuite with LocalSparkContext { - - test("Test SVD++ with mean square error on training set") { - withSpark { sc => - val svdppErr = 8.0 - val edges = sc.textFile("mllib/data/als/test.data").map { line => - val fields = line.split(",") - Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble) - } - val conf = new SVDPlusPlusConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations - var (graph, u) = SVDPlusPlus.run(edges, conf) - graph.cache() - val err = graph.vertices.collect.map{ case (vid, vd) => - if (vid % 2 == 1) vd._4 else 0.0 - }.reduce(_ + _) / graph.triplets.collect.size - assert(err <= svdppErr) - } - } - -} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala deleted file mode 100644 index fee7d20161..0000000000 --- a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala +++ /dev/null @@ -1,57 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import org.scalatest.FunSuite - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.graphx._ -import org.apache.spark.graphx.util.GraphGenerators -import org.apache.spark.rdd._ - - -class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext { - - test("Island Strongly Connected Components") { - withSpark { sc => - val vertices = sc.parallelize((1L to 5L).map(x => (x, -1))) - val edges = sc.parallelize(Seq.empty[Edge[Int]]) - val graph = Graph(vertices, edges) - val sccGraph = graph.stronglyConnectedComponents(5) - for ((id, scc) <- sccGraph.vertices.collect) { - assert(id == scc) - } - } - } - - test("Cycle Strongly Connected Components") { - withSpark { sc => - val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7))) - val graph = Graph.fromEdgeTuples(rawEdges, -1) - val sccGraph = graph.stronglyConnectedComponents(20) - for ((id, scc) <- sccGraph.vertices.collect) { - assert(0L == scc) - } - } - } - - test("2 Cycle Strongly Connected Components") { - withSpark { sc => - val edges = - Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++ - Array(6L -> 0L, 5L -> 7L) - val rawEdges = sc.parallelize(edges) - val graph = Graph.fromEdgeTuples(rawEdges, -1) - val sccGraph = graph.stronglyConnectedComponents(20) - for ((id, scc) <- sccGraph.vertices.collect) { - if (id < 3) - assert(0L == scc) - else if (id < 6) - assert(3L == scc) - else - assert(id == scc) - } - } - } - -} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala deleted file mode 100644 index b85b289da6..0000000000 --- a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.spark.graphx.algorithms - -import org.scalatest.FunSuite - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.graphx._ -import org.apache.spark.graphx.util.GraphGenerators -import org.apache.spark.rdd._ - - -class TriangleCountSuite extends FunSuite with LocalSparkContext { - - test("Count a single triangle") { - withSpark { sc => - val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2) - val graph = Graph.fromEdgeTuples(rawEdges, true).cache() - val triangleCount = graph.triangleCount() - val verts = triangleCount.vertices - verts.collect.foreach { case (vid, count) => assert(count === 1) } - } - } - - test("Count two triangles") { - withSpark { sc => - val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> -1L, -1L -> -2L, -2L -> 0L) - val rawEdges = sc.parallelize(triangles, 2) - val graph = Graph.fromEdgeTuples(rawEdges, true).cache() - val triangleCount = graph.triangleCount() - val verts = triangleCount.vertices - verts.collect().foreach { case (vid, count) => - if (vid == 0) { - assert(count === 2) - } else { - assert(count === 1) - } - } - } - } - - test("Count two triangles with bi-directed edges") { - withSpark { sc => - val triangles = - Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> -1L, -1L -> -2L, -2L -> 0L) - val revTriangles = triangles.map { case (a,b) => (b,a) } - val rawEdges = sc.parallelize(triangles ++ revTriangles, 2) - val graph = Graph.fromEdgeTuples(rawEdges, true).cache() - val triangleCount = graph.triangleCount() - val verts = triangleCount.vertices - verts.collect().foreach { case (vid, count) => - if (vid == 0) { - assert(count === 4) - } else { - assert(count === 2) - } - } - } - } - - test("Count a single triangle with duplicate edges") { - withSpark { sc => - val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) - val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() - val triangleCount = graph.triangleCount() - val verts = triangleCount.vertices - verts.collect.foreach { case (vid, count) => assert(count === 1) } - } - } - -} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala new file mode 100644 index 0000000000..66612b381f --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -0,0 +1,83 @@ +package org.apache.spark.graphx.lib + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + + +class ConnectedComponentsSuite extends FunSuite with LocalSparkContext { + + test("Grid Connected Components") { + withSpark { sc => + val gridGraph = GraphGenerators.gridGraph(sc, 10, 10) + val ccGraph = gridGraph.connectedComponents() + val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum + assert(maxCCid === 0) + } + } // end of Grid connected components + + + test("Reverse Grid Connected Components") { + withSpark { sc => + val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse + val ccGraph = gridGraph.connectedComponents() + val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum + assert(maxCCid === 0) + } + } // end of Grid connected components + + + test("Chain Connected Components") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x+1) ) + val chain2 = (10 until 20).map(x => (x, x+1) ) + val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } + val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0) + val ccGraph = twoChains.connectedComponents() + val vertices = ccGraph.vertices.collect() + for ( (id, cc) <- vertices ) { + if(id < 10) { assert(cc === 0) } + else { assert(cc === 10) } + } + val ccMap = vertices.toMap + for (id <- 0 until 20) { + if (id < 10) { + assert(ccMap(id) === 0) + } else { + assert(ccMap(id) === 10) + } + } + } + } // end of chain connected components + + test("Reverse Chain Connected Components") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x+1) ) + val chain2 = (10 until 20).map(x => (x, x+1) ) + val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } + val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse + val ccGraph = twoChains.connectedComponents() + val vertices = ccGraph.vertices.collect + for ( (id, cc) <- vertices ) { + if (id < 10) { + assert(cc === 0) + } else { + assert(cc === 10) + } + } + val ccMap = vertices.toMap + for ( id <- 0 until 20 ) { + if (id < 10) { + assert(ccMap(id) === 0) + } else { + assert(ccMap(id) === 10) + } + } + } + } // end of reverse chain connected components + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala new file mode 100644 index 0000000000..fe7e4261f8 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -0,0 +1,119 @@ +package org.apache.spark.graphx.lib + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.lib._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + +object GridPageRank { + def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { + val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) + val outDegree = Array.fill(nRows * nCols)(0) + // Convert row column address into vertex ids (row major order) + def sub2ind(r: Int, c: Int): Int = r * nCols + c + // Make the grid graph + for (r <- 0 until nRows; c <- 0 until nCols) { + val ind = sub2ind(r,c) + if (r+1 < nRows) { + outDegree(ind) += 1 + inNbrs(sub2ind(r+1,c)) += ind + } + if (c+1 < nCols) { + outDegree(ind) += 1 + inNbrs(sub2ind(r,c+1)) += ind + } + } + // compute the pagerank + var pr = Array.fill(nRows * nCols)(resetProb) + for (iter <- 0 until nIter) { + val oldPr = pr + pr = new Array[Double](nRows * nCols) + for (ind <- 0 until (nRows * nCols)) { + pr(ind) = resetProb + (1.0 - resetProb) * + inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum + } + } + (0L until (nRows * nCols)).zip(pr) + } + +} + + +class PageRankSuite extends FunSuite with LocalSparkContext { + + def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { + a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } + .map { case (id, error) => error }.sum + } + + test("Star PageRank") { + withSpark { sc => + val nVertices = 100 + val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val resetProb = 0.15 + val errorTol = 1.0e-5 + + val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices + val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache() + + // Static PageRank should only take 2 iterations to converge + val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => + if (pr1 != pr2) 1 else 0 + }.map { case (vid, test) => test }.sum + assert(notMatching === 0) + + val staticErrors = staticRanks2.map { case (vid, pr) => + val correct = (vid > 0 && pr == resetProb) || + (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) + if (!correct) 1 else 0 + } + assert(staticErrors.sum === 0) + + val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache() + assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) + } + } // end of test Star PageRank + + + + test("Grid PageRank") { + withSpark { sc => + val rows = 10 + val cols = 10 + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 50 + val errorTol = 1.0e-5 + val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() + + val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache() + val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache() + val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache() + + assert(compareRanks(staticRanks, referenceRanks) < errorTol) + assert(compareRanks(dynamicRanks, referenceRanks) < errorTol) + } + } // end of Grid PageRank + + + test("Chain PageRank") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x+1) ) + val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 10 + val errorTol = 1.0e-5 + + val staticRanks = chain.staticPageRank(numIter, resetProb).vertices + val dynamicRanks = chain.pageRank(tol, resetProb).vertices + + assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + } + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala new file mode 100644 index 0000000000..a4a1cdab18 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala @@ -0,0 +1,31 @@ +package org.apache.spark.graphx.lib + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + + +class SVDPlusPlusSuite extends FunSuite with LocalSparkContext { + + test("Test SVD++ with mean square error on training set") { + withSpark { sc => + val svdppErr = 8.0 + val edges = sc.textFile("mllib/data/als/test.data").map { line => + val fields = line.split(",") + Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble) + } + val conf = new SVDPlusPlusConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations + var (graph, u) = SVDPlusPlus.run(edges, conf) + graph.cache() + val err = graph.vertices.collect.map{ case (vid, vd) => + if (vid % 2 == 1) vd._4 else 0.0 + }.reduce(_ + _) / graph.triplets.collect.size + assert(err <= svdppErr) + } + } + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala new file mode 100644 index 0000000000..0458311661 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala @@ -0,0 +1,57 @@ +package org.apache.spark.graphx.lib + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + + +class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext { + + test("Island Strongly Connected Components") { + withSpark { sc => + val vertices = sc.parallelize((1L to 5L).map(x => (x, -1))) + val edges = sc.parallelize(Seq.empty[Edge[Int]]) + val graph = Graph(vertices, edges) + val sccGraph = graph.stronglyConnectedComponents(5) + for ((id, scc) <- sccGraph.vertices.collect) { + assert(id == scc) + } + } + } + + test("Cycle Strongly Connected Components") { + withSpark { sc => + val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7))) + val graph = Graph.fromEdgeTuples(rawEdges, -1) + val sccGraph = graph.stronglyConnectedComponents(20) + for ((id, scc) <- sccGraph.vertices.collect) { + assert(0L == scc) + } + } + } + + test("2 Cycle Strongly Connected Components") { + withSpark { sc => + val edges = + Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++ + Array(6L -> 0L, 5L -> 7L) + val rawEdges = sc.parallelize(edges) + val graph = Graph.fromEdgeTuples(rawEdges, -1) + val sccGraph = graph.stronglyConnectedComponents(20) + for ((id, scc) <- sccGraph.vertices.collect) { + if (id < 3) + assert(0L == scc) + else if (id < 6) + assert(3L == scc) + else + assert(id == scc) + } + } + } + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala new file mode 100644 index 0000000000..a286b7d03b --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala @@ -0,0 +1,73 @@ +package org.apache.spark.graphx.lib + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + + +class TriangleCountSuite extends FunSuite with LocalSparkContext { + + test("Count a single triangle") { + withSpark { sc => + val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val triangleCount = graph.triangleCount() + val verts = triangleCount.vertices + verts.collect.foreach { case (vid, count) => assert(count === 1) } + } + } + + test("Count two triangles") { + withSpark { sc => + val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> -1L, -1L -> -2L, -2L -> 0L) + val rawEdges = sc.parallelize(triangles, 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val triangleCount = graph.triangleCount() + val verts = triangleCount.vertices + verts.collect().foreach { case (vid, count) => + if (vid == 0) { + assert(count === 2) + } else { + assert(count === 1) + } + } + } + } + + test("Count two triangles with bi-directed edges") { + withSpark { sc => + val triangles = + Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> -1L, -1L -> -2L, -2L -> 0L) + val revTriangles = triangles.map { case (a,b) => (b,a) } + val rawEdges = sc.parallelize(triangles ++ revTriangles, 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val triangleCount = graph.triangleCount() + val verts = triangleCount.vertices + verts.collect().foreach { case (vid, count) => + if (vid == 0) { + assert(count === 4) + } else { + assert(count === 2) + } + } + } + } + + test("Count a single triangle with duplicate edges") { + withSpark { sc => + val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) + val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() + val triangleCount = graph.triangleCount() + val verts = triangleCount.vertices + verts.collect.foreach { case (vid, count) => assert(count === 1) } + } + } + +} -- cgit v1.2.3