diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-09 14:31:33 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-09 14:31:33 -0800 |
commit | 731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1 (patch) | |
tree | 51fa693c046869f0706337046a1581c09e56e4b5 /graphx | |
parent | 100718bcd3f6ade1a93256458ec1528bb9142b5e (diff) | |
download | spark-731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1.tar.gz spark-731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1.tar.bz2 spark-731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1.zip |
graph -> graphx
Diffstat (limited to 'graphx')
49 files changed, 6708 insertions, 0 deletions
diff --git a/graphx/pom.xml b/graphx/pom.xml new file mode 100644 index 0000000000..fd3dcaad7c --- /dev/null +++ b/graphx/pom.xml @@ -0,0 +1,129 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent</artifactId> + <version>0.9.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-graph_2.9.3</artifactId> + <packaging>jar</packaging> + <name>Spark Graph</name> + <url>http://spark-project.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.9.3</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>hadoop1</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + <classifier>hadoop1</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <classifier>hadoop1</classifier> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>hadoop2</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + <classifier>hadoop2</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <classifier>hadoop2</classifier> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/Analytics.scala new file mode 100644 index 0000000000..0cafc3fdf9 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/Analytics.scala @@ -0,0 +1,593 @@ +package org.apache.spark.graphx + +import org.apache.spark._ +import org.apache.spark.graphx.algorithms._ + + +/** + * The Analytics object contains a collection of basic graph analytics + * algorithms that operate largely on the graph structure. + * + * In addition the Analytics object contains a driver `main` which can + * be used to apply the various functions to graphs in standard + * formats. + */ +object Analytics extends Logging { + + def main(args: Array[String]) = { + val host = args(0) + val taskType = args(1) + val fname = args(2) + val options = args.drop(3).map { arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + + def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = { + loggers.map{ + loggerName => + val logger = org.apache.log4j.Logger.getLogger(loggerName) + val prevLevel = logger.getLevel() + logger.setLevel(level) + loggerName -> prevLevel + }.toMap + } + + def pickPartitioner(v: String): PartitionStrategy = { + v match { + case "RandomVertexCut" => RandomVertexCut + case "EdgePartition1D" => EdgePartition1D + case "EdgePartition2D" => EdgePartition2D + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut + case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v) + } + } +// setLogLevels(org.apache.log4j.Level.DEBUG, Seq("org.apache.spark")) + + val serializer = "org.apache.spark.serializer.KryoSerializer" + System.setProperty("spark.serializer", serializer) + //System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + + taskType match { + case "pagerank" => { + + var tol:Float = 0.001F + var outFname = "" + var numVPart = 4 + var numEPart = 4 + var partitionStrategy: Option[PartitionStrategy] = None + + options.foreach{ + case ("tol", v) => tol = v.toFloat + case ("output", v) => outFname = v + case ("numVPart", v) => numVPart = v.toInt + case ("numEPart", v) => numEPart = v.toInt + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| PageRank |") + println("======================================") + + val sc = new SparkContext(host, "PageRank(" + fname + ")") + + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + println("GRAPHX: Number of vertices " + graph.vertices.count) + println("GRAPHX: Number of edges " + graph.edges.count) + + //val pr = Analytics.pagerank(graph, numIter) + val pr = PageRank.runStandalone(graph, tol) + + println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_)) + + if (!outFname.isEmpty) { + logWarning("Saving pageranks of pages to " + outFname) + pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) + } + + sc.stop() + } + + case "cc" => { + + var numIter = Int.MaxValue + var numVPart = 4 + var numEPart = 4 + var isDynamic = false + var partitionStrategy: Option[PartitionStrategy] = None + + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case ("numEPart", v) => numEPart = v.toInt + case ("numVPart", v) => numVPart = v.toInt + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + if(!isDynamic && numIter == Int.MaxValue) { + println("Set number of iterations!") + sys.exit(1) + } + println("======================================") + println("| Connected Components |") + println("--------------------------------------") + println(" Using parameters:") + println(" \tDynamic: " + isDynamic) + println(" \tNumIter: " + numIter) + println("======================================") + + val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + val cc = ConnectedComponents.run(graph) + println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) + sc.stop() + } + + case "triangles" => { + var numVPart = 4 + var numEPart = 4 + // TriangleCount requires the graph to be partitioned + var partitionStrategy: PartitionStrategy = RandomVertexCut + + options.foreach{ + case ("numEPart", v) => numEPart = v.toInt + case ("numVPart", v) => numVPart = v.toInt + case ("partStrategy", v) => partitionStrategy = pickPartitioner(v) + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + println("======================================") + println("| Triangle Count |") + println("--------------------------------------") + val sc = new SparkContext(host, "TriangleCount(" + fname + ")") + val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, + minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() + val triangles = TriangleCount.run(graph) + println("Triangles: " + triangles.vertices.map { + case (vid,data) => data.toLong + }.reduce(_+_) / 3) + sc.stop() + } + +// +// case "shortestpath" => { +// +// var numIter = Int.MaxValue +// var isDynamic = true +// var sources: List[Int] = List.empty +// +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("source", v) => sources ++= List(v.toInt) +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } +// +// +// if(!isDynamic && numIter == Int.MaxValue) { +// println("Set number of iterations!") +// sys.exit(1) +// } +// +// if(sources.isEmpty) { +// println("No sources provided!") +// sys.exit(1) +// } +// +// println("======================================") +// println("| Shortest Path |") +// println("--------------------------------------") +// println(" Using parameters:") +// println(" \tDynamic: " + isDynamic) +// println(" \tNumIter: " + numIter) +// println(" \tSources: [" + sources.mkString(", ") + "]") +// println("======================================") +// +// val sc = new SparkContext(host, "ShortestPath(" + fname + ")") +// val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) ) +// //val sp = Analytics.shortestPath(graph, sources, numIter) +// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) +// // else Analytics.shortestPath(graph, sources, numIter) +// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_))) +// +// sc.stop() +// } + + + // case "als" => { + + // var numIter = 5 + // var lambda = 0.01 + // var latentK = 10 + // var usersFname = "usersFactors.tsv" + // var moviesFname = "moviesFname.tsv" + // var numVPart = 4 + // var numEPart = 4 + + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("lambda", v) => lambda = v.toDouble + // case ("latentK", v) => latentK = v.toInt + // case ("usersFname", v) => usersFname = v + // case ("moviesFname", v) => moviesFname = v + // case ("numVPart", v) => numVPart = v.toInt + // case ("numEPart", v) => numEPart = v.toInt + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + // println("======================================") + // println("| Alternating Least Squares |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tNumIter: " + numIter) + // println(" \tLambda: " + lambda) + // println(" \tLatentK: " + latentK) + // println(" \tusersFname: " + usersFname) + // println(" \tmoviesFname: " + moviesFname) + // println("======================================") + + // val sc = new SparkContext(host, "ALS(" + fname + ")") + // val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble ) + // graph.numVPart = numVPart + // graph.numEPart = numEPart + + // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) + // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_)) + // assert(maxUser < minMovie) + + // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache + // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t")) + // .saveAsTextFile(usersFname) + // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t")) + // .saveAsTextFile(moviesFname) + + // sc.stop() + // } + + + case _ => { + println("Invalid task type.") + } + } + } + + // /** + // * Compute the shortest path to a set of markers + // */ + // def shortestPath[VD: Manifest](graph: Graph[VD, Double], sources: List[Int], numIter: Int) = { + // val sourceSet = sources.toSet + // val spGraph = graph.mapVertices { + // case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0 else Double.MaxValue)) + // } + // GraphLab.iterateGA[Double, Double, Double](spGraph)( + // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather + // (a: Double, b: Double) => math.min(a, b), // merge + // (v, a: Option[Double]) => math.min(v.data, a.getOrElse(Double.MaxValue)), // apply + // numIter, + // gatherDirection = EdgeDirection.In) + // } + + // /** + // * Compute the connected component membership of each vertex + // * and return an RDD with the vertex value containing the + // * lowest vertex id in the connected component containing + // * that vertex. + // */ + // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], + // numIter: Int = Int.MaxValue) = { + + // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) }) + // val edges = graph.edges // .mapValues(v => None) + // val ccGraph = new Graph(vertices, edges) + + // ccGraph.iterateDynamic( + // (me_id, edge) => edge.otherVertex(me_id).data, // gather + // (a: Int, b: Int) => math.min(a, b), // merge + // Integer.MAX_VALUE, + // (v, a: Int) => math.min(v.data, a), // apply + // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter + // numIter, + // gatherEdges = EdgeDirection.Both, + // scatterEdges = EdgeDirection.Both).vertices + // // + // // graph_ret.vertices.collect.foreach(println) + // // graph_ret.edges.take(10).foreach(println) + // } + + + // /** + // * Compute the shortest path to a set of markers + // */ + // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Double], + // sources: List[Int], numIter: Int) = { + // val sourceSet = sources.toSet + // val vertices = graph.vertices.mapPartitions( + // iter => iter.map { + // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Double.MaxValue) ) + // }); + + // val edges = graph.edges // .mapValues(v => None) + // val spGraph = new Graph(vertices, edges) + + // val niterations = Int.MaxValue + // spGraph.iterateDynamic( + // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather + // (a: Double, b: Double) => math.min(a, b), // merge + // Double.MaxValue, + // (v, a: Double) => math.min(v.data, a), // apply + // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter + // numIter, + // gatherEdges = EdgeDirection.In, + // scatterEdges = EdgeDirection.Out).vertices + // } + + + // /** + // * + // */ + // def alternatingLeastSquares[VD: ClassTag, ED: ClassTag](graph: Graph[VD, Double], + // latentK: Int, lambda: Double, numIter: Int) = { + // val vertices = graph.vertices.mapPartitions( _.map { + // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } ) + // }).cache + // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) + // val edges = graph.edges // .mapValues(v => None) + // val alsGraph = new Graph(vertices, edges) + // alsGraph.numVPart = graph.numVPart + // alsGraph.numEPart = graph.numEPart + + // val niterations = Int.MaxValue + // alsGraph.iterateDynamic[(Array[Double], Array[Double])]( + // (me_id, edge) => { // gather + // val X = edge.otherVertex(me_id).data + // val y = edge.data + // val Xy = X.map(_ * y) + // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray + // (Xy, XtX) + // }, + // (a, b) => { + // // The difference between the while loop and the zip is a FACTOR OF TWO in overall + // // runtime + // var i = 0 + // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 } + // i = 0 + // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 } + // a + // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r }) + // }, + // (Array.empty[Double], Array.empty[Double]), // default value is empty + // (vertex, accum) => { // apply + // val XyArray = accum._1 + // val XtXArray = accum._2 + // if(XyArray.isEmpty) vertex.data // no neighbors + // else { + // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) => + // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) + + // (if(i == j) lambda else 1.0F) //regularization + // } + // val Xy = DenseMatrix.create(latentK,1,XyArray) + // val w = XtX \ Xy + // w.data + // } + // }, + // (me_id, edge) => true, + // numIter, + // gatherEdges = EdgeDirection.Both, + // scatterEdges = EdgeDirection.Both, + // vertex => vertex.id < maxUser).vertices + // } + + // def main(args: Array[String]) = { + // val host = args(0) + // val taskType = args(1) + // val fname = args(2) + // val options = args.drop(3).map { arg => + // arg.dropWhile(_ == '-').split('=') match { + // case Array(opt, v) => (opt -> v) + // case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + // } + // } + + // System.setProperty("spark.serializer", "spark.KryoSerializer") + // //System.setProperty("spark.shuffle.compress", "false") + // System.setProperty("spark.kryo.registrator", "spark.graphx.GraphKryoRegistrator") + + // taskType match { + // case "pagerank" => { + + // var numIter = Int.MaxValue + // var isDynamic = false + // var tol:Double = 0.001 + // var outFname = "" + // var numVPart = 4 + // var numEPart = 4 + + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("dynamic", v) => isDynamic = v.toBoolean + // case ("tol", v) => tol = v.toDouble + // case ("output", v) => outFname = v + // case ("numVPart", v) => numVPart = v.toInt + // case ("numEPart", v) => numEPart = v.toInt + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + // if(!isDynamic && numIter == Int.MaxValue) { + // println("Set number of iterations!") + // sys.exit(1) + // } + // println("======================================") + // println("| PageRank |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tDynamic: " + isDynamic) + // if(isDynamic) println(" \t |-> Tolerance: " + tol) + // println(" \tNumIter: " + numIter) + // println("======================================") + + // val sc = new SparkContext(host, "PageRank(" + fname + ")") + + // val graph = GraphLoader.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart).cache() + + // val startTime = System.currentTimeMillis + // logInfo("GRAPHX: starting tasks") + // logInfo("GRAPHX: Number of vertices " + graph.vertices.count) + // logInfo("GRAPHX: Number of edges " + graph.edges.count) + + // val pr = Analytics.pagerank(graph, numIter) + // // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter) + // // else Analytics.pagerank(graph, numIter) + // logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) ) + // if (!outFname.isEmpty) { + // println("Saving pageranks of pages to " + outFname) + // pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname) + // } + // logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") + // sc.stop() + // } + + // case "cc" => { + + // var numIter = Int.MaxValue + // var isDynamic = false + + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("dynamic", v) => isDynamic = v.toBoolean + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + // if(!isDynamic && numIter == Int.MaxValue) { + // println("Set number of iterations!") + // sys.exit(1) + // } + // println("======================================") + // println("| Connected Components |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tDynamic: " + isDynamic) + // println(" \tNumIter: " + numIter) + // println("======================================") + + // val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") + // val graph = GraphLoader.textFile(sc, fname, a => 1.0) + // val cc = Analytics.connectedComponents(graph, numIter) + // // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) + // // else Analytics.connectedComponents(graph, numIter) + // println("Components: " + cc.vertices.map(_.data).distinct()) + + // sc.stop() + // } + + // case "shortestpath" => { + + // var numIter = Int.MaxValue + // var isDynamic = true + // var sources: List[Int] = List.empty + + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("dynamic", v) => isDynamic = v.toBoolean + // case ("source", v) => sources ++= List(v.toInt) + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + + // if(!isDynamic && numIter == Int.MaxValue) { + // println("Set number of iterations!") + // sys.exit(1) + // } + + // if(sources.isEmpty) { + // println("No sources provided!") + // sys.exit(1) + // } + + // println("======================================") + // println("| Shortest Path |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tDynamic: " + isDynamic) + // println(" \tNumIter: " + numIter) + // println(" \tSources: [" + sources.mkString(", ") + "]") + // println("======================================") + + // val sc = new SparkContext(host, "ShortestPath(" + fname + ")") + // val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) ) + // val sp = Analytics.shortestPath(graph, sources, numIter) + // // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter) + // // else Analytics.shortestPath(graph, sources, numIter) + // println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_))) + + // sc.stop() + // } + + + // case "als" => { + + // var numIter = 5 + // var lambda = 0.01 + // var latentK = 10 + // var usersFname = "usersFactors.tsv" + // var moviesFname = "moviesFname.tsv" + // var numVPart = 4 + // var numEPart = 4 + + // options.foreach{ + // case ("numIter", v) => numIter = v.toInt + // case ("lambda", v) => lambda = v.toDouble + // case ("latentK", v) => latentK = v.toInt + // case ("usersFname", v) => usersFname = v + // case ("moviesFname", v) => moviesFname = v + // case ("numVPart", v) => numVPart = v.toInt + // case ("numEPart", v) => numEPart = v.toInt + // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + // } + + // println("======================================") + // println("| Alternating Least Squares |") + // println("--------------------------------------") + // println(" Using parameters:") + // println(" \tNumIter: " + numIter) + // println(" \tLambda: " + lambda) + // println(" \tLatentK: " + latentK) + // println(" \tusersFname: " + usersFname) + // println(" \tmoviesFname: " + moviesFname) + // println("======================================") + + // val sc = new SparkContext(host, "ALS(" + fname + ")") + // val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble ) + // graph.numVPart = numVPart + // graph.numEPart = numEPart + + // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_)) + // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_)) + // assert(maxUser < minMovie) + + // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache + // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t")) + // .saveAsTextFile(usersFname) + // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t")) + // .saveAsTextFile(moviesFname) + + // sc.stop() + // } + + + // case _ => { + // println("Invalid task type.") + // } + // } + // } + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala new file mode 100644 index 0000000000..29b46674f1 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala @@ -0,0 +1,50 @@ +package org.apache.spark.graphx + + +/** + * A single directed edge consisting of a source id, target id, + * and the data associated with the Edgee. + * + * @tparam ED type of the edge attribute + */ +case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( + /** + * The vertex id of the source vertex + */ + var srcId: VertexID = 0, + /** + * The vertex id of the target vertex. + */ + var dstId: VertexID = 0, + /** + * The attribute associated with the edge. + */ + var attr: ED = nullValue[ED]) extends Serializable { + + /** + * Given one vertex in the edge return the other vertex. + * + * @param vid the id one of the two vertices on the edge. + * @return the id of the other vertex on the edge. + */ + def otherVertexId(vid: VertexID): VertexID = + if (srcId == vid) dstId else { assert(dstId == vid); srcId } + + /** + * Return the relative direction of the edge to the corresponding + * vertex. + * + * @param vid the id of one of the two vertices in the edge. + * @return the relative direction of the edge to the corresponding + * vertex. + */ + def relativeDirection(vid: VertexID): EdgeDirection = + if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In } +} + +object Edge { + def lexicographicOrdering[ED] = new Ordering[Edge[ED]] { + override def compare(a: Edge[ED], b: Edge[ED]): Int = + Ordering[(VertexID, VertexID)].compare((a.srcId, a.dstId), (b.srcId, b.dstId)) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala new file mode 100644 index 0000000000..785f941650 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala @@ -0,0 +1,36 @@ +package org.apache.spark.graphx + + +/** + * The direction of directed edge relative to a vertex used to select + * the set of adjacent neighbors when running a neighborhood query. + */ +sealed abstract class EdgeDirection { + /** + * Reverse the direction of an edge. An in becomes out, + * out becomes in and both remains both. + */ + def reverse: EdgeDirection = this match { + case EdgeDirection.In => EdgeDirection.Out + case EdgeDirection.Out => EdgeDirection.In + case EdgeDirection.Both => EdgeDirection.Both + } +} + + +object EdgeDirection { + /** + * Edges arriving at a vertex. + */ + case object In extends EdgeDirection + + /** + * Edges originating from a vertex + */ + case object Out extends EdgeDirection + + /** + * All edges adjacent to a vertex + */ + case object Both extends EdgeDirection +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala new file mode 100644 index 0000000000..e4ef460e6f --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -0,0 +1,73 @@ +package org.apache.spark.graphx + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} +import org.apache.spark.graphx.impl.EdgePartition +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + + +class EdgeRDD[@specialized ED: ClassTag]( + val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])]) + extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + + partitionsRDD.setName("EdgeRDD") + + override protected def getPartitions: Array[Partition] = partitionsRDD.partitions + + /** + * If partitionsRDD already has a partitioner, use it. Otherwise assume that the PartitionIDs in + * partitionsRDD correspond to the actual partitions and create a new partitioner that allows + * co-partitioning with partitionsRDD. + */ + override val partitioner = + partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) + + override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { + firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator + } + + override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() + + /** + * Caching a VertexRDD causes the index and values to be cached separately. + */ + override def persist(newLevel: StorageLevel): EdgeRDD[ED] = { + partitionsRDD.persist(newLevel) + this + } + + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ + override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY) + + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ + override def cache(): EdgeRDD[ED] = persist() + + def mapEdgePartitions[ED2: ClassTag](f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]) + : EdgeRDD[ED2] = { +// iter => iter.map { case (pid, ep) => (pid, f(ep)) } + new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => + val (pid, ep) = iter.next() + Iterator(Tuple2(pid, f(pid, ep))) + }, preservesPartitioning = true)) + } + + def innerJoin[ED2: ClassTag, ED3: ClassTag] + (other: EdgeRDD[ED2]) + (f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = { + val ed2Tag = classTag[ED2] + val ed3Tag = classTag[ED3] + new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) { + (thisIter, otherIter) => + val (pid, thisEPart) = thisIter.next() + val (_, otherEPart) = otherIter.next() + Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag))) + }) + } + + def collectVertexIDs(): RDD[VertexID] = { + partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } + } + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala new file mode 100644 index 0000000000..b0565b7e0e --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala @@ -0,0 +1,63 @@ +package org.apache.spark.graphx + +import org.apache.spark.graphx.impl.VertexPartition + +/** + * An edge triplet represents two vertices and edge along with their + * attributes. + * + * @tparam VD the type of the vertex attribute. + * @tparam ED the type of the edge attribute + * + * @todo specialize edge triplet for basic types, though when I last + * tried specializing I got a warning about inherenting from a type + * that is not a trait. + */ +class EdgeTriplet[VD, ED] extends Edge[ED] { +// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassTag, +// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag] extends Edge[ED] { + + + /** + * The source vertex attribute + */ + var srcAttr: VD = _ //nullValue[VD] + + /** + * The destination vertex attribute + */ + var dstAttr: VD = _ //nullValue[VD] + + var srcStale: Boolean = false + var dstStale: Boolean = false + + /** + * Set the edge properties of this triplet. + */ + protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD,ED] = { + srcId = other.srcId + dstId = other.dstId + attr = other.attr + this + } + + /** + * Given one vertex in the edge return the other vertex. + * + * @param vid the id one of the two vertices on the edge. + * @return the attribute for the other vertex on the edge. + */ + def otherVertexAttr(vid: VertexID): VD = + if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr } + + /** + * Get the vertex object for the given vertex in the edge. + * + * @param vid the id of one of the two vertices on the edge + * @return the attr for the vertex with that id. + */ + def vertexAttr(vid: VertexID): VD = + if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr } + + override def toString() = ((srcId, srcAttr), (dstId, dstAttr), attr).toString() +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala new file mode 100644 index 0000000000..2b7c0a2583 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -0,0 +1,437 @@ +package org.apache.spark.graphx + +import scala.reflect.ClassTag + +import org.apache.spark.graphx.impl._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + + +/** + * The Graph abstractly represents a graph with arbitrary objects + * associated with vertices and edges. The graph provides basic + * operations to access and manipulate the data associated with + * vertices and edges as well as the underlying structure. Like Spark + * RDDs, the graph is a functional data-structure in which mutating + * operations return new graphs. + * + * @see GraphOps for additional graph member functions. + * + * @note The majority of the graph operations are implemented in + * `GraphOps`. All the convenience operations are defined in the + * `GraphOps` class which may be shared across multiple graph + * implementations. + * + * @tparam VD the vertex attribute type + * @tparam ED the edge attribute type + */ +abstract class Graph[VD: ClassTag, ED: ClassTag] { + + /** + * Get the vertices and their data. + * + * @note vertex ids are unique. + * @return An RDD containing the vertices in this graph + * + * @see Vertex for the vertex type. + * + */ + val vertices: VertexRDD[VD] + + /** + * Get the Edges and their data as an RDD. The entries in the RDD + * contain just the source id and target id along with the edge + * data. + * + * @return An RDD containing the edges in this graph + * + * @see Edge for the edge type. + * @see edgesWithVertices to get an RDD which contains all the edges + * along with their vertex data. + * + */ + val edges: EdgeRDD[ED] + + /** + * Get the edges with the vertex data associated with the adjacent + * pair of vertices. + * + * @return An RDD containing edge triplets. + * + * @example This operation might be used to evaluate a graph + * coloring where we would like to check that both vertices are a + * different color. + * {{{ + * type Color = Int + * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv") + * val numInvalid = graph.edgesWithVertices() + * .map(e => if (e.src.data == e.dst.data) 1 else 0).sum + * }}} + * + * @see edges() If only the edge data and adjacent vertex ids are + * required. + * + */ + val triplets: RDD[EdgeTriplet[VD, ED]] + + /** + * Cache the vertices and edges associated with this graph. + * + * @param newLevel the level at which to cache the graph. + + * @return A reference to this graph for convenience. + * + */ + def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] + + /** + * Return a graph that is cached when first created. This is used to + * pin a graph in memory enabling multiple queries to reuse the same + * construction process. + * + * @see RDD.cache() for a more detailed explanation of caching. + */ + def cache(): Graph[VD, ED] + + /** + * Repartition the edges in the graph according to partitionStrategy. + */ + def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] + + /** + * Compute statistics describing the graph representation. + */ + def statistics: Map[String, Any] + + /** + * Construct a new graph where each vertex value has been + * transformed by the map function. + * + * @note This graph is not changed and that the new graph has the + * same structure. As a consequence the underlying index structures + * can be reused. + * + * @param map the function from a vertex object to a new vertex value. + * + * @tparam VD2 the new vertex data type + * + * @example We might use this operation to change the vertex values + * from one type to another to initialize an algorithm. + * {{{ + * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") + * val root = 42 + * var bfsGraph = rawGraph + * .mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue) + * }}} + * + */ + def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED] + + /** + * Construct a new graph where the value of each edge is + * transformed by the map operation. This function is not passed + * the vertex value for the vertices adjacent to the edge. If + * vertex values are desired use the mapTriplets function. + * + * @note This graph is not changed and that the new graph has the + * same structure. As a consequence the underlying index structures + * can be reused. + * + * @param map the function from an edge object to a new edge value. + * + * @tparam ED2 the new edge data type + * + * @example This function might be used to initialize edge + * attributes. + * + */ + def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2] = { + mapEdges((pid, iter) => iter.map(map)) + } + + /** + * Construct a new graph transforming the value of each edge using + * the user defined iterator transform. The iterator transform is + * given an iterator over edge triplets within a logical partition + * and should yield a new iterator over the new values of each edge + * in the order in which they are provided to the iterator transform + * If adjacent vertex values are not required, consider using the + * mapEdges function instead. + * + * @note This that this does not change the structure of the + * graph or modify the values of this graph. As a consequence + * the underlying index structures can be reused. + * + * @param map the function which takes a partition id and an iterator + * over all the edges in the partition and must return an iterator over + * the new values for each edge in the order of the input iterator. + * + * @tparam ED2 the new edge data type + * + */ + def mapEdges[ED2: ClassTag]( + map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] + + /** + * Construct a new graph where the value of each edge is + * transformed by the map operation. This function passes vertex + * values for the adjacent vertices to the map function. If + * adjacent vertex values are not required, consider using the + * mapEdges function instead. + * + * @note This that this does not change the structure of the + * graph or modify the values of this graph. As a consequence + * the underlying index structures can be reused. + * + * @param map the function from an edge object to a new edge value. + * + * @tparam ED2 the new edge data type + * + * @example This function might be used to initialize edge + * attributes based on the attributes associated with each vertex. + * {{{ + * val rawGraph: Graph[Int, Int] = someLoadFunction() + * val graph = rawGraph.mapTriplets[Int]( edge => + * edge.src.data - edge.dst.data) + * }}} + * + */ + def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { + mapTriplets((pid, iter) => iter.map(map)) + } + + /** + * Construct a new graph transforming the value of each edge using + * the user defined iterator transform. The iterator transform is + * given an iterator over edge triplets within a logical partition + * and should yield a new iterator over the new values of each edge + * in the order in which they are provided to the iterator transform + * If adjacent vertex values are not required, consider using the + * mapEdges function instead. + * + * @note This that this does not change the structure of the + * graph or modify the values of this graph. As a consequence + * the underlying index structures can be reused. + * + * @param map the function which takes a partition id and an iterator + * over all the edges in the partition and must return an iterator over + * the new values for each edge in the order of the input iterator. + * + * @tparam ED2 the new edge data type + * + */ + def mapTriplets[ED2: ClassTag]( + map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): + Graph[VD, ED2] + + /** + * Construct a new graph with all the edges reversed. If this graph + * contains an edge from a to b then the returned graph contains an + * edge from b to a. + */ + def reverse: Graph[VD, ED] + + /** + * This function takes a vertex and edge predicate and constructs + * the subgraph that consists of vertices and edges that satisfy the + * predict. The resulting graph contains the vertices and edges + * that satisfy: + * + * {{{ + * V' = {v : for all v in V where vpred(v)} + * E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)} + * }}} + * + * @param epred the edge predicate which takes a triplet and + * evaluates to true if the edge is to remain in the subgraph. Note + * that only edges in which both vertices satisfy the vertex + * predicate are considered. + * + * @param vpred the vertex predicate which takes a vertex object and + * evaluates to true if the vertex is to be included in the subgraph + * + * @return the subgraph containing only the vertices and edges that + * satisfy the predicates. + */ + def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), + vpred: (VertexID, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] + + /** + * Subgraph of this graph with only vertices and edges from the other graph. + * @param other the graph to project this graph onto + * @return a graph with vertices and edges that exists in both the current graph and other, + * with vertex and edge data from the current graph. + */ + def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED] + + /** + * This function merges multiple edges between two vertices into a single Edge. For correct + * results, the graph must have been partitioned using partitionBy. + * + * @tparam ED2 the type of the resulting edge data after grouping. + * + * @param f the user supplied commutative associative function to merge edge attributes for + * duplicate edges. + * + * @return Graph[VD,ED2] The resulting graph with a single Edge for each source, dest vertex pair. + */ + def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] + + /** + * The mapReduceTriplets function is used to compute statistics + * about the neighboring edges and vertices of each vertex. The + * user supplied `mapFunc` function is invoked on each edge of the + * graph generating 0 or more "messages" to be "sent" to either + * vertex in the edge. The `reduceFunc` is then used to combine the + * output of the map phase destined to each vertex. + * + * @tparam A the type of "message" to be sent to each vertex + * + * @param mapFunc the user defined map function which returns 0 or + * more messages to neighboring vertices. + * + * @param reduceFunc the user defined reduce function which should + * be commutative and assosciative and is used to combine the output + * of the map phase. + * + * @param activeSet optionally, a set of "active" vertices and a direction of edges to consider + * when running `mapFunc`. For example, if the direction is Out, `mapFunc` will only be run on + * edges originating from vertices in the active set. `activeSet` must have the same index as the + * graph's vertices. + * + * @example We can use this function to compute the inDegree of each + * vertex + * {{{ + * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph") + * val inDeg: RDD[(VertexID, Int)] = + * mapReduceTriplets[Int](et => Array((et.dst.id, 1)), _ + _) + * }}} + * + * @note By expressing computation at the edge level we achieve + * maximum parallelism. This is one of the core functions in the + * Graph API in that enables neighborhood level computation. For + * example this function can be used to count neighbors satisfying a + * predicate or implement PageRank. + * + */ + def mapReduceTriplets[A: ClassTag]( + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + reduceFunc: (A, A) => A, + activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) + : VertexRDD[A] + + /** + * Join the vertices with an RDD and then apply a function from the + * the vertex and RDD entry to a new vertex value and type. The + * input table should contain at most one entry for each vertex. If + * no entry is provided the map function is invoked passing none. + * + * @tparam U the type of entry in the table of updates + * @tparam VD2 the new vertex value type + * + * @param table the table to join with the vertices in the graph. + * The table should contain at most one entry for each vertex. + * + * @param mapFunc the function used to compute the new vertex + * values. The map function is invoked for all vertices, even those + * that do not have a corresponding entry in the table. + * + * @example This function is used to update the vertices with new + * values based on external data. For example we could add the out + * degree to each vertex record + * + * {{{ + * val rawGraph: Graph[(),()] = Graph.textFile("webgraph") + * val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees() + * val graph = rawGraph.outerJoinVertices(outDeg) { + * (vid, data, optDeg) => optDeg.getOrElse(0) + * } + * }}} + * + */ + def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(VertexID, U)]) + (mapFunc: (VertexID, VD, Option[U]) => VD2) + : Graph[VD2, ED] + + // Save a copy of the GraphOps object so there is always one unique GraphOps object + // for a given Graph object, and thus the lazy vals in GraphOps would work as intended. + val ops = new GraphOps(this) +} // end of Graph + + + + +/** + * The Graph object contains a collection of routines used to construct graphs from RDDs. + */ +object Graph { + + /** + * Construct a graph from a collection of edges encoded as vertex id pairs. + * + * @param rawEdges a collection of edges in (src,dst) form. + * @param uniqueEdges if multiple identical edges are found they are combined and the edge + * attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable + * uniqueEdges, a [[PartitionStrategy]] must be provided. + * + * @return a graph with edge attributes containing either the count of duplicate edges or 1 + * (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex. + */ + def fromEdgeTuples[VD: ClassTag]( + rawEdges: RDD[(VertexID, VertexID)], + defaultValue: VD, + uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = { + val edges = rawEdges.map(p => Edge(p._1, p._2, 1)) + val graph = GraphImpl(edges, defaultValue) + uniqueEdges match { + case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b) + case None => graph + } + } + + /** + * Construct a graph from a collection of edges. + * + * @param edges the RDD containing the set of edges in the graph + * @param defaultValue the default vertex attribute to use for each vertex + * + * @return a graph with edge attributes described by `edges` and vertices + * given by all vertices in `edges` with value `defaultValue` + */ + def fromEdges[VD: ClassTag, ED: ClassTag]( + edges: RDD[Edge[ED]], + defaultValue: VD): Graph[VD, ED] = { + GraphImpl(edges, defaultValue) + } + + /** + * Construct a graph from a collection attributed vertices and + * edges. Duplicate vertices are picked arbitrarily and + * vertices found in the edge collection but not in the input + * vertices are the default attribute. + * + * @tparam VD the vertex attribute type + * @tparam ED the edge attribute type + * @param vertices the "set" of vertices and their attributes + * @param edges the collection of edges in the graph + * @param defaultVertexAttr the default vertex attribute to use for + * vertices that are mentioned in edges but not in vertices + * @param partitionStrategy the partition strategy to use when + * partitioning the edges. + */ + def apply[VD: ClassTag, ED: ClassTag]( + vertices: RDD[(VertexID, VD)], + edges: RDD[Edge[ED]], + defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = { + GraphImpl(vertices, edges, defaultVertexAttr) + } + + /** + * The implicit graphToGraphOPs function extracts the GraphOps member from a graph. + * + * To improve modularity the Graph type only contains a small set of basic operations. All the + * convenience operations are defined in the GraphOps class which may be shared across multiple + * graph implementations. + */ + implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops +} // end of Graph object diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala new file mode 100644 index 0000000000..f8aab951f0 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -0,0 +1,28 @@ +package org.apache.spark.graphx + +import com.esotericsoftware.kryo.Kryo + +import org.apache.spark.graphx.impl._ +import org.apache.spark.serializer.KryoRegistrator +import org.apache.spark.util.collection.BitSet +import org.apache.spark.util.BoundedPriorityQueue + + +class GraphKryoRegistrator extends KryoRegistrator { + + def registerClasses(kryo: Kryo) { + kryo.register(classOf[Edge[Object]]) + kryo.register(classOf[MessageToPartition[Object]]) + kryo.register(classOf[VertexBroadcastMsg[Object]]) + kryo.register(classOf[(VertexID, Object)]) + kryo.register(classOf[EdgePartition[Object]]) + kryo.register(classOf[BitSet]) + kryo.register(classOf[VertexIdToIndexMap]) + kryo.register(classOf[VertexAttributeBlock[Object]]) + kryo.register(classOf[PartitionStrategy]) + kryo.register(classOf[BoundedPriorityQueue[Object]]) + + // This avoids a large number of hash table lookups. + kryo.setReferences(false) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala new file mode 100644 index 0000000000..437288405f --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala @@ -0,0 +1,134 @@ +package org.apache.spark.graphx + +import scala.reflect.ClassTag + +import org.apache.spark.Logging +import scala.collection.JavaConversions._ +import org.apache.spark.rdd.RDD + +/** + * This object implements the GraphLab gather-apply-scatter api. + */ +object GraphLab extends Logging { + + /** + * Execute the GraphLab Gather-Apply-Scatter API + * + * @todo finish documenting GraphLab Gather-Apply-Scatter API + * + * @param graph The graph on which to execute the GraphLab API + * @param gatherFunc The gather function is executed on each edge triplet + * adjacent to a vertex and returns an accumulator which + * is then merged using the merge function. + * @param mergeFunc An accumulative associative operation on the result of + * the gather type. + * @param applyFunc Takes a vertex and the final result of the merge operations + * on the adjacent edges and returns a new vertex value. + * @param scatterFunc Executed after the apply function the scatter function takes + * a triplet and signals whether the neighboring vertex program + * must be recomputed. + * @param startVertices predicate to determine which vertices to start the computation on. + * these will be the active vertices in the first iteration. + * @param numIter The maximum number of iterations to run. + * @param gatherDirection The direction of edges to consider during the gather phase + * @param scatterDirection The direction of edges to consider during the scatter phase + * + * @tparam VD The graph vertex attribute type + * @tparam ED The graph edge attribute type + * @tparam A The type accumulated during the gather phase + * @return the resulting graph after the algorithm converges + */ + def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] + (graph: Graph[VD, ED], numIter: Int, + gatherDirection: EdgeDirection = EdgeDirection.In, + scatterDirection: EdgeDirection = EdgeDirection.Out) + (gatherFunc: (VertexID, EdgeTriplet[VD, ED]) => A, + mergeFunc: (A, A) => A, + applyFunc: (VertexID, VD, Option[A]) => VD, + scatterFunc: (VertexID, EdgeTriplet[VD, ED]) => Boolean, + startVertices: (VertexID, VD) => Boolean = (vid: VertexID, data: VD) => true) + : Graph[VD, ED] = { + + + // Add an active attribute to all vertices to track convergence. + var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices { + case (id, data) => (startVertices(id, data), data) + }.cache() + + // The gather function wrapper strips the active attribute and + // only invokes the gather function on active vertices + def gather(vid: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = { + if (e.vertexAttr(vid)._1) { + val edgeTriplet = new EdgeTriplet[VD,ED] + edgeTriplet.set(e) + edgeTriplet.srcAttr = e.srcAttr._2 + edgeTriplet.dstAttr = e.dstAttr._2 + Some(gatherFunc(vid, edgeTriplet)) + } else { + None + } + } + + // The apply function wrapper strips the vertex of the active attribute + // and only invokes the apply function on active vertices + def apply(vid: VertexID, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = { + val (active, vData) = data + if (active) (true, applyFunc(vid, vData, accum)) + else (false, vData) + } + + // The scatter function wrapper strips the vertex of the active attribute + // and only invokes the scatter function on active vertices + def scatter(rawVertexID: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = { + val vid = e.otherVertexId(rawVertexID) + if (e.vertexAttr(vid)._1) { + val edgeTriplet = new EdgeTriplet[VD,ED] + edgeTriplet.set(e) + edgeTriplet.srcAttr = e.srcAttr._2 + edgeTriplet.dstAttr = e.dstAttr._2 + Some(scatterFunc(vid, edgeTriplet)) + } else { + None + } + } + + // Used to set the active status of vertices for the next round + def applyActive( + vid: VertexID, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = { + val (prevActive, vData) = data + (newActiveOpt.getOrElse(false), vData) + } + + // Main Loop --------------------------------------------------------------------- + var i = 0 + var numActive = activeGraph.numVertices + while (i < numIter && numActive > 0) { + + // Gather + val gathered: RDD[(VertexID, A)] = + activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) + + // Apply + activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache() + + + + // Scatter is basically a gather in the opposite direction so we reverse the edge direction + // activeGraph: Graph[(Boolean, VD), ED] + val scattered: RDD[(VertexID, Boolean)] = + activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) + + activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache() + + // Calculate the number of active vertices + numActive = activeGraph.vertices.map{ + case (vid, data) => if (data._1) 1 else 0 + }.reduce(_ + _) + logInfo("Number active vertices: " + numActive) + i += 1 + } + + // Remove the active attribute from the vertex data before returning the graph + activeGraph.mapVertices{case (vid, data) => data._2 } + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala new file mode 100644 index 0000000000..473cfb18cf --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -0,0 +1,113 @@ +package org.apache.spark.graphx + +import java.util.{Arrays => JArrays} +import scala.reflect.ClassTag + +import org.apache.spark.graphx.impl.EdgePartitionBuilder +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.graphx.impl.{EdgePartition, GraphImpl} +import org.apache.spark.util.collection.PrimitiveVector + + +object GraphLoader extends Logging { + + /** + * Load an edge list from file initializing the Graph + * + * @tparam ED the type of the edge data of the resulting Graph + * + * @param sc the SparkContext used to construct RDDs + * @param path the path to the text file containing the edge list + * @param edgeParser a function that takes an array of strings and + * returns an ED object + * @param minEdgePartitions the number of partitions for the + * the Edge RDD + * + */ + def textFile[ED: ClassTag]( + sc: SparkContext, + path: String, + edgeParser: Array[String] => ED, + minEdgePartitions: Int = 1): + Graph[Int, ED] = { + // Parse the edge data table + val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter => + iter.filter(line => !line.isEmpty && line(0) != '#').map { line => + val lineArray = line.split("\\s+") + if(lineArray.length < 2) { + println("Invalid line: " + line) + assert(false) + } + val source = lineArray(0).trim.toLong + val target = lineArray(1).trim.toLong + val tail = lineArray.drop(2) + val edata = edgeParser(tail) + Edge(source, target, edata) + }) + val defaultVertexAttr = 1 + Graph.fromEdges(edges, defaultVertexAttr) + } + + /** + * Load a graph from an edge list formatted file with each line containing + * two integers: a source Id and a target Id. + * + * @example A file in the following format: + * {{{ + * # Comment Line + * # Source Id <\t> Target Id + * 1 -5 + * 1 2 + * 2 7 + * 1 8 + * }}} + * + * If desired the edges can be automatically oriented in the positive + * direction (source Id < target Id) by setting `canonicalOrientation` to + * true + * + * @param sc + * @param path the path to the file (e.g., /Home/data/file or hdfs://file) + * @param canonicalOrientation whether to orient edges in the positive + * direction. + * @param minEdgePartitions the number of partitions for the + * the Edge RDD + * @tparam ED + * @return + */ + def edgeListFile( + sc: SparkContext, + path: String, + canonicalOrientation: Boolean = false, + minEdgePartitions: Int = 1): + Graph[Int, Int] = { + val startTime = System.currentTimeMillis + + // Parse the edge data table directly into edge partitions + val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) => + val builder = new EdgePartitionBuilder[Int] + iter.foreach { line => + if (!line.isEmpty && line(0) != '#') { + val lineArray = line.split("\\s+") + if (lineArray.length < 2) { + logWarning("Invalid line: " + line) + } + val srcId = lineArray(0).toLong + val dstId = lineArray(1).toLong + if (canonicalOrientation && dstId > srcId) { + builder.add(dstId, srcId, 1) + } else { + builder.add(srcId, dstId, 1) + } + } + } + Iterator((pid, builder.toEdgePartition)) + }.cache() + edges.count() + + logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) + + GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) + } // end of edgeListFile + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala new file mode 100644 index 0000000000..cacfcb1c90 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -0,0 +1,277 @@ +package org.apache.spark.graphx + +import scala.reflect.ClassTag + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.apache.spark.SparkException + + +/** + * `GraphOps` contains additional functionality (syntatic sugar) for + * the graph type and is implicitly constructed for each Graph object. + * All operations in `GraphOps` are expressed in terms of the + * efficient GraphX API. + * + * @tparam VD the vertex attribute type + * @tparam ED the edge attribute type + * + */ +class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { + + /** + * Compute the number of edges in the graph. + */ + lazy val numEdges: Long = graph.edges.count() + + + /** + * Compute the number of vertices in the graph. + */ + lazy val numVertices: Long = graph.vertices.count() + + + /** + * Compute the in-degree of each vertex in the Graph returning an + * RDD. + * @note Vertices with no in edges are not returned in the resulting RDD. + */ + lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In) + + + /** + * Compute the out-degree of each vertex in the Graph returning an RDD. + * @note Vertices with no out edges are not returned in the resulting RDD. + */ + lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out) + + + /** + * Compute the degrees of each vertex in the Graph returning an RDD. + * @note Vertices with no edges are not returned in the resulting + * RDD. + */ + lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Both) + + + /** + * Compute the neighboring vertex degrees. + * + * @param edgeDirection the direction along which to collect + * neighboring vertex attributes. + */ + private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = { + if (edgeDirection == EdgeDirection.In) { + graph.mapReduceTriplets(et => Iterator((et.dstId,1)), _ + _) + } else if (edgeDirection == EdgeDirection.Out) { + graph.mapReduceTriplets(et => Iterator((et.srcId,1)), _ + _) + } else { // EdgeDirection.both + graph.mapReduceTriplets(et => Iterator((et.srcId,1), (et.dstId,1)), _ + _) + } + } + + + /** + * This function is used to compute a statistic for the neighborhood + * of each vertex and returns a value for all vertices (including + * those without neighbors). + * + * @note Because the a default value is provided all vertices will + * have a corresponding entry in the returned RDD. + * + * @param mapFunc the function applied to each edge adjacent to each + * vertex. The mapFunc can optionally return None in which case it + * does not contribute to the final sum. + * @param reduceFunc the function used to merge the results of each + * map operation. + * @param default the default value to use for each vertex if it has + * no neighbors or the map function repeatedly evaluates to none + * @param direction the direction of edges to consider (e.g., In, + * Out, Both). + * @tparam VD2 The returned type of the aggregation operation. + * + * @return A Spark.RDD containing tuples of vertex identifiers and + * their resulting value. There will be exactly one entry for ever + * vertex in the original graph. + * + * @example We can use this function to compute the average follower + * age for each user + * + * {{{ + * val graph: Graph[Int,Int] = loadGraph() + * val averageFollowerAge: RDD[(Int, Int)] = + * graph.aggregateNeighbors[(Int,Double)]( + * (vid, edge) => (edge.otherVertex(vid).data, 1), + * (a, b) => (a._1 + b._1, a._2 + b._2), + * -1, + * EdgeDirection.In) + * .mapValues{ case (sum,followers) => sum.toDouble / followers} + * }}} + * + * @todo Should this return a graph with the new vertex values? + * + */ + def aggregateNeighbors[A: ClassTag]( + mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A], + reduceFunc: (A, A) => A, + dir: EdgeDirection) + : VertexRDD[A] = { + + // Define a new map function over edge triplets + val mf = (et: EdgeTriplet[VD,ED]) => { + // Compute the message to the dst vertex + val dst = + if (dir == EdgeDirection.In || dir == EdgeDirection.Both) { + mapFunc(et.dstId, et) + } else { Option.empty[A] } + // Compute the message to the source vertex + val src = + if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) { + mapFunc(et.srcId, et) + } else { Option.empty[A] } + // construct the return array + (src, dst) match { + case (None, None) => Iterator.empty + case (Some(srcA),None) => Iterator((et.srcId, srcA)) + case (None, Some(dstA)) => Iterator((et.dstId, dstA)) + case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA)) + } + } + + graph.mapReduceTriplets(mf, reduceFunc) + } // end of aggregateNeighbors + + + /** + * Return the Ids of the neighboring vertices. + * + * @param edgeDirection the direction along which to collect + * neighboring vertices + * + * @return the vertex set of neighboring ids for each vertex. + */ + def collectNeighborIds(edgeDirection: EdgeDirection) : + VertexRDD[Array[VertexID]] = { + val nbrs = + if (edgeDirection == EdgeDirection.Both) { + graph.mapReduceTriplets[Array[VertexID]]( + mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))), + reduceFunc = _ ++ _ + ) + } else if (edgeDirection == EdgeDirection.Out) { + graph.mapReduceTriplets[Array[VertexID]]( + mapFunc = et => Iterator((et.srcId, Array(et.dstId))), + reduceFunc = _ ++ _) + } else if (edgeDirection == EdgeDirection.In) { + graph.mapReduceTriplets[Array[VertexID]]( + mapFunc = et => Iterator((et.dstId, Array(et.srcId))), + reduceFunc = _ ++ _) + } else { + throw new SparkException("It doesn't make sense to collect neighbor ids without a direction.") + } + graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => + nbrsOpt.getOrElse(Array.empty[VertexID]) + } + } // end of collectNeighborIds + + + /** + * Collect the neighbor vertex attributes for each vertex. + * + * @note This function could be highly inefficient on power-law + * graphs where high degree vertices may force a large ammount of + * information to be collected to a single location. + * + * @param edgeDirection the direction along which to collect + * neighboring vertices + * + * @return the vertex set of neighboring vertex attributes for each + * vertex. + */ + def collectNeighbors(edgeDirection: EdgeDirection) : + VertexRDD[ Array[(VertexID, VD)] ] = { + val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]]( + (vid, edge) => + Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )), + (a, b) => a ++ b, + edgeDirection) + + graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => + nbrsOpt.getOrElse(Array.empty[(VertexID, VD)]) + } + } // end of collectNeighbor + + + /** + * Join the vertices with an RDD and then apply a function from the + * the vertex and RDD entry to a new vertex value. The input table + * should contain at most one entry for each vertex. If no entry is + * provided the map function is skipped and the old value is used. + * + * @tparam U the type of entry in the table of updates + * @param table the table to join with the vertices in the graph. + * The table should contain at most one entry for each vertex. + * @param mapFunc the function used to compute the new vertex + * values. The map function is invoked only for vertices with a + * corresponding entry in the table otherwise the old vertex value + * is used. + * + * @note for small tables this function can be much more efficient + * than leftJoinVertices + * + * @example This function is used to update the vertices with new + * values based on external data. For example we could add the out + * degree to each vertex record + * + * {{{ + * val rawGraph: Graph[Int,()] = Graph.textFile("webgraph") + * .mapVertices(v => 0) + * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees() + * val graph = rawGraph.leftJoinVertices[Int,Int](outDeg, + * (v, deg) => deg ) + * }}} + * + */ + def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD) + : Graph[VD, ED] = { + val uf = (id: VertexID, data: VD, o: Option[U]) => { + o match { + case Some(u) => mapFunc(id, data, u) + case None => data + } + } + graph.outerJoinVertices(table)(uf) + } + + /** + * Filter the graph by computing some values to filter on, and applying the predicates. + * + * @param preprocess a function to compute new vertex and edge data before filtering + * @param epred edge pred to filter on after preprocess, see more details under Graph#subgraph + * @param vpred vertex pred to filter on after prerocess, see more details under Graph#subgraph + * @tparam VD2 vertex type the vpred operates on + * @tparam ED2 edge type the epred operates on + * @return a subgraph of the orginal graph, with its data unchanged + * + * @example This function can be used to filter the graph based on some property, without + * changing the vertex and edge values in your program. For example, we could remove the vertices + * in a graph with 0 outdegree + * + * {{{ + * graph.filter( + * graph => { + * val degrees: VertexSetRDD[Int] = graph.outDegrees + * graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} + * }, + * vpred = (vid: VertexID, deg:Int) => deg > 0 + * ) + * }}} + * + */ + def filter[VD2: ClassTag, ED2: ClassTag]( + preprocess: Graph[VD, ED] => Graph[VD2, ED2], + epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true, + vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = { + graph.mask(preprocess(graph).subgraph(epred, vpred)) + } +} // end of GraphOps diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala new file mode 100644 index 0000000000..5e80a535f1 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -0,0 +1,94 @@ +package org.apache.spark.graphx + + +sealed trait PartitionStrategy extends Serializable { + def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID +} + + +/** + * This function implements a classic 2D-Partitioning of a sparse matrix. + * Suppose we have a graph with 11 vertices that we want to partition + * over 9 machines. We can use the following sparse matrix representation: + * + * __________________________________ + * v0 | P0 * | P1 | P2 * | + * v1 | **** | * | | + * v2 | ******* | ** | **** | + * v3 | ***** | * * | * | + * ---------------------------------- + * v4 | P3 * | P4 *** | P5 ** * | + * v5 | * * | * | | + * v6 | * | ** | **** | + * v7 | * * * | * * | * | + * ---------------------------------- + * v8 | P6 * | P7 * | P8 * *| + * v9 | * | * * | | + * v10 | * | ** | * * | + * v11 | * <-E | *** | ** | + * ---------------------------------- + * + * The edge denoted by E connects v11 with v1 and is assigned to + * processor P6. To get the processor number we divide the matrix + * into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges + * adjacent to v11 can only be in the first colum of + * blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8). + * As a consequence we can guarantee that v11 will need to be + * replicated to at most 2 * sqrt(numProc) machines. + * + * Notice that P0 has many edges and as a consequence this + * partitioning would lead to poor work balance. To improve + * balance we first multiply each vertex id by a large prime + * to effectively shuffle the vertex locations. + * + * One of the limitations of this approach is that the number of + * machines must either be a perfect square. We partially address + * this limitation by computing the machine assignment to the next + * largest perfect square and then mapping back down to the actual + * number of machines. Unfortunately, this can also lead to work + * imbalance and so it is suggested that a perfect square is used. + * + * + */ +case object EdgePartition2D extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt + val mixingPrime: VertexID = 1125899906842597L + val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt + val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt + (col * ceilSqrtNumParts + row) % numParts + } +} + + +case object EdgePartition1D extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + val mixingPrime: VertexID = 1125899906842597L + (math.abs(src) * mixingPrime).toInt % numParts + } +} + + +/** + * Assign edges to an aribtrary machine corresponding to a + * random vertex cut. + */ +case object RandomVertexCut extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + math.abs((src, dst).hashCode()) % numParts + } +} + + +/** + * Assign edges to an arbitrary machine corresponding to a random vertex cut. This + * function ensures that edges of opposite direction between the same two vertices + * will end up on the same partition. + */ +case object CanonicalRandomVertexCut extends PartitionStrategy { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + val lower = math.min(src, dst) + val higher = math.max(src, dst) + math.abs((lower, higher).hashCode()) % numParts + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala new file mode 100644 index 0000000000..8ddb788135 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -0,0 +1,122 @@ +package org.apache.spark.graphx + +import scala.reflect.ClassTag + + +/** + * This object implements a Pregel-like bulk-synchronous + * message-passing API. However, unlike the original Pregel API the + * GraphX pregel API factors the sendMessage computation over edges, + * enables the message sending computation to read both vertex + * attributes, and finally constrains messages to the graph structure. + * These changes allow for substantially more efficient distributed + * execution while also exposing greater flexibility for graph based + * computation. + * + * @example We can use the Pregel abstraction to implement PageRank + * {{{ + * val pagerankGraph: Graph[Double, Double] = graph + * // Associate the degree with each vertex + * .outerJoinVertices(graph.outDegrees){ + * (vid, vdata, deg) => deg.getOrElse(0) + * } + * // Set the weight on the edges based on the degree + * .mapTriplets( e => 1.0 / e.srcAttr ) + * // Set the vertex attributes to the initial pagerank values + * .mapVertices( (id, attr) => 1.0 ) + * + * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = + * resetProb + (1.0 - resetProb) * msgSum + * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Option[Double] = + * Some(edge.srcAttr * edge.attr) + * def messageCombiner(a: Double, b: Double): Double = a + b + * val initialMessage = 0.0 + * // Execute pregel for a fixed number of iterations. + * Pregel(pagerankGraph, initialMessage, numIter)( + * vertexProgram, sendMessage, messageCombiner) + * }}} + * + */ +object Pregel { + + /** + * Execute a Pregel-like iterative vertex-parallel abstraction. The + * user-defined vertex-program `vprog` is executed in parallel on + * each vertex receiving any inbound messages and computing a new + * value for the vertex. The `sendMsg` function is then invoked on + * all out-edges and is used to compute an optional message to the + * destination vertex. The `mergeMsg` function is a commutative + * associative function used to combine messages destined to the + * same vertex. + * + * On the first iteration all vertices receive the `initialMsg` and + * on subsequent iterations if a vertex does not receive a message + * then the vertex-program is not invoked. + * + * This function iterates until there are no remaining messages, or + * for maxIterations iterations. + * + * @tparam VD the vertex data type + * @tparam ED the edge data type + * @tparam A the Pregel message type + * + * @param graph the input graph. + * + * @param initialMsg the message each vertex will receive at the on + * the first iteration. + * + * @param maxIterations the maximum number of iterations to run for. + * + * @param vprog the user-defined vertex program which runs on each + * vertex and receives the inbound message and computes a new vertex + * value. On the first iteration the vertex program is invoked on + * all vertices and is passed the default message. On subsequent + * iterations the vertex program is only invoked on those vertices + * that receive messages. + * + * @param sendMsg a user supplied function that is applied to out + * edges of vertices that received messages in the current + * iteration. + * + * @param mergeMsg a user supplied function that takes two incoming + * messages of type A and merges them into a single message of type + * A. ''This function must be commutative and associative and + * ideally the size of A should not increase.'' + * + * @return the resulting graph at the end of the computation + * + */ + def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] + (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)( + vprog: (VertexID, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = { + + var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ) + // compute the messages + var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache() + var activeMessages = messages.count() + // Loop + var i = 0 + while (activeMessages > 0 && i < maxIterations) { + // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. + val newVerts = g.vertices.innerJoin(messages)(vprog).cache() + // Update the graph with the new vertices. + g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) } + + val oldMessages = messages + // Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't + // get to send messages. + messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache() + activeMessages = messages.count() + // after counting we can unpersist the old messages + oldMessages.unpersist(blocking=false) + // count the iteration + i += 1 + } + + g + } // end of apply + +} // end of class Pregel diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala new file mode 100644 index 0000000000..cfee9b089f --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx + +import scala.reflect.ClassTag + +import org.apache.spark._ +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ +import org.apache.spark.storage.StorageLevel + +import org.apache.spark.graphx.impl.MsgRDDFunctions +import org.apache.spark.graphx.impl.VertexPartition + + +/** + * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is + * only one entry for each vertex and by pre-indexing the entries for fast, + * efficient joins. + * + * @tparam VD the vertex attribute associated with each vertex in the set. + * + * To construct a `VertexRDD` use the singleton object: + * + * @example Construct a `VertexRDD` from a plain RDD + * {{{ + * // Construct an intial vertex set + * val someData: RDD[(VertexID, SomeType)] = loadData(someFile) + * val vset = VertexRDD(someData) + * // If there were redundant values in someData we would use a reduceFunc + * val vset2 = VertexRDD(someData, reduceFunc) + * // Finally we can use the VertexRDD to index another dataset + * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile) + * val vset3 = VertexRDD(otherData, vset.index) + * // Now we can construct very fast joins between the two sets + * val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3) + * }}} + * + */ +class VertexRDD[@specialized VD: ClassTag]( + val partitionsRDD: RDD[VertexPartition[VD]]) + extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + + require(partitionsRDD.partitioner.isDefined) + + partitionsRDD.setName("VertexRDD") + + /** + * Construct a new VertexRDD that is indexed by only the keys in the RDD. + * The resulting VertexRDD will be based on a different index and can + * no longer be quickly joined with this RDD. + */ + def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex())) + + /** + * The partitioner is defined by the index. + */ + override val partitioner = partitionsRDD.partitioner + + /** + * The actual partitions are defined by the tuples. + */ + override protected def getPartitions: Array[Partition] = partitionsRDD.partitions + + /** + * The preferred locations are computed based on the preferred + * locations of the tuples. + */ + override protected def getPreferredLocations(s: Partition): Seq[String] = + partitionsRDD.preferredLocations(s) + + /** + * Caching a VertexRDD causes the index and values to be cached separately. + */ + override def persist(newLevel: StorageLevel): VertexRDD[VD] = { + partitionsRDD.persist(newLevel) + this + } + + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ + override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY) + + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ + override def cache(): VertexRDD[VD] = persist() + + /** Return the number of vertices in this set. */ + override def count(): Long = { + partitionsRDD.map(_.size).reduce(_ + _) + } + + /** + * Provide the `RDD[(VertexID, VD)]` equivalent output. + */ + override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = { + firstParent[VertexPartition[VD]].iterator(part, context).next.iterator + } + + /** + * Return a new VertexRDD by applying a function to each VertexPartition of this RDD. + */ + def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2]) + : VertexRDD[VD2] = { + val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true) + new VertexRDD(newPartitionsRDD) + } + + + /** + * Restrict the vertex set to the set of vertices satisfying the + * given predicate. + * + * @param pred the user defined predicate, which takes a tuple to conform to + * the RDD[(VertexID, VD)] interface + * + * @note The vertex set preserves the original index structure + * which means that the returned RDD can be easily joined with + * the original vertex-set. Furthermore, the filter only + * modifies the bitmap index and so no new values are allocated. + */ + override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] = + this.mapVertexPartitions(_.filter(Function.untupled(pred))) + + /** + * Pass each vertex attribute through a map function and retain the + * original RDD's partitioning and index. + * + * @tparam VD2 the type returned by the map function + * + * @param f the function applied to each value in the RDD + * @return a new VertexRDD with values obtained by applying `f` to + * each of the entries in the original VertexRDD. The resulting + * VertexRDD retains the same index. + */ + def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] = + this.mapVertexPartitions(_.map((vid, attr) => f(attr))) + + /** + * Pass each vertex attribute through a map function and retain the + * original RDD's partitioning and index. + * + * @tparam VD2 the type returned by the map function + * + * @param f the function applied to each value in the RDD + * @return a new VertexRDD with values obtained by applying `f` to + * each of the entries in the original VertexRDD. The resulting + * VertexRDD retains the same index. + */ + def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] = + this.mapVertexPartitions(_.map(f)) + + /** + * Hides vertices that are the same between this and other. For vertices that are different, keeps + * the values from `other`. + */ + def diff(other: VertexRDD[VD]): VertexRDD[VD] = { + val newPartitionsRDD = partitionsRDD.zipPartitions( + other.partitionsRDD, preservesPartitioning = true + ) { (thisIter, otherIter) => + val thisPart = thisIter.next() + val otherPart = otherIter.next() + Iterator(thisPart.diff(otherPart)) + } + new VertexRDD(newPartitionsRDD) + } + + /** + * Left join this VertexSet with another VertexSet which has the + * same Index. This function will fail if both VertexSets do not + * share the same index. The resulting vertex set contains an entry + * for each vertex in this set. If the other VertexSet is missing + * any vertex in this VertexSet then a `None` attribute is generated + * + * @tparam VD2 the attribute type of the other VertexSet + * @tparam VD3 the attribute type of the resulting VertexSet + * + * @param other the other VertexSet with which to join. + * @param f the function mapping a vertex id and its attributes in + * this and the other vertex set to a new vertex attribute. + * @return a VertexRDD containing all the vertices in this + * VertexSet with `None` attributes used for Vertices missing in the + * other VertexSet. + * + */ + def leftZipJoin[VD2: ClassTag, VD3: ClassTag] + (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { + val newPartitionsRDD = partitionsRDD.zipPartitions( + other.partitionsRDD, preservesPartitioning = true + ) { (thisIter, otherIter) => + val thisPart = thisIter.next() + val otherPart = otherIter.next() + Iterator(thisPart.leftJoin(otherPart)(f)) + } + new VertexRDD(newPartitionsRDD) + } + + /** + * Left join this VertexRDD with an RDD containing vertex attribute + * pairs. If the other RDD is backed by a VertexRDD with the same + * index than the efficient leftZipJoin implementation is used. The + * resulting vertex set contains an entry for each vertex in this + * set. If the other VertexRDD is missing any vertex in this + * VertexRDD then a `None` attribute is generated. + * + * If there are duplicates, the vertex is picked at random. + * + * @tparam VD2 the attribute type of the other VertexRDD + * @tparam VD3 the attribute type of the resulting VertexRDD + * + * @param other the other VertexRDD with which to join. + * @param f the function mapping a vertex id and its attributes in + * this and the other vertex set to a new vertex attribute. + * @return a VertexRDD containing all the vertices in this + * VertexRDD with the attribute emitted by f. + */ + def leftJoin[VD2: ClassTag, VD3: ClassTag] + (other: RDD[(VertexID, VD2)]) + (f: (VertexID, VD, Option[VD2]) => VD3) + : VertexRDD[VD3] = + { + // Test if the other vertex is a VertexRDD to choose the optimal join strategy. + // If the other set is a VertexRDD then we use the much more efficient leftZipJoin + other match { + case other: VertexRDD[_] => + leftZipJoin(other)(f) + case _ => + new VertexRDD[VD3]( + partitionsRDD.zipPartitions( + other.partitionBy(this.partitioner.get), preservesPartitioning = true) + { (part, msgs) => + val vertexPartition: VertexPartition[VD] = part.next() + Iterator(vertexPartition.leftJoin(msgs)(f)) + } + ) + } + } + + /** + * Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other` + * must have the same index. + */ + def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) + (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { + val newPartitionsRDD = partitionsRDD.zipPartitions( + other.partitionsRDD, preservesPartitioning = true + ) { (thisIter, otherIter) => + val thisPart = thisIter.next() + val otherPart = otherIter.next() + Iterator(thisPart.innerJoin(otherPart)(f)) + } + new VertexRDD(newPartitionsRDD) + } + + /** + * Replace vertices with corresponding vertices in `other`, and drop vertices without a + * corresponding vertex in `other`. + */ + def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) + (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { + // Test if the other vertex is a VertexRDD to choose the optimal join strategy. + // If the other set is a VertexRDD then we use the much more efficient innerZipJoin + other match { + case other: VertexRDD[_] => + innerZipJoin(other)(f) + case _ => + new VertexRDD( + partitionsRDD.zipPartitions( + other.partitionBy(this.partitioner.get), preservesPartitioning = true) + { (part, msgs) => + val vertexPartition: VertexPartition[VD] = part.next() + Iterator(vertexPartition.innerJoin(msgs)(f)) + } + ) + } + } + + /** + * Aggregate messages with the same ids using `reduceFunc`, returning a VertexRDD that is + * co-indexed with this one. + */ + def aggregateUsingIndex[VD2: ClassTag]( + messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = + { + val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) + val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => + val vertexPartition: VertexPartition[VD] = thisIter.next() + Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc)) + } + new VertexRDD[VD2](parts) + } + +} // end of VertexRDD + + +/** + * The VertexRDD singleton is used to construct VertexRDDs + */ +object VertexRDD { + + /** + * Construct a vertex set from an RDD of vertex-attribute pairs. + * Duplicate entries are removed arbitrarily. + * + * @tparam VD the vertex attribute type + * + * @param rdd the collection of vertex-attribute pairs + */ + def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = { + val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { + case Some(p) => rdd + case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) + } + val vertexPartitions = partitioned.mapPartitions( + iter => Iterator(VertexPartition(iter)), + preservesPartitioning = true) + new VertexRDD(vertexPartitions) + } + + /** + * Construct a vertex set from an RDD of vertex-attribute pairs. + * Duplicate entries are merged using mergeFunc. + * + * @tparam VD the vertex attribute type + * + * @param rdd the collection of vertex-attribute pairs + * @param mergeFunc the associative, commutative merge function. + */ + def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = + { + val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { + case Some(p) => rdd + case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) + } + val vertexPartitions = partitioned.mapPartitions( + iter => Iterator(VertexPartition(iter)), + preservesPartitioning = true) + new VertexRDD(vertexPartitions) + } + + def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD) + : VertexRDD[VD] = + { + VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) => + value.getOrElse(default) + } + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala new file mode 100644 index 0000000000..a0dd36da60 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala @@ -0,0 +1,37 @@ +package org.apache.spark.graphx.algorithms + +import org.apache.spark.graphx._ + + +object ConnectedComponents { + /** + * Compute the connected component membership of each vertex and return an RDD with the vertex + * value containing the lowest vertex id in the connected component containing that vertex. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * + * @param graph the graph for which to compute the connected components + * + * @return a graph with vertex attributes containing the smallest vertex in each + * connected component + */ + def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]): Graph[VertexID, ED] = { + val ccGraph = graph.mapVertices { case (vid, _) => vid } + + def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { + if (edge.srcAttr < edge.dstAttr) { + Iterator((edge.dstId, edge.srcAttr)) + } else if (edge.srcAttr > edge.dstAttr) { + Iterator((edge.srcId, edge.dstAttr)) + } else { + Iterator.empty + } + } + val initialMessage = Long.MaxValue + Pregel(ccGraph, initialMessage)( + vprog = (id, attr, msg) => math.min(attr, msg), + sendMsg = sendMessage, + mergeMsg = (a, b) => math.min(a, b)) + } // end of connectedComponents +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala new file mode 100644 index 0000000000..0292b7316d --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala @@ -0,0 +1,205 @@ +package org.apache.spark.graphx.algorithms + +import org.apache.spark.Logging +import org.apache.spark.graphx._ + + +object PageRank extends Logging { + + /** + * Run PageRank for a fixed number of iterations returning a graph + * with vertex attributes containing the PageRank and edge + * attributes the normalized edge weight. + * + * The following PageRank fixed point is computed for each vertex. + * + * {{{ + * var PR = Array.fill(n)( 1.0 ) + * val oldPR = Array.fill(n)( 1.0 ) + * for( iter <- 0 until numIter ) { + * swap(oldPR, PR) + * for( i <- 0 until n ) { + * PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum + * } + * } + * }}} + * + * where `alpha` is the random reset probability (typically 0.15), + * `inNbrs[i]` is the set of neighbors whick link to `i` and + * `outDeg[j]` is the out degree of vertex `j`. + * + * Note that this is not the "normalized" PageRank and as a consequence pages that have no + * inlinks will have a PageRank of alpha. + * + * @tparam VD the original vertex attribute (not used) + * @tparam ED the original edge attribute (not used) + * + * @param graph the graph on which to compute PageRank + * @param numIter the number of iterations of PageRank to run + * @param resetProb the random reset probability (alpha) + * + * @return the graph containing with each vertex containing the PageRank and each edge + * containing the normalized weight. + * + */ + def run[VD: Manifest, ED: Manifest]( + graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = + { + + /** + * Initialize the pagerankGraph with each edge attribute having + * weight 1/outDegree and each vertex with attribute 1.0. + */ + val pagerankGraph: Graph[Double, Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees){ + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr ) + // Set the vertex attributes to the initial pagerank values + .mapVertices( (id, attr) => 1.0 ) + + // Display statistics about pagerank + logInfo(pagerankGraph.statistics.toString) + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = + resetProb + (1.0 - resetProb) * msgSum + def sendMessage(edge: EdgeTriplet[Double, Double]) = + Iterator((edge.dstId, edge.srcAttr * edge.attr)) + def messageCombiner(a: Double, b: Double): Double = a + b + // The initial message received by all vertices in PageRank + val initialMessage = 0.0 + + // Execute pregel for a fixed number of iterations. + Pregel(pagerankGraph, initialMessage, numIter)( + vertexProgram, sendMessage, messageCombiner) + } + + /** + * Run a dynamic version of PageRank returning a graph with vertex attributes containing the + * PageRank and edge attributes containing the normalized edge weight. + * + * {{{ + * var PR = Array.fill(n)( 1.0 ) + * val oldPR = Array.fill(n)( 0.0 ) + * while( max(abs(PR - oldPr)) > tol ) { + * swap(oldPR, PR) + * for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) { + * PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum + * } + * } + * }}} + * + * where `alpha` is the random reset probability (typically 0.15), `inNbrs[i]` is the set of + * neighbors whick link to `i` and `outDeg[j]` is the out degree of vertex `j`. + * + * Note that this is not the "normalized" PageRank and as a consequence pages that have no + * inlinks will have a PageRank of alpha. + * + * @tparam VD the original vertex attribute (not used) + * @tparam ED the original edge attribute (not used) + * + * @param graph the graph on which to compute PageRank + * @param tol the tolerance allowed at convergence (smaller => more * accurate). + * @param resetProb the random reset probability (alpha) + * + * @return the graph containing with each vertex containing the PageRank and each edge + * containing the normalized weight. + */ + def runUntillConvergence[VD: Manifest, ED: Manifest]( + graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = + { + // Initialize the pagerankGraph with each edge attribute + // having weight 1/outDegree and each vertex with attribute 1.0. + val pagerankGraph: Graph[(Double, Double), Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees) { + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets( e => 1.0 / e.srcAttr ) + // Set the vertex attributes to (initalPR, delta = 0) + .mapVertices( (id, attr) => (0.0, 0.0) ) + + // Display statistics about pagerank + logInfo(pagerankGraph.statistics.toString) + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = { + val (oldPR, lastDelta) = attr + val newPR = oldPR + (1.0 - resetProb) * msgSum + (newPR, newPR - oldPR) + } + + def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { + if (edge.srcAttr._2 > tol) { + Iterator((edge.dstId, edge.srcAttr._2 * edge.attr)) + } else { + Iterator.empty + } + } + + def messageCombiner(a: Double, b: Double): Double = a + b + + // The initial message received by all vertices in PageRank + val initialMessage = resetProb / (1.0 - resetProb) + + // Execute a dynamic version of Pregel. + Pregel(pagerankGraph, initialMessage)(vertexProgram, sendMessage, messageCombiner) + .mapVertices((vid, attr) => attr._1) + } // end of deltaPageRank + + def runStandalone[VD: Manifest, ED: Manifest]( + graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): VertexRDD[Double] = { + + // Initialize the ranks + var ranks: VertexRDD[Double] = graph.vertices.mapValues((vid, attr) => resetProb).cache() + + // Initialize the delta graph where each vertex stores its delta and each edge knows its weight + var deltaGraph: Graph[Double, Double] = + graph.outerJoinVertices(graph.outDegrees)((vid, vdata, deg) => deg.getOrElse(0)) + .mapTriplets(e => 1.0 / e.srcAttr) + .mapVertices((vid, degree) => resetProb).cache() + var numDeltas: Long = ranks.count() + + var prevDeltas: Option[VertexRDD[Double]] = None + + var i = 0 + val weight = (1.0 - resetProb) + while (numDeltas > 0) { + // Compute new deltas. Only deltas that existed in the last round (i.e., were greater than + // `tol`) get to send messages; those that were less than `tol` would send messages less than + // `tol` as well. + val deltas = deltaGraph + .mapReduceTriplets[Double]( + et => Iterator((et.dstId, et.srcAttr * et.attr * weight)), + _ + _, + prevDeltas.map((_, EdgeDirection.Out))) + .filter { case (vid, delta) => delta > tol } + .cache() + prevDeltas = Some(deltas) + numDeltas = deltas.count() + logInfo("Standalone PageRank: iter %d has %d deltas".format(i, numDeltas)) + + // Update deltaGraph with the deltas + deltaGraph = deltaGraph.outerJoinVertices(deltas) { (vid, old, newOpt) => + newOpt.getOrElse(old) + }.cache() + + // Update ranks + ranks = ranks.leftZipJoin(deltas) { (vid, oldRank, deltaOpt) => + oldRank + deltaOpt.getOrElse(0.0) + } + ranks.foreach(x => {}) // force the iteration for ease of debugging + + i += 1 + } + + ranks + } + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala new file mode 100644 index 0000000000..8fdfa3d907 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala @@ -0,0 +1,103 @@ +package org.apache.spark.graphx.algorithms + +import org.apache.spark.rdd._ +import org.apache.spark.graphx._ +import scala.util.Random +import org.apache.commons.math.linear._ + +class SVDPlusPlusConf( // SVDPlusPlus parameters + var rank: Int, + var maxIters: Int, + var minVal: Double, + var maxVal: Double, + var gamma1: Double, + var gamma2: Double, + var gamma6: Double, + var gamma7: Double) extends Serializable + +object SVDPlusPlus { + /** + * Implement SVD++ based on "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model", + * paper is available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]]. + * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)), see the details on page 6. + * + * @param edges edges for constructing the graph + * + * @param conf SVDPlusPlus parameters + * + * @return a graph with vertex attributes containing the trained model + */ + + def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf): (Graph[(RealVector, RealVector, Double, Double), Double], Double) = { + + // generate default vertex attribute + def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = { + val v1 = new ArrayRealVector(rank) + val v2 = new ArrayRealVector(rank) + for (i <- 0 until rank) { + v1.setEntry(i, Random.nextDouble) + v2.setEntry(i, Random.nextDouble) + } + (v1, v2, 0.0, 0.0) + } + + // calculate global rating mean + val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) + val u = rs / rc + + // construct graph + var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache() + + // calculate initial bias and norm + var t0 = g.mapReduceTriplets(et => + Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2)) + g = g.outerJoinVertices(t0) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => + (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) + } + + def mapTrainF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) + : Iterator[(VertexID, (RealVector, RealVector, Double))] = { + val (usr, itm) = (et.srcAttr, et.dstAttr) + val (p, q) = (usr._1, itm._1) + var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) + pred = math.max(pred, conf.minVal) + pred = math.min(pred, conf.maxVal) + val err = et.attr - pred + val updateP = ((q.mapMultiply(err)).subtract(p.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + val updateQ = ((usr._2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + val updateY = ((q.mapMultiply(err * usr._4)).subtract((itm._2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)), + (et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1))) + } + + for (i <- 0 until conf.maxIters) { + // phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes + var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2)) + g = g.outerJoinVertices(t1) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => + if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd + } + // phase 2, update p for user nodes and q, y for item nodes + val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) => + (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3)) + g = g.outerJoinVertices(t2) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) => + (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4) + } + } + + // calculate error on training set + def mapTestF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(VertexID, Double)] = { + val (usr, itm) = (et.srcAttr, et.dstAttr) + val (p, q) = (usr._1, itm._1) + var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) + pred = math.max(pred, conf.minVal) + pred = math.min(pred, conf.maxVal) + val err = (et.attr - pred) * (et.attr - pred) + Iterator((et.dstId, err)) + } + val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) + g = g.outerJoinVertices(t3) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => + if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd + } + (g, u) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala new file mode 100644 index 0000000000..f64fc3ef0f --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala @@ -0,0 +1,87 @@ +package org.apache.spark.graphx.algorithms + +import org.apache.spark.graphx._ + +object StronglyConnectedComponents { + + /** + * Compute the strongly connected component (SCC) of each vertex and return an RDD with the vertex + * value containing the lowest vertex id in the SCC containing that vertex. + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (preserved in the computation) + * + * @param graph the graph for which to compute the SCC + * + * @return a graph with vertex attributes containing the smallest vertex id in each SCC + */ + def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = { + + // the graph we update with final SCC ids, and the graph we return at the end + var sccGraph = graph.mapVertices { case (vid, _) => vid } + // graph we are going to work with in our iterations + var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) } + + var numVertices = sccWorkGraph.numVertices + var iter = 0 + while (sccWorkGraph.numVertices > 0 && iter < numIter) { + iter += 1 + do { + numVertices = sccWorkGraph.numVertices + sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.outDegrees) { + (vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true) + } + sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.inDegrees) { + (vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true) + } + + // get all vertices to be removed + val finalVertices = sccWorkGraph.vertices + .filter { case (vid, (scc, isFinal)) => isFinal} + .mapValues { (vid, data) => data._1} + + // write values to sccGraph + sccGraph = sccGraph.outerJoinVertices(finalVertices) { + (vid, scc, opt) => opt.getOrElse(scc) + } + // only keep vertices that are not final + sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2) + } while (sccWorkGraph.numVertices < numVertices) + + sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) } + + // collect min of all my neighbor's scc values, update if it's smaller than mine + // then notify any neighbors with scc values larger than mine + sccWorkGraph = GraphLab[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Integer.MAX_VALUE)( + (vid, e) => e.otherVertexAttr(vid)._1, + (vid1, vid2) => math.min(vid1, vid2), + (vid, scc, optScc) => + (math.min(scc._1, optScc.getOrElse(scc._1)), scc._2), + (vid, e) => e.vertexAttr(vid)._1 < e.otherVertexAttr(vid)._1 + ) + + // start at root of SCCs. Traverse values in reverse, notify all my neighbors + // do not propagate if colors do not match! + sccWorkGraph = GraphLab[(VertexID, Boolean), ED, Boolean]( + sccWorkGraph, + Integer.MAX_VALUE, + EdgeDirection.Out, + EdgeDirection.In + )( + // vertex is final if it is the root of a color + // or it has the same color as a neighbor that is final + (vid, e) => (vid == e.vertexAttr(vid)._1) || (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1), + (final1, final2) => final1 || final2, + (vid, scc, optFinal) => + (scc._1, scc._2 || optFinal.getOrElse(false)), + // activate neighbor if they are not final, you are, and you have the same color + (vid, e) => e.vertexAttr(vid)._2 && + !e.otherVertexAttr(vid)._2 && (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1), + // start at root of colors + (vid, data) => vid == data._1 + ) + } + sccGraph + } + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala new file mode 100644 index 0000000000..b5a93c1bd1 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala @@ -0,0 +1,78 @@ +package org.apache.spark.graphx.algorithms + +import scala.reflect.ClassTag + +import org.apache.spark.graphx._ + + +object TriangleCount { + /** + * Compute the number of triangles passing through each vertex. + * + * The algorithm is relatively straightforward and can be computed in three steps: + * + * 1) Compute the set of neighbors for each vertex + * 2) For each edge compute the intersection of the sets and send the + * count to both vertices. + * 3) Compute the sum at each vertex and divide by two since each + * triangle is counted twice. + * + * + * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned + * using Graph.partitionBy. + * + * @return + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = { + // Remove redundant edges + val g = graph.groupEdges((a, b) => a).cache + + // Construct set representations of the neighborhoods + val nbrSets: VertexRDD[VertexSet] = + g.collectNeighborIds(EdgeDirection.Both).mapValues { (vid, nbrs) => + val set = new VertexSet(4) + var i = 0 + while (i < nbrs.size) { + // prevent self cycle + if(nbrs(i) != vid) { + set.add(nbrs(i)) + } + i += 1 + } + set + } + // join the sets with the graph + val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) { + (vid, _, optSet) => optSet.getOrElse(null) + } + // Edge function computes intersection of smaller vertex with larger vertex + def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = { + assert(et.srcAttr != null) + assert(et.dstAttr != null) + val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) { + (et.srcAttr, et.dstAttr) + } else { + (et.dstAttr, et.srcAttr) + } + val iter = smallSet.iterator + var counter: Int = 0 + while (iter.hasNext) { + val vid = iter.next + if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { counter += 1 } + } + Iterator((et.srcId, counter), (et.dstId, counter)) + } + // compute the intersection along edges + val counters: VertexRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _) + // Merge counters with the graph and divide by two since each triangle is counted twice + g.outerJoinVertices(counters) { + (vid, _, optCounter: Option[Int]) => + val dblCount = optCounter.getOrElse(0) + // double count should be even (divisible by two) + assert((dblCount & 1) == 0) + dblCount / 2 + } + + } // end of TriangleCount + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala new file mode 100644 index 0000000000..4176563d22 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -0,0 +1,220 @@ +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.graphx._ +import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap + +/** + * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are + * clustered by src. + * + * @param srcIds the source vertex id of each edge + * @param dstIds the destination vertex id of each edge + * @param data the attribute associated with each edge + * @param index a clustered index on source vertex id + * @tparam ED the edge attribute type. + */ +class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag]( + val srcIds: Array[VertexID], + val dstIds: Array[VertexID], + val data: Array[ED], + val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable { + + /** + * Reverse all the edges in this partition. + * + * @return a new edge partition with all edges reversed. + */ + def reverse: EdgePartition[ED] = { + val builder = new EdgePartitionBuilder(size) + for (e <- iterator) { + builder.add(e.dstId, e.srcId, e.attr) + } + builder.toEdgePartition + } + + /** + * Construct a new edge partition by applying the function f to all + * edges in this partition. + * + * @param f a function from an edge to a new attribute + * @tparam ED2 the type of the new attribute + * @return a new edge partition with the result of the function `f` + * applied to each edge + */ + def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = { + val newData = new Array[ED2](data.size) + val edge = new Edge[ED]() + val size = data.size + var i = 0 + while (i < size) { + edge.srcId = srcIds(i) + edge.dstId = dstIds(i) + edge.attr = data(i) + newData(i) = f(edge) + i += 1 + } + new EdgePartition(srcIds, dstIds, newData, index) + } + + /** + * Construct a new edge partition by using the edge attributes + * contained in the iterator. + * + * @note The input iterator should return edge attributes in the + * order of the edges returned by `EdgePartition.iterator` and + * should return attributes equal to the number of edges. + * + * @param f a function from an edge to a new attribute + * @tparam ED2 the type of the new attribute + * @return a new edge partition with the result of the function `f` + * applied to each edge + */ + def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = { + val newData = new Array[ED2](data.size) + var i = 0 + while (iter.hasNext) { + newData(i) = iter.next() + i += 1 + } + assert(newData.size == i) + new EdgePartition(srcIds, dstIds, newData, index) + } + + /** + * Apply the function f to all edges in this partition. + * + * @param f an external state mutating user defined function. + */ + def foreach(f: Edge[ED] => Unit) { + iterator.foreach(f) + } + + /** + * Merge all the edges with the same src and dest id into a single + * edge using the `merge` function + * + * @param merge a commutative associative merge operation + * @return a new edge partition without duplicate edges + */ + def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = { + val builder = new EdgePartitionBuilder[ED] + var firstIter: Boolean = true + var currSrcId: VertexID = nullValue[VertexID] + var currDstId: VertexID = nullValue[VertexID] + var currAttr: ED = nullValue[ED] + var i = 0 + while (i < size) { + if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) { + currAttr = merge(currAttr, data(i)) + } else { + if (i > 0) { + builder.add(currSrcId, currDstId, currAttr) + } + currSrcId = srcIds(i) + currDstId = dstIds(i) + currAttr = data(i) + } + i += 1 + } + if (size > 0) { + builder.add(currSrcId, currDstId, currAttr) + } + builder.toEdgePartition + } + + /** + * Apply `f` to all edges present in both `this` and `other` and return a new EdgePartition + * containing the resulting edges. + * + * If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for + * each edge, but each time it may be invoked on any corresponding edge in `other`. + * + * If there are multiple edges with the same src and dst in `other`, `f` will only be invoked + * once. + */ + def innerJoin[ED2: ClassTag, ED3: ClassTag] + (other: EdgePartition[ED2]) + (f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = { + val builder = new EdgePartitionBuilder[ED3] + var i = 0 + var j = 0 + // For i = index of each edge in `this`... + while (i < size && j < other.size) { + val srcId = this.srcIds(i) + val dstId = this.dstIds(i) + // ... forward j to the index of the corresponding edge in `other`, and... + while (j < other.size && other.srcIds(j) < srcId) { j += 1 } + if (j < other.size && other.srcIds(j) == srcId) { + while (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) < dstId) { j += 1 } + if (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) == dstId) { + // ... run `f` on the matching edge + builder.add(srcId, dstId, f(srcId, dstId, this.data(i), other.data(j))) + } + } + i += 1 + } + builder.toEdgePartition + } + + /** + * The number of edges in this partition + * + * @return size of the partition + */ + def size: Int = srcIds.size + + /** The number of unique source vertices in the partition. */ + def indexSize: Int = index.size + + /** + * Get an iterator over the edges in this partition. + * + * @return an iterator over edges in the partition + */ + def iterator = new Iterator[Edge[ED]] { + private[this] val edge = new Edge[ED] + private[this] var pos = 0 + + override def hasNext: Boolean = pos < EdgePartition.this.size + + override def next(): Edge[ED] = { + edge.srcId = srcIds(pos) + edge.dstId = dstIds(pos) + edge.attr = data(pos) + pos += 1 + edge + } + } + + /** + * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The + * iterator is generated using an index scan, so it is efficient at skipping edges that don't + * match srcIdPred. + */ + def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] = + index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator)) + + /** + * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The + * cluster must start at position `index`. + */ + private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] { + private[this] val edge = new Edge[ED] + private[this] var pos = index + + override def hasNext: Boolean = { + pos >= 0 && pos < EdgePartition.this.size && srcIds(pos) == srcId + } + + override def next(): Edge[ED] = { + assert(srcIds(pos) == srcId) + edge.srcId = srcIds(pos) + edge.dstId = dstIds(pos) + edge.attr = data(pos) + pos += 1 + edge + } + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala new file mode 100644 index 0000000000..d4f08497a2 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -0,0 +1,46 @@ +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag +import scala.util.Sorting + +import org.apache.spark.graphx._ +import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} + + +//private[graph] +class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) { + + var edges = new PrimitiveVector[Edge[ED]](size) + + /** Add a new edge to the partition. */ + def add(src: VertexID, dst: VertexID, d: ED) { + edges += Edge(src, dst, d) + } + + def toEdgePartition: EdgePartition[ED] = { + val edgeArray = edges.trim().array + Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) + val srcIds = new Array[VertexID](edgeArray.size) + val dstIds = new Array[VertexID](edgeArray.size) + val data = new Array[ED](edgeArray.size) + val index = new PrimitiveKeyOpenHashMap[VertexID, Int] + // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and + // adding them to the index + if (edgeArray.length > 0) { + index.update(srcIds(0), 0) + var currSrcId: VertexID = srcIds(0) + var i = 0 + while (i < edgeArray.size) { + srcIds(i) = edgeArray(i).srcId + dstIds(i) = edgeArray(i).dstId + data(i) = edgeArray(i).attr + if (edgeArray(i).srcId != currSrcId) { + currSrcId = edgeArray(i).srcId + index.update(currSrcId, i) + } + i += 1 + } + } + new EdgePartition(srcIds, dstIds, data, index) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala new file mode 100644 index 0000000000..79fd962ffd --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala @@ -0,0 +1,43 @@ +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.graphx._ +import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap + + +/** + * The Iterator type returned when constructing edge triplets. This class technically could be + * an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to + * debug / profile. + */ +private[impl] +class EdgeTripletIterator[VD: ClassTag, ED: ClassTag]( + val vidToIndex: VertexIdToIndexMap, + val vertexArray: Array[VD], + val edgePartition: EdgePartition[ED]) + extends Iterator[EdgeTriplet[VD, ED]] { + + // Current position in the array. + private var pos = 0 + + // A triplet object that this iterator.next() call returns. We reuse this object to avoid + // allocating too many temporary Java objects. + private val triplet = new EdgeTriplet[VD, ED] + + private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray) + + override def hasNext: Boolean = pos < edgePartition.size + + override def next() = { + triplet.srcId = edgePartition.srcIds(pos) + // assert(vmap.containsKey(e.src.id)) + triplet.srcAttr = vmap(triplet.srcId) + triplet.dstId = edgePartition.dstIds(pos) + // assert(vmap.containsKey(e.dst.id)) + triplet.dstAttr = vmap(triplet.dstId) + triplet.attr = edgePartition.data(pos) + pos += 1 + triplet + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala new file mode 100644 index 0000000000..be9f188150 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -0,0 +1,422 @@ +package org.apache.spark.graphx.impl + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.util.collection.PrimitiveVector +import org.apache.spark.{HashPartitioner, Partitioner} +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.impl.GraphImpl._ +import org.apache.spark.graphx.impl.MsgRDDFunctions._ +import org.apache.spark.graphx.util.BytecodeUtils +import org.apache.spark.rdd.{ShuffledRDD, RDD} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.ClosureCleaner + + +/** + * A Graph RDD that supports computation on graphs. + * + * Graphs are represented using two classes of data: vertex-partitioned and + * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges` + * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods, + * vertex attributes are replicated to the edge partitions where they appear as sources or + * destinations. `routingTable` stores the routing information for shipping vertex attributes to + * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created + * using the routing table. + */ +class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( + @transient val vertices: VertexRDD[VD], + @transient val edges: EdgeRDD[ED], + @transient val routingTable: RoutingTable, + @transient val replicatedVertexView: ReplicatedVertexView[VD]) + extends Graph[VD, ED] with Serializable { + + def this( + vertices: VertexRDD[VD], + edges: EdgeRDD[ED], + routingTable: RoutingTable) = { + this(vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable)) + } + + def this( + vertices: VertexRDD[VD], + edges: EdgeRDD[ED]) = { + this(vertices, edges, new RoutingTable(edges, vertices)) + } + + /** Return a RDD that brings edges together with their source and destination vertices. */ + @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = { + val vdTag = classTag[VD] + val edTag = classTag[ED] + edges.partitionsRDD.zipPartitions( + replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) => + val (pid, ePart) = ePartIter.next() + val (_, vPart) = vPartIter.next() + new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag) + } + } + + override def persist(newLevel: StorageLevel): Graph[VD, ED] = { + vertices.persist(newLevel) + edges.persist(newLevel) + this + } + + override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY) + + override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { + val numPartitions = edges.partitions.size + val edTag = classTag[ED] + val newEdges = new EdgeRDD(edges.map { e => + val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) + + // Should we be using 3-tuple or an optimized class + new MessageToPartition(part, (e.srcId, e.dstId, e.attr)) + } + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitionsWithIndex( { (pid, iter) => + val builder = new EdgePartitionBuilder[ED]()(edTag) + iter.foreach { message => + val data = message.data + builder.add(data._1, data._2, data._3) + } + val edgePartition = builder.toEdgePartition + Iterator((pid, edgePartition)) + }, preservesPartitioning = true).cache()) + new GraphImpl(vertices, newEdges) + } + + override def statistics: Map[String, Any] = { + // Get the total number of vertices after replication, used to compute the replication ratio. + def numReplicatedVertices(vid2pids: RDD[Array[Array[VertexID]]]): Double = { + vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble + } + + val numVertices = this.ops.numVertices + val numEdges = this.ops.numEdges + val replicationRatioBoth = numReplicatedVertices(routingTable.bothAttrs) / numVertices + val replicationRatioSrcOnly = numReplicatedVertices(routingTable.srcAttrOnly) / numVertices + val replicationRatioDstOnly = numReplicatedVertices(routingTable.dstAttrOnly) / numVertices + // One entry for each partition, indicate the total number of edges on that partition. + val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges) + val minLoad = loadArray.min + val maxLoad = loadArray.max + Map( + "Num Vertices" -> numVertices, + "Num Edges" -> numEdges, + "Replication (both)" -> replicationRatioBoth, + "Replication (src only)" -> replicationRatioSrcOnly, + "Replication (dest only)" -> replicationRatioDstOnly, + "Load Array" -> loadArray, + "Min Load" -> minLoad, + "Max Load" -> maxLoad) + } + + /** + * Display the lineage information for this graph. + */ + def printLineage() = { + def traverseLineage( + rdd: RDD[_], + indent: String = "", + visited: Map[Int, String] = Map.empty[Int, String]) { + if (visited.contains(rdd.id)) { + println(indent + visited(rdd.id)) + println(indent) + } else { + val locs = rdd.partitions.map( p => rdd.preferredLocations(p) ) + val cacheLevel = rdd.getStorageLevel + val name = rdd.id + val deps = rdd.dependencies + val partitioner = rdd.partitioner + val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0} + println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner + + ", " + numparts +")") + println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString) + println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", ")) + deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited)) + } + } + println("edges ------------------------------------------") + traverseLineage(edges, " ") + var visited = Map(edges.id -> "edges") + println("\n\nvertices ------------------------------------------") + traverseLineage(vertices, " ", visited) + visited += (vertices.id -> "vertices") + println("\n\nroutingTable.bothAttrs -------------------------------") + traverseLineage(routingTable.bothAttrs, " ", visited) + visited += (routingTable.bothAttrs.id -> "routingTable.bothAttrs") + println("\n\ntriplets ----------------------------------------") + traverseLineage(triplets, " ", visited) + println(visited) + } // end of printLineage + + override def reverse: Graph[VD, ED] = { + val newETable = edges.mapEdgePartitions((pid, part) => part.reverse) + new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) + } + + override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = { + if (classTag[VD] equals classTag[VD2]) { + // The map preserves type, so we can use incremental replication + val newVerts = vertices.mapVertexPartitions(_.map(f)) + val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) + val newReplicatedVertexView = new ReplicatedVertexView[VD2]( + changedVerts, edges, routingTable, + Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) + new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView) + } else { + // The map does not preserve type, so we must re-replicate all vertices + new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable) + } + } + + override def mapEdges[ED2: ClassTag]( + f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = { + val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator))) + new GraphImpl(vertices, newETable , routingTable, replicatedVertexView) + } + + override def mapTriplets[ED2: ClassTag]( + f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = { + // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit + // manifest from GraphImpl (which would require serializing GraphImpl). + val vdTag = classTag[VD] + val newEdgePartitions = + edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) { + (ePartIter, vTableReplicatedIter) => + val (ePid, edgePartition) = ePartIter.next() + val (vPid, vPart) = vTableReplicatedIter.next() + assert(!vTableReplicatedIter.hasNext) + assert(ePid == vPid) + val et = new EdgeTriplet[VD, ED] + val inputIterator = edgePartition.iterator.map { e => + et.set(e) + et.srcAttr = vPart(e.srcId) + et.dstAttr = vPart(e.dstId) + et + } + // Apply the user function to the vertex partition + val outputIter = f(ePid, inputIterator) + // Consume the iterator to update the edge attributes + val newEdgePartition = edgePartition.map(outputIter) + Iterator((ePid, newEdgePartition)) + } + new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView) + } + + override def subgraph( + epred: EdgeTriplet[VD, ED] => Boolean = x => true, + vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = { + // Filter the vertices, reusing the partitioner and the index from this graph + val newVerts = vertices.mapVertexPartitions(_.filter(vpred)) + + // Filter the edges + val edTag = classTag[ED] + val newEdges = new EdgeRDD[ED](triplets.filter { et => + vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et) + }.mapPartitionsWithIndex( { (pid, iter) => + val builder = new EdgePartitionBuilder[ED]()(edTag) + iter.foreach { et => builder.add(et.srcId, et.dstId, et.attr) } + val edgePartition = builder.toEdgePartition + Iterator((pid, edgePartition)) + }, preservesPartitioning = true)).cache() + + // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been + // removed will be ignored, since we only refer to replicated vertices when they are adjacent to + // an edge. + new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView) + } // end of subgraph + + override def mask[VD2: ClassTag, ED2: ClassTag] ( + other: Graph[VD2, ED2]): Graph[VD, ED] = { + val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v } + val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v } + // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been + // removed will be ignored, since we only refer to replicated vertices when they are adjacent to + // an edge. + new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView) + } + + override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = { + ClosureCleaner.clean(merge) + val newETable = edges.mapEdgePartitions((pid, part) => part.groupEdges(merge)) + new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // Lower level transformation methods + ////////////////////////////////////////////////////////////////////////////////////////////////// + + override def mapReduceTriplets[A: ClassTag]( + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + reduceFunc: (A, A) => A, + activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = { + + ClosureCleaner.clean(mapFunc) + ClosureCleaner.clean(reduceFunc) + + // For each vertex, replicate its attribute only to partitions where it is + // in the relevant position in an edge. + val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr") + val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr") + val vs = activeSetOpt match { + case Some((activeSet, _)) => + replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet) + case None => + replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr) + } + val activeDirectionOpt = activeSetOpt.map(_._2) + + // Map and combine. + val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) => + val (ePid, edgePartition) = ePartIter.next() + val (vPid, vPart) = vPartIter.next() + assert(!vPartIter.hasNext) + assert(ePid == vPid) + // Choose scan method + val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat + val edgeIter = activeDirectionOpt match { + case Some(EdgeDirection.Both) => + if (activeFraction < 0.8) { + edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) + .filter(e => vPart.isActive(e.dstId)) + } else { + edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) + } + case Some(EdgeDirection.Out) => + if (activeFraction < 0.8) { + edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) + } else { + edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) + } + case Some(EdgeDirection.In) => + edgePartition.iterator.filter(e => vPart.isActive(e.dstId)) + case None => + edgePartition.iterator + } + + // Scan edges and run the map function + val et = new EdgeTriplet[VD, ED] + val mapOutputs = edgeIter.flatMap { e => + et.set(e) + if (mapUsesSrcAttr) { + et.srcAttr = vPart(e.srcId) + } + if (mapUsesDstAttr) { + et.dstAttr = vPart(e.dstId) + } + mapFunc(et) + } + // Note: This doesn't allow users to send messages to arbitrary vertices. + vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator + } + + // do the final reduction reusing the index map + vertices.aggregateUsingIndex(preAgg, reduceFunc) + } // end of mapReduceTriplets + + override def outerJoinVertices[U: ClassTag, VD2: ClassTag] + (updates: RDD[(VertexID, U)])(updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = { + if (classTag[VD] equals classTag[VD2]) { + // updateF preserves type, so we can use incremental replication + val newVerts = vertices.leftJoin(updates)(updateF) + val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) + val newReplicatedVertexView = new ReplicatedVertexView[VD2]( + changedVerts, edges, routingTable, + Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) + new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView) + } else { + // updateF does not preserve type, so we must re-replicate all vertices + val newVerts = vertices.leftJoin(updates)(updateF) + new GraphImpl(newVerts, edges, routingTable) + } + } + + private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = { + try { + BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName) + } catch { + case _: ClassNotFoundException => true // if we don't know, be conservative + } + } +} // end of class GraphImpl + + +object GraphImpl { + + def apply[VD: ClassTag, ED: ClassTag]( + edges: RDD[Edge[ED]], + defaultVertexAttr: VD): GraphImpl[VD, ED] = + { + fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr) + } + + def fromEdgePartitions[VD: ClassTag, ED: ClassTag]( + edgePartitions: RDD[(PartitionID, EdgePartition[ED])], + defaultVertexAttr: VD): GraphImpl[VD, ED] = { + fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr) + } + + def apply[VD: ClassTag, ED: ClassTag]( + vertices: RDD[(VertexID, VD)], + edges: RDD[Edge[ED]], + defaultVertexAttr: VD): GraphImpl[VD, ED] = + { + val edgeRDD = createEdgeRDD(edges).cache() + + // Get the set of all vids + val partitioner = Partitioner.defaultPartitioner(vertices) + val vPartitioned = vertices.partitionBy(partitioner) + val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner) + val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) => + vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1) + } + + val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr) + + new GraphImpl(vertexRDD, edgeRDD) + } + + /** + * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges + * data structure (RDD[(VertexID, VertexID, ED)]). + * + * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value + * pair: the key is the partition id, and the value is an EdgePartition object containing all the + * edges in a partition. + */ + private def createEdgeRDD[ED: ClassTag]( + edges: RDD[Edge[ED]]): EdgeRDD[ED] = { + val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => + val builder = new EdgePartitionBuilder[ED] + iter.foreach { e => + builder.add(e.srcId, e.dstId, e.attr) + } + Iterator((pid, builder.toEdgePartition)) + } + new EdgeRDD(edgePartitions) + } + + private def fromEdgeRDD[VD: ClassTag, ED: ClassTag]( + edges: EdgeRDD[ED], + defaultVertexAttr: VD): GraphImpl[VD, ED] = { + edges.cache() + // Get the set of all vids + val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size)) + // Create the VertexRDD. + val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr)) + new GraphImpl(vertices, edges) + } + + /** Collects all vids mentioned in edges and partitions them by partitioner. */ + private def collectVertexIDsFromEdges( + edges: EdgeRDD[_], + partitioner: Partitioner): RDD[(VertexID, Int)] = { + // TODO: Consider doing map side distinct before shuffle. + new ShuffledRDD[VertexID, Int, (VertexID, Int)]( + edges.collectVertexIDs.map(vid => (vid, 0)), partitioner) + .setSerializer(classOf[VertexIDMsgSerializer].getName) + } +} // end of object GraphImpl diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala new file mode 100644 index 0000000000..ad5daf8f6a --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala @@ -0,0 +1,93 @@ +package org.apache.spark.graphx.impl + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.Partitioner +import org.apache.spark.graphx.{PartitionID, VertexID} +import org.apache.spark.rdd.{ShuffledRDD, RDD} + + +class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T]( + @transient var partition: PartitionID, + var vid: VertexID, + var data: T) + extends Product2[PartitionID, (VertexID, T)] with Serializable { + + override def _1 = partition + + override def _2 = (vid, data) + + override def canEqual(that: Any): Boolean = that.isInstanceOf[VertexBroadcastMsg[_]] +} + + +/** + * A message used to send a specific value to a partition. + * @param partition index of the target partition. + * @param data value to send + */ +class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T]( + @transient var partition: PartitionID, + var data: T) + extends Product2[PartitionID, T] with Serializable { + + override def _1 = partition + + override def _2 = data + + override def canEqual(that: Any): Boolean = that.isInstanceOf[MessageToPartition[_]] +} + + +class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) { + def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = { + val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner) + + // Set a custom serializer if the data is of int or double type. + if (classTag[T] == ClassTag.Int) { + rdd.setSerializer(classOf[IntVertexBroadcastMsgSerializer].getName) + } else if (classTag[T] == ClassTag.Long) { + rdd.setSerializer(classOf[LongVertexBroadcastMsgSerializer].getName) + } else if (classTag[T] == ClassTag.Double) { + rdd.setSerializer(classOf[DoubleVertexBroadcastMsgSerializer].getName) + } + rdd + } +} + + +class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) { + + /** + * Return a copy of the RDD partitioned using the specified partitioner. + */ + def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = { + new ShuffledRDD[PartitionID, T, MessageToPartition[T]](self, partitioner) + } + +} + + +object MsgRDDFunctions { + implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = { + new MsgRDDFunctions(rdd) + } + + implicit def rdd2vertexMessageRDDFunctions[T: ClassTag](rdd: RDD[VertexBroadcastMsg[T]]) = { + new VertexBroadcastMsgRDDFunctions(rdd) + } + + def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = { + val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner) + + // Set a custom serializer if the data is of int or double type. + if (classTag[T] == ClassTag.Int) { + rdd.setSerializer(classOf[IntAggMsgSerializer].getName) + } else if (classTag[T] == ClassTag.Long) { + rdd.setSerializer(classOf[LongAggMsgSerializer].getName) + } else if (classTag[T] == ClassTag.Double) { + rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName) + } + rdd + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala new file mode 100644 index 0000000000..63180bc3af --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala @@ -0,0 +1,182 @@ +package org.apache.spark.graphx.impl + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet} + +import org.apache.spark.graphx._ + +/** + * A view of the vertices after they are shipped to the join sites specified in + * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is + * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a + * fresh view is created. + * + * The view is always cached (i.e., once it is created, it remains materialized). This avoids + * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for + * example. + */ +private[impl] +class ReplicatedVertexView[VD: ClassTag]( + updatedVerts: VertexRDD[VD], + edges: EdgeRDD[_], + routingTable: RoutingTable, + prevViewOpt: Option[ReplicatedVertexView[VD]] = None) { + + /** + * Within each edge partition, create a local map from vid to an index into the attribute + * array. Each map contains a superset of the vertices that it will receive, because it stores + * vids from both the source and destination of edges. It must always include both source and + * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. + */ + private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { + case Some(prevView) => + prevView.localVertexIDMap + case None => + edges.partitionsRDD.mapPartitions(_.map { + case (pid, epart) => + val vidToIndex = new VertexIdToIndexMap + epart.foreach { e => + vidToIndex.add(e.srcId) + vidToIndex.add(e.dstId) + } + (pid, vidToIndex) + }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap") + } + + private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true) + private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false) + private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true) + private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false) + + def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = { + (includeSrc, includeDst) match { + case (true, true) => bothAttrs + case (true, false) => srcAttrOnly + case (false, true) => dstAttrOnly + case (false, false) => noAttrs + } + } + + def get( + includeSrc: Boolean, + includeDst: Boolean, + actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = { + // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and + // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be + // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is + // also shipped there. + val shippedActives = routingTable.get(true, true) + .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _)) + .partitionBy(edges.partitioner.get) + // Update the view with shippedActives, setting activeness flags in the resulting + // VertexPartitions + get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) => + val (pid, vPart) = viewIter.next() + val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator)) + Iterator((pid, newPart)) + } + } + + private def create(includeSrc: Boolean, includeDst: Boolean) + : RDD[(PartitionID, VertexPartition[VD])] = { + val vdTag = classTag[VD] + + // Ship vertex attributes to edge partitions according to vertexPlacement + val verts = updatedVerts.partitionsRDD + val shippedVerts = routingTable.get(includeSrc, includeDst) + .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdTag)) + .partitionBy(edges.partitioner.get) + // TODO: Consider using a specialized shuffler. + + prevViewOpt match { + case Some(prevView) => + // Update prevView with shippedVerts, setting staleness flags in the resulting + // VertexPartitions + prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) { + (prevViewIter, shippedVertsIter) => + val (pid, prevVPart) = prevViewIter.next() + val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator)) + Iterator((pid, newVPart)) + }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst)) + + case None => + // Within each edge partition, place the shipped vertex attributes into the correct + // locations specified in localVertexIDMap + localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => + val (pid, vidToIndex) = mapIter.next() + assert(!mapIter.hasNext) + // Populate the vertex array using the vidToIndex map + val vertexArray = vdTag.newArray(vidToIndex.capacity) + for ((_, block) <- shippedVertsIter) { + for (i <- 0 until block.vids.size) { + val vid = block.vids(i) + val attr = block.attrs(i) + val ind = vidToIndex.getPos(vid) + vertexArray(ind) = attr + } + } + val newVPart = new VertexPartition( + vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag) + Iterator((pid, newVPart)) + }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst)) + } + } +} + +object ReplicatedVertexView { + protected def buildBuffer[VD: ClassTag]( + pid2vidIter: Iterator[Array[Array[VertexID]]], + vertexPartIter: Iterator[VertexPartition[VD]]) = { + val pid2vid: Array[Array[VertexID]] = pid2vidIter.next() + val vertexPart: VertexPartition[VD] = vertexPartIter.next() + + Iterator.tabulate(pid2vid.size) { pid => + val vidsCandidate = pid2vid(pid) + val size = vidsCandidate.length + val vids = new PrimitiveVector[VertexID](pid2vid(pid).size) + val attrs = new PrimitiveVector[VD](pid2vid(pid).size) + var i = 0 + while (i < size) { + val vid = vidsCandidate(i) + if (vertexPart.isDefined(vid)) { + vids += vid + attrs += vertexPart(vid) + } + i += 1 + } + (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array)) + } + } + + protected def buildActiveBuffer( + pid2vidIter: Iterator[Array[Array[VertexID]]], + activePartIter: Iterator[VertexPartition[_]]) + : Iterator[(Int, Array[VertexID])] = { + val pid2vid: Array[Array[VertexID]] = pid2vidIter.next() + val activePart: VertexPartition[_] = activePartIter.next() + + Iterator.tabulate(pid2vid.size) { pid => + val vidsCandidate = pid2vid(pid) + val size = vidsCandidate.length + val actives = new PrimitiveVector[VertexID](vidsCandidate.size) + var i = 0 + while (i < size) { + val vid = vidsCandidate(i) + if (activePart.isDefined(vid)) { + actives += vid + } + i += 1 + } + (pid, actives.trim().array) + } + } +} + +class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD]) + extends Serializable { + def iterator: Iterator[(VertexID, VD)] = + (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala new file mode 100644 index 0000000000..3bd8b24133 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala @@ -0,0 +1,64 @@ +package org.apache.spark.graphx.impl + +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.collection.PrimitiveVector + +/** + * Stores the locations of edge-partition join sites for each vertex attribute; that is, the routing + * information for shipping vertex attributes to edge partitions. This is always cached because it + * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and + * (possibly) once to ship the active-set information. + */ +class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { + + val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true) + val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false) + val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true) + val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false) + + def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = + (includeSrcAttr, includeDstAttr) match { + case (true, true) => bothAttrs + case (true, false) => srcAttrOnly + case (false, true) => dstAttrOnly + case (false, false) => noAttrs + } + + private def createPid2Vid( + includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = { + // Determine which vertices each edge partition needs by creating a mapping from vid to pid. + val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => + val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() + val numEdges = edgePartition.size + val vSet = new VertexSet + if (includeSrcAttr) { // Add src vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.srcIds(i)) + i += 1 + } + } + if (includeDstAttr) { // Add dst vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.dstIds(i)) + i += 1 + } + } + vSet.iterator.map { vid => (vid, pid) } + } + + val numPartitions = vertices.partitions.size + vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter => + val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID]) + for ((vid, pid) <- iter) { + pid2vid(pid) += vid + } + + Iterator(pid2vid.map(_.trim().array)) + }.cache().setName("RoutingTable %s %s".format(includeSrcAttr, includeDstAttr)) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala new file mode 100644 index 0000000000..1c3c87f08d --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala @@ -0,0 +1,386 @@ +package org.apache.spark.graphx.impl + +import java.io.{EOFException, InputStream, OutputStream} +import java.nio.ByteBuffer + +import org.apache.spark.SparkConf +import org.apache.spark.graphx._ +import org.apache.spark.serializer._ + +class VertexIDMsgSerializer(conf: SparkConf) extends Serializer { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + + override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { + def writeObject[T](t: T) = { + val msg = t.asInstanceOf[(VertexID, _)] + writeVarLong(msg._1, optimizePositive = false) + this + } + } + + override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { + override def readObject[T](): T = { + (readVarLong(optimizePositive = false), null).asInstanceOf[T] + } + } + } +} + +/** A special shuffle serializer for VertexBroadcastMessage[Int]. */ +class IntVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + + override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { + def writeObject[T](t: T) = { + val msg = t.asInstanceOf[VertexBroadcastMsg[Int]] + writeVarLong(msg.vid, optimizePositive = false) + writeInt(msg.data) + this + } + } + + override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { + override def readObject[T](): T = { + val a = readVarLong(optimizePositive = false) + val b = readInt() + new VertexBroadcastMsg[Int](0, a, b).asInstanceOf[T] + } + } + } +} + +/** A special shuffle serializer for VertexBroadcastMessage[Long]. */ +class LongVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + + override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { + def writeObject[T](t: T) = { + val msg = t.asInstanceOf[VertexBroadcastMsg[Long]] + writeVarLong(msg.vid, optimizePositive = false) + writeLong(msg.data) + this + } + } + + override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { + override def readObject[T](): T = { + val a = readVarLong(optimizePositive = false) + val b = readLong() + new VertexBroadcastMsg[Long](0, a, b).asInstanceOf[T] + } + } + } +} + +/** A special shuffle serializer for VertexBroadcastMessage[Double]. */ +class DoubleVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + + override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { + def writeObject[T](t: T) = { + val msg = t.asInstanceOf[VertexBroadcastMsg[Double]] + writeVarLong(msg.vid, optimizePositive = false) + writeDouble(msg.data) + this + } + } + + override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { + def readObject[T](): T = { + val a = readVarLong(optimizePositive = false) + val b = readDouble() + new VertexBroadcastMsg[Double](0, a, b).asInstanceOf[T] + } + } + } +} + +/** A special shuffle serializer for AggregationMessage[Int]. */ +class IntAggMsgSerializer(conf: SparkConf) extends Serializer { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + + override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { + def writeObject[T](t: T) = { + val msg = t.asInstanceOf[(VertexID, Int)] + writeVarLong(msg._1, optimizePositive = false) + writeUnsignedVarInt(msg._2) + this + } + } + + override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { + override def readObject[T](): T = { + val a = readVarLong(optimizePositive = false) + val b = readUnsignedVarInt() + (a, b).asInstanceOf[T] + } + } + } +} + +/** A special shuffle serializer for AggregationMessage[Long]. */ +class LongAggMsgSerializer(conf: SparkConf) extends Serializer { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + + override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { + def writeObject[T](t: T) = { + val msg = t.asInstanceOf[(VertexID, Long)] + writeVarLong(msg._1, optimizePositive = false) + writeVarLong(msg._2, optimizePositive = true) + this + } + } + + override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { + override def readObject[T](): T = { + val a = readVarLong(optimizePositive = false) + val b = readVarLong(optimizePositive = true) + (a, b).asInstanceOf[T] + } + } + } +} + +/** A special shuffle serializer for AggregationMessage[Double]. */ +class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + + override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { + def writeObject[T](t: T) = { + val msg = t.asInstanceOf[(VertexID, Double)] + writeVarLong(msg._1, optimizePositive = false) + writeDouble(msg._2) + this + } + } + + override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { + def readObject[T](): T = { + val a = readVarLong(optimizePositive = false) + val b = readDouble() + (a, b).asInstanceOf[T] + } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Helper classes to shorten the implementation of those special serializers. +//////////////////////////////////////////////////////////////////////////////// + +abstract class ShuffleSerializationStream(s: OutputStream) extends SerializationStream { + // The implementation should override this one. + def writeObject[T](t: T): SerializationStream + + def writeInt(v: Int) { + s.write(v >> 24) + s.write(v >> 16) + s.write(v >> 8) + s.write(v) + } + + def writeUnsignedVarInt(value: Int) { + if ((value >>> 7) == 0) { + s.write(value.toInt) + } else if ((value >>> 14) == 0) { + s.write((value & 0x7F) | 0x80) + s.write(value >>> 7) + } else if ((value >>> 21) == 0) { + s.write((value & 0x7F) | 0x80) + s.write(value >>> 7 | 0x80) + s.write(value >>> 14) + } else if ((value >>> 28) == 0) { + s.write((value & 0x7F) | 0x80) + s.write(value >>> 7 | 0x80) + s.write(value >>> 14 | 0x80) + s.write(value >>> 21) + } else { + s.write((value & 0x7F) | 0x80) + s.write(value >>> 7 | 0x80) + s.write(value >>> 14 | 0x80) + s.write(value >>> 21 | 0x80) + s.write(value >>> 28) + } + } + + def writeVarLong(value: Long, optimizePositive: Boolean) { + val v = if (!optimizePositive) (value << 1) ^ (value >> 63) else value + if ((v >>> 7) == 0) { + s.write(v.toInt) + } else if ((v >>> 14) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7).toInt) + } else if ((v >>> 21) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14).toInt) + } else if ((v >>> 28) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21).toInt) + } else if ((v >>> 35) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21 | 0x80).toInt) + s.write((v >>> 28).toInt) + } else if ((v >>> 42) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21 | 0x80).toInt) + s.write((v >>> 28 | 0x80).toInt) + s.write((v >>> 35).toInt) + } else if ((v >>> 49) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21 | 0x80).toInt) + s.write((v >>> 28 | 0x80).toInt) + s.write((v >>> 35 | 0x80).toInt) + s.write((v >>> 42).toInt) + } else if ((v >>> 56) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21 | 0x80).toInt) + s.write((v >>> 28 | 0x80).toInt) + s.write((v >>> 35 | 0x80).toInt) + s.write((v >>> 42 | 0x80).toInt) + s.write((v >>> 49).toInt) + } else { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21 | 0x80).toInt) + s.write((v >>> 28 | 0x80).toInt) + s.write((v >>> 35 | 0x80).toInt) + s.write((v >>> 42 | 0x80).toInt) + s.write((v >>> 49 | 0x80).toInt) + s.write((v >>> 56).toInt) + } + } + + def writeLong(v: Long) { + s.write((v >>> 56).toInt) + s.write((v >>> 48).toInt) + s.write((v >>> 40).toInt) + s.write((v >>> 32).toInt) + s.write((v >>> 24).toInt) + s.write((v >>> 16).toInt) + s.write((v >>> 8).toInt) + s.write(v.toInt) + } + + //def writeDouble(v: Double): Unit = writeUnsignedVarLong(java.lang.Double.doubleToLongBits(v)) + def writeDouble(v: Double): Unit = writeLong(java.lang.Double.doubleToLongBits(v)) + + override def flush(): Unit = s.flush() + + override def close(): Unit = s.close() +} + +abstract class ShuffleDeserializationStream(s: InputStream) extends DeserializationStream { + // The implementation should override this one. + def readObject[T](): T + + def readInt(): Int = { + val first = s.read() + if (first < 0) throw new EOFException + (first & 0xFF) << 24 | (s.read() & 0xFF) << 16 | (s.read() & 0xFF) << 8 | (s.read() & 0xFF) + } + + def readUnsignedVarInt(): Int = { + var value: Int = 0 + var i: Int = 0 + def readOrThrow(): Int = { + val in = s.read() + if (in < 0) throw new EOFException + in & 0xFF + } + var b: Int = readOrThrow() + while ((b & 0x80) != 0) { + value |= (b & 0x7F) << i + i += 7 + if (i > 35) throw new IllegalArgumentException("Variable length quantity is too long") + b = readOrThrow() + } + value | (b << i) + } + + def readVarLong(optimizePositive: Boolean): Long = { + def readOrThrow(): Int = { + val in = s.read() + if (in < 0) throw new EOFException + in & 0xFF + } + var b = readOrThrow() + var ret: Long = b & 0x7F + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F) << 7 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F) << 14 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F) << 21 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F).toLong << 28 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F).toLong << 35 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F).toLong << 42 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= (b & 0x7F).toLong << 49 + if ((b & 0x80) != 0) { + b = readOrThrow() + ret |= b.toLong << 56 + } + } + } + } + } + } + } + } + if (!optimizePositive) (ret >>> 1) ^ -(ret & 1) else ret + } + + def readLong(): Long = { + val first = s.read() + if (first < 0) throw new EOFException() + (first.toLong << 56) | + (s.read() & 0xFF).toLong << 48 | + (s.read() & 0xFF).toLong << 40 | + (s.read() & 0xFF).toLong << 32 | + (s.read() & 0xFF).toLong << 24 | + (s.read() & 0xFF) << 16 | + (s.read() & 0xFF) << 8 | + (s.read() & 0xFF) + } + + //def readDouble(): Double = java.lang.Double.longBitsToDouble(readUnsignedVarLong()) + def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong()) + + override def close(): Unit = s.close() +} + +sealed trait ShuffleSerializerInstance extends SerializerInstance { + + override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException + + override def deserialize[T](bytes: ByteBuffer): T = throw new UnsupportedOperationException + + override def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = + throw new UnsupportedOperationException + + // The implementation should override the following two. + override def serializeStream(s: OutputStream): SerializationStream + override def deserializeStream(s: InputStream): DeserializationStream +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala new file mode 100644 index 0000000000..7c83497ca9 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala @@ -0,0 +1,262 @@ +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.util.collection.{BitSet, PrimitiveKeyOpenHashMap} + +import org.apache.spark.Logging +import org.apache.spark.graphx._ + + +private[graphx] object VertexPartition { + + def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = { + val map = new PrimitiveKeyOpenHashMap[VertexID, VD] + iter.foreach { case (k, v) => + map(k) = v + } + new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) + } + + def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD) + : VertexPartition[VD] = + { + val map = new PrimitiveKeyOpenHashMap[VertexID, VD] + iter.foreach { case (k, v) => + map.setMerge(k, v, mergeFunc) + } + new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) + } +} + + +private[graphx] +class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( + val index: VertexIdToIndexMap, + val values: Array[VD], + val mask: BitSet, + /** A set of vids of active vertices. May contain vids not in index due to join rewrite. */ + private val activeSet: Option[VertexSet] = None) + extends Logging { + + val capacity: Int = index.capacity + + def size: Int = mask.cardinality() + + /** Return the vertex attribute for the given vertex ID. */ + def apply(vid: VertexID): VD = values(index.getPos(vid)) + + def isDefined(vid: VertexID): Boolean = { + val pos = index.getPos(vid) + pos >= 0 && mask.get(pos) + } + + /** Look up vid in activeSet, throwing an exception if it is None. */ + def isActive(vid: VertexID): Boolean = { + activeSet.get.contains(vid) + } + + /** The number of active vertices, if any exist. */ + def numActives: Option[Int] = activeSet.map(_.size) + + /** + * Pass each vertex attribute along with the vertex id through a map + * function and retain the original RDD's partitioning and index. + * + * @tparam VD2 the type returned by the map function + * + * @param f the function applied to each vertex id and vertex + * attribute in the RDD + * + * @return a new VertexPartition with values obtained by applying `f` to + * each of the entries in the original VertexRDD. The resulting + * VertexPartition retains the same index. + */ + def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = { + // Construct a view of the map transformation + val newValues = new Array[VD2](capacity) + var i = mask.nextSetBit(0) + while (i >= 0) { + newValues(i) = f(index.getValue(i), values(i)) + i = mask.nextSetBit(i + 1) + } + new VertexPartition[VD2](index, newValues, mask) + } + + /** + * Restrict the vertex set to the set of vertices satisfying the given predicate. + * + * @param pred the user defined predicate + * + * @note The vertex set preserves the original index structure which means that the returned + * RDD can be easily joined with the original vertex-set. Furthermore, the filter only + * modifies the bitmap index and so no new values are allocated. + */ + def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = { + // Allocate the array to store the results into + val newMask = new BitSet(capacity) + // Iterate over the active bits in the old mask and evaluate the predicate + var i = mask.nextSetBit(0) + while (i >= 0) { + if (pred(index.getValue(i), values(i))) { + newMask.set(i) + } + i = mask.nextSetBit(i + 1) + } + new VertexPartition(index, values, newMask) + } + + /** + * Hides vertices that are the same between this and other. For vertices that are different, keeps + * the values from `other`. The indices of `this` and `other` must be the same. + */ + def diff(other: VertexPartition[VD]): VertexPartition[VD] = { + if (index != other.index) { + logWarning("Diffing two VertexPartitions with different indexes is slow.") + diff(createUsingIndex(other.iterator)) + } else { + val newMask = mask & other.mask + var i = newMask.nextSetBit(0) + while (i >= 0) { + if (values(i) == other.values(i)) { + newMask.unset(i) + } + i = newMask.nextSetBit(i + 1) + } + new VertexPartition(index, other.values, newMask) + } + } + + /** Left outer join another VertexPartition. */ + def leftJoin[VD2: ClassTag, VD3: ClassTag] + (other: VertexPartition[VD2]) + (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { + if (index != other.index) { + logWarning("Joining two VertexPartitions with different indexes is slow.") + leftJoin(createUsingIndex(other.iterator))(f) + } else { + val newValues = new Array[VD3](capacity) + + var i = mask.nextSetBit(0) + while (i >= 0) { + val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None + newValues(i) = f(index.getValue(i), values(i), otherV) + i = mask.nextSetBit(i + 1) + } + new VertexPartition(index, newValues, mask) + } + } + + /** Left outer join another iterator of messages. */ + def leftJoin[VD2: ClassTag, VD3: ClassTag] + (other: Iterator[(VertexID, VD2)]) + (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { + leftJoin(createUsingIndex(other))(f) + } + + /** Inner join another VertexPartition. */ + def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U]) + (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = { + if (index != other.index) { + logWarning("Joining two VertexPartitions with different indexes is slow.") + innerJoin(createUsingIndex(other.iterator))(f) + } else { + val newMask = mask & other.mask + val newValues = new Array[VD2](capacity) + var i = newMask.nextSetBit(0) + while (i >= 0) { + newValues(i) = f(index.getValue(i), values(i), other.values(i)) + i = newMask.nextSetBit(i + 1) + } + new VertexPartition(index, newValues, newMask) + } + } + + /** + * Inner join an iterator of messages. + */ + def innerJoin[U: ClassTag, VD2: ClassTag] + (iter: Iterator[Product2[VertexID, U]]) + (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = { + innerJoin(createUsingIndex(iter))(f) + } + + /** + * Similar effect as aggregateUsingIndex((a, b) => a) + */ + def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]]) + : VertexPartition[VD2] = { + val newMask = new BitSet(capacity) + val newValues = new Array[VD2](capacity) + iter.foreach { case (vid, vdata) => + val pos = index.getPos(vid) + if (pos >= 0) { + newMask.set(pos) + newValues(pos) = vdata + } + } + new VertexPartition[VD2](index, newValues, newMask) + } + + /** + * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in + * the partition, hidden by the bitmask. + */ + def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = { + val newMask = new BitSet(capacity) + val newValues = new Array[VD](capacity) + System.arraycopy(values, 0, newValues, 0, newValues.length) + iter.foreach { case (vid, vdata) => + val pos = index.getPos(vid) + if (pos >= 0) { + newMask.set(pos) + newValues(pos) = vdata + } + } + new VertexPartition(index, newValues, newMask) + } + + def aggregateUsingIndex[VD2: ClassTag]( + iter: Iterator[Product2[VertexID, VD2]], + reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = { + val newMask = new BitSet(capacity) + val newValues = new Array[VD2](capacity) + iter.foreach { product => + val vid = product._1 + val vdata = product._2 + val pos = index.getPos(vid) + if (pos >= 0) { + if (newMask.get(pos)) { + newValues(pos) = reduceFunc(newValues(pos), vdata) + } else { // otherwise just store the new value + newMask.set(pos) + newValues(pos) = vdata + } + } + } + new VertexPartition[VD2](index, newValues, newMask) + } + + def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = { + val newActiveSet = new VertexSet + iter.foreach(newActiveSet.add(_)) + new VertexPartition(index, values, mask, Some(newActiveSet)) + } + + /** + * Construct a new VertexPartition whose index contains only the vertices in the mask. + */ + def reindex(): VertexPartition[VD] = { + val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD] + val arbitraryMerge = (a: VD, b: VD) => a + for ((k, v) <- this.iterator) { + hashMap.setMerge(k, v, arbitraryMerge) + } + new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet) + } + + def iterator: Iterator[(VertexID, VD)] = + mask.iterator.map(ind => (index.getValue(ind), values(ind))) + + def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind)) +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala new file mode 100644 index 0000000000..96f0d91c9b --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala @@ -0,0 +1,22 @@ +package org.apache.spark + +import org.apache.spark.util.collection.OpenHashSet + + +package object graphx { + + type VertexID = Long + + // TODO: Consider using Char. + type PartitionID = Int + + type VertexSet = OpenHashSet[VertexID] + + // type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap + type VertexIdToIndexMap = OpenHashSet[VertexID] + + /** + * Return the default null-like value for a data type T. + */ + def nullValue[T] = null.asInstanceOf[T] +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/perf/BagelTest.scala b/graphx/src/main/scala/org/apache/spark/graphx/perf/BagelTest.scala new file mode 100644 index 0000000000..81332e0800 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/perf/BagelTest.scala @@ -0,0 +1,76 @@ +///// This file creates circular dependencies between examples bagle and graph + +// package org.apache.spark.graphx.perf + +// import org.apache.spark._ +// import org.apache.spark.SparkContext._ +// import org.apache.spark.bagel.Bagel + +// import org.apache.spark.examples.bagel +// //import org.apache.spark.bagel.examples._ +// import org.apache.spark.graphx._ + + +// object BagelTest { + +// def main(args: Array[String]) { +// val host = args(0) +// val taskType = args(1) +// val fname = args(2) +// val options = args.drop(3).map { arg => +// arg.dropWhile(_ == '-').split('=') match { +// case Array(opt, v) => (opt -> v) +// case _ => throw new IllegalArgumentException("Invalid argument: " + arg) +// } +// } + +// System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +// //System.setProperty("spark.shuffle.compress", "false") +// System.setProperty("spark.kryo.registrator", "org.apache.spark.bagel.examples.PRKryoRegistrator") + +// var numIter = Int.MaxValue +// var isDynamic = false +// var tol:Float = 0.001F +// var outFname = "" +// var numVPart = 4 +// var numEPart = 4 + +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("tol", v) => tol = v.toFloat +// case ("output", v) => outFname = v +// case ("numVPart", v) => numVPart = v.toInt +// case ("numEPart", v) => numEPart = v.toInt +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } + +// val sc = new SparkContext(host, "PageRank(" + fname + ")") +// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() +// val startTime = System.currentTimeMillis + +// val numVertices = g.vertices.count() + +// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) => +// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString))) +// } + +// // Do the computation +// val epsilon = 0.01 / numVertices +// val messages = sc.parallelize(Array[(String, PRMessage)]()) +// val utils = new PageRankUtils +// val result = +// Bagel.run( +// sc, vertices, messages, combiner = new PRCombiner(), +// numPartitions = numVPart)( +// utils.computeWithCombiner(numVertices, epsilon, numIter)) + +// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) ) +// if (!outFname.isEmpty) { +// println("Saving pageranks of pages to " + outFname) +// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname) +// } +// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") +// sc.stop() +// } +// } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/perf/SparkTest.scala b/graphx/src/main/scala/org/apache/spark/graphx/perf/SparkTest.scala new file mode 100644 index 0000000000..24262640ab --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/perf/SparkTest.scala @@ -0,0 +1,75 @@ +///// This file creates circular dependencies between examples bagle and graph + + +// package org.apache.spark.graphx.perf + +// import org.apache.spark._ +// import org.apache.spark.SparkContext._ +// import org.apache.spark.bagel.Bagel +// import org.apache.spark.bagel.examples._ +// import org.apache.spark.graphx._ + + +// object SparkTest { + +// def main(args: Array[String]) { +// val host = args(0) +// val taskType = args(1) +// val fname = args(2) +// val options = args.drop(3).map { arg => +// arg.dropWhile(_ == '-').split('=') match { +// case Array(opt, v) => (opt -> v) +// case _ => throw new IllegalArgumentException("Invalid argument: " + arg) +// } +// } + +// System.setProperty("spark.serializer", "org.apache.spark.KryoSerializer") +// //System.setProperty("spark.shuffle.compress", "false") +// System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator") + +// var numIter = Int.MaxValue +// var isDynamic = false +// var tol:Float = 0.001F +// var outFname = "" +// var numVPart = 4 +// var numEPart = 4 + +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("tol", v) => tol = v.toFloat +// case ("output", v) => outFname = v +// case ("numVPart", v) => numVPart = v.toInt +// case ("numEPart", v) => numEPart = v.toInt +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } + +// val sc = new SparkContext(host, "PageRank(" + fname + ")") +// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() +// val startTime = System.currentTimeMillis + +// val numVertices = g.vertices.count() + +// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) => +// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString))) +// } + +// // Do the computation +// val epsilon = 0.01 / numVertices +// val messages = sc.parallelize(Array[(String, PRMessage)]()) +// val utils = new PageRankUtils +// val result = +// Bagel.run( +// sc, vertices, messages, combiner = new PRCombiner(), +// numPartitions = numVPart)( +// utils.computeWithCombiner(numVertices, epsilon, numIter)) + +// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) ) +// if (!outFname.isEmpty) { +// println("Saving pageranks of pages to " + outFname) +// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname) +// } +// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") +// sc.stop() +// } +// } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala new file mode 100644 index 0000000000..ec8d534333 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala @@ -0,0 +1,114 @@ +package org.apache.spark.graphx.util + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import scala.collection.mutable.HashSet + +import org.apache.spark.util.Utils + +import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor} +import org.objectweb.asm.Opcodes._ + + + +private[spark] object BytecodeUtils { + + /** + * Test whether the given closure invokes the specified method in the specified class. + */ + def invokedMethod(closure: AnyRef, targetClass: Class[_], targetMethod: String): Boolean = { + if (_invokedMethod(closure.getClass, "apply", targetClass, targetMethod)) { + true + } else { + // look at closures enclosed in this closure + for (f <- closure.getClass.getDeclaredFields + if f.getType.getName.startsWith("scala.Function")) { + f.setAccessible(true) + if (invokedMethod(f.get(closure), targetClass, targetMethod)) { + return true + } + } + return false + } + } + + private def _invokedMethod(cls: Class[_], method: String, + targetClass: Class[_], targetMethod: String): Boolean = { + + val seen = new HashSet[(Class[_], String)] + var stack = List[(Class[_], String)]((cls, method)) + + while (stack.nonEmpty) { + val (c, m) = stack.head + stack = stack.tail + seen.add((c, m)) + val finder = new MethodInvocationFinder(c.getName, m) + getClassReader(c).accept(finder, 0) + for (classMethod <- finder.methodsInvoked) { + //println(classMethod) + if (classMethod._1 == targetClass && classMethod._2 == targetMethod) { + return true + } else if (!seen.contains(classMethod)) { + stack = classMethod :: stack + } + } + } + return false + } + + /** + * Get an ASM class reader for a given class from the JAR that loaded it. + */ + private def getClassReader(cls: Class[_]): ClassReader = { + // Copy data over, before delegating to ClassReader - else we can run out of open file handles. + val className = cls.getName.replaceFirst("^.*\\.", "") + ".class" + val resourceStream = cls.getResourceAsStream(className) + // todo: Fixme - continuing with earlier behavior ... + if (resourceStream == null) return new ClassReader(resourceStream) + + val baos = new ByteArrayOutputStream(128) + Utils.copyStream(resourceStream, baos, true) + new ClassReader(new ByteArrayInputStream(baos.toByteArray)) + } + + /** + * Given the class name, return whether we should look into the class or not. This is used to + * skip examing a large quantity of Java or Scala classes that we know for sure wouldn't access + * the closures. Note that the class name is expected in ASM style (i.e. use "/" instead of "."). + */ + private def skipClass(className: String): Boolean = { + val c = className + c.startsWith("java/") || c.startsWith("scala/") || c.startsWith("javax/") + } + + /** + * Find the set of methods invoked by the specified method in the specified class. + * For example, after running the visitor, + * MethodInvocationFinder("spark/graph/Foo", "test") + * its methodsInvoked variable will contain the set of methods invoked directly by + * Foo.test(). Interface invocations are not returned as part of the result set because we cannot + * determine the actual metod invoked by inspecting the bytecode. + */ + private class MethodInvocationFinder(className: String, methodName: String) + extends ClassVisitor(ASM4) { + + val methodsInvoked = new HashSet[(Class[_], String)] + + override def visitMethod(access: Int, name: String, desc: String, + sig: String, exceptions: Array[String]): MethodVisitor = { + if (name == methodName) { + new MethodVisitor(ASM4) { + override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { + if (op == INVOKEVIRTUAL || op == INVOKESPECIAL || op == INVOKESTATIC) { + if (!skipClass(owner)) { + methodsInvoked.add((Class.forName(owner.replace("/", ".")), name)) + } + } + } + } + } else { + null + } + } + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala new file mode 100644 index 0000000000..57117241ad --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -0,0 +1,282 @@ +package org.apache.spark.graphx.util + +import scala.annotation.tailrec +import scala.math._ +import scala.reflect.ClassTag +import scala.util._ + +import org.apache.spark._ +import org.apache.spark.serializer._ +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.Graph +import org.apache.spark.graphx.Edge +import org.apache.spark.graphx.impl.GraphImpl + +/** + * @todo cleanup and modularize code + */ +object GraphGenerators { + + val RMATa = 0.45 + val RMATb = 0.15 + val RMATc = 0.15 + val RMATd = 0.25 + + def main(args: Array[String]) { + + + val serializer = "org.apache.spark.serializer.KryoSerializer" + System.setProperty("spark.serializer", serializer) + //System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.kryo.registrator", "spark.graphx.GraphKryoRegistrator") + val host = "local[4]" + val sc = new SparkContext(host, "Lognormal graph generator") + + val lnGraph = logNormalGraph(sc, 10000) + + val rmat = rmatGraph(sc, 1000, 3000) + + //for (v <- lnGraph.vertices) { + // println(v.id + ":\t" + v.data) + //} + + val times = 100000 + //val nums = (1 to times).flatMap { n => List(sampleLogNormal(4.0, 1.3, times)) }.toList + //val avg = nums.sum / nums.length + //val sumSquares = nums.foldLeft(0.0) {(total, next) => + // (total + math.pow((next - avg), 2)) } + //val stdev = math.sqrt(sumSquares/(nums.length - 1)) + + //println("avg: " + avg + "+-" + stdev) + + + //for (i <- 1 to 1000) { + // println(sampleLogNormal(4.0, 1.3, 1000)) + //} + + sc.stop() + + } + + + // Right now it just generates a bunch of edges where + // the edge data is the weight (default 1) + def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = { + // based on Pregel settings + val mu = 4 + val sigma = 1.3 + //val vertsAndEdges = (0 until numVertices).flatMap { src => { + + val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{ + src => (src, sampleLogNormal(mu, sigma, numVertices)) + } + + val edges = vertices.flatMap{ + v => generateRandomEdges(v._1.toInt, v._2, numVertices) + } + + Graph(vertices, edges, 0) + //println("Vertices:") + //for (v <- vertices) { + // println(v.id) + //} + + //println("Edges") + //for (e <- edges) { + // println(e.src, e.dst, e.data) + //} + + } + + + def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = { + val rand = new Random() + var dsts: Set[Int] = Set() + while (dsts.size < numEdges) { + val nextDst = rand.nextInt(maxVertexID) + if (nextDst != src) { + dsts += nextDst + } + } + dsts.map {dst => Edge[Int](src, dst, 1) }.toArray + } + + + /** + * Randomly samples from a log normal distribution + * whose corresponding normal distribution has the + * the given mean and standard deviation. It uses + * the formula X = exp(m+s*Z) where m, s are the + * mean, standard deviation of the lognormal distribution + * and Z~N(0, 1). In this function, + * m = e^(mu+sigma^2/2) and + * s = sqrt[(e^(sigma^2) - 1)(e^(2*mu+sigma^2))]. + * + * @param mu the mean of the normal distribution + * @param sigma the standard deviation of the normal distribution + * @param macVal exclusive upper bound on the value of the sample + */ + def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = { + val rand = new Random() + val m = math.exp(mu+(sigma*sigma)/2.0) + val s = math.sqrt((math.exp(sigma*sigma) - 1) * math.exp(2*mu + sigma*sigma)) + // Z ~ N(0, 1) + var X: Double = maxVal + + while (X >= maxVal) { + val Z = rand.nextGaussian() + //X = math.exp((m + s*Z)) + X = math.exp((mu + sigma*Z)) + } + math.round(X.toFloat) + } + + + + def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int): Graph[Int, Int] = { + // let N = requestedNumVertices + // the number of vertices is 2^n where n=ceil(log2[N]) + // This ensures that the 4 quadrants are the same size at all recursion levels + val numVertices = math.round(math.pow(2.0, math.ceil(math.log(requestedNumVertices)/math.log(2.0)))).toInt + var edges: Set[Edge[Int]] = Set() + while (edges.size < numEdges) { + if (edges.size % 100 == 0) { + println(edges.size + " edges") + } + edges += addEdge(numVertices) + + } + val graph = outDegreeFromEdges(sc.parallelize(edges.toList)) + graph + + } + + def outDegreeFromEdges[ED: ClassTag](edges: RDD[Edge[ED]]): Graph[Int, ED] = { + + val vertices = edges.flatMap { edge => List((edge.srcId, 1)) } + .reduceByKey(_ + _) + .map{ case (vid, degree) => (vid, degree) } + Graph(vertices, edges, 0) + } + + /** + * @param numVertices Specifies the total number of vertices in the graph (used to get + * the dimensions of the adjacency matrix + */ + def addEdge(numVertices: Int): Edge[Int] = { + //val (src, dst) = chooseCell(numVertices/2.0, numVertices/2.0, numVertices/2.0) + val v = math.round(numVertices.toFloat/2.0).toInt + + val (src, dst) = chooseCell(v, v, v) + Edge[Int](src, dst, 1) + } + + + /** + * This method recursively subdivides the the adjacency matrix into quadrants + * until it picks a single cell. The naming conventions in this paper match + * those of the R-MAT paper. There are a power of 2 number of nodes in the graph. + * The adjacency matrix looks like: + * + * dst -> + * (x,y) *************** _ + * | | | | + * | a | b | | + * src | | | | + * | *************** | T + * \|/ | | | | + * | c | d | | + * | | | | + * *************** - + * + * where this represents the subquadrant of the adj matrix currently being + * subdivided. (x,y) represent the upper left hand corner of the subquadrant, + * and T represents the side length (guaranteed to be a power of 2). + * + * After choosing the next level subquadrant, we get the resulting sets + * of parameters: + * quad = a, x'=x, y'=y, T'=T/2 + * quad = b, x'=x+T/2, y'=y, T'=T/2 + * quad = c, x'=x, y'=y+T/2, T'=T/2 + * quad = d, x'=x+T/2, y'=y+T/2, T'=T/2 + * + * @param src is the + */ + @tailrec + def chooseCell(x: Int, y: Int, t: Int): (Int, Int) = { + if (t <= 1) + (x,y) + else { + val newT = math.round(t.toFloat/2.0).toInt + pickQuadrant(RMATa, RMATb, RMATc, RMATd) match { + case 0 => chooseCell(x, y, newT) + case 1 => chooseCell(x+newT, y, newT) + case 2 => chooseCell(x, y+newT, newT) + case 3 => chooseCell(x+newT, y+newT, newT) + } + } + } + + // TODO(crankshaw) turn result into an enum (or case class for pattern matching} + def pickQuadrant(a: Double, b: Double, c: Double, d: Double): Int = { + if (a+b+c+d != 1.0) { + throw new IllegalArgumentException("R-MAT probability parameters sum to " + (a+b+c+d) + ", should sum to 1.0") + } + val rand = new Random() + val result = rand.nextDouble() + result match { + case x if x < a => 0 // 0 corresponds to quadrant a + case x if (x >= a && x < a+b) => 1 // 1 corresponds to b + case x if (x >= a+b && x < a+b+c) => 2 // 2 corresponds to c + case _ => 3 // 3 corresponds to d + } + } + + + + /** + * Create `rows` by `cols` grid graph with each vertex connected to its + * row+1 and col+1 neighbors. Vertex ids are assigned in row major + * order. + * + * @param sc the spark context in which to construct the graph + * @param rows the number of rows + * @param cols the number of columns + * + * @return A graph containing vertices with the row and column ids + * as their attributes and edge values as 1.0. + */ + def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = { + // Convert row column address into vertex ids (row major order) + def sub2ind(r: Int, c: Int): VertexID = r * cols + c + + val vertices: RDD[(VertexID, (Int,Int))] = + sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) ) + val edges: RDD[Edge[Double]] = + vertices.flatMap{ case (vid, (r,c)) => + (if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++ + (if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty }) + }.map{ case (src, dst) => Edge(src, dst, 1.0) } + Graph(vertices, edges) + } // end of gridGraph + + /** + * Create a star graph with vertex 0 being the center. + * + * @param sc the spark context in which to construct the graph + * @param nverts the number of vertices in the star + * + * @return A star graph containing `nverts` vertices with vertex 0 + * being the center vertex. + */ + def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { + val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) + Graph.fromEdgeTuples(edges, 1) + } // end of starGraph + + + +} // end of Graph Generators diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/HashUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/HashUtils.scala new file mode 100644 index 0000000000..7a79d33350 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/HashUtils.scala @@ -0,0 +1,21 @@ +package org.apache.spark.graphx.util + + +object HashUtils { + + /** + * Compute a 64-bit hash value for the given string. + * See http://stackoverflow.com/questions/1660501/what-is-a-good-64bit-hash-function-in-java-for-textual-strings + */ + def hash(str: String): Long = { + var h = 1125899906842597L + val len = str.length + var i = 0 + + while (i < len) { + h = 31 * h + str(i) + i += 1 + } + h + } +} diff --git a/graphx/src/test/resources/log4j.properties b/graphx/src/test/resources/log4j.properties new file mode 100644 index 0000000000..896936d8c4 --- /dev/null +++ b/graphx/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file core/target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=graph/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN +org.eclipse.jetty.LEVEL=WARN diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala new file mode 100644 index 0000000000..cc281fce99 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -0,0 +1,92 @@ +package org.apache.spark.graphx + +import org.apache.spark.SparkContext +import org.apache.spark.graphx.Graph._ +import org.apache.spark.graphx.impl.EdgePartition +import org.apache.spark.rdd._ +import org.scalatest.FunSuite + +class GraphOpsSuite extends FunSuite with LocalSparkContext { + + test("aggregateNeighbors") { + withSpark { sc => + val n = 3 + val star = + Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1) + + val indegrees = star.aggregateNeighbors( + (vid, edge) => Some(1), + (a: Int, b: Int) => a + b, + EdgeDirection.In) + assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet) + + val outdegrees = star.aggregateNeighbors( + (vid, edge) => Some(1), + (a: Int, b: Int) => a + b, + EdgeDirection.Out) + assert(outdegrees.collect().toSet === Set((0, n))) + + val noVertexValues = star.aggregateNeighbors[Int]( + (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None, + (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"), + EdgeDirection.In) + assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)]) + } + } + + test("joinVertices") { + withSpark { sc => + val vertices = + sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2) + val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo")))) + val g: Graph[String, String] = Graph(vertices, edges) + + val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20))) + val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u } + + val v = g1.vertices.collect().toSet + assert(v === Set((1, "one10"), (2, "two20"), (3, "three"))) + } + } + + test("collectNeighborIds") { + withSpark { sc => + val chain = (0 until 100).map(x => (x, (x+1)%100) ) + val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) } + val graph = Graph.fromEdgeTuples(rawEdges, 1.0) + val nbrs = graph.collectNeighborIds(EdgeDirection.Both) + assert(nbrs.count === chain.size) + assert(graph.numVertices === nbrs.count) + nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) } + nbrs.collect.foreach { case (vid, nbrs) => + val s = nbrs.toSet + assert(s.contains((vid + 1) % 100)) + assert(s.contains(if (vid > 0) vid - 1 else 99 )) + } + } + } + + test ("filter") { + withSpark { sc => + val n = 5 + val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x))) + val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) + val graph: Graph[Int, Int] = Graph(vertices, edges) + val filteredGraph = graph.filter( + graph => { + val degrees: VertexRDD[Int] = graph.outDegrees + graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} + }, + vpred = (vid: VertexID, deg:Int) => deg > 0 + ) + + val v = filteredGraph.vertices.collect().toSet + assert(v === Set((0,0))) + + // the map is necessary because of object-reuse in the edge iterator + val e = filteredGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet + assert(e.isEmpty) + } + } + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala new file mode 100644 index 0000000000..094fa722a0 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -0,0 +1,272 @@ +package org.apache.spark.graphx + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.graphx.Graph._ +import org.apache.spark.rdd._ + +class GraphSuite extends FunSuite with LocalSparkContext { + + def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = { + Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v") + } + + test("Graph.fromEdgeTuples") { + withSpark { sc => + val ring = (0L to 100L).zip((1L to 99L) :+ 0L) + val doubleRing = ring ++ ring + val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1) + assert(graph.edges.count() === doubleRing.size) + assert(graph.edges.collect.forall(e => e.attr == 1)) + + // uniqueEdges option should uniquify edges and store duplicate count in edge attributes + val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, Some(RandomVertexCut)) + assert(uniqueGraph.edges.count() === ring.size) + assert(uniqueGraph.edges.collect.forall(e => e.attr == 2)) + } + } + + test("Graph.fromEdges") { + withSpark { sc => + val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1) } + val graph = Graph.fromEdges(sc.parallelize(ring), 1.0F) + assert(graph.edges.count() === ring.size) + } + } + + test("Graph.apply") { + withSpark { sc => + val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L) + val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) } + val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true))) + val graph = Graph(vertices, edges, false) + assert( graph.edges.count() === rawEdges.size ) + // Vertices not explicitly provided but referenced by edges should be created automatically + assert( graph.vertices.count() === 100) + graph.triplets.map { et => + assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr)) + assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr)) + } + } + } + + test("triplets") { + withSpark { sc => + val n = 5 + val star = starGraph(sc, n) + assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet === + (1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet) + } + } + + test("partitionBy") { + withSpark { sc => + def mkGraph(edges: List[(Long, Long)]) = Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0) + def nonemptyParts(graph: Graph[Int, Int]) = { + graph.edges.partitionsRDD.mapPartitions { iter => + Iterator(iter.next()._2.iterator.toList) + }.filter(_.nonEmpty) + } + val identicalEdges = List((0L, 1L), (0L, 1L)) + val canonicalEdges = List((0L, 1L), (1L, 0L)) + val sameSrcEdges = List((0L, 1L), (0L, 2L)) + + // The two edges start out in different partitions + for (edges <- List(identicalEdges, canonicalEdges, sameSrcEdges)) { + assert(nonemptyParts(mkGraph(edges)).count === 2) + } + // partitionBy(RandomVertexCut) puts identical edges in the same partition + assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(RandomVertexCut)).count === 1) + // partitionBy(EdgePartition1D) puts same-source edges in the same partition + assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1) + // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into + // the same partition + assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1) + // partitionBy(EdgePartition2D) puts identical edges in the same partition + assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1) + + // partitionBy(EdgePartition2D) ensures that vertices need only be replicated to 2 * sqrt(p) + // partitions + val n = 100 + val p = 100 + val verts = 1 to n + val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x => + verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0) + assert(graph.edges.partitions.length === p) + val partitionedGraph = graph.partitionBy(EdgePartition2D) + assert(graph.edges.partitions.length === p) + val bound = 2 * math.sqrt(p) + // Each vertex should be replicated to at most 2 * sqrt(p) partitions + val partitionSets = partitionedGraph.edges.partitionsRDD.mapPartitions { iter => + val part = iter.next()._2 + Iterator((part.srcIds ++ part.dstIds).toSet) + }.collect + assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) + // This should not be true for the default hash partitioning + val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter => + val part = iter.next()._2 + Iterator((part.srcIds ++ part.dstIds).toSet) + }.collect + assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound)) + } + } + + test("mapVertices") { + withSpark { sc => + val n = 5 + val star = starGraph(sc, n) + // mapVertices preserving type + val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2") + assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet) + // mapVertices changing type + val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length) + assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet) + } + } + + test("mapEdges") { + withSpark { sc => + val n = 3 + val star = starGraph(sc, n) + val starWithEdgeAttrs = star.mapEdges(e => e.dstId) + + val edges = starWithEdgeAttrs.edges.collect() + assert(edges.size === n) + assert(edges.toSet === (1 to n).map(x => Edge(0, x, x)).toSet) + } + } + + test("mapTriplets") { + withSpark { sc => + val n = 5 + val star = starGraph(sc, n) + assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet === + (1L to n).map(x => Edge(0, x, "vv")).toSet) + } + } + + test("reverse") { + withSpark { sc => + val n = 5 + val star = starGraph(sc, n) + assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet) + } + } + + test("subgraph") { + withSpark { sc => + // Create a star graph of 10 veritces. + val n = 10 + val star = starGraph(sc, n) + // Take only vertices whose vids are even + val subgraph = star.subgraph(vpred = (vid, attr) => vid % 2 == 0) + + // We should have 5 vertices. + assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet) + + // And 4 edges. + assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet) + } + } + + test("mask") { + withSpark { sc => + val n = 5 + val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x))) + val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) + val graph: Graph[Int, Int] = Graph(vertices, edges) + + val subgraph = graph.subgraph( + e => e.dstId != 4L, + (vid, vdata) => vid != 3L + ).mapVertices((vid, vdata) => -1).mapEdges(e => -1) + + val projectedGraph = graph.mask(subgraph) + + val v = projectedGraph.vertices.collect().toSet + assert(v === Set((0,0), (1,1), (2,2), (4,4), (5,5))) + + // the map is necessary because of object-reuse in the edge iterator + val e = projectedGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet + assert(e === Set(Edge(0,1,1), Edge(0,2,2), Edge(0,5,5))) + + } + } + + test("groupEdges") { + withSpark { sc => + val n = 5 + val star = starGraph(sc, n) + val doubleStar = Graph.fromEdgeTuples( + sc.parallelize((1 to n).flatMap(x => + List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v") + val star2 = doubleStar.groupEdges { (a, b) => a} + assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) === + star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int])) + assert(star2.vertices.collect.toSet === star.vertices.collect.toSet) + } + } + + test("mapReduceTriplets") { + withSpark { sc => + val n = 5 + val star = starGraph(sc, n).mapVertices { (_, _) => 0 } + val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg } + val neighborDegreeSums = starDeg.mapReduceTriplets( + edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)), + (a: Int, b: Int) => a + b) + assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) + + // activeSetOpt + val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID) + val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0) + val vids = complete.mapVertices((vid, attr) => vid).cache() + val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 } + val numEvenNeighbors = vids.mapReduceTriplets(et => { + // Map function should only run on edges with destination in the active set + if (et.dstId % 2 != 0) { + throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId)) + } + Iterator((et.srcId, 1)) + }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet + assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet) + + // outerJoinVertices followed by mapReduceTriplets(activeSetOpt) + val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3) + val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache() + val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_) + val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) } + val numOddNeighbors = changedGraph.mapReduceTriplets(et => { + // Map function should only run on edges with source in the active set + if (et.srcId % 2 != 1) { + throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId)) + } + Iterator((et.dstId, 1)) + }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet + assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet) + + } + } + + test("outerJoinVertices") { + withSpark { sc => + val n = 5 + val reverseStar = starGraph(sc, n).reverse + // outerJoinVertices changing type + val reverseStarDegrees = + reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) } + val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets( + et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)), + (a: Int, b: Int) => a + b).collect.toSet + assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0))) + // outerJoinVertices preserving type + val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString } + val newReverseStar = + reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") } + assert(newReverseStar.vertices.map(_._2).collect.toSet === + (0 to n).map(x => "v%d".format(x)).toSet) + } + } + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala new file mode 100644 index 0000000000..6aec2ea8a9 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala @@ -0,0 +1,28 @@ +package org.apache.spark.graphx + +import org.scalatest.Suite +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.SparkContext + + +/** + * Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped + * after each test. +*/ +trait LocalSparkContext { + System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + System.setProperty("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + + /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */ + def withSpark[T](f: SparkContext => T) = { + val sc = new SparkContext("local", "test") + try { + f(sc) + } finally { + sc.stop() + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala new file mode 100644 index 0000000000..429622357f --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala @@ -0,0 +1,41 @@ +package org.apache.spark.graphx + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.rdd._ + +class PregelSuite extends FunSuite with LocalSparkContext { + + test("1 iteration") { + withSpark { sc => + val n = 5 + val star = + Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v") + val result = Pregel(star, 0)( + (vid, attr, msg) => attr, + et => Iterator.empty, + (a: Int, b: Int) => throw new Exception("mergeMsg run unexpectedly")) + assert(result.vertices.collect.toSet === star.vertices.collect.toSet) + } + } + + test("chain propagation") { + withSpark { sc => + val n = 5 + val chain = Graph.fromEdgeTuples( + sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3), + 0).cache() + assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet) + val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 } + assert(chainWithSeed.vertices.collect.toSet === + Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet) + val result = Pregel(chainWithSeed, 0)( + (vid, attr, msg) => math.max(msg, attr), + et => Iterator((et.dstId, et.srcAttr)), + (a: Int, b: Int) => math.max(a, b)) + assert(result.vertices.collect.toSet === + chain.vertices.mapValues { (vid, attr) => attr + 1 }.collect.toSet) + } + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala new file mode 100644 index 0000000000..3ba412c1f8 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala @@ -0,0 +1,183 @@ +package org.apache.spark.graphx + +import java.io.{EOFException, ByteArrayInputStream, ByteArrayOutputStream} + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark._ +import org.apache.spark.graphx.impl._ +import org.apache.spark.graphx.impl.MsgRDDFunctions._ +import org.apache.spark.serializer.SerializationStream + + +class SerializerSuite extends FunSuite with LocalSparkContext { + + test("IntVertexBroadcastMsgSerializer") { + val conf = new SparkConf(false) + val outMsg = new VertexBroadcastMsg[Int](3, 4, 5) + val bout = new ByteArrayOutputStream + val outStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout) + outStrm.writeObject(outMsg) + outStrm.writeObject(outMsg) + bout.flush() + val bin = new ByteArrayInputStream(bout.toByteArray) + val inStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin) + val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject() + val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject() + assert(outMsg.vid === inMsg1.vid) + assert(outMsg.vid === inMsg2.vid) + assert(outMsg.data === inMsg1.data) + assert(outMsg.data === inMsg2.data) + + intercept[EOFException] { + inStrm.readObject() + } + } + + test("LongVertexBroadcastMsgSerializer") { + val conf = new SparkConf(false) + val outMsg = new VertexBroadcastMsg[Long](3, 4, 5) + val bout = new ByteArrayOutputStream + val outStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout) + outStrm.writeObject(outMsg) + outStrm.writeObject(outMsg) + bout.flush() + val bin = new ByteArrayInputStream(bout.toByteArray) + val inStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin) + val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject() + val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject() + assert(outMsg.vid === inMsg1.vid) + assert(outMsg.vid === inMsg2.vid) + assert(outMsg.data === inMsg1.data) + assert(outMsg.data === inMsg2.data) + + intercept[EOFException] { + inStrm.readObject() + } + } + + test("DoubleVertexBroadcastMsgSerializer") { + val conf = new SparkConf(false) + val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0) + val bout = new ByteArrayOutputStream + val outStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout) + outStrm.writeObject(outMsg) + outStrm.writeObject(outMsg) + bout.flush() + val bin = new ByteArrayInputStream(bout.toByteArray) + val inStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin) + val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject() + val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject() + assert(outMsg.vid === inMsg1.vid) + assert(outMsg.vid === inMsg2.vid) + assert(outMsg.data === inMsg1.data) + assert(outMsg.data === inMsg2.data) + + intercept[EOFException] { + inStrm.readObject() + } + } + + test("IntAggMsgSerializer") { + val conf = new SparkConf(false) + val outMsg = (4: VertexID, 5) + val bout = new ByteArrayOutputStream + val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout) + outStrm.writeObject(outMsg) + outStrm.writeObject(outMsg) + bout.flush() + val bin = new ByteArrayInputStream(bout.toByteArray) + val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin) + val inMsg1: (VertexID, Int) = inStrm.readObject() + val inMsg2: (VertexID, Int) = inStrm.readObject() + assert(outMsg === inMsg1) + assert(outMsg === inMsg2) + + intercept[EOFException] { + inStrm.readObject() + } + } + + test("LongAggMsgSerializer") { + val conf = new SparkConf(false) + val outMsg = (4: VertexID, 1L << 32) + val bout = new ByteArrayOutputStream + val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout) + outStrm.writeObject(outMsg) + outStrm.writeObject(outMsg) + bout.flush() + val bin = new ByteArrayInputStream(bout.toByteArray) + val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin) + val inMsg1: (VertexID, Long) = inStrm.readObject() + val inMsg2: (VertexID, Long) = inStrm.readObject() + assert(outMsg === inMsg1) + assert(outMsg === inMsg2) + + intercept[EOFException] { + inStrm.readObject() + } + } + + test("DoubleAggMsgSerializer") { + val conf = new SparkConf(false) + val outMsg = (4: VertexID, 5.0) + val bout = new ByteArrayOutputStream + val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout) + outStrm.writeObject(outMsg) + outStrm.writeObject(outMsg) + bout.flush() + val bin = new ByteArrayInputStream(bout.toByteArray) + val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin) + val inMsg1: (VertexID, Double) = inStrm.readObject() + val inMsg2: (VertexID, Double) = inStrm.readObject() + assert(outMsg === inMsg1) + assert(outMsg === inMsg2) + + intercept[EOFException] { + inStrm.readObject() + } + } + + test("TestShuffleVertexBroadcastMsg") { + withSpark { sc => + val bmsgs = sc.parallelize(0 until 100, 10).map { pid => + new VertexBroadcastMsg[Int](pid, pid, pid) + } + bmsgs.partitionBy(new HashPartitioner(3)).collect() + } + } + + test("variable long encoding") { + def testVarLongEncoding(v: Long, optimizePositive: Boolean) { + val bout = new ByteArrayOutputStream + val stream = new ShuffleSerializationStream(bout) { + def writeObject[T](t: T): SerializationStream = { + writeVarLong(t.asInstanceOf[Long], optimizePositive = optimizePositive) + this + } + } + stream.writeObject(v) + + val bin = new ByteArrayInputStream(bout.toByteArray) + val dstream = new ShuffleDeserializationStream(bin) { + def readObject[T](): T = { + readVarLong(optimizePositive).asInstanceOf[T] + } + } + val read = dstream.readObject[Long]() + assert(read === v) + } + + // Test all variable encoding code path (each branch uses 7 bits, i.e. 1L << 7 difference) + val d = Random.nextLong() % 128 + Seq[Long](0, 1L << 0 + d, 1L << 7 + d, 1L << 14 + d, 1L << 21 + d, 1L << 28 + d, 1L << 35 + d, + 1L << 42 + d, 1L << 49 + d, 1L << 56 + d, 1L << 63 + d).foreach { number => + testVarLongEncoding(number, optimizePositive = false) + testVarLongEncoding(number, optimizePositive = true) + testVarLongEncoding(-number, optimizePositive = false) + testVarLongEncoding(-number, optimizePositive = true) + } + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala new file mode 100644 index 0000000000..573b708e89 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -0,0 +1,85 @@ +package org.apache.spark.graphx + +import org.apache.spark.SparkContext +import org.apache.spark.graphx.Graph._ +import org.apache.spark.graphx.impl.EdgePartition +import org.apache.spark.rdd._ +import org.scalatest.FunSuite + +class VertexRDDSuite extends FunSuite with LocalSparkContext { + + def vertices(sc: SparkContext, n: Int) = { + VertexRDD(sc.parallelize((0 to n).map(x => (x.toLong, x)), 5)) + } + + test("filter") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n) + val evens = verts.filter(q => ((q._2 % 2) == 0)) + assert(evens.count === (0 to n).filter(_ % 2 == 0).size) + } + } + + test("mapValues") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n) + val negatives = verts.mapValues(x => -x).cache() // Allow joining b with a derived RDD of b + assert(negatives.count === n + 1) + } + } + + test("diff") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n) + val flipEvens = verts.mapValues(x => if (x % 2 == 0) -x else x) + // diff should keep only the changed vertices + assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet) + // diff should keep the vertex values from `other` + assert(flipEvens.diff(verts).map(_._2).collect().toSet === (2 to n by 2).toSet) + } + } + + test("leftJoin") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n) + val evens = verts.filter(q => ((q._2 % 2) == 0)) + // leftJoin with another VertexRDD + assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet === + (0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet) + // leftJoin with an RDD + val evensRDD = evens.map(identity) + assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet === + (0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet) + } + } + + test("innerJoin") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n) + val evens = verts.filter(q => ((q._2 % 2) == 0)) + // innerJoin with another VertexRDD + assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect.toSet === + (0 to n by 2).map(x => (x.toLong, 0)).toSet) + // innerJoin with an RDD + val evensRDD = evens.map(identity) + assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect.toSet === + (0 to n by 2).map(x => (x.toLong, 0)).toSet) } + } + + test("aggregateUsingIndex") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n) + val messageTargets = (0 to n) ++ (0 to n by 2) + val messages = sc.parallelize(messageTargets.map(x => (x.toLong, 1))) + assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect.toSet === + (0 to n).map(x => (x.toLong, if (x % 2 == 0) 2 else 1)).toSet) + } + } + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala new file mode 100644 index 0000000000..5e2ecfcde9 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala @@ -0,0 +1,83 @@ +package org.apache.spark.graphx.algorithms + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + + +class ConnectedComponentsSuite extends FunSuite with LocalSparkContext { + + test("Grid Connected Components") { + withSpark { sc => + val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache() + val ccGraph = ConnectedComponents.run(gridGraph).cache() + val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum + assert(maxCCid === 0) + } + } // end of Grid connected components + + + test("Reverse Grid Connected Components") { + withSpark { sc => + val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache() + val ccGraph = ConnectedComponents.run(gridGraph).cache() + val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum + assert(maxCCid === 0) + } + } // end of Grid connected components + + + test("Chain Connected Components") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x+1) ) + val chain2 = (10 until 20).map(x => (x, x+1) ) + val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } + val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val ccGraph = ConnectedComponents.run(twoChains).cache() + val vertices = ccGraph.vertices.collect() + for ( (id, cc) <- vertices ) { + if(id < 10) { assert(cc === 0) } + else { assert(cc === 10) } + } + val ccMap = vertices.toMap + for (id <- 0 until 20) { + if (id < 10) { + assert(ccMap(id) === 0) + } else { + assert(ccMap(id) === 10) + } + } + } + } // end of chain connected components + + test("Reverse Chain Connected Components") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x+1) ) + val chain2 = (10 until 20).map(x => (x, x+1) ) + val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } + val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse.cache() + val ccGraph = ConnectedComponents.run(twoChains).cache() + val vertices = ccGraph.vertices.collect + for ( (id, cc) <- vertices ) { + if (id < 10) { + assert(cc === 0) + } else { + assert(cc === 10) + } + } + val ccMap = vertices.toMap + for ( id <- 0 until 20 ) { + if (id < 10) { + assert(ccMap(id) === 0) + } else { + assert(ccMap(id) === 10) + } + } + } + } // end of reverse chain connected components + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala new file mode 100644 index 0000000000..e365b1e230 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala @@ -0,0 +1,126 @@ +package org.apache.spark.graphx.algorithms + +import org.scalatest.FunSuite + +import org.apache.spark.graphx._ +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ + +import org.apache.spark.graphx.util.GraphGenerators + + +object GridPageRank { + def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { + val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) + val outDegree = Array.fill(nRows * nCols)(0) + // Convert row column address into vertex ids (row major order) + def sub2ind(r: Int, c: Int): Int = r * nCols + c + // Make the grid graph + for (r <- 0 until nRows; c <- 0 until nCols) { + val ind = sub2ind(r,c) + if (r+1 < nRows) { + outDegree(ind) += 1 + inNbrs(sub2ind(r+1,c)) += ind + } + if (c+1 < nCols) { + outDegree(ind) += 1 + inNbrs(sub2ind(r,c+1)) += ind + } + } + // compute the pagerank + var pr = Array.fill(nRows * nCols)(resetProb) + for (iter <- 0 until nIter) { + val oldPr = pr + pr = new Array[Double](nRows * nCols) + for (ind <- 0 until (nRows * nCols)) { + pr(ind) = resetProb + (1.0 - resetProb) * + inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum + } + } + (0L until (nRows * nCols)).zip(pr) + } + +} + + +class PageRankSuite extends FunSuite with LocalSparkContext { + + def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { + a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } + .map { case (id, error) => error }.sum + } + + test("Star PageRank") { + withSpark { sc => + val nVertices = 100 + val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val resetProb = 0.15 + val errorTol = 1.0e-5 + + val staticRanks1 = PageRank.run(starGraph, numIter = 1, resetProb).vertices.cache() + val staticRanks2 = PageRank.run(starGraph, numIter = 2, resetProb).vertices.cache() + + // Static PageRank should only take 2 iterations to converge + val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => + if (pr1 != pr2) 1 else 0 + }.map { case (vid, test) => test }.sum + assert(notMatching === 0) + + val staticErrors = staticRanks2.map { case (vid, pr) => + val correct = (vid > 0 && pr == resetProb) || + (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) + if (!correct) 1 else 0 + } + assert(staticErrors.sum === 0) + + val dynamicRanks = PageRank.runUntillConvergence(starGraph, 0, resetProb).vertices.cache() + val standaloneRanks = PageRank.runStandalone(starGraph, 0, resetProb).cache() + assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) + assert(compareRanks(staticRanks2, standaloneRanks) < errorTol) + } + } // end of test Star PageRank + + + + test("Grid PageRank") { + withSpark { sc => + val rows = 10 + val cols = 10 + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 50 + val errorTol = 1.0e-5 + val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() + + val staticRanks = PageRank.run(gridGraph, numIter, resetProb).vertices.cache() + val dynamicRanks = PageRank.runUntillConvergence(gridGraph, tol, resetProb).vertices.cache() + val standaloneRanks = PageRank.runStandalone(gridGraph, tol, resetProb).cache() + val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))) + + assert(compareRanks(staticRanks, referenceRanks) < errorTol) + assert(compareRanks(dynamicRanks, referenceRanks) < errorTol) + assert(compareRanks(standaloneRanks, referenceRanks) < errorTol) + } + } // end of Grid PageRank + + + test("Chain PageRank") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x+1) ) + val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 10 + val errorTol = 1.0e-5 + + val staticRanks = PageRank.run(chain, numIter, resetProb).vertices.cache() + val dynamicRanks = PageRank.runUntillConvergence(chain, tol, resetProb).vertices.cache() + val standaloneRanks = PageRank.runStandalone(chain, tol, resetProb).cache() + + assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + assert(compareRanks(dynamicRanks, standaloneRanks) < errorTol) + } + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala new file mode 100644 index 0000000000..06604198d7 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala @@ -0,0 +1,30 @@ +package org.apache.spark.graphx.algorithms + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + + +class SVDPlusPlusSuite extends FunSuite with LocalSparkContext { + + test("Test SVD++ with mean square error on training set") { + withSpark { sc => + val svdppErr = 8.0 + val edges = sc.textFile("mllib/data/als/test.data").map { line => + val fields = line.split(",") + Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble) + } + val conf = new SVDPlusPlusConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations + var (graph, u) = SVDPlusPlus.run(edges, conf) + val err = graph.vertices.collect.map{ case (vid, vd) => + if (vid % 2 == 1) vd._4 else 0.0 + }.reduce(_ + _) / graph.triplets.collect.size + assert(err <= svdppErr) + } + } + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala new file mode 100644 index 0000000000..696b80944e --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala @@ -0,0 +1,57 @@ +package org.apache.spark.graphx.algorithms + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + + +class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext { + + test("Island Strongly Connected Components") { + withSpark { sc => + val vertices = sc.parallelize((1L to 5L).map(x => (x, -1))) + val edges = sc.parallelize(Seq.empty[Edge[Int]]) + val graph = Graph(vertices, edges) + val sccGraph = StronglyConnectedComponents.run(graph, 5) + for ((id, scc) <- sccGraph.vertices.collect) { + assert(id == scc) + } + } + } + + test("Cycle Strongly Connected Components") { + withSpark { sc => + val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7))) + val graph = Graph.fromEdgeTuples(rawEdges, -1) + val sccGraph = StronglyConnectedComponents.run(graph, 20) + for ((id, scc) <- sccGraph.vertices.collect) { + assert(0L == scc) + } + } + } + + test("2 Cycle Strongly Connected Components") { + withSpark { sc => + val edges = + Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++ + Array(6L -> 0L, 5L -> 7L) + val rawEdges = sc.parallelize(edges) + val graph = Graph.fromEdgeTuples(rawEdges, -1) + val sccGraph = StronglyConnectedComponents.run(graph, 20) + for ((id, scc) <- sccGraph.vertices.collect) { + if (id < 3) + assert(0L == scc) + else if (id < 6) + assert(3L == scc) + else + assert(id == scc) + } + } + } + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala new file mode 100644 index 0000000000..0e59912754 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala @@ -0,0 +1,73 @@ +package org.apache.spark.graphx.algorithms + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + + +class TriangleCountSuite extends FunSuite with LocalSparkContext { + + test("Count a single triangle") { + withSpark { sc => + val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val triangleCount = TriangleCount.run(graph) + val verts = triangleCount.vertices + verts.collect.foreach { case (vid, count) => assert(count === 1) } + } + } + + test("Count two triangles") { + withSpark { sc => + val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> -1L, -1L -> -2L, -2L -> 0L) + val rawEdges = sc.parallelize(triangles, 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val triangleCount = TriangleCount.run(graph) + val verts = triangleCount.vertices + verts.collect().foreach { case (vid, count) => + if (vid == 0) { + assert(count === 2) + } else { + assert(count === 1) + } + } + } + } + + test("Count two triangles with bi-directed edges") { + withSpark { sc => + val triangles = + Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> -1L, -1L -> -2L, -2L -> 0L) + val revTriangles = triangles.map { case (a,b) => (b,a) } + val rawEdges = sc.parallelize(triangles ++ revTriangles, 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val triangleCount = TriangleCount.run(graph) + val verts = triangleCount.vertices + verts.collect().foreach { case (vid, count) => + if (vid == 0) { + assert(count === 4) + } else { + assert(count === 2) + } + } + } + } + + test("Count a single triangle with duplicate edges") { + withSpark { sc => + val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) + val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() + val triangleCount = TriangleCount.run(graph) + val verts = triangleCount.vertices + verts.collect.foreach { case (vid, count) => assert(count === 1) } + } + } + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala new file mode 100644 index 0000000000..eb82436f09 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -0,0 +1,76 @@ +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.graphx._ + +class EdgePartitionSuite extends FunSuite { + + test("reverse") { + val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0)) + val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0)) + val builder = new EdgePartitionBuilder[Int] + for (e <- edges) { + builder.add(e.srcId, e.dstId, e.attr) + } + val edgePartition = builder.toEdgePartition + assert(edgePartition.reverse.iterator.map(_.copy()).toList === reversedEdges) + assert(edgePartition.reverse.reverse.iterator.map(_.copy()).toList === edges) + } + + test("map") { + val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0)) + val builder = new EdgePartitionBuilder[Int] + for (e <- edges) { + builder.add(e.srcId, e.dstId, e.attr) + } + val edgePartition = builder.toEdgePartition + assert(edgePartition.map(e => e.srcId + e.dstId).iterator.map(_.copy()).toList === + edges.map(e => e.copy(attr = e.srcId + e.dstId))) + } + + test("groupEdges") { + val edges = List( + Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32)) + val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36)) + val builder = new EdgePartitionBuilder[Int] + for (e <- edges) { + builder.add(e.srcId, e.dstId, e.attr) + } + val edgePartition = builder.toEdgePartition + assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges) + } + + test("indexIterator") { + val edgesFrom0 = List(Edge(0, 1, 0)) + val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0)) + val sortedEdges = edgesFrom0 ++ edgesFrom1 + val builder = new EdgePartitionBuilder[Int] + for (e <- Random.shuffle(sortedEdges)) { + builder.add(e.srcId, e.dstId, e.attr) + } + + val edgePartition = builder.toEdgePartition + assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges) + assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0) + assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1) + } + + test("innerJoin") { + def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { + val builder = new EdgePartitionBuilder[A] + for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) } + builder.toEdgePartition + } + val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) + val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0)) + val a = makeEdgePartition(aList) + val b = makeEdgePartition(bList) + + assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList === + List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0))) + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala new file mode 100644 index 0000000000..d37d64e8c8 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala @@ -0,0 +1,113 @@ +package org.apache.spark.graphx.impl + +import org.apache.spark.graphx._ +import org.scalatest.FunSuite + +class VertexPartitionSuite extends FunSuite { + + test("isDefined, filter") { + val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).filter { (vid, attr) => vid == 0 } + assert(vp.isDefined(0)) + assert(!vp.isDefined(1)) + assert(!vp.isDefined(2)) + assert(!vp.isDefined(-1)) + } + + test("isActive, numActives, replaceActives") { + val vp = VertexPartition(Iterator((0L, 1), (1L, 1))) + .filter { (vid, attr) => vid == 0 } + .replaceActives(Iterator(0, 2, 0)) + assert(vp.isActive(0)) + assert(!vp.isActive(1)) + assert(vp.isActive(2)) + assert(!vp.isActive(-1)) + assert(vp.numActives == Some(2)) + } + + test("map") { + val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).map { (vid, attr) => 2 } + assert(vp(0) === 2) + } + + test("diff") { + val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1))) + val vp2 = vp.filter { (vid, attr) => vid <= 1 } + val vp3a = vp.map { (vid, attr) => 2 } + val vp3b = VertexPartition(vp3a.iterator) + // diff with same index + val diff1 = vp2.diff(vp3a) + assert(diff1(0) === 2) + assert(diff1(1) === 2) + assert(diff1(2) === 2) + assert(!diff1.isDefined(2)) + // diff with different indexes + val diff2 = vp2.diff(vp3b) + assert(diff2(0) === 2) + assert(diff2(1) === 2) + assert(diff2(2) === 2) + assert(!diff2.isDefined(2)) + } + + test("leftJoin") { + val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1))) + val vp2a = vp.filter { (vid, attr) => vid <= 1 }.map { (vid, attr) => 2 } + val vp2b = VertexPartition(vp2a.iterator) + // leftJoin with same index + val join1 = vp.leftJoin(vp2a) { (vid, a, bOpt) => bOpt.getOrElse(a) } + assert(join1.iterator.toSet === Set((0L, 2), (1L, 2), (2L, 1))) + // leftJoin with different indexes + val join2 = vp.leftJoin(vp2b) { (vid, a, bOpt) => bOpt.getOrElse(a) } + assert(join2.iterator.toSet === Set((0L, 2), (1L, 2), (2L, 1))) + // leftJoin an iterator + val join3 = vp.leftJoin(vp2a.iterator) { (vid, a, bOpt) => bOpt.getOrElse(a) } + assert(join3.iterator.toSet === Set((0L, 2), (1L, 2), (2L, 1))) + } + + test("innerJoin") { + val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1))) + val vp2a = vp.filter { (vid, attr) => vid <= 1 }.map { (vid, attr) => 2 } + val vp2b = VertexPartition(vp2a.iterator) + // innerJoin with same index + val join1 = vp.innerJoin(vp2a) { (vid, a, b) => b } + assert(join1.iterator.toSet === Set((0L, 2), (1L, 2))) + // innerJoin with different indexes + val join2 = vp.innerJoin(vp2b) { (vid, a, b) => b } + assert(join2.iterator.toSet === Set((0L, 2), (1L, 2))) + // innerJoin an iterator + val join3 = vp.innerJoin(vp2a.iterator) { (vid, a, b) => b } + assert(join3.iterator.toSet === Set((0L, 2), (1L, 2))) + } + + test("createUsingIndex") { + val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1))) + val elems = List((0L, 2), (2L, 2), (3L, 2)) + val vp2 = vp.createUsingIndex(elems.iterator) + assert(vp2.iterator.toSet === Set((0L, 2), (2L, 2))) + assert(vp.index === vp2.index) + } + + test("innerJoinKeepLeft") { + val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1))) + val elems = List((0L, 2), (2L, 2), (3L, 2)) + val vp2 = vp.innerJoinKeepLeft(elems.iterator) + assert(vp2.iterator.toSet === Set((0L, 2), (2L, 2))) + assert(vp2(1) === 1) + } + + test("aggregateUsingIndex") { + val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1))) + val messages = List((0L, "a"), (2L, "b"), (0L, "c"), (3L, "d")) + val vp2 = vp.aggregateUsingIndex[String](messages.iterator, _ + _) + assert(vp2.iterator.toSet === Set((0L, "ac"), (2L, "b"))) + } + + test("reindex") { + val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1))) + val vp2 = vp.filter { (vid, attr) => vid <= 1 } + val vp3 = vp2.reindex() + assert(vp2.iterator.toSet === vp3.iterator.toSet) + assert(vp2(2) === 1) + assert(vp3.index.getPos(2) === -1) + } + +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala new file mode 100644 index 0000000000..11db339750 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala @@ -0,0 +1,93 @@ +package org.apache.spark.graphx.util + +import org.scalatest.FunSuite + + +class BytecodeUtilsSuite extends FunSuite { + + import BytecodeUtilsSuite.TestClass + + test("closure invokes a method") { + val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); } + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) + + val c2 = {e: TestClass => println(e.foo); println(e.bar); } + assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo")) + assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar")) + assert(!BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz")) + + val c3 = {e: TestClass => println(e.foo); } + assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo")) + assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar")) + assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz")) + } + + test("closure inside a closure invokes a method") { + val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); } + val c2 = {e: TestClass => c1(e); println(e.foo); } + assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo")) + assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz")) + } + + test("closure inside a closure inside a closure invokes a method") { + val c1 = {e: TestClass => println(e.baz); } + val c2 = {e: TestClass => c1(e); println(e.foo); } + val c3 = {e: TestClass => c2(e) } + assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo")) + assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz")) + } + + test("closure calling a function that invokes a method") { + def zoo(e: TestClass) { + println(e.baz) + } + val c1 = {e: TestClass => zoo(e)} + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) + } + + test("closure calling a function that invokes a method which uses another closure") { + val c2 = {e: TestClass => println(e.baz)} + def zoo(e: TestClass) { + c2(e) + } + val c1 = {e: TestClass => zoo(e)} + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) + } + + test("nested closure") { + val c2 = {e: TestClass => println(e.baz)} + def zoo(e: TestClass, c: TestClass => Unit) { + c(e) + } + val c1 = {e: TestClass => zoo(e, c2)} + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) + assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) + assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) + } + + // The following doesn't work yet, because the byte code doesn't contain any information + // about what exactly "c" is. +// test("invoke interface") { +// val c1 = {e: TestClass => c(e)} +// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) +// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) +// assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) +// } + + private val c = {e: TestClass => println(e.baz)} +} + + +object BytecodeUtilsSuite { + class TestClass(val foo: Int, val bar: Long) { + def baz: Boolean = false + } +} |