aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-09 14:31:33 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-09 14:31:33 -0800
commit731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1 (patch)
tree51fa693c046869f0706337046a1581c09e56e4b5 /graphx
parent100718bcd3f6ade1a93256458ec1528bb9142b5e (diff)
downloadspark-731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1.tar.gz
spark-731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1.tar.bz2
spark-731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1.zip
graph -> graphx
Diffstat (limited to 'graphx')
-rw-r--r--graphx/pom.xml129
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Analytics.scala593
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Edge.scala50
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala36
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala73
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala63
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala437
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala28
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala134
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala113
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala277
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala94
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala122
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala361
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala37
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala205
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala103
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala87
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala78
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala220
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala46
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala43
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala422
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala93
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala182
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala64
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala386
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala262
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/package.scala22
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/perf/BagelTest.scala76
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/perf/SparkTest.scala75
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala114
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala282
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/HashUtils.scala21
-rw-r--r--graphx/src/test/resources/log4j.properties28
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala92
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala272
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala28
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala41
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala183
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala85
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala83
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala126
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala30
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala57
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala73
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala76
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala113
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala93
49 files changed, 6708 insertions, 0 deletions
diff --git a/graphx/pom.xml b/graphx/pom.xml
new file mode 100644
index 0000000000..fd3dcaad7c
--- /dev/null
+++ b/graphx/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-graph_2.9.3</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Graph</name>
+ <url>http://spark-project.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.9.3</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop1</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/Analytics.scala
new file mode 100644
index 0000000000..0cafc3fdf9
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Analytics.scala
@@ -0,0 +1,593 @@
+package org.apache.spark.graphx
+
+import org.apache.spark._
+import org.apache.spark.graphx.algorithms._
+
+
+/**
+ * The Analytics object contains a collection of basic graph analytics
+ * algorithms that operate largely on the graph structure.
+ *
+ * In addition the Analytics object contains a driver `main` which can
+ * be used to apply the various functions to graphs in standard
+ * formats.
+ */
+object Analytics extends Logging {
+
+ def main(args: Array[String]) = {
+ val host = args(0)
+ val taskType = args(1)
+ val fname = args(2)
+ val options = args.drop(3).map { arg =>
+ arg.dropWhile(_ == '-').split('=') match {
+ case Array(opt, v) => (opt -> v)
+ case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+ }
+ }
+
+ def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = {
+ loggers.map{
+ loggerName =>
+ val logger = org.apache.log4j.Logger.getLogger(loggerName)
+ val prevLevel = logger.getLevel()
+ logger.setLevel(level)
+ loggerName -> prevLevel
+ }.toMap
+ }
+
+ def pickPartitioner(v: String): PartitionStrategy = {
+ v match {
+ case "RandomVertexCut" => RandomVertexCut
+ case "EdgePartition1D" => EdgePartition1D
+ case "EdgePartition2D" => EdgePartition2D
+ case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
+ case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v)
+ }
+ }
+// setLogLevels(org.apache.log4j.Level.DEBUG, Seq("org.apache.spark"))
+
+ val serializer = "org.apache.spark.serializer.KryoSerializer"
+ System.setProperty("spark.serializer", serializer)
+ //System.setProperty("spark.shuffle.compress", "false")
+ System.setProperty("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+
+ taskType match {
+ case "pagerank" => {
+
+ var tol:Float = 0.001F
+ var outFname = ""
+ var numVPart = 4
+ var numEPart = 4
+ var partitionStrategy: Option[PartitionStrategy] = None
+
+ options.foreach{
+ case ("tol", v) => tol = v.toFloat
+ case ("output", v) => outFname = v
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("numEPart", v) => numEPart = v.toInt
+ case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ println("======================================")
+ println("| PageRank |")
+ println("======================================")
+
+ val sc = new SparkContext(host, "PageRank(" + fname + ")")
+
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ println("GRAPHX: Number of vertices " + graph.vertices.count)
+ println("GRAPHX: Number of edges " + graph.edges.count)
+
+ //val pr = Analytics.pagerank(graph, numIter)
+ val pr = PageRank.runStandalone(graph, tol)
+
+ println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_))
+
+ if (!outFname.isEmpty) {
+ logWarning("Saving pageranks of pages to " + outFname)
+ pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+ }
+
+ sc.stop()
+ }
+
+ case "cc" => {
+
+ var numIter = Int.MaxValue
+ var numVPart = 4
+ var numEPart = 4
+ var isDynamic = false
+ var partitionStrategy: Option[PartitionStrategy] = None
+
+ options.foreach{
+ case ("numIter", v) => numIter = v.toInt
+ case ("dynamic", v) => isDynamic = v.toBoolean
+ case ("numEPart", v) => numEPart = v.toInt
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ if(!isDynamic && numIter == Int.MaxValue) {
+ println("Set number of iterations!")
+ sys.exit(1)
+ }
+ println("======================================")
+ println("| Connected Components |")
+ println("--------------------------------------")
+ println(" Using parameters:")
+ println(" \tDynamic: " + isDynamic)
+ println(" \tNumIter: " + numIter)
+ println("======================================")
+
+ val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ val cc = ConnectedComponents.run(graph)
+ println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
+ sc.stop()
+ }
+
+ case "triangles" => {
+ var numVPart = 4
+ var numEPart = 4
+ // TriangleCount requires the graph to be partitioned
+ var partitionStrategy: PartitionStrategy = RandomVertexCut
+
+ options.foreach{
+ case ("numEPart", v) => numEPart = v.toInt
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+ println("======================================")
+ println("| Triangle Count |")
+ println("--------------------------------------")
+ val sc = new SparkContext(host, "TriangleCount(" + fname + ")")
+ val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
+ minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
+ val triangles = TriangleCount.run(graph)
+ println("Triangles: " + triangles.vertices.map {
+ case (vid,data) => data.toLong
+ }.reduce(_+_) / 3)
+ sc.stop()
+ }
+
+//
+// case "shortestpath" => {
+//
+// var numIter = Int.MaxValue
+// var isDynamic = true
+// var sources: List[Int] = List.empty
+//
+// options.foreach{
+// case ("numIter", v) => numIter = v.toInt
+// case ("dynamic", v) => isDynamic = v.toBoolean
+// case ("source", v) => sources ++= List(v.toInt)
+// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+// }
+//
+//
+// if(!isDynamic && numIter == Int.MaxValue) {
+// println("Set number of iterations!")
+// sys.exit(1)
+// }
+//
+// if(sources.isEmpty) {
+// println("No sources provided!")
+// sys.exit(1)
+// }
+//
+// println("======================================")
+// println("| Shortest Path |")
+// println("--------------------------------------")
+// println(" Using parameters:")
+// println(" \tDynamic: " + isDynamic)
+// println(" \tNumIter: " + numIter)
+// println(" \tSources: [" + sources.mkString(", ") + "]")
+// println("======================================")
+//
+// val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
+// val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0F else a(0).toFloat ) )
+// //val sp = Analytics.shortestPath(graph, sources, numIter)
+// // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
+// // else Analytics.shortestPath(graph, sources, numIter)
+// println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
+//
+// sc.stop()
+// }
+
+
+ // case "als" => {
+
+ // var numIter = 5
+ // var lambda = 0.01
+ // var latentK = 10
+ // var usersFname = "usersFactors.tsv"
+ // var moviesFname = "moviesFname.tsv"
+ // var numVPart = 4
+ // var numEPart = 4
+
+ // options.foreach{
+ // case ("numIter", v) => numIter = v.toInt
+ // case ("lambda", v) => lambda = v.toDouble
+ // case ("latentK", v) => latentK = v.toInt
+ // case ("usersFname", v) => usersFname = v
+ // case ("moviesFname", v) => moviesFname = v
+ // case ("numVPart", v) => numVPart = v.toInt
+ // case ("numEPart", v) => numEPart = v.toInt
+ // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ // }
+
+ // println("======================================")
+ // println("| Alternating Least Squares |")
+ // println("--------------------------------------")
+ // println(" Using parameters:")
+ // println(" \tNumIter: " + numIter)
+ // println(" \tLambda: " + lambda)
+ // println(" \tLatentK: " + latentK)
+ // println(" \tusersFname: " + usersFname)
+ // println(" \tmoviesFname: " + moviesFname)
+ // println("======================================")
+
+ // val sc = new SparkContext(host, "ALS(" + fname + ")")
+ // val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble )
+ // graph.numVPart = numVPart
+ // graph.numEPart = numEPart
+
+ // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
+ // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
+ // assert(maxUser < minMovie)
+
+ // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
+ // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
+ // .saveAsTextFile(usersFname)
+ // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
+ // .saveAsTextFile(moviesFname)
+
+ // sc.stop()
+ // }
+
+
+ case _ => {
+ println("Invalid task type.")
+ }
+ }
+ }
+
+ // /**
+ // * Compute the shortest path to a set of markers
+ // */
+ // def shortestPath[VD: Manifest](graph: Graph[VD, Double], sources: List[Int], numIter: Int) = {
+ // val sourceSet = sources.toSet
+ // val spGraph = graph.mapVertices {
+ // case Vertex(vid, _) => Vertex(vid, (if(sourceSet.contains(vid)) 0.0 else Double.MaxValue))
+ // }
+ // GraphLab.iterateGA[Double, Double, Double](spGraph)(
+ // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
+ // (a: Double, b: Double) => math.min(a, b), // merge
+ // (v, a: Option[Double]) => math.min(v.data, a.getOrElse(Double.MaxValue)), // apply
+ // numIter,
+ // gatherDirection = EdgeDirection.In)
+ // }
+
+ // /**
+ // * Compute the connected component membership of each vertex
+ // * and return an RDD with the vertex value containing the
+ // * lowest vertex id in the connected component containing
+ // * that vertex.
+ // */
+ // def dynamicConnectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
+ // numIter: Int = Int.MaxValue) = {
+
+ // val vertices = graph.vertices.mapPartitions(iter => iter.map { case (vid, _) => (vid, vid) })
+ // val edges = graph.edges // .mapValues(v => None)
+ // val ccGraph = new Graph(vertices, edges)
+
+ // ccGraph.iterateDynamic(
+ // (me_id, edge) => edge.otherVertex(me_id).data, // gather
+ // (a: Int, b: Int) => math.min(a, b), // merge
+ // Integer.MAX_VALUE,
+ // (v, a: Int) => math.min(v.data, a), // apply
+ // (me_id, edge) => edge.otherVertex(me_id).data > edge.vertex(me_id).data, // scatter
+ // numIter,
+ // gatherEdges = EdgeDirection.Both,
+ // scatterEdges = EdgeDirection.Both).vertices
+ // //
+ // // graph_ret.vertices.collect.foreach(println)
+ // // graph_ret.edges.take(10).foreach(println)
+ // }
+
+
+ // /**
+ // * Compute the shortest path to a set of markers
+ // */
+ // def dynamicShortestPath[VD: Manifest, ED: Manifest](graph: Graph[VD, Double],
+ // sources: List[Int], numIter: Int) = {
+ // val sourceSet = sources.toSet
+ // val vertices = graph.vertices.mapPartitions(
+ // iter => iter.map {
+ // case (vid, _) => (vid, (if(sourceSet.contains(vid)) 0.0F else Double.MaxValue) )
+ // });
+
+ // val edges = graph.edges // .mapValues(v => None)
+ // val spGraph = new Graph(vertices, edges)
+
+ // val niterations = Int.MaxValue
+ // spGraph.iterateDynamic(
+ // (me_id, edge) => edge.otherVertex(me_id).data + edge.data, // gather
+ // (a: Double, b: Double) => math.min(a, b), // merge
+ // Double.MaxValue,
+ // (v, a: Double) => math.min(v.data, a), // apply
+ // (me_id, edge) => edge.vertex(me_id).data + edge.data < edge.otherVertex(me_id).data, // scatter
+ // numIter,
+ // gatherEdges = EdgeDirection.In,
+ // scatterEdges = EdgeDirection.Out).vertices
+ // }
+
+
+ // /**
+ // *
+ // */
+ // def alternatingLeastSquares[VD: ClassTag, ED: ClassTag](graph: Graph[VD, Double],
+ // latentK: Int, lambda: Double, numIter: Int) = {
+ // val vertices = graph.vertices.mapPartitions( _.map {
+ // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } )
+ // }).cache
+ // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
+ // val edges = graph.edges // .mapValues(v => None)
+ // val alsGraph = new Graph(vertices, edges)
+ // alsGraph.numVPart = graph.numVPart
+ // alsGraph.numEPart = graph.numEPart
+
+ // val niterations = Int.MaxValue
+ // alsGraph.iterateDynamic[(Array[Double], Array[Double])](
+ // (me_id, edge) => { // gather
+ // val X = edge.otherVertex(me_id).data
+ // val y = edge.data
+ // val Xy = X.map(_ * y)
+ // val XtX = (for(i <- 0 until latentK; j <- i until latentK) yield(X(i) * X(j))).toArray
+ // (Xy, XtX)
+ // },
+ // (a, b) => {
+ // // The difference between the while loop and the zip is a FACTOR OF TWO in overall
+ // // runtime
+ // var i = 0
+ // while(i < a._1.length) { a._1(i) += b._1(i); i += 1 }
+ // i = 0
+ // while(i < a._2.length) { a._2(i) += b._2(i); i += 1 }
+ // a
+ // // (a._1.zip(b._1).map{ case (q,r) => q+r }, a._2.zip(b._2).map{ case (q,r) => q+r })
+ // },
+ // (Array.empty[Double], Array.empty[Double]), // default value is empty
+ // (vertex, accum) => { // apply
+ // val XyArray = accum._1
+ // val XtXArray = accum._2
+ // if(XyArray.isEmpty) vertex.data // no neighbors
+ // else {
+ // val XtX = DenseMatrix.tabulate(latentK,latentK){ (i,j) =>
+ // (if(i < j) XtXArray(i + (j+1)*j/2) else XtXArray(i + (j+1)*j/2)) +
+ // (if(i == j) lambda else 1.0F) //regularization
+ // }
+ // val Xy = DenseMatrix.create(latentK,1,XyArray)
+ // val w = XtX \ Xy
+ // w.data
+ // }
+ // },
+ // (me_id, edge) => true,
+ // numIter,
+ // gatherEdges = EdgeDirection.Both,
+ // scatterEdges = EdgeDirection.Both,
+ // vertex => vertex.id < maxUser).vertices
+ // }
+
+ // def main(args: Array[String]) = {
+ // val host = args(0)
+ // val taskType = args(1)
+ // val fname = args(2)
+ // val options = args.drop(3).map { arg =>
+ // arg.dropWhile(_ == '-').split('=') match {
+ // case Array(opt, v) => (opt -> v)
+ // case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+ // }
+ // }
+
+ // System.setProperty("spark.serializer", "spark.KryoSerializer")
+ // //System.setProperty("spark.shuffle.compress", "false")
+ // System.setProperty("spark.kryo.registrator", "spark.graphx.GraphKryoRegistrator")
+
+ // taskType match {
+ // case "pagerank" => {
+
+ // var numIter = Int.MaxValue
+ // var isDynamic = false
+ // var tol:Double = 0.001
+ // var outFname = ""
+ // var numVPart = 4
+ // var numEPart = 4
+
+ // options.foreach{
+ // case ("numIter", v) => numIter = v.toInt
+ // case ("dynamic", v) => isDynamic = v.toBoolean
+ // case ("tol", v) => tol = v.toDouble
+ // case ("output", v) => outFname = v
+ // case ("numVPart", v) => numVPart = v.toInt
+ // case ("numEPart", v) => numEPart = v.toInt
+ // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ // }
+
+ // if(!isDynamic && numIter == Int.MaxValue) {
+ // println("Set number of iterations!")
+ // sys.exit(1)
+ // }
+ // println("======================================")
+ // println("| PageRank |")
+ // println("--------------------------------------")
+ // println(" Using parameters:")
+ // println(" \tDynamic: " + isDynamic)
+ // if(isDynamic) println(" \t |-> Tolerance: " + tol)
+ // println(" \tNumIter: " + numIter)
+ // println("======================================")
+
+ // val sc = new SparkContext(host, "PageRank(" + fname + ")")
+
+ // val graph = GraphLoader.textFile(sc, fname, a => 1.0).withPartitioner(numVPart, numEPart).cache()
+
+ // val startTime = System.currentTimeMillis
+ // logInfo("GRAPHX: starting tasks")
+ // logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
+ // logInfo("GRAPHX: Number of edges " + graph.edges.count)
+
+ // val pr = Analytics.pagerank(graph, numIter)
+ // // val pr = if(isDynamic) Analytics.dynamicPagerank(graph, tol, numIter)
+ // // else Analytics.pagerank(graph, numIter)
+ // logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case Vertex(id,r) => r }.reduce(_+_) )
+ // if (!outFname.isEmpty) {
+ // println("Saving pageranks of pages to " + outFname)
+ // pr.vertices.map{case Vertex(id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+ // }
+ // logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+ // sc.stop()
+ // }
+
+ // case "cc" => {
+
+ // var numIter = Int.MaxValue
+ // var isDynamic = false
+
+ // options.foreach{
+ // case ("numIter", v) => numIter = v.toInt
+ // case ("dynamic", v) => isDynamic = v.toBoolean
+ // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ // }
+
+ // if(!isDynamic && numIter == Int.MaxValue) {
+ // println("Set number of iterations!")
+ // sys.exit(1)
+ // }
+ // println("======================================")
+ // println("| Connected Components |")
+ // println("--------------------------------------")
+ // println(" Using parameters:")
+ // println(" \tDynamic: " + isDynamic)
+ // println(" \tNumIter: " + numIter)
+ // println("======================================")
+
+ // val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
+ // val graph = GraphLoader.textFile(sc, fname, a => 1.0)
+ // val cc = Analytics.connectedComponents(graph, numIter)
+ // // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
+ // // else Analytics.connectedComponents(graph, numIter)
+ // println("Components: " + cc.vertices.map(_.data).distinct())
+
+ // sc.stop()
+ // }
+
+ // case "shortestpath" => {
+
+ // var numIter = Int.MaxValue
+ // var isDynamic = true
+ // var sources: List[Int] = List.empty
+
+ // options.foreach{
+ // case ("numIter", v) => numIter = v.toInt
+ // case ("dynamic", v) => isDynamic = v.toBoolean
+ // case ("source", v) => sources ++= List(v.toInt)
+ // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ // }
+
+
+ // if(!isDynamic && numIter == Int.MaxValue) {
+ // println("Set number of iterations!")
+ // sys.exit(1)
+ // }
+
+ // if(sources.isEmpty) {
+ // println("No sources provided!")
+ // sys.exit(1)
+ // }
+
+ // println("======================================")
+ // println("| Shortest Path |")
+ // println("--------------------------------------")
+ // println(" Using parameters:")
+ // println(" \tDynamic: " + isDynamic)
+ // println(" \tNumIter: " + numIter)
+ // println(" \tSources: [" + sources.mkString(", ") + "]")
+ // println("======================================")
+
+ // val sc = new SparkContext(host, "ShortestPath(" + fname + ")")
+ // val graph = GraphLoader.textFile(sc, fname, a => (if(a.isEmpty) 1.0 else a(0).toDouble ) )
+ // val sp = Analytics.shortestPath(graph, sources, numIter)
+ // // val cc = if(isDynamic) Analytics.dynamicShortestPath(graph, sources, numIter)
+ // // else Analytics.shortestPath(graph, sources, numIter)
+ // println("Longest Path: " + sp.vertices.map(_.data).reduce(math.max(_,_)))
+
+ // sc.stop()
+ // }
+
+
+ // case "als" => {
+
+ // var numIter = 5
+ // var lambda = 0.01
+ // var latentK = 10
+ // var usersFname = "usersFactors.tsv"
+ // var moviesFname = "moviesFname.tsv"
+ // var numVPart = 4
+ // var numEPart = 4
+
+ // options.foreach{
+ // case ("numIter", v) => numIter = v.toInt
+ // case ("lambda", v) => lambda = v.toDouble
+ // case ("latentK", v) => latentK = v.toInt
+ // case ("usersFname", v) => usersFname = v
+ // case ("moviesFname", v) => moviesFname = v
+ // case ("numVPart", v) => numVPart = v.toInt
+ // case ("numEPart", v) => numEPart = v.toInt
+ // case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ // }
+
+ // println("======================================")
+ // println("| Alternating Least Squares |")
+ // println("--------------------------------------")
+ // println(" Using parameters:")
+ // println(" \tNumIter: " + numIter)
+ // println(" \tLambda: " + lambda)
+ // println(" \tLatentK: " + latentK)
+ // println(" \tusersFname: " + usersFname)
+ // println(" \tmoviesFname: " + moviesFname)
+ // println("======================================")
+
+ // val sc = new SparkContext(host, "ALS(" + fname + ")")
+ // val graph = GraphLoader.textFile(sc, fname, a => a(0).toDouble )
+ // graph.numVPart = numVPart
+ // graph.numEPart = numEPart
+
+ // val maxUser = graph.edges.map(_._1).reduce(math.max(_,_))
+ // val minMovie = graph.edges.map(_._2).reduce(math.min(_,_))
+ // assert(maxUser < minMovie)
+
+ // val factors = Analytics.alternatingLeastSquares(graph, latentK, lambda, numIter).cache
+ // factors.filter(_._1 <= maxUser).map(r => r._1 + "\t" + r._2.mkString("\t"))
+ // .saveAsTextFile(usersFname)
+ // factors.filter(_._1 >= minMovie).map(r => r._1 + "\t" + r._2.mkString("\t"))
+ // .saveAsTextFile(moviesFname)
+
+ // sc.stop()
+ // }
+
+
+ // case _ => {
+ // println("Invalid task type.")
+ // }
+ // }
+ // }
+
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
new file mode 100644
index 0000000000..29b46674f1
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
@@ -0,0 +1,50 @@
+package org.apache.spark.graphx
+
+
+/**
+ * A single directed edge consisting of a source id, target id,
+ * and the data associated with the Edgee.
+ *
+ * @tparam ED type of the edge attribute
+ */
+case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
+ /**
+ * The vertex id of the source vertex
+ */
+ var srcId: VertexID = 0,
+ /**
+ * The vertex id of the target vertex.
+ */
+ var dstId: VertexID = 0,
+ /**
+ * The attribute associated with the edge.
+ */
+ var attr: ED = nullValue[ED]) extends Serializable {
+
+ /**
+ * Given one vertex in the edge return the other vertex.
+ *
+ * @param vid the id one of the two vertices on the edge.
+ * @return the id of the other vertex on the edge.
+ */
+ def otherVertexId(vid: VertexID): VertexID =
+ if (srcId == vid) dstId else { assert(dstId == vid); srcId }
+
+ /**
+ * Return the relative direction of the edge to the corresponding
+ * vertex.
+ *
+ * @param vid the id of one of the two vertices in the edge.
+ * @return the relative direction of the edge to the corresponding
+ * vertex.
+ */
+ def relativeDirection(vid: VertexID): EdgeDirection =
+ if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }
+}
+
+object Edge {
+ def lexicographicOrdering[ED] = new Ordering[Edge[ED]] {
+ override def compare(a: Edge[ED], b: Edge[ED]): Int =
+ Ordering[(VertexID, VertexID)].compare((a.srcId, a.dstId), (b.srcId, b.dstId))
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
new file mode 100644
index 0000000000..785f941650
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
@@ -0,0 +1,36 @@
+package org.apache.spark.graphx
+
+
+/**
+ * The direction of directed edge relative to a vertex used to select
+ * the set of adjacent neighbors when running a neighborhood query.
+ */
+sealed abstract class EdgeDirection {
+ /**
+ * Reverse the direction of an edge. An in becomes out,
+ * out becomes in and both remains both.
+ */
+ def reverse: EdgeDirection = this match {
+ case EdgeDirection.In => EdgeDirection.Out
+ case EdgeDirection.Out => EdgeDirection.In
+ case EdgeDirection.Both => EdgeDirection.Both
+ }
+}
+
+
+object EdgeDirection {
+ /**
+ * Edges arriving at a vertex.
+ */
+ case object In extends EdgeDirection
+
+ /**
+ * Edges originating from a vertex
+ */
+ case object Out extends EdgeDirection
+
+ /**
+ * All edges adjacent to a vertex
+ */
+ case object Both extends EdgeDirection
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
new file mode 100644
index 0000000000..e4ef460e6f
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -0,0 +1,73 @@
+package org.apache.spark.graphx
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
+import org.apache.spark.graphx.impl.EdgePartition
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+
+class EdgeRDD[@specialized ED: ClassTag](
+ val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
+ extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+
+ partitionsRDD.setName("EdgeRDD")
+
+ override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
+
+ /**
+ * If partitionsRDD already has a partitioner, use it. Otherwise assume that the PartitionIDs in
+ * partitionsRDD correspond to the actual partitions and create a new partitioner that allows
+ * co-partitioning with partitionsRDD.
+ */
+ override val partitioner =
+ partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+
+ override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
+ firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
+ }
+
+ override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
+
+ /**
+ * Caching a VertexRDD causes the index and values to be cached separately.
+ */
+ override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
+ partitionsRDD.persist(newLevel)
+ this
+ }
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ override def cache(): EdgeRDD[ED] = persist()
+
+ def mapEdgePartitions[ED2: ClassTag](f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
+ : EdgeRDD[ED2] = {
+// iter => iter.map { case (pid, ep) => (pid, f(ep)) }
+ new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
+ val (pid, ep) = iter.next()
+ Iterator(Tuple2(pid, f(pid, ep)))
+ }, preservesPartitioning = true))
+ }
+
+ def innerJoin[ED2: ClassTag, ED3: ClassTag]
+ (other: EdgeRDD[ED2])
+ (f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = {
+ val ed2Tag = classTag[ED2]
+ val ed3Tag = classTag[ED3]
+ new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
+ (thisIter, otherIter) =>
+ val (pid, thisEPart) = thisIter.next()
+ val (_, otherEPart) = otherIter.next()
+ Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
+ })
+ }
+
+ def collectVertexIDs(): RDD[VertexID] = {
+ partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
+ }
+
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
new file mode 100644
index 0000000000..b0565b7e0e
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -0,0 +1,63 @@
+package org.apache.spark.graphx
+
+import org.apache.spark.graphx.impl.VertexPartition
+
+/**
+ * An edge triplet represents two vertices and edge along with their
+ * attributes.
+ *
+ * @tparam VD the type of the vertex attribute.
+ * @tparam ED the type of the edge attribute
+ *
+ * @todo specialize edge triplet for basic types, though when I last
+ * tried specializing I got a warning about inherenting from a type
+ * that is not a trait.
+ */
+class EdgeTriplet[VD, ED] extends Edge[ED] {
+// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassTag,
+// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag] extends Edge[ED] {
+
+
+ /**
+ * The source vertex attribute
+ */
+ var srcAttr: VD = _ //nullValue[VD]
+
+ /**
+ * The destination vertex attribute
+ */
+ var dstAttr: VD = _ //nullValue[VD]
+
+ var srcStale: Boolean = false
+ var dstStale: Boolean = false
+
+ /**
+ * Set the edge properties of this triplet.
+ */
+ protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD,ED] = {
+ srcId = other.srcId
+ dstId = other.dstId
+ attr = other.attr
+ this
+ }
+
+ /**
+ * Given one vertex in the edge return the other vertex.
+ *
+ * @param vid the id one of the two vertices on the edge.
+ * @return the attribute for the other vertex on the edge.
+ */
+ def otherVertexAttr(vid: VertexID): VD =
+ if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr }
+
+ /**
+ * Get the vertex object for the given vertex in the edge.
+ *
+ * @param vid the id of one of the two vertices on the edge
+ * @return the attr for the vertex with that id.
+ */
+ def vertexAttr(vid: VertexID): VD =
+ if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
+
+ override def toString() = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
new file mode 100644
index 0000000000..2b7c0a2583
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -0,0 +1,437 @@
+package org.apache.spark.graphx
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.graphx.impl._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+
+/**
+ * The Graph abstractly represents a graph with arbitrary objects
+ * associated with vertices and edges. The graph provides basic
+ * operations to access and manipulate the data associated with
+ * vertices and edges as well as the underlying structure. Like Spark
+ * RDDs, the graph is a functional data-structure in which mutating
+ * operations return new graphs.
+ *
+ * @see GraphOps for additional graph member functions.
+ *
+ * @note The majority of the graph operations are implemented in
+ * `GraphOps`. All the convenience operations are defined in the
+ * `GraphOps` class which may be shared across multiple graph
+ * implementations.
+ *
+ * @tparam VD the vertex attribute type
+ * @tparam ED the edge attribute type
+ */
+abstract class Graph[VD: ClassTag, ED: ClassTag] {
+
+ /**
+ * Get the vertices and their data.
+ *
+ * @note vertex ids are unique.
+ * @return An RDD containing the vertices in this graph
+ *
+ * @see Vertex for the vertex type.
+ *
+ */
+ val vertices: VertexRDD[VD]
+
+ /**
+ * Get the Edges and their data as an RDD. The entries in the RDD
+ * contain just the source id and target id along with the edge
+ * data.
+ *
+ * @return An RDD containing the edges in this graph
+ *
+ * @see Edge for the edge type.
+ * @see edgesWithVertices to get an RDD which contains all the edges
+ * along with their vertex data.
+ *
+ */
+ val edges: EdgeRDD[ED]
+
+ /**
+ * Get the edges with the vertex data associated with the adjacent
+ * pair of vertices.
+ *
+ * @return An RDD containing edge triplets.
+ *
+ * @example This operation might be used to evaluate a graph
+ * coloring where we would like to check that both vertices are a
+ * different color.
+ * {{{
+ * type Color = Int
+ * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv")
+ * val numInvalid = graph.edgesWithVertices()
+ * .map(e => if (e.src.data == e.dst.data) 1 else 0).sum
+ * }}}
+ *
+ * @see edges() If only the edge data and adjacent vertex ids are
+ * required.
+ *
+ */
+ val triplets: RDD[EdgeTriplet[VD, ED]]
+
+ /**
+ * Cache the vertices and edges associated with this graph.
+ *
+ * @param newLevel the level at which to cache the graph.
+
+ * @return A reference to this graph for convenience.
+ *
+ */
+ def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
+
+ /**
+ * Return a graph that is cached when first created. This is used to
+ * pin a graph in memory enabling multiple queries to reuse the same
+ * construction process.
+ *
+ * @see RDD.cache() for a more detailed explanation of caching.
+ */
+ def cache(): Graph[VD, ED]
+
+ /**
+ * Repartition the edges in the graph according to partitionStrategy.
+ */
+ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
+
+ /**
+ * Compute statistics describing the graph representation.
+ */
+ def statistics: Map[String, Any]
+
+ /**
+ * Construct a new graph where each vertex value has been
+ * transformed by the map function.
+ *
+ * @note This graph is not changed and that the new graph has the
+ * same structure. As a consequence the underlying index structures
+ * can be reused.
+ *
+ * @param map the function from a vertex object to a new vertex value.
+ *
+ * @tparam VD2 the new vertex data type
+ *
+ * @example We might use this operation to change the vertex values
+ * from one type to another to initialize an algorithm.
+ * {{{
+ * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file")
+ * val root = 42
+ * var bfsGraph = rawGraph
+ * .mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue)
+ * }}}
+ *
+ */
+ def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED]
+
+ /**
+ * Construct a new graph where the value of each edge is
+ * transformed by the map operation. This function is not passed
+ * the vertex value for the vertices adjacent to the edge. If
+ * vertex values are desired use the mapTriplets function.
+ *
+ * @note This graph is not changed and that the new graph has the
+ * same structure. As a consequence the underlying index structures
+ * can be reused.
+ *
+ * @param map the function from an edge object to a new edge value.
+ *
+ * @tparam ED2 the new edge data type
+ *
+ * @example This function might be used to initialize edge
+ * attributes.
+ *
+ */
+ def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2] = {
+ mapEdges((pid, iter) => iter.map(map))
+ }
+
+ /**
+ * Construct a new graph transforming the value of each edge using
+ * the user defined iterator transform. The iterator transform is
+ * given an iterator over edge triplets within a logical partition
+ * and should yield a new iterator over the new values of each edge
+ * in the order in which they are provided to the iterator transform
+ * If adjacent vertex values are not required, consider using the
+ * mapEdges function instead.
+ *
+ * @note This that this does not change the structure of the
+ * graph or modify the values of this graph. As a consequence
+ * the underlying index structures can be reused.
+ *
+ * @param map the function which takes a partition id and an iterator
+ * over all the edges in the partition and must return an iterator over
+ * the new values for each edge in the order of the input iterator.
+ *
+ * @tparam ED2 the new edge data type
+ *
+ */
+ def mapEdges[ED2: ClassTag](
+ map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
+
+ /**
+ * Construct a new graph where the value of each edge is
+ * transformed by the map operation. This function passes vertex
+ * values for the adjacent vertices to the map function. If
+ * adjacent vertex values are not required, consider using the
+ * mapEdges function instead.
+ *
+ * @note This that this does not change the structure of the
+ * graph or modify the values of this graph. As a consequence
+ * the underlying index structures can be reused.
+ *
+ * @param map the function from an edge object to a new edge value.
+ *
+ * @tparam ED2 the new edge data type
+ *
+ * @example This function might be used to initialize edge
+ * attributes based on the attributes associated with each vertex.
+ * {{{
+ * val rawGraph: Graph[Int, Int] = someLoadFunction()
+ * val graph = rawGraph.mapTriplets[Int]( edge =>
+ * edge.src.data - edge.dst.data)
+ * }}}
+ *
+ */
+ def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
+ mapTriplets((pid, iter) => iter.map(map))
+ }
+
+ /**
+ * Construct a new graph transforming the value of each edge using
+ * the user defined iterator transform. The iterator transform is
+ * given an iterator over edge triplets within a logical partition
+ * and should yield a new iterator over the new values of each edge
+ * in the order in which they are provided to the iterator transform
+ * If adjacent vertex values are not required, consider using the
+ * mapEdges function instead.
+ *
+ * @note This that this does not change the structure of the
+ * graph or modify the values of this graph. As a consequence
+ * the underlying index structures can be reused.
+ *
+ * @param map the function which takes a partition id and an iterator
+ * over all the edges in the partition and must return an iterator over
+ * the new values for each edge in the order of the input iterator.
+ *
+ * @tparam ED2 the new edge data type
+ *
+ */
+ def mapTriplets[ED2: ClassTag](
+ map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]):
+ Graph[VD, ED2]
+
+ /**
+ * Construct a new graph with all the edges reversed. If this graph
+ * contains an edge from a to b then the returned graph contains an
+ * edge from b to a.
+ */
+ def reverse: Graph[VD, ED]
+
+ /**
+ * This function takes a vertex and edge predicate and constructs
+ * the subgraph that consists of vertices and edges that satisfy the
+ * predict. The resulting graph contains the vertices and edges
+ * that satisfy:
+ *
+ * {{{
+ * V' = {v : for all v in V where vpred(v)}
+ * E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)}
+ * }}}
+ *
+ * @param epred the edge predicate which takes a triplet and
+ * evaluates to true if the edge is to remain in the subgraph. Note
+ * that only edges in which both vertices satisfy the vertex
+ * predicate are considered.
+ *
+ * @param vpred the vertex predicate which takes a vertex object and
+ * evaluates to true if the vertex is to be included in the subgraph
+ *
+ * @return the subgraph containing only the vertices and edges that
+ * satisfy the predicates.
+ */
+ def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
+ vpred: (VertexID, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
+
+ /**
+ * Subgraph of this graph with only vertices and edges from the other graph.
+ * @param other the graph to project this graph onto
+ * @return a graph with vertices and edges that exists in both the current graph and other,
+ * with vertex and edge data from the current graph.
+ */
+ def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
+
+ /**
+ * This function merges multiple edges between two vertices into a single Edge. For correct
+ * results, the graph must have been partitioned using partitionBy.
+ *
+ * @tparam ED2 the type of the resulting edge data after grouping.
+ *
+ * @param f the user supplied commutative associative function to merge edge attributes for
+ * duplicate edges.
+ *
+ * @return Graph[VD,ED2] The resulting graph with a single Edge for each source, dest vertex pair.
+ */
+ def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
+
+ /**
+ * The mapReduceTriplets function is used to compute statistics
+ * about the neighboring edges and vertices of each vertex. The
+ * user supplied `mapFunc` function is invoked on each edge of the
+ * graph generating 0 or more "messages" to be "sent" to either
+ * vertex in the edge. The `reduceFunc` is then used to combine the
+ * output of the map phase destined to each vertex.
+ *
+ * @tparam A the type of "message" to be sent to each vertex
+ *
+ * @param mapFunc the user defined map function which returns 0 or
+ * more messages to neighboring vertices.
+ *
+ * @param reduceFunc the user defined reduce function which should
+ * be commutative and assosciative and is used to combine the output
+ * of the map phase.
+ *
+ * @param activeSet optionally, a set of "active" vertices and a direction of edges to consider
+ * when running `mapFunc`. For example, if the direction is Out, `mapFunc` will only be run on
+ * edges originating from vertices in the active set. `activeSet` must have the same index as the
+ * graph's vertices.
+ *
+ * @example We can use this function to compute the inDegree of each
+ * vertex
+ * {{{
+ * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
+ * val inDeg: RDD[(VertexID, Int)] =
+ * mapReduceTriplets[Int](et => Array((et.dst.id, 1)), _ + _)
+ * }}}
+ *
+ * @note By expressing computation at the edge level we achieve
+ * maximum parallelism. This is one of the core functions in the
+ * Graph API in that enables neighborhood level computation. For
+ * example this function can be used to count neighbors satisfying a
+ * predicate or implement PageRank.
+ *
+ */
+ def mapReduceTriplets[A: ClassTag](
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ reduceFunc: (A, A) => A,
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
+ : VertexRDD[A]
+
+ /**
+ * Join the vertices with an RDD and then apply a function from the
+ * the vertex and RDD entry to a new vertex value and type. The
+ * input table should contain at most one entry for each vertex. If
+ * no entry is provided the map function is invoked passing none.
+ *
+ * @tparam U the type of entry in the table of updates
+ * @tparam VD2 the new vertex value type
+ *
+ * @param table the table to join with the vertices in the graph.
+ * The table should contain at most one entry for each vertex.
+ *
+ * @param mapFunc the function used to compute the new vertex
+ * values. The map function is invoked for all vertices, even those
+ * that do not have a corresponding entry in the table.
+ *
+ * @example This function is used to update the vertices with new
+ * values based on external data. For example we could add the out
+ * degree to each vertex record
+ *
+ * {{{
+ * val rawGraph: Graph[(),()] = Graph.textFile("webgraph")
+ * val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees()
+ * val graph = rawGraph.outerJoinVertices(outDeg) {
+ * (vid, data, optDeg) => optDeg.getOrElse(0)
+ * }
+ * }}}
+ *
+ */
+ def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(VertexID, U)])
+ (mapFunc: (VertexID, VD, Option[U]) => VD2)
+ : Graph[VD2, ED]
+
+ // Save a copy of the GraphOps object so there is always one unique GraphOps object
+ // for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
+ val ops = new GraphOps(this)
+} // end of Graph
+
+
+
+
+/**
+ * The Graph object contains a collection of routines used to construct graphs from RDDs.
+ */
+object Graph {
+
+ /**
+ * Construct a graph from a collection of edges encoded as vertex id pairs.
+ *
+ * @param rawEdges a collection of edges in (src,dst) form.
+ * @param uniqueEdges if multiple identical edges are found they are combined and the edge
+ * attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
+ * uniqueEdges, a [[PartitionStrategy]] must be provided.
+ *
+ * @return a graph with edge attributes containing either the count of duplicate edges or 1
+ * (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex.
+ */
+ def fromEdgeTuples[VD: ClassTag](
+ rawEdges: RDD[(VertexID, VertexID)],
+ defaultValue: VD,
+ uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = {
+ val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
+ val graph = GraphImpl(edges, defaultValue)
+ uniqueEdges match {
+ case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
+ case None => graph
+ }
+ }
+
+ /**
+ * Construct a graph from a collection of edges.
+ *
+ * @param edges the RDD containing the set of edges in the graph
+ * @param defaultValue the default vertex attribute to use for each vertex
+ *
+ * @return a graph with edge attributes described by `edges` and vertices
+ * given by all vertices in `edges` with value `defaultValue`
+ */
+ def fromEdges[VD: ClassTag, ED: ClassTag](
+ edges: RDD[Edge[ED]],
+ defaultValue: VD): Graph[VD, ED] = {
+ GraphImpl(edges, defaultValue)
+ }
+
+ /**
+ * Construct a graph from a collection attributed vertices and
+ * edges. Duplicate vertices are picked arbitrarily and
+ * vertices found in the edge collection but not in the input
+ * vertices are the default attribute.
+ *
+ * @tparam VD the vertex attribute type
+ * @tparam ED the edge attribute type
+ * @param vertices the "set" of vertices and their attributes
+ * @param edges the collection of edges in the graph
+ * @param defaultVertexAttr the default vertex attribute to use for
+ * vertices that are mentioned in edges but not in vertices
+ * @param partitionStrategy the partition strategy to use when
+ * partitioning the edges.
+ */
+ def apply[VD: ClassTag, ED: ClassTag](
+ vertices: RDD[(VertexID, VD)],
+ edges: RDD[Edge[ED]],
+ defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
+ GraphImpl(vertices, edges, defaultVertexAttr)
+ }
+
+ /**
+ * The implicit graphToGraphOPs function extracts the GraphOps member from a graph.
+ *
+ * To improve modularity the Graph type only contains a small set of basic operations. All the
+ * convenience operations are defined in the GraphOps class which may be shared across multiple
+ * graph implementations.
+ */
+ implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops
+} // end of Graph object
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
new file mode 100644
index 0000000000..f8aab951f0
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -0,0 +1,28 @@
+package org.apache.spark.graphx
+
+import com.esotericsoftware.kryo.Kryo
+
+import org.apache.spark.graphx.impl._
+import org.apache.spark.serializer.KryoRegistrator
+import org.apache.spark.util.collection.BitSet
+import org.apache.spark.util.BoundedPriorityQueue
+
+
+class GraphKryoRegistrator extends KryoRegistrator {
+
+ def registerClasses(kryo: Kryo) {
+ kryo.register(classOf[Edge[Object]])
+ kryo.register(classOf[MessageToPartition[Object]])
+ kryo.register(classOf[VertexBroadcastMsg[Object]])
+ kryo.register(classOf[(VertexID, Object)])
+ kryo.register(classOf[EdgePartition[Object]])
+ kryo.register(classOf[BitSet])
+ kryo.register(classOf[VertexIdToIndexMap])
+ kryo.register(classOf[VertexAttributeBlock[Object]])
+ kryo.register(classOf[PartitionStrategy])
+ kryo.register(classOf[BoundedPriorityQueue[Object]])
+
+ // This avoids a large number of hash table lookups.
+ kryo.setReferences(false)
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
new file mode 100644
index 0000000000..437288405f
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
@@ -0,0 +1,134 @@
+package org.apache.spark.graphx
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.Logging
+import scala.collection.JavaConversions._
+import org.apache.spark.rdd.RDD
+
+/**
+ * This object implements the GraphLab gather-apply-scatter api.
+ */
+object GraphLab extends Logging {
+
+ /**
+ * Execute the GraphLab Gather-Apply-Scatter API
+ *
+ * @todo finish documenting GraphLab Gather-Apply-Scatter API
+ *
+ * @param graph The graph on which to execute the GraphLab API
+ * @param gatherFunc The gather function is executed on each edge triplet
+ * adjacent to a vertex and returns an accumulator which
+ * is then merged using the merge function.
+ * @param mergeFunc An accumulative associative operation on the result of
+ * the gather type.
+ * @param applyFunc Takes a vertex and the final result of the merge operations
+ * on the adjacent edges and returns a new vertex value.
+ * @param scatterFunc Executed after the apply function the scatter function takes
+ * a triplet and signals whether the neighboring vertex program
+ * must be recomputed.
+ * @param startVertices predicate to determine which vertices to start the computation on.
+ * these will be the active vertices in the first iteration.
+ * @param numIter The maximum number of iterations to run.
+ * @param gatherDirection The direction of edges to consider during the gather phase
+ * @param scatterDirection The direction of edges to consider during the scatter phase
+ *
+ * @tparam VD The graph vertex attribute type
+ * @tparam ED The graph edge attribute type
+ * @tparam A The type accumulated during the gather phase
+ * @return the resulting graph after the algorithm converges
+ */
+ def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
+ (graph: Graph[VD, ED], numIter: Int,
+ gatherDirection: EdgeDirection = EdgeDirection.In,
+ scatterDirection: EdgeDirection = EdgeDirection.Out)
+ (gatherFunc: (VertexID, EdgeTriplet[VD, ED]) => A,
+ mergeFunc: (A, A) => A,
+ applyFunc: (VertexID, VD, Option[A]) => VD,
+ scatterFunc: (VertexID, EdgeTriplet[VD, ED]) => Boolean,
+ startVertices: (VertexID, VD) => Boolean = (vid: VertexID, data: VD) => true)
+ : Graph[VD, ED] = {
+
+
+ // Add an active attribute to all vertices to track convergence.
+ var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
+ case (id, data) => (startVertices(id, data), data)
+ }.cache()
+
+ // The gather function wrapper strips the active attribute and
+ // only invokes the gather function on active vertices
+ def gather(vid: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = {
+ if (e.vertexAttr(vid)._1) {
+ val edgeTriplet = new EdgeTriplet[VD,ED]
+ edgeTriplet.set(e)
+ edgeTriplet.srcAttr = e.srcAttr._2
+ edgeTriplet.dstAttr = e.dstAttr._2
+ Some(gatherFunc(vid, edgeTriplet))
+ } else {
+ None
+ }
+ }
+
+ // The apply function wrapper strips the vertex of the active attribute
+ // and only invokes the apply function on active vertices
+ def apply(vid: VertexID, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = {
+ val (active, vData) = data
+ if (active) (true, applyFunc(vid, vData, accum))
+ else (false, vData)
+ }
+
+ // The scatter function wrapper strips the vertex of the active attribute
+ // and only invokes the scatter function on active vertices
+ def scatter(rawVertexID: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = {
+ val vid = e.otherVertexId(rawVertexID)
+ if (e.vertexAttr(vid)._1) {
+ val edgeTriplet = new EdgeTriplet[VD,ED]
+ edgeTriplet.set(e)
+ edgeTriplet.srcAttr = e.srcAttr._2
+ edgeTriplet.dstAttr = e.dstAttr._2
+ Some(scatterFunc(vid, edgeTriplet))
+ } else {
+ None
+ }
+ }
+
+ // Used to set the active status of vertices for the next round
+ def applyActive(
+ vid: VertexID, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = {
+ val (prevActive, vData) = data
+ (newActiveOpt.getOrElse(false), vData)
+ }
+
+ // Main Loop ---------------------------------------------------------------------
+ var i = 0
+ var numActive = activeGraph.numVertices
+ while (i < numIter && numActive > 0) {
+
+ // Gather
+ val gathered: RDD[(VertexID, A)] =
+ activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
+
+ // Apply
+ activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache()
+
+
+
+ // Scatter is basically a gather in the opposite direction so we reverse the edge direction
+ // activeGraph: Graph[(Boolean, VD), ED]
+ val scattered: RDD[(VertexID, Boolean)] =
+ activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
+
+ activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache()
+
+ // Calculate the number of active vertices
+ numActive = activeGraph.vertices.map{
+ case (vid, data) => if (data._1) 1 else 0
+ }.reduce(_ + _)
+ logInfo("Number active vertices: " + numActive)
+ i += 1
+ }
+
+ // Remove the active attribute from the vertex data before returning the graph
+ activeGraph.mapVertices{case (vid, data) => data._2 }
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
new file mode 100644
index 0000000000..473cfb18cf
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -0,0 +1,113 @@
+package org.apache.spark.graphx
+
+import java.util.{Arrays => JArrays}
+import scala.reflect.ClassTag
+
+import org.apache.spark.graphx.impl.EdgePartitionBuilder
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.graphx.impl.{EdgePartition, GraphImpl}
+import org.apache.spark.util.collection.PrimitiveVector
+
+
+object GraphLoader extends Logging {
+
+ /**
+ * Load an edge list from file initializing the Graph
+ *
+ * @tparam ED the type of the edge data of the resulting Graph
+ *
+ * @param sc the SparkContext used to construct RDDs
+ * @param path the path to the text file containing the edge list
+ * @param edgeParser a function that takes an array of strings and
+ * returns an ED object
+ * @param minEdgePartitions the number of partitions for the
+ * the Edge RDD
+ *
+ */
+ def textFile[ED: ClassTag](
+ sc: SparkContext,
+ path: String,
+ edgeParser: Array[String] => ED,
+ minEdgePartitions: Int = 1):
+ Graph[Int, ED] = {
+ // Parse the edge data table
+ val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
+ iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
+ val lineArray = line.split("\\s+")
+ if(lineArray.length < 2) {
+ println("Invalid line: " + line)
+ assert(false)
+ }
+ val source = lineArray(0).trim.toLong
+ val target = lineArray(1).trim.toLong
+ val tail = lineArray.drop(2)
+ val edata = edgeParser(tail)
+ Edge(source, target, edata)
+ })
+ val defaultVertexAttr = 1
+ Graph.fromEdges(edges, defaultVertexAttr)
+ }
+
+ /**
+ * Load a graph from an edge list formatted file with each line containing
+ * two integers: a source Id and a target Id.
+ *
+ * @example A file in the following format:
+ * {{{
+ * # Comment Line
+ * # Source Id <\t> Target Id
+ * 1 -5
+ * 1 2
+ * 2 7
+ * 1 8
+ * }}}
+ *
+ * If desired the edges can be automatically oriented in the positive
+ * direction (source Id < target Id) by setting `canonicalOrientation` to
+ * true
+ *
+ * @param sc
+ * @param path the path to the file (e.g., /Home/data/file or hdfs://file)
+ * @param canonicalOrientation whether to orient edges in the positive
+ * direction.
+ * @param minEdgePartitions the number of partitions for the
+ * the Edge RDD
+ * @tparam ED
+ * @return
+ */
+ def edgeListFile(
+ sc: SparkContext,
+ path: String,
+ canonicalOrientation: Boolean = false,
+ minEdgePartitions: Int = 1):
+ Graph[Int, Int] = {
+ val startTime = System.currentTimeMillis
+
+ // Parse the edge data table directly into edge partitions
+ val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[Int]
+ iter.foreach { line =>
+ if (!line.isEmpty && line(0) != '#') {
+ val lineArray = line.split("\\s+")
+ if (lineArray.length < 2) {
+ logWarning("Invalid line: " + line)
+ }
+ val srcId = lineArray(0).toLong
+ val dstId = lineArray(1).toLong
+ if (canonicalOrientation && dstId > srcId) {
+ builder.add(dstId, srcId, 1)
+ } else {
+ builder.add(srcId, dstId, 1)
+ }
+ }
+ }
+ Iterator((pid, builder.toEdgePartition))
+ }.cache()
+ edges.count()
+
+ logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
+
+ GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
+ } // end of edgeListFile
+
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
new file mode 100644
index 0000000000..cacfcb1c90
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -0,0 +1,277 @@
+package org.apache.spark.graphx
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+import org.apache.spark.SparkException
+
+
+/**
+ * `GraphOps` contains additional functionality (syntatic sugar) for
+ * the graph type and is implicitly constructed for each Graph object.
+ * All operations in `GraphOps` are expressed in terms of the
+ * efficient GraphX API.
+ *
+ * @tparam VD the vertex attribute type
+ * @tparam ED the edge attribute type
+ *
+ */
+class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
+
+ /**
+ * Compute the number of edges in the graph.
+ */
+ lazy val numEdges: Long = graph.edges.count()
+
+
+ /**
+ * Compute the number of vertices in the graph.
+ */
+ lazy val numVertices: Long = graph.vertices.count()
+
+
+ /**
+ * Compute the in-degree of each vertex in the Graph returning an
+ * RDD.
+ * @note Vertices with no in edges are not returned in the resulting RDD.
+ */
+ lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
+
+
+ /**
+ * Compute the out-degree of each vertex in the Graph returning an RDD.
+ * @note Vertices with no out edges are not returned in the resulting RDD.
+ */
+ lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
+
+
+ /**
+ * Compute the degrees of each vertex in the Graph returning an RDD.
+ * @note Vertices with no edges are not returned in the resulting
+ * RDD.
+ */
+ lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Both)
+
+
+ /**
+ * Compute the neighboring vertex degrees.
+ *
+ * @param edgeDirection the direction along which to collect
+ * neighboring vertex attributes.
+ */
+ private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = {
+ if (edgeDirection == EdgeDirection.In) {
+ graph.mapReduceTriplets(et => Iterator((et.dstId,1)), _ + _)
+ } else if (edgeDirection == EdgeDirection.Out) {
+ graph.mapReduceTriplets(et => Iterator((et.srcId,1)), _ + _)
+ } else { // EdgeDirection.both
+ graph.mapReduceTriplets(et => Iterator((et.srcId,1), (et.dstId,1)), _ + _)
+ }
+ }
+
+
+ /**
+ * This function is used to compute a statistic for the neighborhood
+ * of each vertex and returns a value for all vertices (including
+ * those without neighbors).
+ *
+ * @note Because the a default value is provided all vertices will
+ * have a corresponding entry in the returned RDD.
+ *
+ * @param mapFunc the function applied to each edge adjacent to each
+ * vertex. The mapFunc can optionally return None in which case it
+ * does not contribute to the final sum.
+ * @param reduceFunc the function used to merge the results of each
+ * map operation.
+ * @param default the default value to use for each vertex if it has
+ * no neighbors or the map function repeatedly evaluates to none
+ * @param direction the direction of edges to consider (e.g., In,
+ * Out, Both).
+ * @tparam VD2 The returned type of the aggregation operation.
+ *
+ * @return A Spark.RDD containing tuples of vertex identifiers and
+ * their resulting value. There will be exactly one entry for ever
+ * vertex in the original graph.
+ *
+ * @example We can use this function to compute the average follower
+ * age for each user
+ *
+ * {{{
+ * val graph: Graph[Int,Int] = loadGraph()
+ * val averageFollowerAge: RDD[(Int, Int)] =
+ * graph.aggregateNeighbors[(Int,Double)](
+ * (vid, edge) => (edge.otherVertex(vid).data, 1),
+ * (a, b) => (a._1 + b._1, a._2 + b._2),
+ * -1,
+ * EdgeDirection.In)
+ * .mapValues{ case (sum,followers) => sum.toDouble / followers}
+ * }}}
+ *
+ * @todo Should this return a graph with the new vertex values?
+ *
+ */
+ def aggregateNeighbors[A: ClassTag](
+ mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
+ reduceFunc: (A, A) => A,
+ dir: EdgeDirection)
+ : VertexRDD[A] = {
+
+ // Define a new map function over edge triplets
+ val mf = (et: EdgeTriplet[VD,ED]) => {
+ // Compute the message to the dst vertex
+ val dst =
+ if (dir == EdgeDirection.In || dir == EdgeDirection.Both) {
+ mapFunc(et.dstId, et)
+ } else { Option.empty[A] }
+ // Compute the message to the source vertex
+ val src =
+ if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) {
+ mapFunc(et.srcId, et)
+ } else { Option.empty[A] }
+ // construct the return array
+ (src, dst) match {
+ case (None, None) => Iterator.empty
+ case (Some(srcA),None) => Iterator((et.srcId, srcA))
+ case (None, Some(dstA)) => Iterator((et.dstId, dstA))
+ case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA))
+ }
+ }
+
+ graph.mapReduceTriplets(mf, reduceFunc)
+ } // end of aggregateNeighbors
+
+
+ /**
+ * Return the Ids of the neighboring vertices.
+ *
+ * @param edgeDirection the direction along which to collect
+ * neighboring vertices
+ *
+ * @return the vertex set of neighboring ids for each vertex.
+ */
+ def collectNeighborIds(edgeDirection: EdgeDirection) :
+ VertexRDD[Array[VertexID]] = {
+ val nbrs =
+ if (edgeDirection == EdgeDirection.Both) {
+ graph.mapReduceTriplets[Array[VertexID]](
+ mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))),
+ reduceFunc = _ ++ _
+ )
+ } else if (edgeDirection == EdgeDirection.Out) {
+ graph.mapReduceTriplets[Array[VertexID]](
+ mapFunc = et => Iterator((et.srcId, Array(et.dstId))),
+ reduceFunc = _ ++ _)
+ } else if (edgeDirection == EdgeDirection.In) {
+ graph.mapReduceTriplets[Array[VertexID]](
+ mapFunc = et => Iterator((et.dstId, Array(et.srcId))),
+ reduceFunc = _ ++ _)
+ } else {
+ throw new SparkException("It doesn't make sense to collect neighbor ids without a direction.")
+ }
+ graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
+ nbrsOpt.getOrElse(Array.empty[VertexID])
+ }
+ } // end of collectNeighborIds
+
+
+ /**
+ * Collect the neighbor vertex attributes for each vertex.
+ *
+ * @note This function could be highly inefficient on power-law
+ * graphs where high degree vertices may force a large ammount of
+ * information to be collected to a single location.
+ *
+ * @param edgeDirection the direction along which to collect
+ * neighboring vertices
+ *
+ * @return the vertex set of neighboring vertex attributes for each
+ * vertex.
+ */
+ def collectNeighbors(edgeDirection: EdgeDirection) :
+ VertexRDD[ Array[(VertexID, VD)] ] = {
+ val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]](
+ (vid, edge) =>
+ Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )),
+ (a, b) => a ++ b,
+ edgeDirection)
+
+ graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
+ nbrsOpt.getOrElse(Array.empty[(VertexID, VD)])
+ }
+ } // end of collectNeighbor
+
+
+ /**
+ * Join the vertices with an RDD and then apply a function from the
+ * the vertex and RDD entry to a new vertex value. The input table
+ * should contain at most one entry for each vertex. If no entry is
+ * provided the map function is skipped and the old value is used.
+ *
+ * @tparam U the type of entry in the table of updates
+ * @param table the table to join with the vertices in the graph.
+ * The table should contain at most one entry for each vertex.
+ * @param mapFunc the function used to compute the new vertex
+ * values. The map function is invoked only for vertices with a
+ * corresponding entry in the table otherwise the old vertex value
+ * is used.
+ *
+ * @note for small tables this function can be much more efficient
+ * than leftJoinVertices
+ *
+ * @example This function is used to update the vertices with new
+ * values based on external data. For example we could add the out
+ * degree to each vertex record
+ *
+ * {{{
+ * val rawGraph: Graph[Int,()] = Graph.textFile("webgraph")
+ * .mapVertices(v => 0)
+ * val outDeg: RDD[(Int, Int)] = rawGraph.outDegrees()
+ * val graph = rawGraph.leftJoinVertices[Int,Int](outDeg,
+ * (v, deg) => deg )
+ * }}}
+ *
+ */
+ def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD)
+ : Graph[VD, ED] = {
+ val uf = (id: VertexID, data: VD, o: Option[U]) => {
+ o match {
+ case Some(u) => mapFunc(id, data, u)
+ case None => data
+ }
+ }
+ graph.outerJoinVertices(table)(uf)
+ }
+
+ /**
+ * Filter the graph by computing some values to filter on, and applying the predicates.
+ *
+ * @param preprocess a function to compute new vertex and edge data before filtering
+ * @param epred edge pred to filter on after preprocess, see more details under Graph#subgraph
+ * @param vpred vertex pred to filter on after prerocess, see more details under Graph#subgraph
+ * @tparam VD2 vertex type the vpred operates on
+ * @tparam ED2 edge type the epred operates on
+ * @return a subgraph of the orginal graph, with its data unchanged
+ *
+ * @example This function can be used to filter the graph based on some property, without
+ * changing the vertex and edge values in your program. For example, we could remove the vertices
+ * in a graph with 0 outdegree
+ *
+ * {{{
+ * graph.filter(
+ * graph => {
+ * val degrees: VertexSetRDD[Int] = graph.outDegrees
+ * graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
+ * },
+ * vpred = (vid: VertexID, deg:Int) => deg > 0
+ * )
+ * }}}
+ *
+ */
+ def filter[VD2: ClassTag, ED2: ClassTag](
+ preprocess: Graph[VD, ED] => Graph[VD2, ED2],
+ epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
+ vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = {
+ graph.mask(preprocess(graph).subgraph(epred, vpred))
+ }
+} // end of GraphOps
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
new file mode 100644
index 0000000000..5e80a535f1
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -0,0 +1,94 @@
+package org.apache.spark.graphx
+
+
+sealed trait PartitionStrategy extends Serializable {
+ def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
+}
+
+
+/**
+ * This function implements a classic 2D-Partitioning of a sparse matrix.
+ * Suppose we have a graph with 11 vertices that we want to partition
+ * over 9 machines. We can use the following sparse matrix representation:
+ *
+ * __________________________________
+ * v0 | P0 * | P1 | P2 * |
+ * v1 | **** | * | |
+ * v2 | ******* | ** | **** |
+ * v3 | ***** | * * | * |
+ * ----------------------------------
+ * v4 | P3 * | P4 *** | P5 ** * |
+ * v5 | * * | * | |
+ * v6 | * | ** | **** |
+ * v7 | * * * | * * | * |
+ * ----------------------------------
+ * v8 | P6 * | P7 * | P8 * *|
+ * v9 | * | * * | |
+ * v10 | * | ** | * * |
+ * v11 | * <-E | *** | ** |
+ * ----------------------------------
+ *
+ * The edge denoted by E connects v11 with v1 and is assigned to
+ * processor P6. To get the processor number we divide the matrix
+ * into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges
+ * adjacent to v11 can only be in the first colum of
+ * blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8).
+ * As a consequence we can guarantee that v11 will need to be
+ * replicated to at most 2 * sqrt(numProc) machines.
+ *
+ * Notice that P0 has many edges and as a consequence this
+ * partitioning would lead to poor work balance. To improve
+ * balance we first multiply each vertex id by a large prime
+ * to effectively shuffle the vertex locations.
+ *
+ * One of the limitations of this approach is that the number of
+ * machines must either be a perfect square. We partially address
+ * this limitation by computing the machine assignment to the next
+ * largest perfect square and then mapping back down to the actual
+ * number of machines. Unfortunately, this can also lead to work
+ * imbalance and so it is suggested that a perfect square is used.
+ *
+ *
+ */
+case object EdgePartition2D extends PartitionStrategy {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
+ val mixingPrime: VertexID = 1125899906842597L
+ val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
+ val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
+ (col * ceilSqrtNumParts + row) % numParts
+ }
+}
+
+
+case object EdgePartition1D extends PartitionStrategy {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ val mixingPrime: VertexID = 1125899906842597L
+ (math.abs(src) * mixingPrime).toInt % numParts
+ }
+}
+
+
+/**
+ * Assign edges to an aribtrary machine corresponding to a
+ * random vertex cut.
+ */
+case object RandomVertexCut extends PartitionStrategy {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ math.abs((src, dst).hashCode()) % numParts
+ }
+}
+
+
+/**
+ * Assign edges to an arbitrary machine corresponding to a random vertex cut. This
+ * function ensures that edges of opposite direction between the same two vertices
+ * will end up on the same partition.
+ */
+case object CanonicalRandomVertexCut extends PartitionStrategy {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ val lower = math.min(src, dst)
+ val higher = math.max(src, dst)
+ math.abs((lower, higher).hashCode()) % numParts
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
new file mode 100644
index 0000000000..8ddb788135
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -0,0 +1,122 @@
+package org.apache.spark.graphx
+
+import scala.reflect.ClassTag
+
+
+/**
+ * This object implements a Pregel-like bulk-synchronous
+ * message-passing API. However, unlike the original Pregel API the
+ * GraphX pregel API factors the sendMessage computation over edges,
+ * enables the message sending computation to read both vertex
+ * attributes, and finally constrains messages to the graph structure.
+ * These changes allow for substantially more efficient distributed
+ * execution while also exposing greater flexibility for graph based
+ * computation.
+ *
+ * @example We can use the Pregel abstraction to implement PageRank
+ * {{{
+ * val pagerankGraph: Graph[Double, Double] = graph
+ * // Associate the degree with each vertex
+ * .outerJoinVertices(graph.outDegrees){
+ * (vid, vdata, deg) => deg.getOrElse(0)
+ * }
+ * // Set the weight on the edges based on the degree
+ * .mapTriplets( e => 1.0 / e.srcAttr )
+ * // Set the vertex attributes to the initial pagerank values
+ * .mapVertices( (id, attr) => 1.0 )
+ *
+ * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
+ * resetProb + (1.0 - resetProb) * msgSum
+ * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Option[Double] =
+ * Some(edge.srcAttr * edge.attr)
+ * def messageCombiner(a: Double, b: Double): Double = a + b
+ * val initialMessage = 0.0
+ * // Execute pregel for a fixed number of iterations.
+ * Pregel(pagerankGraph, initialMessage, numIter)(
+ * vertexProgram, sendMessage, messageCombiner)
+ * }}}
+ *
+ */
+object Pregel {
+
+ /**
+ * Execute a Pregel-like iterative vertex-parallel abstraction. The
+ * user-defined vertex-program `vprog` is executed in parallel on
+ * each vertex receiving any inbound messages and computing a new
+ * value for the vertex. The `sendMsg` function is then invoked on
+ * all out-edges and is used to compute an optional message to the
+ * destination vertex. The `mergeMsg` function is a commutative
+ * associative function used to combine messages destined to the
+ * same vertex.
+ *
+ * On the first iteration all vertices receive the `initialMsg` and
+ * on subsequent iterations if a vertex does not receive a message
+ * then the vertex-program is not invoked.
+ *
+ * This function iterates until there are no remaining messages, or
+ * for maxIterations iterations.
+ *
+ * @tparam VD the vertex data type
+ * @tparam ED the edge data type
+ * @tparam A the Pregel message type
+ *
+ * @param graph the input graph.
+ *
+ * @param initialMsg the message each vertex will receive at the on
+ * the first iteration.
+ *
+ * @param maxIterations the maximum number of iterations to run for.
+ *
+ * @param vprog the user-defined vertex program which runs on each
+ * vertex and receives the inbound message and computes a new vertex
+ * value. On the first iteration the vertex program is invoked on
+ * all vertices and is passed the default message. On subsequent
+ * iterations the vertex program is only invoked on those vertices
+ * that receive messages.
+ *
+ * @param sendMsg a user supplied function that is applied to out
+ * edges of vertices that received messages in the current
+ * iteration.
+ *
+ * @param mergeMsg a user supplied function that takes two incoming
+ * messages of type A and merges them into a single message of type
+ * A. ''This function must be commutative and associative and
+ * ideally the size of A should not increase.''
+ *
+ * @return the resulting graph at the end of the computation
+ *
+ */
+ def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
+ (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)(
+ vprog: (VertexID, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
+ mergeMsg: (A, A) => A)
+ : Graph[VD, ED] = {
+
+ var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) )
+ // compute the messages
+ var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache()
+ var activeMessages = messages.count()
+ // Loop
+ var i = 0
+ while (activeMessages > 0 && i < maxIterations) {
+ // Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
+ val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
+ // Update the graph with the new vertices.
+ g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
+
+ val oldMessages = messages
+ // Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
+ // get to send messages.
+ messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache()
+ activeMessages = messages.count()
+ // after counting we can unpersist the old messages
+ oldMessages.unpersist(blocking=false)
+ // count the iteration
+ i += 1
+ }
+
+ g
+ } // end of apply
+
+} // end of class Pregel
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
new file mode 100644
index 0000000000..cfee9b089f
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.spark.graphx.impl.MsgRDDFunctions
+import org.apache.spark.graphx.impl.VertexPartition
+
+
+/**
+ * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is
+ * only one entry for each vertex and by pre-indexing the entries for fast,
+ * efficient joins.
+ *
+ * @tparam VD the vertex attribute associated with each vertex in the set.
+ *
+ * To construct a `VertexRDD` use the singleton object:
+ *
+ * @example Construct a `VertexRDD` from a plain RDD
+ * {{{
+ * // Construct an intial vertex set
+ * val someData: RDD[(VertexID, SomeType)] = loadData(someFile)
+ * val vset = VertexRDD(someData)
+ * // If there were redundant values in someData we would use a reduceFunc
+ * val vset2 = VertexRDD(someData, reduceFunc)
+ * // Finally we can use the VertexRDD to index another dataset
+ * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile)
+ * val vset3 = VertexRDD(otherData, vset.index)
+ * // Now we can construct very fast joins between the two sets
+ * val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
+ * }}}
+ *
+ */
+class VertexRDD[@specialized VD: ClassTag](
+ val partitionsRDD: RDD[VertexPartition[VD]])
+ extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+
+ require(partitionsRDD.partitioner.isDefined)
+
+ partitionsRDD.setName("VertexRDD")
+
+ /**
+ * Construct a new VertexRDD that is indexed by only the keys in the RDD.
+ * The resulting VertexRDD will be based on a different index and can
+ * no longer be quickly joined with this RDD.
+ */
+ def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
+
+ /**
+ * The partitioner is defined by the index.
+ */
+ override val partitioner = partitionsRDD.partitioner
+
+ /**
+ * The actual partitions are defined by the tuples.
+ */
+ override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
+
+ /**
+ * The preferred locations are computed based on the preferred
+ * locations of the tuples.
+ */
+ override protected def getPreferredLocations(s: Partition): Seq[String] =
+ partitionsRDD.preferredLocations(s)
+
+ /**
+ * Caching a VertexRDD causes the index and values to be cached separately.
+ */
+ override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
+ partitionsRDD.persist(newLevel)
+ this
+ }
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY)
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ override def cache(): VertexRDD[VD] = persist()
+
+ /** Return the number of vertices in this set. */
+ override def count(): Long = {
+ partitionsRDD.map(_.size).reduce(_ + _)
+ }
+
+ /**
+ * Provide the `RDD[(VertexID, VD)]` equivalent output.
+ */
+ override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = {
+ firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
+ }
+
+ /**
+ * Return a new VertexRDD by applying a function to each VertexPartition of this RDD.
+ */
+ def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2])
+ : VertexRDD[VD2] = {
+ val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
+ new VertexRDD(newPartitionsRDD)
+ }
+
+
+ /**
+ * Restrict the vertex set to the set of vertices satisfying the
+ * given predicate.
+ *
+ * @param pred the user defined predicate, which takes a tuple to conform to
+ * the RDD[(VertexID, VD)] interface
+ *
+ * @note The vertex set preserves the original index structure
+ * which means that the returned RDD can be easily joined with
+ * the original vertex-set. Furthermore, the filter only
+ * modifies the bitmap index and so no new values are allocated.
+ */
+ override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] =
+ this.mapVertexPartitions(_.filter(Function.untupled(pred)))
+
+ /**
+ * Pass each vertex attribute through a map function and retain the
+ * original RDD's partitioning and index.
+ *
+ * @tparam VD2 the type returned by the map function
+ *
+ * @param f the function applied to each value in the RDD
+ * @return a new VertexRDD with values obtained by applying `f` to
+ * each of the entries in the original VertexRDD. The resulting
+ * VertexRDD retains the same index.
+ */
+ def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] =
+ this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
+
+ /**
+ * Pass each vertex attribute through a map function and retain the
+ * original RDD's partitioning and index.
+ *
+ * @tparam VD2 the type returned by the map function
+ *
+ * @param f the function applied to each value in the RDD
+ * @return a new VertexRDD with values obtained by applying `f` to
+ * each of the entries in the original VertexRDD. The resulting
+ * VertexRDD retains the same index.
+ */
+ def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] =
+ this.mapVertexPartitions(_.map(f))
+
+ /**
+ * Hides vertices that are the same between this and other. For vertices that are different, keeps
+ * the values from `other`.
+ */
+ def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
+ val newPartitionsRDD = partitionsRDD.zipPartitions(
+ other.partitionsRDD, preservesPartitioning = true
+ ) { (thisIter, otherIter) =>
+ val thisPart = thisIter.next()
+ val otherPart = otherIter.next()
+ Iterator(thisPart.diff(otherPart))
+ }
+ new VertexRDD(newPartitionsRDD)
+ }
+
+ /**
+ * Left join this VertexSet with another VertexSet which has the
+ * same Index. This function will fail if both VertexSets do not
+ * share the same index. The resulting vertex set contains an entry
+ * for each vertex in this set. If the other VertexSet is missing
+ * any vertex in this VertexSet then a `None` attribute is generated
+ *
+ * @tparam VD2 the attribute type of the other VertexSet
+ * @tparam VD3 the attribute type of the resulting VertexSet
+ *
+ * @param other the other VertexSet with which to join.
+ * @param f the function mapping a vertex id and its attributes in
+ * this and the other vertex set to a new vertex attribute.
+ * @return a VertexRDD containing all the vertices in this
+ * VertexSet with `None` attributes used for Vertices missing in the
+ * other VertexSet.
+ *
+ */
+ def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
+ (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
+ val newPartitionsRDD = partitionsRDD.zipPartitions(
+ other.partitionsRDD, preservesPartitioning = true
+ ) { (thisIter, otherIter) =>
+ val thisPart = thisIter.next()
+ val otherPart = otherIter.next()
+ Iterator(thisPart.leftJoin(otherPart)(f))
+ }
+ new VertexRDD(newPartitionsRDD)
+ }
+
+ /**
+ * Left join this VertexRDD with an RDD containing vertex attribute
+ * pairs. If the other RDD is backed by a VertexRDD with the same
+ * index than the efficient leftZipJoin implementation is used. The
+ * resulting vertex set contains an entry for each vertex in this
+ * set. If the other VertexRDD is missing any vertex in this
+ * VertexRDD then a `None` attribute is generated.
+ *
+ * If there are duplicates, the vertex is picked at random.
+ *
+ * @tparam VD2 the attribute type of the other VertexRDD
+ * @tparam VD3 the attribute type of the resulting VertexRDD
+ *
+ * @param other the other VertexRDD with which to join.
+ * @param f the function mapping a vertex id and its attributes in
+ * this and the other vertex set to a new vertex attribute.
+ * @return a VertexRDD containing all the vertices in this
+ * VertexRDD with the attribute emitted by f.
+ */
+ def leftJoin[VD2: ClassTag, VD3: ClassTag]
+ (other: RDD[(VertexID, VD2)])
+ (f: (VertexID, VD, Option[VD2]) => VD3)
+ : VertexRDD[VD3] =
+ {
+ // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
+ // If the other set is a VertexRDD then we use the much more efficient leftZipJoin
+ other match {
+ case other: VertexRDD[_] =>
+ leftZipJoin(other)(f)
+ case _ =>
+ new VertexRDD[VD3](
+ partitionsRDD.zipPartitions(
+ other.partitionBy(this.partitioner.get), preservesPartitioning = true)
+ { (part, msgs) =>
+ val vertexPartition: VertexPartition[VD] = part.next()
+ Iterator(vertexPartition.leftJoin(msgs)(f))
+ }
+ )
+ }
+ }
+
+ /**
+ * Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other`
+ * must have the same index.
+ */
+ def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
+ (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
+ val newPartitionsRDD = partitionsRDD.zipPartitions(
+ other.partitionsRDD, preservesPartitioning = true
+ ) { (thisIter, otherIter) =>
+ val thisPart = thisIter.next()
+ val otherPart = otherIter.next()
+ Iterator(thisPart.innerJoin(otherPart)(f))
+ }
+ new VertexRDD(newPartitionsRDD)
+ }
+
+ /**
+ * Replace vertices with corresponding vertices in `other`, and drop vertices without a
+ * corresponding vertex in `other`.
+ */
+ def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
+ (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
+ // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
+ // If the other set is a VertexRDD then we use the much more efficient innerZipJoin
+ other match {
+ case other: VertexRDD[_] =>
+ innerZipJoin(other)(f)
+ case _ =>
+ new VertexRDD(
+ partitionsRDD.zipPartitions(
+ other.partitionBy(this.partitioner.get), preservesPartitioning = true)
+ { (part, msgs) =>
+ val vertexPartition: VertexPartition[VD] = part.next()
+ Iterator(vertexPartition.innerJoin(msgs)(f))
+ }
+ )
+ }
+ }
+
+ /**
+ * Aggregate messages with the same ids using `reduceFunc`, returning a VertexRDD that is
+ * co-indexed with this one.
+ */
+ def aggregateUsingIndex[VD2: ClassTag](
+ messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
+ {
+ val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
+ val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
+ val vertexPartition: VertexPartition[VD] = thisIter.next()
+ Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
+ }
+ new VertexRDD[VD2](parts)
+ }
+
+} // end of VertexRDD
+
+
+/**
+ * The VertexRDD singleton is used to construct VertexRDDs
+ */
+object VertexRDD {
+
+ /**
+ * Construct a vertex set from an RDD of vertex-attribute pairs.
+ * Duplicate entries are removed arbitrarily.
+ *
+ * @tparam VD the vertex attribute type
+ *
+ * @param rdd the collection of vertex-attribute pairs
+ */
+ def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = {
+ val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
+ case Some(p) => rdd
+ case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+ }
+ val vertexPartitions = partitioned.mapPartitions(
+ iter => Iterator(VertexPartition(iter)),
+ preservesPartitioning = true)
+ new VertexRDD(vertexPartitions)
+ }
+
+ /**
+ * Construct a vertex set from an RDD of vertex-attribute pairs.
+ * Duplicate entries are merged using mergeFunc.
+ *
+ * @tparam VD the vertex attribute type
+ *
+ * @param rdd the collection of vertex-attribute pairs
+ * @param mergeFunc the associative, commutative merge function.
+ */
+ def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
+ {
+ val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
+ case Some(p) => rdd
+ case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+ }
+ val vertexPartitions = partitioned.mapPartitions(
+ iter => Iterator(VertexPartition(iter)),
+ preservesPartitioning = true)
+ new VertexRDD(vertexPartitions)
+ }
+
+ def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD)
+ : VertexRDD[VD] =
+ {
+ VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
+ value.getOrElse(default)
+ }
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala
new file mode 100644
index 0000000000..a0dd36da60
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala
@@ -0,0 +1,37 @@
+package org.apache.spark.graphx.algorithms
+
+import org.apache.spark.graphx._
+
+
+object ConnectedComponents {
+ /**
+ * Compute the connected component membership of each vertex and return an RDD with the vertex
+ * value containing the lowest vertex id in the connected component containing that vertex.
+ *
+ * @tparam VD the vertex attribute type (discarded in the computation)
+ * @tparam ED the edge attribute type (preserved in the computation)
+ *
+ * @param graph the graph for which to compute the connected components
+ *
+ * @return a graph with vertex attributes containing the smallest vertex in each
+ * connected component
+ */
+ def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]): Graph[VertexID, ED] = {
+ val ccGraph = graph.mapVertices { case (vid, _) => vid }
+
+ def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
+ if (edge.srcAttr < edge.dstAttr) {
+ Iterator((edge.dstId, edge.srcAttr))
+ } else if (edge.srcAttr > edge.dstAttr) {
+ Iterator((edge.srcId, edge.dstAttr))
+ } else {
+ Iterator.empty
+ }
+ }
+ val initialMessage = Long.MaxValue
+ Pregel(ccGraph, initialMessage)(
+ vprog = (id, attr, msg) => math.min(attr, msg),
+ sendMsg = sendMessage,
+ mergeMsg = (a, b) => math.min(a, b))
+ } // end of connectedComponents
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala
new file mode 100644
index 0000000000..0292b7316d
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala
@@ -0,0 +1,205 @@
+package org.apache.spark.graphx.algorithms
+
+import org.apache.spark.Logging
+import org.apache.spark.graphx._
+
+
+object PageRank extends Logging {
+
+ /**
+ * Run PageRank for a fixed number of iterations returning a graph
+ * with vertex attributes containing the PageRank and edge
+ * attributes the normalized edge weight.
+ *
+ * The following PageRank fixed point is computed for each vertex.
+ *
+ * {{{
+ * var PR = Array.fill(n)( 1.0 )
+ * val oldPR = Array.fill(n)( 1.0 )
+ * for( iter <- 0 until numIter ) {
+ * swap(oldPR, PR)
+ * for( i <- 0 until n ) {
+ * PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
+ * }
+ * }
+ * }}}
+ *
+ * where `alpha` is the random reset probability (typically 0.15),
+ * `inNbrs[i]` is the set of neighbors whick link to `i` and
+ * `outDeg[j]` is the out degree of vertex `j`.
+ *
+ * Note that this is not the "normalized" PageRank and as a consequence pages that have no
+ * inlinks will have a PageRank of alpha.
+ *
+ * @tparam VD the original vertex attribute (not used)
+ * @tparam ED the original edge attribute (not used)
+ *
+ * @param graph the graph on which to compute PageRank
+ * @param numIter the number of iterations of PageRank to run
+ * @param resetProb the random reset probability (alpha)
+ *
+ * @return the graph containing with each vertex containing the PageRank and each edge
+ * containing the normalized weight.
+ *
+ */
+ def run[VD: Manifest, ED: Manifest](
+ graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
+ {
+
+ /**
+ * Initialize the pagerankGraph with each edge attribute having
+ * weight 1/outDegree and each vertex with attribute 1.0.
+ */
+ val pagerankGraph: Graph[Double, Double] = graph
+ // Associate the degree with each vertex
+ .outerJoinVertices(graph.outDegrees){
+ (vid, vdata, deg) => deg.getOrElse(0)
+ }
+ // Set the weight on the edges based on the degree
+ .mapTriplets( e => 1.0 / e.srcAttr )
+ // Set the vertex attributes to the initial pagerank values
+ .mapVertices( (id, attr) => 1.0 )
+
+ // Display statistics about pagerank
+ logInfo(pagerankGraph.statistics.toString)
+
+ // Define the three functions needed to implement PageRank in the GraphX
+ // version of Pregel
+ def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
+ resetProb + (1.0 - resetProb) * msgSum
+ def sendMessage(edge: EdgeTriplet[Double, Double]) =
+ Iterator((edge.dstId, edge.srcAttr * edge.attr))
+ def messageCombiner(a: Double, b: Double): Double = a + b
+ // The initial message received by all vertices in PageRank
+ val initialMessage = 0.0
+
+ // Execute pregel for a fixed number of iterations.
+ Pregel(pagerankGraph, initialMessage, numIter)(
+ vertexProgram, sendMessage, messageCombiner)
+ }
+
+ /**
+ * Run a dynamic version of PageRank returning a graph with vertex attributes containing the
+ * PageRank and edge attributes containing the normalized edge weight.
+ *
+ * {{{
+ * var PR = Array.fill(n)( 1.0 )
+ * val oldPR = Array.fill(n)( 0.0 )
+ * while( max(abs(PR - oldPr)) > tol ) {
+ * swap(oldPR, PR)
+ * for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) {
+ * PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
+ * }
+ * }
+ * }}}
+ *
+ * where `alpha` is the random reset probability (typically 0.15), `inNbrs[i]` is the set of
+ * neighbors whick link to `i` and `outDeg[j]` is the out degree of vertex `j`.
+ *
+ * Note that this is not the "normalized" PageRank and as a consequence pages that have no
+ * inlinks will have a PageRank of alpha.
+ *
+ * @tparam VD the original vertex attribute (not used)
+ * @tparam ED the original edge attribute (not used)
+ *
+ * @param graph the graph on which to compute PageRank
+ * @param tol the tolerance allowed at convergence (smaller => more * accurate).
+ * @param resetProb the random reset probability (alpha)
+ *
+ * @return the graph containing with each vertex containing the PageRank and each edge
+ * containing the normalized weight.
+ */
+ def runUntillConvergence[VD: Manifest, ED: Manifest](
+ graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
+ {
+ // Initialize the pagerankGraph with each edge attribute
+ // having weight 1/outDegree and each vertex with attribute 1.0.
+ val pagerankGraph: Graph[(Double, Double), Double] = graph
+ // Associate the degree with each vertex
+ .outerJoinVertices(graph.outDegrees) {
+ (vid, vdata, deg) => deg.getOrElse(0)
+ }
+ // Set the weight on the edges based on the degree
+ .mapTriplets( e => 1.0 / e.srcAttr )
+ // Set the vertex attributes to (initalPR, delta = 0)
+ .mapVertices( (id, attr) => (0.0, 0.0) )
+
+ // Display statistics about pagerank
+ logInfo(pagerankGraph.statistics.toString)
+
+ // Define the three functions needed to implement PageRank in the GraphX
+ // version of Pregel
+ def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = {
+ val (oldPR, lastDelta) = attr
+ val newPR = oldPR + (1.0 - resetProb) * msgSum
+ (newPR, newPR - oldPR)
+ }
+
+ def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
+ if (edge.srcAttr._2 > tol) {
+ Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
+ } else {
+ Iterator.empty
+ }
+ }
+
+ def messageCombiner(a: Double, b: Double): Double = a + b
+
+ // The initial message received by all vertices in PageRank
+ val initialMessage = resetProb / (1.0 - resetProb)
+
+ // Execute a dynamic version of Pregel.
+ Pregel(pagerankGraph, initialMessage)(vertexProgram, sendMessage, messageCombiner)
+ .mapVertices((vid, attr) => attr._1)
+ } // end of deltaPageRank
+
+ def runStandalone[VD: Manifest, ED: Manifest](
+ graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): VertexRDD[Double] = {
+
+ // Initialize the ranks
+ var ranks: VertexRDD[Double] = graph.vertices.mapValues((vid, attr) => resetProb).cache()
+
+ // Initialize the delta graph where each vertex stores its delta and each edge knows its weight
+ var deltaGraph: Graph[Double, Double] =
+ graph.outerJoinVertices(graph.outDegrees)((vid, vdata, deg) => deg.getOrElse(0))
+ .mapTriplets(e => 1.0 / e.srcAttr)
+ .mapVertices((vid, degree) => resetProb).cache()
+ var numDeltas: Long = ranks.count()
+
+ var prevDeltas: Option[VertexRDD[Double]] = None
+
+ var i = 0
+ val weight = (1.0 - resetProb)
+ while (numDeltas > 0) {
+ // Compute new deltas. Only deltas that existed in the last round (i.e., were greater than
+ // `tol`) get to send messages; those that were less than `tol` would send messages less than
+ // `tol` as well.
+ val deltas = deltaGraph
+ .mapReduceTriplets[Double](
+ et => Iterator((et.dstId, et.srcAttr * et.attr * weight)),
+ _ + _,
+ prevDeltas.map((_, EdgeDirection.Out)))
+ .filter { case (vid, delta) => delta > tol }
+ .cache()
+ prevDeltas = Some(deltas)
+ numDeltas = deltas.count()
+ logInfo("Standalone PageRank: iter %d has %d deltas".format(i, numDeltas))
+
+ // Update deltaGraph with the deltas
+ deltaGraph = deltaGraph.outerJoinVertices(deltas) { (vid, old, newOpt) =>
+ newOpt.getOrElse(old)
+ }.cache()
+
+ // Update ranks
+ ranks = ranks.leftZipJoin(deltas) { (vid, oldRank, deltaOpt) =>
+ oldRank + deltaOpt.getOrElse(0.0)
+ }
+ ranks.foreach(x => {}) // force the iteration for ease of debugging
+
+ i += 1
+ }
+
+ ranks
+ }
+
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala
new file mode 100644
index 0000000000..8fdfa3d907
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala
@@ -0,0 +1,103 @@
+package org.apache.spark.graphx.algorithms
+
+import org.apache.spark.rdd._
+import org.apache.spark.graphx._
+import scala.util.Random
+import org.apache.commons.math.linear._
+
+class SVDPlusPlusConf( // SVDPlusPlus parameters
+ var rank: Int,
+ var maxIters: Int,
+ var minVal: Double,
+ var maxVal: Double,
+ var gamma1: Double,
+ var gamma2: Double,
+ var gamma6: Double,
+ var gamma7: Double) extends Serializable
+
+object SVDPlusPlus {
+ /**
+ * Implement SVD++ based on "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model",
+ * paper is available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].
+ * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)), see the details on page 6.
+ *
+ * @param edges edges for constructing the graph
+ *
+ * @param conf SVDPlusPlus parameters
+ *
+ * @return a graph with vertex attributes containing the trained model
+ */
+
+ def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf): (Graph[(RealVector, RealVector, Double, Double), Double], Double) = {
+
+ // generate default vertex attribute
+ def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = {
+ val v1 = new ArrayRealVector(rank)
+ val v2 = new ArrayRealVector(rank)
+ for (i <- 0 until rank) {
+ v1.setEntry(i, Random.nextDouble)
+ v2.setEntry(i, Random.nextDouble)
+ }
+ (v1, v2, 0.0, 0.0)
+ }
+
+ // calculate global rating mean
+ val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
+ val u = rs / rc
+
+ // construct graph
+ var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache()
+
+ // calculate initial bias and norm
+ var t0 = g.mapReduceTriplets(et =>
+ Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
+ g = g.outerJoinVertices(t0) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
+ (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
+ }
+
+ def mapTrainF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
+ : Iterator[(VertexID, (RealVector, RealVector, Double))] = {
+ val (usr, itm) = (et.srcAttr, et.dstAttr)
+ val (p, q) = (usr._1, itm._1)
+ var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
+ pred = math.max(pred, conf.minVal)
+ pred = math.min(pred, conf.maxVal)
+ val err = et.attr - pred
+ val updateP = ((q.mapMultiply(err)).subtract(p.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
+ val updateQ = ((usr._2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
+ val updateY = ((q.mapMultiply(err * usr._4)).subtract((itm._2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
+ Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)),
+ (et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1)))
+ }
+
+ for (i <- 0 until conf.maxIters) {
+ // phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
+ var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2))
+ g = g.outerJoinVertices(t1) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
+ if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
+ }
+ // phase 2, update p for user nodes and q, y for item nodes
+ val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
+ (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
+ g = g.outerJoinVertices(t2) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) =>
+ (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
+ }
+ }
+
+ // calculate error on training set
+ def mapTestF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(VertexID, Double)] = {
+ val (usr, itm) = (et.srcAttr, et.dstAttr)
+ val (p, q) = (usr._1, itm._1)
+ var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
+ pred = math.max(pred, conf.minVal)
+ pred = math.min(pred, conf.maxVal)
+ val err = (et.attr - pred) * (et.attr - pred)
+ Iterator((et.dstId, err))
+ }
+ val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
+ g = g.outerJoinVertices(t3) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
+ if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
+ }
+ (g, u)
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala
new file mode 100644
index 0000000000..f64fc3ef0f
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala
@@ -0,0 +1,87 @@
+package org.apache.spark.graphx.algorithms
+
+import org.apache.spark.graphx._
+
+object StronglyConnectedComponents {
+
+ /**
+ * Compute the strongly connected component (SCC) of each vertex and return an RDD with the vertex
+ * value containing the lowest vertex id in the SCC containing that vertex.
+ *
+ * @tparam VD the vertex attribute type (discarded in the computation)
+ * @tparam ED the edge attribute type (preserved in the computation)
+ *
+ * @param graph the graph for which to compute the SCC
+ *
+ * @return a graph with vertex attributes containing the smallest vertex id in each SCC
+ */
+ def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = {
+
+ // the graph we update with final SCC ids, and the graph we return at the end
+ var sccGraph = graph.mapVertices { case (vid, _) => vid }
+ // graph we are going to work with in our iterations
+ var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) }
+
+ var numVertices = sccWorkGraph.numVertices
+ var iter = 0
+ while (sccWorkGraph.numVertices > 0 && iter < numIter) {
+ iter += 1
+ do {
+ numVertices = sccWorkGraph.numVertices
+ sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.outDegrees) {
+ (vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true)
+ }
+ sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.inDegrees) {
+ (vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true)
+ }
+
+ // get all vertices to be removed
+ val finalVertices = sccWorkGraph.vertices
+ .filter { case (vid, (scc, isFinal)) => isFinal}
+ .mapValues { (vid, data) => data._1}
+
+ // write values to sccGraph
+ sccGraph = sccGraph.outerJoinVertices(finalVertices) {
+ (vid, scc, opt) => opt.getOrElse(scc)
+ }
+ // only keep vertices that are not final
+ sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2)
+ } while (sccWorkGraph.numVertices < numVertices)
+
+ sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) }
+
+ // collect min of all my neighbor's scc values, update if it's smaller than mine
+ // then notify any neighbors with scc values larger than mine
+ sccWorkGraph = GraphLab[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Integer.MAX_VALUE)(
+ (vid, e) => e.otherVertexAttr(vid)._1,
+ (vid1, vid2) => math.min(vid1, vid2),
+ (vid, scc, optScc) =>
+ (math.min(scc._1, optScc.getOrElse(scc._1)), scc._2),
+ (vid, e) => e.vertexAttr(vid)._1 < e.otherVertexAttr(vid)._1
+ )
+
+ // start at root of SCCs. Traverse values in reverse, notify all my neighbors
+ // do not propagate if colors do not match!
+ sccWorkGraph = GraphLab[(VertexID, Boolean), ED, Boolean](
+ sccWorkGraph,
+ Integer.MAX_VALUE,
+ EdgeDirection.Out,
+ EdgeDirection.In
+ )(
+ // vertex is final if it is the root of a color
+ // or it has the same color as a neighbor that is final
+ (vid, e) => (vid == e.vertexAttr(vid)._1) || (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1),
+ (final1, final2) => final1 || final2,
+ (vid, scc, optFinal) =>
+ (scc._1, scc._2 || optFinal.getOrElse(false)),
+ // activate neighbor if they are not final, you are, and you have the same color
+ (vid, e) => e.vertexAttr(vid)._2 &&
+ !e.otherVertexAttr(vid)._2 && (e.vertexAttr(vid)._1 == e.otherVertexAttr(vid)._1),
+ // start at root of colors
+ (vid, data) => vid == data._1
+ )
+ }
+ sccGraph
+ }
+
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala
new file mode 100644
index 0000000000..b5a93c1bd1
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala
@@ -0,0 +1,78 @@
+package org.apache.spark.graphx.algorithms
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.graphx._
+
+
+object TriangleCount {
+ /**
+ * Compute the number of triangles passing through each vertex.
+ *
+ * The algorithm is relatively straightforward and can be computed in three steps:
+ *
+ * 1) Compute the set of neighbors for each vertex
+ * 2) For each edge compute the intersection of the sets and send the
+ * count to both vertices.
+ * 3) Compute the sum at each vertex and divide by two since each
+ * triangle is counted twice.
+ *
+ *
+ * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned
+ * using Graph.partitionBy.
+ *
+ * @return
+ */
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
+ // Remove redundant edges
+ val g = graph.groupEdges((a, b) => a).cache
+
+ // Construct set representations of the neighborhoods
+ val nbrSets: VertexRDD[VertexSet] =
+ g.collectNeighborIds(EdgeDirection.Both).mapValues { (vid, nbrs) =>
+ val set = new VertexSet(4)
+ var i = 0
+ while (i < nbrs.size) {
+ // prevent self cycle
+ if(nbrs(i) != vid) {
+ set.add(nbrs(i))
+ }
+ i += 1
+ }
+ set
+ }
+ // join the sets with the graph
+ val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
+ (vid, _, optSet) => optSet.getOrElse(null)
+ }
+ // Edge function computes intersection of smaller vertex with larger vertex
+ def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = {
+ assert(et.srcAttr != null)
+ assert(et.dstAttr != null)
+ val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
+ (et.srcAttr, et.dstAttr)
+ } else {
+ (et.dstAttr, et.srcAttr)
+ }
+ val iter = smallSet.iterator
+ var counter: Int = 0
+ while (iter.hasNext) {
+ val vid = iter.next
+ if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { counter += 1 }
+ }
+ Iterator((et.srcId, counter), (et.dstId, counter))
+ }
+ // compute the intersection along edges
+ val counters: VertexRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _)
+ // Merge counters with the graph and divide by two since each triangle is counted twice
+ g.outerJoinVertices(counters) {
+ (vid, _, optCounter: Option[Int]) =>
+ val dblCount = optCounter.getOrElse(0)
+ // double count should be even (divisible by two)
+ assert((dblCount & 1) == 0)
+ dblCount / 2
+ }
+
+ } // end of TriangleCount
+
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
new file mode 100644
index 0000000000..4176563d22
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -0,0 +1,220 @@
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.graphx._
+import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are
+ * clustered by src.
+ *
+ * @param srcIds the source vertex id of each edge
+ * @param dstIds the destination vertex id of each edge
+ * @param data the attribute associated with each edge
+ * @param index a clustered index on source vertex id
+ * @tparam ED the edge attribute type.
+ */
+class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
+ val srcIds: Array[VertexID],
+ val dstIds: Array[VertexID],
+ val data: Array[ED],
+ val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable {
+
+ /**
+ * Reverse all the edges in this partition.
+ *
+ * @return a new edge partition with all edges reversed.
+ */
+ def reverse: EdgePartition[ED] = {
+ val builder = new EdgePartitionBuilder(size)
+ for (e <- iterator) {
+ builder.add(e.dstId, e.srcId, e.attr)
+ }
+ builder.toEdgePartition
+ }
+
+ /**
+ * Construct a new edge partition by applying the function f to all
+ * edges in this partition.
+ *
+ * @param f a function from an edge to a new attribute
+ * @tparam ED2 the type of the new attribute
+ * @return a new edge partition with the result of the function `f`
+ * applied to each edge
+ */
+ def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = {
+ val newData = new Array[ED2](data.size)
+ val edge = new Edge[ED]()
+ val size = data.size
+ var i = 0
+ while (i < size) {
+ edge.srcId = srcIds(i)
+ edge.dstId = dstIds(i)
+ edge.attr = data(i)
+ newData(i) = f(edge)
+ i += 1
+ }
+ new EdgePartition(srcIds, dstIds, newData, index)
+ }
+
+ /**
+ * Construct a new edge partition by using the edge attributes
+ * contained in the iterator.
+ *
+ * @note The input iterator should return edge attributes in the
+ * order of the edges returned by `EdgePartition.iterator` and
+ * should return attributes equal to the number of edges.
+ *
+ * @param f a function from an edge to a new attribute
+ * @tparam ED2 the type of the new attribute
+ * @return a new edge partition with the result of the function `f`
+ * applied to each edge
+ */
+ def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
+ val newData = new Array[ED2](data.size)
+ var i = 0
+ while (iter.hasNext) {
+ newData(i) = iter.next()
+ i += 1
+ }
+ assert(newData.size == i)
+ new EdgePartition(srcIds, dstIds, newData, index)
+ }
+
+ /**
+ * Apply the function f to all edges in this partition.
+ *
+ * @param f an external state mutating user defined function.
+ */
+ def foreach(f: Edge[ED] => Unit) {
+ iterator.foreach(f)
+ }
+
+ /**
+ * Merge all the edges with the same src and dest id into a single
+ * edge using the `merge` function
+ *
+ * @param merge a commutative associative merge operation
+ * @return a new edge partition without duplicate edges
+ */
+ def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
+ val builder = new EdgePartitionBuilder[ED]
+ var firstIter: Boolean = true
+ var currSrcId: VertexID = nullValue[VertexID]
+ var currDstId: VertexID = nullValue[VertexID]
+ var currAttr: ED = nullValue[ED]
+ var i = 0
+ while (i < size) {
+ if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
+ currAttr = merge(currAttr, data(i))
+ } else {
+ if (i > 0) {
+ builder.add(currSrcId, currDstId, currAttr)
+ }
+ currSrcId = srcIds(i)
+ currDstId = dstIds(i)
+ currAttr = data(i)
+ }
+ i += 1
+ }
+ if (size > 0) {
+ builder.add(currSrcId, currDstId, currAttr)
+ }
+ builder.toEdgePartition
+ }
+
+ /**
+ * Apply `f` to all edges present in both `this` and `other` and return a new EdgePartition
+ * containing the resulting edges.
+ *
+ * If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for
+ * each edge, but each time it may be invoked on any corresponding edge in `other`.
+ *
+ * If there are multiple edges with the same src and dst in `other`, `f` will only be invoked
+ * once.
+ */
+ def innerJoin[ED2: ClassTag, ED3: ClassTag]
+ (other: EdgePartition[ED2])
+ (f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = {
+ val builder = new EdgePartitionBuilder[ED3]
+ var i = 0
+ var j = 0
+ // For i = index of each edge in `this`...
+ while (i < size && j < other.size) {
+ val srcId = this.srcIds(i)
+ val dstId = this.dstIds(i)
+ // ... forward j to the index of the corresponding edge in `other`, and...
+ while (j < other.size && other.srcIds(j) < srcId) { j += 1 }
+ if (j < other.size && other.srcIds(j) == srcId) {
+ while (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) < dstId) { j += 1 }
+ if (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) == dstId) {
+ // ... run `f` on the matching edge
+ builder.add(srcId, dstId, f(srcId, dstId, this.data(i), other.data(j)))
+ }
+ }
+ i += 1
+ }
+ builder.toEdgePartition
+ }
+
+ /**
+ * The number of edges in this partition
+ *
+ * @return size of the partition
+ */
+ def size: Int = srcIds.size
+
+ /** The number of unique source vertices in the partition. */
+ def indexSize: Int = index.size
+
+ /**
+ * Get an iterator over the edges in this partition.
+ *
+ * @return an iterator over edges in the partition
+ */
+ def iterator = new Iterator[Edge[ED]] {
+ private[this] val edge = new Edge[ED]
+ private[this] var pos = 0
+
+ override def hasNext: Boolean = pos < EdgePartition.this.size
+
+ override def next(): Edge[ED] = {
+ edge.srcId = srcIds(pos)
+ edge.dstId = dstIds(pos)
+ edge.attr = data(pos)
+ pos += 1
+ edge
+ }
+ }
+
+ /**
+ * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The
+ * iterator is generated using an index scan, so it is efficient at skipping edges that don't
+ * match srcIdPred.
+ */
+ def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] =
+ index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
+
+ /**
+ * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
+ * cluster must start at position `index`.
+ */
+ private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] {
+ private[this] val edge = new Edge[ED]
+ private[this] var pos = index
+
+ override def hasNext: Boolean = {
+ pos >= 0 && pos < EdgePartition.this.size && srcIds(pos) == srcId
+ }
+
+ override def next(): Edge[ED] = {
+ assert(srcIds(pos) == srcId)
+ edge.srcId = srcIds(pos)
+ edge.dstId = dstIds(pos)
+ edge.attr = data(pos)
+ pos += 1
+ edge
+ }
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
new file mode 100644
index 0000000000..d4f08497a2
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -0,0 +1,46 @@
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+import scala.util.Sorting
+
+import org.apache.spark.graphx._
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
+
+
+//private[graph]
+class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) {
+
+ var edges = new PrimitiveVector[Edge[ED]](size)
+
+ /** Add a new edge to the partition. */
+ def add(src: VertexID, dst: VertexID, d: ED) {
+ edges += Edge(src, dst, d)
+ }
+
+ def toEdgePartition: EdgePartition[ED] = {
+ val edgeArray = edges.trim().array
+ Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
+ val srcIds = new Array[VertexID](edgeArray.size)
+ val dstIds = new Array[VertexID](edgeArray.size)
+ val data = new Array[ED](edgeArray.size)
+ val index = new PrimitiveKeyOpenHashMap[VertexID, Int]
+ // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
+ // adding them to the index
+ if (edgeArray.length > 0) {
+ index.update(srcIds(0), 0)
+ var currSrcId: VertexID = srcIds(0)
+ var i = 0
+ while (i < edgeArray.size) {
+ srcIds(i) = edgeArray(i).srcId
+ dstIds(i) = edgeArray(i).dstId
+ data(i) = edgeArray(i).attr
+ if (edgeArray(i).srcId != currSrcId) {
+ currSrcId = edgeArray(i).srcId
+ index.update(currSrcId, i)
+ }
+ i += 1
+ }
+ }
+ new EdgePartition(srcIds, dstIds, data, index)
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
new file mode 100644
index 0000000000..79fd962ffd
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -0,0 +1,43 @@
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.graphx._
+import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
+
+
+/**
+ * The Iterator type returned when constructing edge triplets. This class technically could be
+ * an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
+ * debug / profile.
+ */
+private[impl]
+class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
+ val vidToIndex: VertexIdToIndexMap,
+ val vertexArray: Array[VD],
+ val edgePartition: EdgePartition[ED])
+ extends Iterator[EdgeTriplet[VD, ED]] {
+
+ // Current position in the array.
+ private var pos = 0
+
+ // A triplet object that this iterator.next() call returns. We reuse this object to avoid
+ // allocating too many temporary Java objects.
+ private val triplet = new EdgeTriplet[VD, ED]
+
+ private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray)
+
+ override def hasNext: Boolean = pos < edgePartition.size
+
+ override def next() = {
+ triplet.srcId = edgePartition.srcIds(pos)
+ // assert(vmap.containsKey(e.src.id))
+ triplet.srcAttr = vmap(triplet.srcId)
+ triplet.dstId = edgePartition.dstIds(pos)
+ // assert(vmap.containsKey(e.dst.id))
+ triplet.dstAttr = vmap(triplet.dstId)
+ triplet.attr = edgePartition.data(pos)
+ pos += 1
+ triplet
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
new file mode 100644
index 0000000000..be9f188150
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -0,0 +1,422 @@
+package org.apache.spark.graphx.impl
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.util.collection.PrimitiveVector
+import org.apache.spark.{HashPartitioner, Partitioner}
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.impl.GraphImpl._
+import org.apache.spark.graphx.impl.MsgRDDFunctions._
+import org.apache.spark.graphx.util.BytecodeUtils
+import org.apache.spark.rdd.{ShuffledRDD, RDD}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.ClosureCleaner
+
+
+/**
+ * A Graph RDD that supports computation on graphs.
+ *
+ * Graphs are represented using two classes of data: vertex-partitioned and
+ * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges`
+ * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods,
+ * vertex attributes are replicated to the edge partitions where they appear as sources or
+ * destinations. `routingTable` stores the routing information for shipping vertex attributes to
+ * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created
+ * using the routing table.
+ */
+class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
+ @transient val vertices: VertexRDD[VD],
+ @transient val edges: EdgeRDD[ED],
+ @transient val routingTable: RoutingTable,
+ @transient val replicatedVertexView: ReplicatedVertexView[VD])
+ extends Graph[VD, ED] with Serializable {
+
+ def this(
+ vertices: VertexRDD[VD],
+ edges: EdgeRDD[ED],
+ routingTable: RoutingTable) = {
+ this(vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable))
+ }
+
+ def this(
+ vertices: VertexRDD[VD],
+ edges: EdgeRDD[ED]) = {
+ this(vertices, edges, new RoutingTable(edges, vertices))
+ }
+
+ /** Return a RDD that brings edges together with their source and destination vertices. */
+ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
+ val vdTag = classTag[VD]
+ val edTag = classTag[ED]
+ edges.partitionsRDD.zipPartitions(
+ replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) =>
+ val (pid, ePart) = ePartIter.next()
+ val (_, vPart) = vPartIter.next()
+ new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
+ }
+ }
+
+ override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
+ vertices.persist(newLevel)
+ edges.persist(newLevel)
+ this
+ }
+
+ override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
+
+ override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
+ val numPartitions = edges.partitions.size
+ val edTag = classTag[ED]
+ val newEdges = new EdgeRDD(edges.map { e =>
+ val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
+
+ // Should we be using 3-tuple or an optimized class
+ new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
+ }
+ .partitionBy(new HashPartitioner(numPartitions))
+ .mapPartitionsWithIndex( { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED]()(edTag)
+ iter.foreach { message =>
+ val data = message.data
+ builder.add(data._1, data._2, data._3)
+ }
+ val edgePartition = builder.toEdgePartition
+ Iterator((pid, edgePartition))
+ }, preservesPartitioning = true).cache())
+ new GraphImpl(vertices, newEdges)
+ }
+
+ override def statistics: Map[String, Any] = {
+ // Get the total number of vertices after replication, used to compute the replication ratio.
+ def numReplicatedVertices(vid2pids: RDD[Array[Array[VertexID]]]): Double = {
+ vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble
+ }
+
+ val numVertices = this.ops.numVertices
+ val numEdges = this.ops.numEdges
+ val replicationRatioBoth = numReplicatedVertices(routingTable.bothAttrs) / numVertices
+ val replicationRatioSrcOnly = numReplicatedVertices(routingTable.srcAttrOnly) / numVertices
+ val replicationRatioDstOnly = numReplicatedVertices(routingTable.dstAttrOnly) / numVertices
+ // One entry for each partition, indicate the total number of edges on that partition.
+ val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges)
+ val minLoad = loadArray.min
+ val maxLoad = loadArray.max
+ Map(
+ "Num Vertices" -> numVertices,
+ "Num Edges" -> numEdges,
+ "Replication (both)" -> replicationRatioBoth,
+ "Replication (src only)" -> replicationRatioSrcOnly,
+ "Replication (dest only)" -> replicationRatioDstOnly,
+ "Load Array" -> loadArray,
+ "Min Load" -> minLoad,
+ "Max Load" -> maxLoad)
+ }
+
+ /**
+ * Display the lineage information for this graph.
+ */
+ def printLineage() = {
+ def traverseLineage(
+ rdd: RDD[_],
+ indent: String = "",
+ visited: Map[Int, String] = Map.empty[Int, String]) {
+ if (visited.contains(rdd.id)) {
+ println(indent + visited(rdd.id))
+ println(indent)
+ } else {
+ val locs = rdd.partitions.map( p => rdd.preferredLocations(p) )
+ val cacheLevel = rdd.getStorageLevel
+ val name = rdd.id
+ val deps = rdd.dependencies
+ val partitioner = rdd.partitioner
+ val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0}
+ println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner +
+ ", " + numparts +")")
+ println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString)
+ println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", "))
+ deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
+ }
+ }
+ println("edges ------------------------------------------")
+ traverseLineage(edges, " ")
+ var visited = Map(edges.id -> "edges")
+ println("\n\nvertices ------------------------------------------")
+ traverseLineage(vertices, " ", visited)
+ visited += (vertices.id -> "vertices")
+ println("\n\nroutingTable.bothAttrs -------------------------------")
+ traverseLineage(routingTable.bothAttrs, " ", visited)
+ visited += (routingTable.bothAttrs.id -> "routingTable.bothAttrs")
+ println("\n\ntriplets ----------------------------------------")
+ traverseLineage(triplets, " ", visited)
+ println(visited)
+ } // end of printLineage
+
+ override def reverse: Graph[VD, ED] = {
+ val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
+ new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
+ }
+
+ override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = {
+ if (classTag[VD] equals classTag[VD2]) {
+ // The map preserves type, so we can use incremental replication
+ val newVerts = vertices.mapVertexPartitions(_.map(f))
+ val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
+ val newReplicatedVertexView = new ReplicatedVertexView[VD2](
+ changedVerts, edges, routingTable,
+ Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
+ new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
+ } else {
+ // The map does not preserve type, so we must re-replicate all vertices
+ new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable)
+ }
+ }
+
+ override def mapEdges[ED2: ClassTag](
+ f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
+ val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
+ new GraphImpl(vertices, newETable , routingTable, replicatedVertexView)
+ }
+
+ override def mapTriplets[ED2: ClassTag](
+ f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
+ // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
+ // manifest from GraphImpl (which would require serializing GraphImpl).
+ val vdTag = classTag[VD]
+ val newEdgePartitions =
+ edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) {
+ (ePartIter, vTableReplicatedIter) =>
+ val (ePid, edgePartition) = ePartIter.next()
+ val (vPid, vPart) = vTableReplicatedIter.next()
+ assert(!vTableReplicatedIter.hasNext)
+ assert(ePid == vPid)
+ val et = new EdgeTriplet[VD, ED]
+ val inputIterator = edgePartition.iterator.map { e =>
+ et.set(e)
+ et.srcAttr = vPart(e.srcId)
+ et.dstAttr = vPart(e.dstId)
+ et
+ }
+ // Apply the user function to the vertex partition
+ val outputIter = f(ePid, inputIterator)
+ // Consume the iterator to update the edge attributes
+ val newEdgePartition = edgePartition.map(outputIter)
+ Iterator((ePid, newEdgePartition))
+ }
+ new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView)
+ }
+
+ override def subgraph(
+ epred: EdgeTriplet[VD, ED] => Boolean = x => true,
+ vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
+ // Filter the vertices, reusing the partitioner and the index from this graph
+ val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
+
+ // Filter the edges
+ val edTag = classTag[ED]
+ val newEdges = new EdgeRDD[ED](triplets.filter { et =>
+ vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)
+ }.mapPartitionsWithIndex( { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED]()(edTag)
+ iter.foreach { et => builder.add(et.srcId, et.dstId, et.attr) }
+ val edgePartition = builder.toEdgePartition
+ Iterator((pid, edgePartition))
+ }, preservesPartitioning = true)).cache()
+
+ // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
+ // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
+ // an edge.
+ new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView)
+ } // end of subgraph
+
+ override def mask[VD2: ClassTag, ED2: ClassTag] (
+ other: Graph[VD2, ED2]): Graph[VD, ED] = {
+ val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
+ val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
+ // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
+ // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
+ // an edge.
+ new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView)
+ }
+
+ override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
+ ClosureCleaner.clean(merge)
+ val newETable = edges.mapEdgePartitions((pid, part) => part.groupEdges(merge))
+ new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // Lower level transformation methods
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
+ override def mapReduceTriplets[A: ClassTag](
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ reduceFunc: (A, A) => A,
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
+
+ ClosureCleaner.clean(mapFunc)
+ ClosureCleaner.clean(reduceFunc)
+
+ // For each vertex, replicate its attribute only to partitions where it is
+ // in the relevant position in an edge.
+ val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
+ val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
+ val vs = activeSetOpt match {
+ case Some((activeSet, _)) =>
+ replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
+ case None =>
+ replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr)
+ }
+ val activeDirectionOpt = activeSetOpt.map(_._2)
+
+ // Map and combine.
+ val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
+ val (ePid, edgePartition) = ePartIter.next()
+ val (vPid, vPart) = vPartIter.next()
+ assert(!vPartIter.hasNext)
+ assert(ePid == vPid)
+ // Choose scan method
+ val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
+ val edgeIter = activeDirectionOpt match {
+ case Some(EdgeDirection.Both) =>
+ if (activeFraction < 0.8) {
+ edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
+ .filter(e => vPart.isActive(e.dstId))
+ } else {
+ edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
+ }
+ case Some(EdgeDirection.Out) =>
+ if (activeFraction < 0.8) {
+ edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
+ } else {
+ edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
+ }
+ case Some(EdgeDirection.In) =>
+ edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
+ case None =>
+ edgePartition.iterator
+ }
+
+ // Scan edges and run the map function
+ val et = new EdgeTriplet[VD, ED]
+ val mapOutputs = edgeIter.flatMap { e =>
+ et.set(e)
+ if (mapUsesSrcAttr) {
+ et.srcAttr = vPart(e.srcId)
+ }
+ if (mapUsesDstAttr) {
+ et.dstAttr = vPart(e.dstId)
+ }
+ mapFunc(et)
+ }
+ // Note: This doesn't allow users to send messages to arbitrary vertices.
+ vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
+ }
+
+ // do the final reduction reusing the index map
+ vertices.aggregateUsingIndex(preAgg, reduceFunc)
+ } // end of mapReduceTriplets
+
+ override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
+ (updates: RDD[(VertexID, U)])(updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = {
+ if (classTag[VD] equals classTag[VD2]) {
+ // updateF preserves type, so we can use incremental replication
+ val newVerts = vertices.leftJoin(updates)(updateF)
+ val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
+ val newReplicatedVertexView = new ReplicatedVertexView[VD2](
+ changedVerts, edges, routingTable,
+ Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
+ new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
+ } else {
+ // updateF does not preserve type, so we must re-replicate all vertices
+ val newVerts = vertices.leftJoin(updates)(updateF)
+ new GraphImpl(newVerts, edges, routingTable)
+ }
+ }
+
+ private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
+ try {
+ BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
+ } catch {
+ case _: ClassNotFoundException => true // if we don't know, be conservative
+ }
+ }
+} // end of class GraphImpl
+
+
+object GraphImpl {
+
+ def apply[VD: ClassTag, ED: ClassTag](
+ edges: RDD[Edge[ED]],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] =
+ {
+ fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
+ }
+
+ def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
+ edgePartitions: RDD[(PartitionID, EdgePartition[ED])],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
+ }
+
+ def apply[VD: ClassTag, ED: ClassTag](
+ vertices: RDD[(VertexID, VD)],
+ edges: RDD[Edge[ED]],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] =
+ {
+ val edgeRDD = createEdgeRDD(edges).cache()
+
+ // Get the set of all vids
+ val partitioner = Partitioner.defaultPartitioner(vertices)
+ val vPartitioned = vertices.partitionBy(partitioner)
+ val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner)
+ val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
+ vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
+ }
+
+ val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr)
+
+ new GraphImpl(vertexRDD, edgeRDD)
+ }
+
+ /**
+ * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
+ * data structure (RDD[(VertexID, VertexID, ED)]).
+ *
+ * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
+ * pair: the key is the partition id, and the value is an EdgePartition object containing all the
+ * edges in a partition.
+ */
+ private def createEdgeRDD[ED: ClassTag](
+ edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
+ val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED]
+ iter.foreach { e =>
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ Iterator((pid, builder.toEdgePartition))
+ }
+ new EdgeRDD(edgePartitions)
+ }
+
+ private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
+ edges: EdgeRDD[ED],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ edges.cache()
+ // Get the set of all vids
+ val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size))
+ // Create the VertexRDD.
+ val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
+ new GraphImpl(vertices, edges)
+ }
+
+ /** Collects all vids mentioned in edges and partitions them by partitioner. */
+ private def collectVertexIDsFromEdges(
+ edges: EdgeRDD[_],
+ partitioner: Partitioner): RDD[(VertexID, Int)] = {
+ // TODO: Consider doing map side distinct before shuffle.
+ new ShuffledRDD[VertexID, Int, (VertexID, Int)](
+ edges.collectVertexIDs.map(vid => (vid, 0)), partitioner)
+ .setSerializer(classOf[VertexIDMsgSerializer].getName)
+ }
+} // end of object GraphImpl
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
new file mode 100644
index 0000000000..ad5daf8f6a
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -0,0 +1,93 @@
+package org.apache.spark.graphx.impl
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.Partitioner
+import org.apache.spark.graphx.{PartitionID, VertexID}
+import org.apache.spark.rdd.{ShuffledRDD, RDD}
+
+
+class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
+ @transient var partition: PartitionID,
+ var vid: VertexID,
+ var data: T)
+ extends Product2[PartitionID, (VertexID, T)] with Serializable {
+
+ override def _1 = partition
+
+ override def _2 = (vid, data)
+
+ override def canEqual(that: Any): Boolean = that.isInstanceOf[VertexBroadcastMsg[_]]
+}
+
+
+/**
+ * A message used to send a specific value to a partition.
+ * @param partition index of the target partition.
+ * @param data value to send
+ */
+class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T](
+ @transient var partition: PartitionID,
+ var data: T)
+ extends Product2[PartitionID, T] with Serializable {
+
+ override def _1 = partition
+
+ override def _2 = data
+
+ override def canEqual(that: Any): Boolean = that.isInstanceOf[MessageToPartition[_]]
+}
+
+
+class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
+ def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
+ val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner)
+
+ // Set a custom serializer if the data is of int or double type.
+ if (classTag[T] == ClassTag.Int) {
+ rdd.setSerializer(classOf[IntVertexBroadcastMsgSerializer].getName)
+ } else if (classTag[T] == ClassTag.Long) {
+ rdd.setSerializer(classOf[LongVertexBroadcastMsgSerializer].getName)
+ } else if (classTag[T] == ClassTag.Double) {
+ rdd.setSerializer(classOf[DoubleVertexBroadcastMsgSerializer].getName)
+ }
+ rdd
+ }
+}
+
+
+class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {
+
+ /**
+ * Return a copy of the RDD partitioned using the specified partitioner.
+ */
+ def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = {
+ new ShuffledRDD[PartitionID, T, MessageToPartition[T]](self, partitioner)
+ }
+
+}
+
+
+object MsgRDDFunctions {
+ implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = {
+ new MsgRDDFunctions(rdd)
+ }
+
+ implicit def rdd2vertexMessageRDDFunctions[T: ClassTag](rdd: RDD[VertexBroadcastMsg[T]]) = {
+ new VertexBroadcastMsgRDDFunctions(rdd)
+ }
+
+ def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = {
+ val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner)
+
+ // Set a custom serializer if the data is of int or double type.
+ if (classTag[T] == ClassTag.Int) {
+ rdd.setSerializer(classOf[IntAggMsgSerializer].getName)
+ } else if (classTag[T] == ClassTag.Long) {
+ rdd.setSerializer(classOf[LongAggMsgSerializer].getName)
+ } else if (classTag[T] == ClassTag.Double) {
+ rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName)
+ }
+ rdd
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
new file mode 100644
index 0000000000..63180bc3af
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -0,0 +1,182 @@
+package org.apache.spark.graphx.impl
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
+
+import org.apache.spark.graphx._
+
+/**
+ * A view of the vertices after they are shipped to the join sites specified in
+ * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is
+ * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a
+ * fresh view is created.
+ *
+ * The view is always cached (i.e., once it is created, it remains materialized). This avoids
+ * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for
+ * example.
+ */
+private[impl]
+class ReplicatedVertexView[VD: ClassTag](
+ updatedVerts: VertexRDD[VD],
+ edges: EdgeRDD[_],
+ routingTable: RoutingTable,
+ prevViewOpt: Option[ReplicatedVertexView[VD]] = None) {
+
+ /**
+ * Within each edge partition, create a local map from vid to an index into the attribute
+ * array. Each map contains a superset of the vertices that it will receive, because it stores
+ * vids from both the source and destination of edges. It must always include both source and
+ * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
+ */
+ private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
+ case Some(prevView) =>
+ prevView.localVertexIDMap
+ case None =>
+ edges.partitionsRDD.mapPartitions(_.map {
+ case (pid, epart) =>
+ val vidToIndex = new VertexIdToIndexMap
+ epart.foreach { e =>
+ vidToIndex.add(e.srcId)
+ vidToIndex.add(e.dstId)
+ }
+ (pid, vidToIndex)
+ }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap")
+ }
+
+ private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
+ private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false)
+ private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true)
+ private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false)
+
+ def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = {
+ (includeSrc, includeDst) match {
+ case (true, true) => bothAttrs
+ case (true, false) => srcAttrOnly
+ case (false, true) => dstAttrOnly
+ case (false, false) => noAttrs
+ }
+ }
+
+ def get(
+ includeSrc: Boolean,
+ includeDst: Boolean,
+ actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = {
+ // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
+ // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
+ // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
+ // also shipped there.
+ val shippedActives = routingTable.get(true, true)
+ .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _))
+ .partitionBy(edges.partitioner.get)
+ // Update the view with shippedActives, setting activeness flags in the resulting
+ // VertexPartitions
+ get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
+ val (pid, vPart) = viewIter.next()
+ val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator))
+ Iterator((pid, newPart))
+ }
+ }
+
+ private def create(includeSrc: Boolean, includeDst: Boolean)
+ : RDD[(PartitionID, VertexPartition[VD])] = {
+ val vdTag = classTag[VD]
+
+ // Ship vertex attributes to edge partitions according to vertexPlacement
+ val verts = updatedVerts.partitionsRDD
+ val shippedVerts = routingTable.get(includeSrc, includeDst)
+ .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdTag))
+ .partitionBy(edges.partitioner.get)
+ // TODO: Consider using a specialized shuffler.
+
+ prevViewOpt match {
+ case Some(prevView) =>
+ // Update prevView with shippedVerts, setting staleness flags in the resulting
+ // VertexPartitions
+ prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) {
+ (prevViewIter, shippedVertsIter) =>
+ val (pid, prevVPart) = prevViewIter.next()
+ val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
+ Iterator((pid, newVPart))
+ }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst))
+
+ case None =>
+ // Within each edge partition, place the shipped vertex attributes into the correct
+ // locations specified in localVertexIDMap
+ localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
+ val (pid, vidToIndex) = mapIter.next()
+ assert(!mapIter.hasNext)
+ // Populate the vertex array using the vidToIndex map
+ val vertexArray = vdTag.newArray(vidToIndex.capacity)
+ for ((_, block) <- shippedVertsIter) {
+ for (i <- 0 until block.vids.size) {
+ val vid = block.vids(i)
+ val attr = block.attrs(i)
+ val ind = vidToIndex.getPos(vid)
+ vertexArray(ind) = attr
+ }
+ }
+ val newVPart = new VertexPartition(
+ vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag)
+ Iterator((pid, newVPart))
+ }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst))
+ }
+ }
+}
+
+object ReplicatedVertexView {
+ protected def buildBuffer[VD: ClassTag](
+ pid2vidIter: Iterator[Array[Array[VertexID]]],
+ vertexPartIter: Iterator[VertexPartition[VD]]) = {
+ val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
+ val vertexPart: VertexPartition[VD] = vertexPartIter.next()
+
+ Iterator.tabulate(pid2vid.size) { pid =>
+ val vidsCandidate = pid2vid(pid)
+ val size = vidsCandidate.length
+ val vids = new PrimitiveVector[VertexID](pid2vid(pid).size)
+ val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
+ var i = 0
+ while (i < size) {
+ val vid = vidsCandidate(i)
+ if (vertexPart.isDefined(vid)) {
+ vids += vid
+ attrs += vertexPart(vid)
+ }
+ i += 1
+ }
+ (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
+ }
+ }
+
+ protected def buildActiveBuffer(
+ pid2vidIter: Iterator[Array[Array[VertexID]]],
+ activePartIter: Iterator[VertexPartition[_]])
+ : Iterator[(Int, Array[VertexID])] = {
+ val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
+ val activePart: VertexPartition[_] = activePartIter.next()
+
+ Iterator.tabulate(pid2vid.size) { pid =>
+ val vidsCandidate = pid2vid(pid)
+ val size = vidsCandidate.length
+ val actives = new PrimitiveVector[VertexID](vidsCandidate.size)
+ var i = 0
+ while (i < size) {
+ val vid = vidsCandidate(i)
+ if (activePart.isDefined(vid)) {
+ actives += vid
+ }
+ i += 1
+ }
+ (pid, actives.trim().array)
+ }
+ }
+}
+
+class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD])
+ extends Serializable {
+ def iterator: Iterator[(VertexID, VD)] =
+ (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
new file mode 100644
index 0000000000..3bd8b24133
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
@@ -0,0 +1,64 @@
+package org.apache.spark.graphx.impl
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.collection.PrimitiveVector
+
+/**
+ * Stores the locations of edge-partition join sites for each vertex attribute; that is, the routing
+ * information for shipping vertex attributes to edge partitions. This is always cached because it
+ * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and
+ * (possibly) once to ship the active-set information.
+ */
+class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
+
+ val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true)
+ val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false)
+ val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true)
+ val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false)
+
+ def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] =
+ (includeSrcAttr, includeDstAttr) match {
+ case (true, true) => bothAttrs
+ case (true, false) => srcAttrOnly
+ case (false, true) => dstAttrOnly
+ case (false, false) => noAttrs
+ }
+
+ private def createPid2Vid(
+ includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = {
+ // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
+ val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
+ val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
+ val numEdges = edgePartition.size
+ val vSet = new VertexSet
+ if (includeSrcAttr) { // Add src vertices to the set.
+ var i = 0
+ while (i < numEdges) {
+ vSet.add(edgePartition.srcIds(i))
+ i += 1
+ }
+ }
+ if (includeDstAttr) { // Add dst vertices to the set.
+ var i = 0
+ while (i < numEdges) {
+ vSet.add(edgePartition.dstIds(i))
+ i += 1
+ }
+ }
+ vSet.iterator.map { vid => (vid, pid) }
+ }
+
+ val numPartitions = vertices.partitions.size
+ vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
+ val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID])
+ for ((vid, pid) <- iter) {
+ pid2vid(pid) += vid
+ }
+
+ Iterator(pid2vid.map(_.trim().array))
+ }.cache().setName("RoutingTable %s %s".format(includeSrcAttr, includeDstAttr))
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
new file mode 100644
index 0000000000..1c3c87f08d
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -0,0 +1,386 @@
+package org.apache.spark.graphx.impl
+
+import java.io.{EOFException, InputStream, OutputStream}
+import java.nio.ByteBuffer
+
+import org.apache.spark.SparkConf
+import org.apache.spark.graphx._
+import org.apache.spark.serializer._
+
+class VertexIDMsgSerializer(conf: SparkConf) extends Serializer {
+ override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+ override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
+ def writeObject[T](t: T) = {
+ val msg = t.asInstanceOf[(VertexID, _)]
+ writeVarLong(msg._1, optimizePositive = false)
+ this
+ }
+ }
+
+ override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
+ override def readObject[T](): T = {
+ (readVarLong(optimizePositive = false), null).asInstanceOf[T]
+ }
+ }
+ }
+}
+
+/** A special shuffle serializer for VertexBroadcastMessage[Int]. */
+class IntVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
+ override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+ override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
+ def writeObject[T](t: T) = {
+ val msg = t.asInstanceOf[VertexBroadcastMsg[Int]]
+ writeVarLong(msg.vid, optimizePositive = false)
+ writeInt(msg.data)
+ this
+ }
+ }
+
+ override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
+ override def readObject[T](): T = {
+ val a = readVarLong(optimizePositive = false)
+ val b = readInt()
+ new VertexBroadcastMsg[Int](0, a, b).asInstanceOf[T]
+ }
+ }
+ }
+}
+
+/** A special shuffle serializer for VertexBroadcastMessage[Long]. */
+class LongVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
+ override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+ override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
+ def writeObject[T](t: T) = {
+ val msg = t.asInstanceOf[VertexBroadcastMsg[Long]]
+ writeVarLong(msg.vid, optimizePositive = false)
+ writeLong(msg.data)
+ this
+ }
+ }
+
+ override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
+ override def readObject[T](): T = {
+ val a = readVarLong(optimizePositive = false)
+ val b = readLong()
+ new VertexBroadcastMsg[Long](0, a, b).asInstanceOf[T]
+ }
+ }
+ }
+}
+
+/** A special shuffle serializer for VertexBroadcastMessage[Double]. */
+class DoubleVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer {
+ override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+ override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
+ def writeObject[T](t: T) = {
+ val msg = t.asInstanceOf[VertexBroadcastMsg[Double]]
+ writeVarLong(msg.vid, optimizePositive = false)
+ writeDouble(msg.data)
+ this
+ }
+ }
+
+ override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
+ def readObject[T](): T = {
+ val a = readVarLong(optimizePositive = false)
+ val b = readDouble()
+ new VertexBroadcastMsg[Double](0, a, b).asInstanceOf[T]
+ }
+ }
+ }
+}
+
+/** A special shuffle serializer for AggregationMessage[Int]. */
+class IntAggMsgSerializer(conf: SparkConf) extends Serializer {
+ override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+ override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
+ def writeObject[T](t: T) = {
+ val msg = t.asInstanceOf[(VertexID, Int)]
+ writeVarLong(msg._1, optimizePositive = false)
+ writeUnsignedVarInt(msg._2)
+ this
+ }
+ }
+
+ override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
+ override def readObject[T](): T = {
+ val a = readVarLong(optimizePositive = false)
+ val b = readUnsignedVarInt()
+ (a, b).asInstanceOf[T]
+ }
+ }
+ }
+}
+
+/** A special shuffle serializer for AggregationMessage[Long]. */
+class LongAggMsgSerializer(conf: SparkConf) extends Serializer {
+ override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+ override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
+ def writeObject[T](t: T) = {
+ val msg = t.asInstanceOf[(VertexID, Long)]
+ writeVarLong(msg._1, optimizePositive = false)
+ writeVarLong(msg._2, optimizePositive = true)
+ this
+ }
+ }
+
+ override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
+ override def readObject[T](): T = {
+ val a = readVarLong(optimizePositive = false)
+ val b = readVarLong(optimizePositive = true)
+ (a, b).asInstanceOf[T]
+ }
+ }
+ }
+}
+
+/** A special shuffle serializer for AggregationMessage[Double]. */
+class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer {
+ override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+ override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
+ def writeObject[T](t: T) = {
+ val msg = t.asInstanceOf[(VertexID, Double)]
+ writeVarLong(msg._1, optimizePositive = false)
+ writeDouble(msg._2)
+ this
+ }
+ }
+
+ override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
+ def readObject[T](): T = {
+ val a = readVarLong(optimizePositive = false)
+ val b = readDouble()
+ (a, b).asInstanceOf[T]
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Helper classes to shorten the implementation of those special serializers.
+////////////////////////////////////////////////////////////////////////////////
+
+abstract class ShuffleSerializationStream(s: OutputStream) extends SerializationStream {
+ // The implementation should override this one.
+ def writeObject[T](t: T): SerializationStream
+
+ def writeInt(v: Int) {
+ s.write(v >> 24)
+ s.write(v >> 16)
+ s.write(v >> 8)
+ s.write(v)
+ }
+
+ def writeUnsignedVarInt(value: Int) {
+ if ((value >>> 7) == 0) {
+ s.write(value.toInt)
+ } else if ((value >>> 14) == 0) {
+ s.write((value & 0x7F) | 0x80)
+ s.write(value >>> 7)
+ } else if ((value >>> 21) == 0) {
+ s.write((value & 0x7F) | 0x80)
+ s.write(value >>> 7 | 0x80)
+ s.write(value >>> 14)
+ } else if ((value >>> 28) == 0) {
+ s.write((value & 0x7F) | 0x80)
+ s.write(value >>> 7 | 0x80)
+ s.write(value >>> 14 | 0x80)
+ s.write(value >>> 21)
+ } else {
+ s.write((value & 0x7F) | 0x80)
+ s.write(value >>> 7 | 0x80)
+ s.write(value >>> 14 | 0x80)
+ s.write(value >>> 21 | 0x80)
+ s.write(value >>> 28)
+ }
+ }
+
+ def writeVarLong(value: Long, optimizePositive: Boolean) {
+ val v = if (!optimizePositive) (value << 1) ^ (value >> 63) else value
+ if ((v >>> 7) == 0) {
+ s.write(v.toInt)
+ } else if ((v >>> 14) == 0) {
+ s.write(((v & 0x7F) | 0x80).toInt)
+ s.write((v >>> 7).toInt)
+ } else if ((v >>> 21) == 0) {
+ s.write(((v & 0x7F) | 0x80).toInt)
+ s.write((v >>> 7 | 0x80).toInt)
+ s.write((v >>> 14).toInt)
+ } else if ((v >>> 28) == 0) {
+ s.write(((v & 0x7F) | 0x80).toInt)
+ s.write((v >>> 7 | 0x80).toInt)
+ s.write((v >>> 14 | 0x80).toInt)
+ s.write((v >>> 21).toInt)
+ } else if ((v >>> 35) == 0) {
+ s.write(((v & 0x7F) | 0x80).toInt)
+ s.write((v >>> 7 | 0x80).toInt)
+ s.write((v >>> 14 | 0x80).toInt)
+ s.write((v >>> 21 | 0x80).toInt)
+ s.write((v >>> 28).toInt)
+ } else if ((v >>> 42) == 0) {
+ s.write(((v & 0x7F) | 0x80).toInt)
+ s.write((v >>> 7 | 0x80).toInt)
+ s.write((v >>> 14 | 0x80).toInt)
+ s.write((v >>> 21 | 0x80).toInt)
+ s.write((v >>> 28 | 0x80).toInt)
+ s.write((v >>> 35).toInt)
+ } else if ((v >>> 49) == 0) {
+ s.write(((v & 0x7F) | 0x80).toInt)
+ s.write((v >>> 7 | 0x80).toInt)
+ s.write((v >>> 14 | 0x80).toInt)
+ s.write((v >>> 21 | 0x80).toInt)
+ s.write((v >>> 28 | 0x80).toInt)
+ s.write((v >>> 35 | 0x80).toInt)
+ s.write((v >>> 42).toInt)
+ } else if ((v >>> 56) == 0) {
+ s.write(((v & 0x7F) | 0x80).toInt)
+ s.write((v >>> 7 | 0x80).toInt)
+ s.write((v >>> 14 | 0x80).toInt)
+ s.write((v >>> 21 | 0x80).toInt)
+ s.write((v >>> 28 | 0x80).toInt)
+ s.write((v >>> 35 | 0x80).toInt)
+ s.write((v >>> 42 | 0x80).toInt)
+ s.write((v >>> 49).toInt)
+ } else {
+ s.write(((v & 0x7F) | 0x80).toInt)
+ s.write((v >>> 7 | 0x80).toInt)
+ s.write((v >>> 14 | 0x80).toInt)
+ s.write((v >>> 21 | 0x80).toInt)
+ s.write((v >>> 28 | 0x80).toInt)
+ s.write((v >>> 35 | 0x80).toInt)
+ s.write((v >>> 42 | 0x80).toInt)
+ s.write((v >>> 49 | 0x80).toInt)
+ s.write((v >>> 56).toInt)
+ }
+ }
+
+ def writeLong(v: Long) {
+ s.write((v >>> 56).toInt)
+ s.write((v >>> 48).toInt)
+ s.write((v >>> 40).toInt)
+ s.write((v >>> 32).toInt)
+ s.write((v >>> 24).toInt)
+ s.write((v >>> 16).toInt)
+ s.write((v >>> 8).toInt)
+ s.write(v.toInt)
+ }
+
+ //def writeDouble(v: Double): Unit = writeUnsignedVarLong(java.lang.Double.doubleToLongBits(v))
+ def writeDouble(v: Double): Unit = writeLong(java.lang.Double.doubleToLongBits(v))
+
+ override def flush(): Unit = s.flush()
+
+ override def close(): Unit = s.close()
+}
+
+abstract class ShuffleDeserializationStream(s: InputStream) extends DeserializationStream {
+ // The implementation should override this one.
+ def readObject[T](): T
+
+ def readInt(): Int = {
+ val first = s.read()
+ if (first < 0) throw new EOFException
+ (first & 0xFF) << 24 | (s.read() & 0xFF) << 16 | (s.read() & 0xFF) << 8 | (s.read() & 0xFF)
+ }
+
+ def readUnsignedVarInt(): Int = {
+ var value: Int = 0
+ var i: Int = 0
+ def readOrThrow(): Int = {
+ val in = s.read()
+ if (in < 0) throw new EOFException
+ in & 0xFF
+ }
+ var b: Int = readOrThrow()
+ while ((b & 0x80) != 0) {
+ value |= (b & 0x7F) << i
+ i += 7
+ if (i > 35) throw new IllegalArgumentException("Variable length quantity is too long")
+ b = readOrThrow()
+ }
+ value | (b << i)
+ }
+
+ def readVarLong(optimizePositive: Boolean): Long = {
+ def readOrThrow(): Int = {
+ val in = s.read()
+ if (in < 0) throw new EOFException
+ in & 0xFF
+ }
+ var b = readOrThrow()
+ var ret: Long = b & 0x7F
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F) << 7
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F) << 14
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F) << 21
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 28
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 35
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 42
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 49
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= b.toLong << 56
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ if (!optimizePositive) (ret >>> 1) ^ -(ret & 1) else ret
+ }
+
+ def readLong(): Long = {
+ val first = s.read()
+ if (first < 0) throw new EOFException()
+ (first.toLong << 56) |
+ (s.read() & 0xFF).toLong << 48 |
+ (s.read() & 0xFF).toLong << 40 |
+ (s.read() & 0xFF).toLong << 32 |
+ (s.read() & 0xFF).toLong << 24 |
+ (s.read() & 0xFF) << 16 |
+ (s.read() & 0xFF) << 8 |
+ (s.read() & 0xFF)
+ }
+
+ //def readDouble(): Double = java.lang.Double.longBitsToDouble(readUnsignedVarLong())
+ def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong())
+
+ override def close(): Unit = s.close()
+}
+
+sealed trait ShuffleSerializerInstance extends SerializerInstance {
+
+ override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException
+
+ override def deserialize[T](bytes: ByteBuffer): T = throw new UnsupportedOperationException
+
+ override def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T =
+ throw new UnsupportedOperationException
+
+ // The implementation should override the following two.
+ override def serializeStream(s: OutputStream): SerializationStream
+ override def deserializeStream(s: InputStream): DeserializationStream
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
new file mode 100644
index 0000000000..7c83497ca9
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -0,0 +1,262 @@
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.collection.{BitSet, PrimitiveKeyOpenHashMap}
+
+import org.apache.spark.Logging
+import org.apache.spark.graphx._
+
+
+private[graphx] object VertexPartition {
+
+ def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = {
+ val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
+ iter.foreach { case (k, v) =>
+ map(k) = v
+ }
+ new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
+ }
+
+ def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD)
+ : VertexPartition[VD] =
+ {
+ val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
+ iter.foreach { case (k, v) =>
+ map.setMerge(k, v, mergeFunc)
+ }
+ new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
+ }
+}
+
+
+private[graphx]
+class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
+ val index: VertexIdToIndexMap,
+ val values: Array[VD],
+ val mask: BitSet,
+ /** A set of vids of active vertices. May contain vids not in index due to join rewrite. */
+ private val activeSet: Option[VertexSet] = None)
+ extends Logging {
+
+ val capacity: Int = index.capacity
+
+ def size: Int = mask.cardinality()
+
+ /** Return the vertex attribute for the given vertex ID. */
+ def apply(vid: VertexID): VD = values(index.getPos(vid))
+
+ def isDefined(vid: VertexID): Boolean = {
+ val pos = index.getPos(vid)
+ pos >= 0 && mask.get(pos)
+ }
+
+ /** Look up vid in activeSet, throwing an exception if it is None. */
+ def isActive(vid: VertexID): Boolean = {
+ activeSet.get.contains(vid)
+ }
+
+ /** The number of active vertices, if any exist. */
+ def numActives: Option[Int] = activeSet.map(_.size)
+
+ /**
+ * Pass each vertex attribute along with the vertex id through a map
+ * function and retain the original RDD's partitioning and index.
+ *
+ * @tparam VD2 the type returned by the map function
+ *
+ * @param f the function applied to each vertex id and vertex
+ * attribute in the RDD
+ *
+ * @return a new VertexPartition with values obtained by applying `f` to
+ * each of the entries in the original VertexRDD. The resulting
+ * VertexPartition retains the same index.
+ */
+ def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = {
+ // Construct a view of the map transformation
+ val newValues = new Array[VD2](capacity)
+ var i = mask.nextSetBit(0)
+ while (i >= 0) {
+ newValues(i) = f(index.getValue(i), values(i))
+ i = mask.nextSetBit(i + 1)
+ }
+ new VertexPartition[VD2](index, newValues, mask)
+ }
+
+ /**
+ * Restrict the vertex set to the set of vertices satisfying the given predicate.
+ *
+ * @param pred the user defined predicate
+ *
+ * @note The vertex set preserves the original index structure which means that the returned
+ * RDD can be easily joined with the original vertex-set. Furthermore, the filter only
+ * modifies the bitmap index and so no new values are allocated.
+ */
+ def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = {
+ // Allocate the array to store the results into
+ val newMask = new BitSet(capacity)
+ // Iterate over the active bits in the old mask and evaluate the predicate
+ var i = mask.nextSetBit(0)
+ while (i >= 0) {
+ if (pred(index.getValue(i), values(i))) {
+ newMask.set(i)
+ }
+ i = mask.nextSetBit(i + 1)
+ }
+ new VertexPartition(index, values, newMask)
+ }
+
+ /**
+ * Hides vertices that are the same between this and other. For vertices that are different, keeps
+ * the values from `other`. The indices of `this` and `other` must be the same.
+ */
+ def diff(other: VertexPartition[VD]): VertexPartition[VD] = {
+ if (index != other.index) {
+ logWarning("Diffing two VertexPartitions with different indexes is slow.")
+ diff(createUsingIndex(other.iterator))
+ } else {
+ val newMask = mask & other.mask
+ var i = newMask.nextSetBit(0)
+ while (i >= 0) {
+ if (values(i) == other.values(i)) {
+ newMask.unset(i)
+ }
+ i = newMask.nextSetBit(i + 1)
+ }
+ new VertexPartition(index, other.values, newMask)
+ }
+ }
+
+ /** Left outer join another VertexPartition. */
+ def leftJoin[VD2: ClassTag, VD3: ClassTag]
+ (other: VertexPartition[VD2])
+ (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ if (index != other.index) {
+ logWarning("Joining two VertexPartitions with different indexes is slow.")
+ leftJoin(createUsingIndex(other.iterator))(f)
+ } else {
+ val newValues = new Array[VD3](capacity)
+
+ var i = mask.nextSetBit(0)
+ while (i >= 0) {
+ val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None
+ newValues(i) = f(index.getValue(i), values(i), otherV)
+ i = mask.nextSetBit(i + 1)
+ }
+ new VertexPartition(index, newValues, mask)
+ }
+ }
+
+ /** Left outer join another iterator of messages. */
+ def leftJoin[VD2: ClassTag, VD3: ClassTag]
+ (other: Iterator[(VertexID, VD2)])
+ (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ leftJoin(createUsingIndex(other))(f)
+ }
+
+ /** Inner join another VertexPartition. */
+ def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
+ (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
+ if (index != other.index) {
+ logWarning("Joining two VertexPartitions with different indexes is slow.")
+ innerJoin(createUsingIndex(other.iterator))(f)
+ } else {
+ val newMask = mask & other.mask
+ val newValues = new Array[VD2](capacity)
+ var i = newMask.nextSetBit(0)
+ while (i >= 0) {
+ newValues(i) = f(index.getValue(i), values(i), other.values(i))
+ i = newMask.nextSetBit(i + 1)
+ }
+ new VertexPartition(index, newValues, newMask)
+ }
+ }
+
+ /**
+ * Inner join an iterator of messages.
+ */
+ def innerJoin[U: ClassTag, VD2: ClassTag]
+ (iter: Iterator[Product2[VertexID, U]])
+ (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
+ innerJoin(createUsingIndex(iter))(f)
+ }
+
+ /**
+ * Similar effect as aggregateUsingIndex((a, b) => a)
+ */
+ def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]])
+ : VertexPartition[VD2] = {
+ val newMask = new BitSet(capacity)
+ val newValues = new Array[VD2](capacity)
+ iter.foreach { case (vid, vdata) =>
+ val pos = index.getPos(vid)
+ if (pos >= 0) {
+ newMask.set(pos)
+ newValues(pos) = vdata
+ }
+ }
+ new VertexPartition[VD2](index, newValues, newMask)
+ }
+
+ /**
+ * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
+ * the partition, hidden by the bitmask.
+ */
+ def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = {
+ val newMask = new BitSet(capacity)
+ val newValues = new Array[VD](capacity)
+ System.arraycopy(values, 0, newValues, 0, newValues.length)
+ iter.foreach { case (vid, vdata) =>
+ val pos = index.getPos(vid)
+ if (pos >= 0) {
+ newMask.set(pos)
+ newValues(pos) = vdata
+ }
+ }
+ new VertexPartition(index, newValues, newMask)
+ }
+
+ def aggregateUsingIndex[VD2: ClassTag](
+ iter: Iterator[Product2[VertexID, VD2]],
+ reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
+ val newMask = new BitSet(capacity)
+ val newValues = new Array[VD2](capacity)
+ iter.foreach { product =>
+ val vid = product._1
+ val vdata = product._2
+ val pos = index.getPos(vid)
+ if (pos >= 0) {
+ if (newMask.get(pos)) {
+ newValues(pos) = reduceFunc(newValues(pos), vdata)
+ } else { // otherwise just store the new value
+ newMask.set(pos)
+ newValues(pos) = vdata
+ }
+ }
+ }
+ new VertexPartition[VD2](index, newValues, newMask)
+ }
+
+ def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = {
+ val newActiveSet = new VertexSet
+ iter.foreach(newActiveSet.add(_))
+ new VertexPartition(index, values, mask, Some(newActiveSet))
+ }
+
+ /**
+ * Construct a new VertexPartition whose index contains only the vertices in the mask.
+ */
+ def reindex(): VertexPartition[VD] = {
+ val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD]
+ val arbitraryMerge = (a: VD, b: VD) => a
+ for ((k, v) <- this.iterator) {
+ hashMap.setMerge(k, v, arbitraryMerge)
+ }
+ new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
+ }
+
+ def iterator: Iterator[(VertexID, VD)] =
+ mask.iterator.map(ind => (index.getValue(ind), values(ind)))
+
+ def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind))
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
new file mode 100644
index 0000000000..96f0d91c9b
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
@@ -0,0 +1,22 @@
+package org.apache.spark
+
+import org.apache.spark.util.collection.OpenHashSet
+
+
+package object graphx {
+
+ type VertexID = Long
+
+ // TODO: Consider using Char.
+ type PartitionID = Int
+
+ type VertexSet = OpenHashSet[VertexID]
+
+ // type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
+ type VertexIdToIndexMap = OpenHashSet[VertexID]
+
+ /**
+ * Return the default null-like value for a data type T.
+ */
+ def nullValue[T] = null.asInstanceOf[T]
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/perf/BagelTest.scala b/graphx/src/main/scala/org/apache/spark/graphx/perf/BagelTest.scala
new file mode 100644
index 0000000000..81332e0800
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/perf/BagelTest.scala
@@ -0,0 +1,76 @@
+///// This file creates circular dependencies between examples bagle and graph
+
+// package org.apache.spark.graphx.perf
+
+// import org.apache.spark._
+// import org.apache.spark.SparkContext._
+// import org.apache.spark.bagel.Bagel
+
+// import org.apache.spark.examples.bagel
+// //import org.apache.spark.bagel.examples._
+// import org.apache.spark.graphx._
+
+
+// object BagelTest {
+
+// def main(args: Array[String]) {
+// val host = args(0)
+// val taskType = args(1)
+// val fname = args(2)
+// val options = args.drop(3).map { arg =>
+// arg.dropWhile(_ == '-').split('=') match {
+// case Array(opt, v) => (opt -> v)
+// case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+// }
+// }
+
+// System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+// //System.setProperty("spark.shuffle.compress", "false")
+// System.setProperty("spark.kryo.registrator", "org.apache.spark.bagel.examples.PRKryoRegistrator")
+
+// var numIter = Int.MaxValue
+// var isDynamic = false
+// var tol:Float = 0.001F
+// var outFname = ""
+// var numVPart = 4
+// var numEPart = 4
+
+// options.foreach{
+// case ("numIter", v) => numIter = v.toInt
+// case ("dynamic", v) => isDynamic = v.toBoolean
+// case ("tol", v) => tol = v.toFloat
+// case ("output", v) => outFname = v
+// case ("numVPart", v) => numVPart = v.toInt
+// case ("numEPart", v) => numEPart = v.toInt
+// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+// }
+
+// val sc = new SparkContext(host, "PageRank(" + fname + ")")
+// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
+// val startTime = System.currentTimeMillis
+
+// val numVertices = g.vertices.count()
+
+// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) =>
+// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString)))
+// }
+
+// // Do the computation
+// val epsilon = 0.01 / numVertices
+// val messages = sc.parallelize(Array[(String, PRMessage)]())
+// val utils = new PageRankUtils
+// val result =
+// Bagel.run(
+// sc, vertices, messages, combiner = new PRCombiner(),
+// numPartitions = numVPart)(
+// utils.computeWithCombiner(numVertices, epsilon, numIter))
+
+// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) )
+// if (!outFname.isEmpty) {
+// println("Saving pageranks of pages to " + outFname)
+// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname)
+// }
+// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+// sc.stop()
+// }
+// }
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/perf/SparkTest.scala b/graphx/src/main/scala/org/apache/spark/graphx/perf/SparkTest.scala
new file mode 100644
index 0000000000..24262640ab
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/perf/SparkTest.scala
@@ -0,0 +1,75 @@
+///// This file creates circular dependencies between examples bagle and graph
+
+
+// package org.apache.spark.graphx.perf
+
+// import org.apache.spark._
+// import org.apache.spark.SparkContext._
+// import org.apache.spark.bagel.Bagel
+// import org.apache.spark.bagel.examples._
+// import org.apache.spark.graphx._
+
+
+// object SparkTest {
+
+// def main(args: Array[String]) {
+// val host = args(0)
+// val taskType = args(1)
+// val fname = args(2)
+// val options = args.drop(3).map { arg =>
+// arg.dropWhile(_ == '-').split('=') match {
+// case Array(opt, v) => (opt -> v)
+// case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+// }
+// }
+
+// System.setProperty("spark.serializer", "org.apache.spark.KryoSerializer")
+// //System.setProperty("spark.shuffle.compress", "false")
+// System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator")
+
+// var numIter = Int.MaxValue
+// var isDynamic = false
+// var tol:Float = 0.001F
+// var outFname = ""
+// var numVPart = 4
+// var numEPart = 4
+
+// options.foreach{
+// case ("numIter", v) => numIter = v.toInt
+// case ("dynamic", v) => isDynamic = v.toBoolean
+// case ("tol", v) => tol = v.toFloat
+// case ("output", v) => outFname = v
+// case ("numVPart", v) => numVPart = v.toInt
+// case ("numEPart", v) => numEPart = v.toInt
+// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+// }
+
+// val sc = new SparkContext(host, "PageRank(" + fname + ")")
+// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
+// val startTime = System.currentTimeMillis
+
+// val numVertices = g.vertices.count()
+
+// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) =>
+// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString)))
+// }
+
+// // Do the computation
+// val epsilon = 0.01 / numVertices
+// val messages = sc.parallelize(Array[(String, PRMessage)]())
+// val utils = new PageRankUtils
+// val result =
+// Bagel.run(
+// sc, vertices, messages, combiner = new PRCombiner(),
+// numPartitions = numVPart)(
+// utils.computeWithCombiner(numVertices, epsilon, numIter))
+
+// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) )
+// if (!outFname.isEmpty) {
+// println("Saving pageranks of pages to " + outFname)
+// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname)
+// }
+// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+// sc.stop()
+// }
+// }
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
new file mode 100644
index 0000000000..ec8d534333
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
@@ -0,0 +1,114 @@
+package org.apache.spark.graphx.util
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.collection.mutable.HashSet
+
+import org.apache.spark.util.Utils
+
+import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor}
+import org.objectweb.asm.Opcodes._
+
+
+
+private[spark] object BytecodeUtils {
+
+ /**
+ * Test whether the given closure invokes the specified method in the specified class.
+ */
+ def invokedMethod(closure: AnyRef, targetClass: Class[_], targetMethod: String): Boolean = {
+ if (_invokedMethod(closure.getClass, "apply", targetClass, targetMethod)) {
+ true
+ } else {
+ // look at closures enclosed in this closure
+ for (f <- closure.getClass.getDeclaredFields
+ if f.getType.getName.startsWith("scala.Function")) {
+ f.setAccessible(true)
+ if (invokedMethod(f.get(closure), targetClass, targetMethod)) {
+ return true
+ }
+ }
+ return false
+ }
+ }
+
+ private def _invokedMethod(cls: Class[_], method: String,
+ targetClass: Class[_], targetMethod: String): Boolean = {
+
+ val seen = new HashSet[(Class[_], String)]
+ var stack = List[(Class[_], String)]((cls, method))
+
+ while (stack.nonEmpty) {
+ val (c, m) = stack.head
+ stack = stack.tail
+ seen.add((c, m))
+ val finder = new MethodInvocationFinder(c.getName, m)
+ getClassReader(c).accept(finder, 0)
+ for (classMethod <- finder.methodsInvoked) {
+ //println(classMethod)
+ if (classMethod._1 == targetClass && classMethod._2 == targetMethod) {
+ return true
+ } else if (!seen.contains(classMethod)) {
+ stack = classMethod :: stack
+ }
+ }
+ }
+ return false
+ }
+
+ /**
+ * Get an ASM class reader for a given class from the JAR that loaded it.
+ */
+ private def getClassReader(cls: Class[_]): ClassReader = {
+ // Copy data over, before delegating to ClassReader - else we can run out of open file handles.
+ val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
+ val resourceStream = cls.getResourceAsStream(className)
+ // todo: Fixme - continuing with earlier behavior ...
+ if (resourceStream == null) return new ClassReader(resourceStream)
+
+ val baos = new ByteArrayOutputStream(128)
+ Utils.copyStream(resourceStream, baos, true)
+ new ClassReader(new ByteArrayInputStream(baos.toByteArray))
+ }
+
+ /**
+ * Given the class name, return whether we should look into the class or not. This is used to
+ * skip examing a large quantity of Java or Scala classes that we know for sure wouldn't access
+ * the closures. Note that the class name is expected in ASM style (i.e. use "/" instead of ".").
+ */
+ private def skipClass(className: String): Boolean = {
+ val c = className
+ c.startsWith("java/") || c.startsWith("scala/") || c.startsWith("javax/")
+ }
+
+ /**
+ * Find the set of methods invoked by the specified method in the specified class.
+ * For example, after running the visitor,
+ * MethodInvocationFinder("spark/graph/Foo", "test")
+ * its methodsInvoked variable will contain the set of methods invoked directly by
+ * Foo.test(). Interface invocations are not returned as part of the result set because we cannot
+ * determine the actual metod invoked by inspecting the bytecode.
+ */
+ private class MethodInvocationFinder(className: String, methodName: String)
+ extends ClassVisitor(ASM4) {
+
+ val methodsInvoked = new HashSet[(Class[_], String)]
+
+ override def visitMethod(access: Int, name: String, desc: String,
+ sig: String, exceptions: Array[String]): MethodVisitor = {
+ if (name == methodName) {
+ new MethodVisitor(ASM4) {
+ override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) {
+ if (op == INVOKEVIRTUAL || op == INVOKESPECIAL || op == INVOKESTATIC) {
+ if (!skipClass(owner)) {
+ methodsInvoked.add((Class.forName(owner.replace("/", ".")), name))
+ }
+ }
+ }
+ }
+ } else {
+ null
+ }
+ }
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
new file mode 100644
index 0000000000..57117241ad
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -0,0 +1,282 @@
+package org.apache.spark.graphx.util
+
+import scala.annotation.tailrec
+import scala.math._
+import scala.reflect.ClassTag
+import scala.util._
+
+import org.apache.spark._
+import org.apache.spark.serializer._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.Graph
+import org.apache.spark.graphx.Edge
+import org.apache.spark.graphx.impl.GraphImpl
+
+/**
+ * @todo cleanup and modularize code
+ */
+object GraphGenerators {
+
+ val RMATa = 0.45
+ val RMATb = 0.15
+ val RMATc = 0.15
+ val RMATd = 0.25
+
+ def main(args: Array[String]) {
+
+
+ val serializer = "org.apache.spark.serializer.KryoSerializer"
+ System.setProperty("spark.serializer", serializer)
+ //System.setProperty("spark.shuffle.compress", "false")
+ System.setProperty("spark.kryo.registrator", "spark.graphx.GraphKryoRegistrator")
+ val host = "local[4]"
+ val sc = new SparkContext(host, "Lognormal graph generator")
+
+ val lnGraph = logNormalGraph(sc, 10000)
+
+ val rmat = rmatGraph(sc, 1000, 3000)
+
+ //for (v <- lnGraph.vertices) {
+ // println(v.id + ":\t" + v.data)
+ //}
+
+ val times = 100000
+ //val nums = (1 to times).flatMap { n => List(sampleLogNormal(4.0, 1.3, times)) }.toList
+ //val avg = nums.sum / nums.length
+ //val sumSquares = nums.foldLeft(0.0) {(total, next) =>
+ // (total + math.pow((next - avg), 2)) }
+ //val stdev = math.sqrt(sumSquares/(nums.length - 1))
+
+ //println("avg: " + avg + "+-" + stdev)
+
+
+ //for (i <- 1 to 1000) {
+ // println(sampleLogNormal(4.0, 1.3, 1000))
+ //}
+
+ sc.stop()
+
+ }
+
+
+ // Right now it just generates a bunch of edges where
+ // the edge data is the weight (default 1)
+ def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = {
+ // based on Pregel settings
+ val mu = 4
+ val sigma = 1.3
+ //val vertsAndEdges = (0 until numVertices).flatMap { src => {
+
+ val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{
+ src => (src, sampleLogNormal(mu, sigma, numVertices))
+ }
+
+ val edges = vertices.flatMap{
+ v => generateRandomEdges(v._1.toInt, v._2, numVertices)
+ }
+
+ Graph(vertices, edges, 0)
+ //println("Vertices:")
+ //for (v <- vertices) {
+ // println(v.id)
+ //}
+
+ //println("Edges")
+ //for (e <- edges) {
+ // println(e.src, e.dst, e.data)
+ //}
+
+ }
+
+
+ def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = {
+ val rand = new Random()
+ var dsts: Set[Int] = Set()
+ while (dsts.size < numEdges) {
+ val nextDst = rand.nextInt(maxVertexID)
+ if (nextDst != src) {
+ dsts += nextDst
+ }
+ }
+ dsts.map {dst => Edge[Int](src, dst, 1) }.toArray
+ }
+
+
+ /**
+ * Randomly samples from a log normal distribution
+ * whose corresponding normal distribution has the
+ * the given mean and standard deviation. It uses
+ * the formula X = exp(m+s*Z) where m, s are the
+ * mean, standard deviation of the lognormal distribution
+ * and Z~N(0, 1). In this function,
+ * m = e^(mu+sigma^2/2) and
+ * s = sqrt[(e^(sigma^2) - 1)(e^(2*mu+sigma^2))].
+ *
+ * @param mu the mean of the normal distribution
+ * @param sigma the standard deviation of the normal distribution
+ * @param macVal exclusive upper bound on the value of the sample
+ */
+ def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = {
+ val rand = new Random()
+ val m = math.exp(mu+(sigma*sigma)/2.0)
+ val s = math.sqrt((math.exp(sigma*sigma) - 1) * math.exp(2*mu + sigma*sigma))
+ // Z ~ N(0, 1)
+ var X: Double = maxVal
+
+ while (X >= maxVal) {
+ val Z = rand.nextGaussian()
+ //X = math.exp((m + s*Z))
+ X = math.exp((mu + sigma*Z))
+ }
+ math.round(X.toFloat)
+ }
+
+
+
+ def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int): Graph[Int, Int] = {
+ // let N = requestedNumVertices
+ // the number of vertices is 2^n where n=ceil(log2[N])
+ // This ensures that the 4 quadrants are the same size at all recursion levels
+ val numVertices = math.round(math.pow(2.0, math.ceil(math.log(requestedNumVertices)/math.log(2.0)))).toInt
+ var edges: Set[Edge[Int]] = Set()
+ while (edges.size < numEdges) {
+ if (edges.size % 100 == 0) {
+ println(edges.size + " edges")
+ }
+ edges += addEdge(numVertices)
+
+ }
+ val graph = outDegreeFromEdges(sc.parallelize(edges.toList))
+ graph
+
+ }
+
+ def outDegreeFromEdges[ED: ClassTag](edges: RDD[Edge[ED]]): Graph[Int, ED] = {
+
+ val vertices = edges.flatMap { edge => List((edge.srcId, 1)) }
+ .reduceByKey(_ + _)
+ .map{ case (vid, degree) => (vid, degree) }
+ Graph(vertices, edges, 0)
+ }
+
+ /**
+ * @param numVertices Specifies the total number of vertices in the graph (used to get
+ * the dimensions of the adjacency matrix
+ */
+ def addEdge(numVertices: Int): Edge[Int] = {
+ //val (src, dst) = chooseCell(numVertices/2.0, numVertices/2.0, numVertices/2.0)
+ val v = math.round(numVertices.toFloat/2.0).toInt
+
+ val (src, dst) = chooseCell(v, v, v)
+ Edge[Int](src, dst, 1)
+ }
+
+
+ /**
+ * This method recursively subdivides the the adjacency matrix into quadrants
+ * until it picks a single cell. The naming conventions in this paper match
+ * those of the R-MAT paper. There are a power of 2 number of nodes in the graph.
+ * The adjacency matrix looks like:
+ *
+ * dst ->
+ * (x,y) *************** _
+ * | | | |
+ * | a | b | |
+ * src | | | |
+ * | *************** | T
+ * \|/ | | | |
+ * | c | d | |
+ * | | | |
+ * *************** -
+ *
+ * where this represents the subquadrant of the adj matrix currently being
+ * subdivided. (x,y) represent the upper left hand corner of the subquadrant,
+ * and T represents the side length (guaranteed to be a power of 2).
+ *
+ * After choosing the next level subquadrant, we get the resulting sets
+ * of parameters:
+ * quad = a, x'=x, y'=y, T'=T/2
+ * quad = b, x'=x+T/2, y'=y, T'=T/2
+ * quad = c, x'=x, y'=y+T/2, T'=T/2
+ * quad = d, x'=x+T/2, y'=y+T/2, T'=T/2
+ *
+ * @param src is the
+ */
+ @tailrec
+ def chooseCell(x: Int, y: Int, t: Int): (Int, Int) = {
+ if (t <= 1)
+ (x,y)
+ else {
+ val newT = math.round(t.toFloat/2.0).toInt
+ pickQuadrant(RMATa, RMATb, RMATc, RMATd) match {
+ case 0 => chooseCell(x, y, newT)
+ case 1 => chooseCell(x+newT, y, newT)
+ case 2 => chooseCell(x, y+newT, newT)
+ case 3 => chooseCell(x+newT, y+newT, newT)
+ }
+ }
+ }
+
+ // TODO(crankshaw) turn result into an enum (or case class for pattern matching}
+ def pickQuadrant(a: Double, b: Double, c: Double, d: Double): Int = {
+ if (a+b+c+d != 1.0) {
+ throw new IllegalArgumentException("R-MAT probability parameters sum to " + (a+b+c+d) + ", should sum to 1.0")
+ }
+ val rand = new Random()
+ val result = rand.nextDouble()
+ result match {
+ case x if x < a => 0 // 0 corresponds to quadrant a
+ case x if (x >= a && x < a+b) => 1 // 1 corresponds to b
+ case x if (x >= a+b && x < a+b+c) => 2 // 2 corresponds to c
+ case _ => 3 // 3 corresponds to d
+ }
+ }
+
+
+
+ /**
+ * Create `rows` by `cols` grid graph with each vertex connected to its
+ * row+1 and col+1 neighbors. Vertex ids are assigned in row major
+ * order.
+ *
+ * @param sc the spark context in which to construct the graph
+ * @param rows the number of rows
+ * @param cols the number of columns
+ *
+ * @return A graph containing vertices with the row and column ids
+ * as their attributes and edge values as 1.0.
+ */
+ def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
+ // Convert row column address into vertex ids (row major order)
+ def sub2ind(r: Int, c: Int): VertexID = r * cols + c
+
+ val vertices: RDD[(VertexID, (Int,Int))] =
+ sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
+ val edges: RDD[Edge[Double]] =
+ vertices.flatMap{ case (vid, (r,c)) =>
+ (if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++
+ (if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty })
+ }.map{ case (src, dst) => Edge(src, dst, 1.0) }
+ Graph(vertices, edges)
+ } // end of gridGraph
+
+ /**
+ * Create a star graph with vertex 0 being the center.
+ *
+ * @param sc the spark context in which to construct the graph
+ * @param nverts the number of vertices in the star
+ *
+ * @return A star graph containing `nverts` vertices with vertex 0
+ * being the center vertex.
+ */
+ def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
+ val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
+ Graph.fromEdgeTuples(edges, 1)
+ } // end of starGraph
+
+
+
+} // end of Graph Generators
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/HashUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/HashUtils.scala
new file mode 100644
index 0000000000..7a79d33350
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/HashUtils.scala
@@ -0,0 +1,21 @@
+package org.apache.spark.graphx.util
+
+
+object HashUtils {
+
+ /**
+ * Compute a 64-bit hash value for the given string.
+ * See http://stackoverflow.com/questions/1660501/what-is-a-good-64bit-hash-function-in-java-for-textual-strings
+ */
+ def hash(str: String): Long = {
+ var h = 1125899906842597L
+ val len = str.length
+ var i = 0
+
+ while (i < len) {
+ h = 31 * h + str(i)
+ i += 1
+ }
+ h
+ }
+}
diff --git a/graphx/src/test/resources/log4j.properties b/graphx/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..896936d8c4
--- /dev/null
+++ b/graphx/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=graph/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
new file mode 100644
index 0000000000..cc281fce99
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -0,0 +1,92 @@
+package org.apache.spark.graphx
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graphx.Graph._
+import org.apache.spark.graphx.impl.EdgePartition
+import org.apache.spark.rdd._
+import org.scalatest.FunSuite
+
+class GraphOpsSuite extends FunSuite with LocalSparkContext {
+
+ test("aggregateNeighbors") {
+ withSpark { sc =>
+ val n = 3
+ val star =
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1)
+
+ val indegrees = star.aggregateNeighbors(
+ (vid, edge) => Some(1),
+ (a: Int, b: Int) => a + b,
+ EdgeDirection.In)
+ assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet)
+
+ val outdegrees = star.aggregateNeighbors(
+ (vid, edge) => Some(1),
+ (a: Int, b: Int) => a + b,
+ EdgeDirection.Out)
+ assert(outdegrees.collect().toSet === Set((0, n)))
+
+ val noVertexValues = star.aggregateNeighbors[Int](
+ (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None,
+ (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"),
+ EdgeDirection.In)
+ assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)])
+ }
+ }
+
+ test("joinVertices") {
+ withSpark { sc =>
+ val vertices =
+ sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2)
+ val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
+ val g: Graph[String, String] = Graph(vertices, edges)
+
+ val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20)))
+ val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u }
+
+ val v = g1.vertices.collect().toSet
+ assert(v === Set((1, "one10"), (2, "two20"), (3, "three")))
+ }
+ }
+
+ test("collectNeighborIds") {
+ withSpark { sc =>
+ val chain = (0 until 100).map(x => (x, (x+1)%100) )
+ val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) }
+ val graph = Graph.fromEdgeTuples(rawEdges, 1.0)
+ val nbrs = graph.collectNeighborIds(EdgeDirection.Both)
+ assert(nbrs.count === chain.size)
+ assert(graph.numVertices === nbrs.count)
+ nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) }
+ nbrs.collect.foreach { case (vid, nbrs) =>
+ val s = nbrs.toSet
+ assert(s.contains((vid + 1) % 100))
+ assert(s.contains(if (vid > 0) vid - 1 else 99 ))
+ }
+ }
+ }
+
+ test ("filter") {
+ withSpark { sc =>
+ val n = 5
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
+ val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
+ val graph: Graph[Int, Int] = Graph(vertices, edges)
+ val filteredGraph = graph.filter(
+ graph => {
+ val degrees: VertexRDD[Int] = graph.outDegrees
+ graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
+ },
+ vpred = (vid: VertexID, deg:Int) => deg > 0
+ )
+
+ val v = filteredGraph.vertices.collect().toSet
+ assert(v === Set((0,0)))
+
+ // the map is necessary because of object-reuse in the edge iterator
+ val e = filteredGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet
+ assert(e.isEmpty)
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
new file mode 100644
index 0000000000..094fa722a0
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -0,0 +1,272 @@
+package org.apache.spark.graphx
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graphx.Graph._
+import org.apache.spark.rdd._
+
+class GraphSuite extends FunSuite with LocalSparkContext {
+
+ def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = {
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v")
+ }
+
+ test("Graph.fromEdgeTuples") {
+ withSpark { sc =>
+ val ring = (0L to 100L).zip((1L to 99L) :+ 0L)
+ val doubleRing = ring ++ ring
+ val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1)
+ assert(graph.edges.count() === doubleRing.size)
+ assert(graph.edges.collect.forall(e => e.attr == 1))
+
+ // uniqueEdges option should uniquify edges and store duplicate count in edge attributes
+ val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, Some(RandomVertexCut))
+ assert(uniqueGraph.edges.count() === ring.size)
+ assert(uniqueGraph.edges.collect.forall(e => e.attr == 2))
+ }
+ }
+
+ test("Graph.fromEdges") {
+ withSpark { sc =>
+ val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1) }
+ val graph = Graph.fromEdges(sc.parallelize(ring), 1.0F)
+ assert(graph.edges.count() === ring.size)
+ }
+ }
+
+ test("Graph.apply") {
+ withSpark { sc =>
+ val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L)
+ val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) }
+ val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
+ val graph = Graph(vertices, edges, false)
+ assert( graph.edges.count() === rawEdges.size )
+ // Vertices not explicitly provided but referenced by edges should be created automatically
+ assert( graph.vertices.count() === 100)
+ graph.triplets.map { et =>
+ assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr))
+ assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr))
+ }
+ }
+ }
+
+ test("triplets") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
+ (1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet)
+ }
+ }
+
+ test("partitionBy") {
+ withSpark { sc =>
+ def mkGraph(edges: List[(Long, Long)]) = Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
+ def nonemptyParts(graph: Graph[Int, Int]) = {
+ graph.edges.partitionsRDD.mapPartitions { iter =>
+ Iterator(iter.next()._2.iterator.toList)
+ }.filter(_.nonEmpty)
+ }
+ val identicalEdges = List((0L, 1L), (0L, 1L))
+ val canonicalEdges = List((0L, 1L), (1L, 0L))
+ val sameSrcEdges = List((0L, 1L), (0L, 2L))
+
+ // The two edges start out in different partitions
+ for (edges <- List(identicalEdges, canonicalEdges, sameSrcEdges)) {
+ assert(nonemptyParts(mkGraph(edges)).count === 2)
+ }
+ // partitionBy(RandomVertexCut) puts identical edges in the same partition
+ assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(RandomVertexCut)).count === 1)
+ // partitionBy(EdgePartition1D) puts same-source edges in the same partition
+ assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1)
+ // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
+ // the same partition
+ assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
+ // partitionBy(EdgePartition2D) puts identical edges in the same partition
+ assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1)
+
+ // partitionBy(EdgePartition2D) ensures that vertices need only be replicated to 2 * sqrt(p)
+ // partitions
+ val n = 100
+ val p = 100
+ val verts = 1 to n
+ val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
+ verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0)
+ assert(graph.edges.partitions.length === p)
+ val partitionedGraph = graph.partitionBy(EdgePartition2D)
+ assert(graph.edges.partitions.length === p)
+ val bound = 2 * math.sqrt(p)
+ // Each vertex should be replicated to at most 2 * sqrt(p) partitions
+ val partitionSets = partitionedGraph.edges.partitionsRDD.mapPartitions { iter =>
+ val part = iter.next()._2
+ Iterator((part.srcIds ++ part.dstIds).toSet)
+ }.collect
+ assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound))
+ // This should not be true for the default hash partitioning
+ val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
+ val part = iter.next()._2
+ Iterator((part.srcIds ++ part.dstIds).toSet)
+ }.collect
+ assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
+ }
+ }
+
+ test("mapVertices") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ // mapVertices preserving type
+ val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
+ assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet)
+ // mapVertices changing type
+ val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
+ assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet)
+ }
+ }
+
+ test("mapEdges") {
+ withSpark { sc =>
+ val n = 3
+ val star = starGraph(sc, n)
+ val starWithEdgeAttrs = star.mapEdges(e => e.dstId)
+
+ val edges = starWithEdgeAttrs.edges.collect()
+ assert(edges.size === n)
+ assert(edges.toSet === (1 to n).map(x => Edge(0, x, x)).toSet)
+ }
+ }
+
+ test("mapTriplets") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet ===
+ (1L to n).map(x => Edge(0, x, "vv")).toSet)
+ }
+ }
+
+ test("reverse") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet)
+ }
+ }
+
+ test("subgraph") {
+ withSpark { sc =>
+ // Create a star graph of 10 veritces.
+ val n = 10
+ val star = starGraph(sc, n)
+ // Take only vertices whose vids are even
+ val subgraph = star.subgraph(vpred = (vid, attr) => vid % 2 == 0)
+
+ // We should have 5 vertices.
+ assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet)
+
+ // And 4 edges.
+ assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
+ }
+ }
+
+ test("mask") {
+ withSpark { sc =>
+ val n = 5
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
+ val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
+ val graph: Graph[Int, Int] = Graph(vertices, edges)
+
+ val subgraph = graph.subgraph(
+ e => e.dstId != 4L,
+ (vid, vdata) => vid != 3L
+ ).mapVertices((vid, vdata) => -1).mapEdges(e => -1)
+
+ val projectedGraph = graph.mask(subgraph)
+
+ val v = projectedGraph.vertices.collect().toSet
+ assert(v === Set((0,0), (1,1), (2,2), (4,4), (5,5)))
+
+ // the map is necessary because of object-reuse in the edge iterator
+ val e = projectedGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet
+ assert(e === Set(Edge(0,1,1), Edge(0,2,2), Edge(0,5,5)))
+
+ }
+ }
+
+ test("groupEdges") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ val doubleStar = Graph.fromEdgeTuples(
+ sc.parallelize((1 to n).flatMap(x =>
+ List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v")
+ val star2 = doubleStar.groupEdges { (a, b) => a}
+ assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
+ star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
+ assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
+ }
+ }
+
+ test("mapReduceTriplets") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n).mapVertices { (_, _) => 0 }
+ val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
+ val neighborDegreeSums = starDeg.mapReduceTriplets(
+ edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
+ (a: Int, b: Int) => a + b)
+ assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
+
+ // activeSetOpt
+ val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID)
+ val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
+ val vids = complete.mapVertices((vid, attr) => vid).cache()
+ val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
+ val numEvenNeighbors = vids.mapReduceTriplets(et => {
+ // Map function should only run on edges with destination in the active set
+ if (et.dstId % 2 != 0) {
+ throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
+ }
+ Iterator((et.srcId, 1))
+ }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
+ assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet)
+
+ // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
+ val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3)
+ val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
+ val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_)
+ val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
+ val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
+ // Map function should only run on edges with source in the active set
+ if (et.srcId % 2 != 1) {
+ throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
+ }
+ Iterator((et.dstId, 1))
+ }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
+ assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet)
+
+ }
+ }
+
+ test("outerJoinVertices") {
+ withSpark { sc =>
+ val n = 5
+ val reverseStar = starGraph(sc, n).reverse
+ // outerJoinVertices changing type
+ val reverseStarDegrees =
+ reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
+ val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
+ et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
+ (a: Int, b: Int) => a + b).collect.toSet
+ assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0)))
+ // outerJoinVertices preserving type
+ val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
+ val newReverseStar =
+ reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
+ assert(newReverseStar.vertices.map(_._2).collect.toSet ===
+ (0 to n).map(x => "v%d".format(x)).toSet)
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
new file mode 100644
index 0000000000..6aec2ea8a9
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
@@ -0,0 +1,28 @@
+package org.apache.spark.graphx
+
+import org.scalatest.Suite
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkContext
+
+
+/**
+ * Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped
+ * after each test.
+*/
+trait LocalSparkContext {
+ System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+
+ /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
+ def withSpark[T](f: SparkContext => T) = {
+ val sc = new SparkContext("local", "test")
+ try {
+ f(sc)
+ } finally {
+ sc.stop()
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
+ }
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
new file mode 100644
index 0000000000..429622357f
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
@@ -0,0 +1,41 @@
+package org.apache.spark.graphx
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd._
+
+class PregelSuite extends FunSuite with LocalSparkContext {
+
+ test("1 iteration") {
+ withSpark { sc =>
+ val n = 5
+ val star =
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v")
+ val result = Pregel(star, 0)(
+ (vid, attr, msg) => attr,
+ et => Iterator.empty,
+ (a: Int, b: Int) => throw new Exception("mergeMsg run unexpectedly"))
+ assert(result.vertices.collect.toSet === star.vertices.collect.toSet)
+ }
+ }
+
+ test("chain propagation") {
+ withSpark { sc =>
+ val n = 5
+ val chain = Graph.fromEdgeTuples(
+ sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3),
+ 0).cache()
+ assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet)
+ val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }
+ assert(chainWithSeed.vertices.collect.toSet ===
+ Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet)
+ val result = Pregel(chainWithSeed, 0)(
+ (vid, attr, msg) => math.max(msg, attr),
+ et => Iterator((et.dstId, et.srcAttr)),
+ (a: Int, b: Int) => math.max(a, b))
+ assert(result.vertices.collect.toSet ===
+ chain.vertices.mapValues { (vid, attr) => attr + 1 }.collect.toSet)
+ }
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
new file mode 100644
index 0000000000..3ba412c1f8
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
@@ -0,0 +1,183 @@
+package org.apache.spark.graphx
+
+import java.io.{EOFException, ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark._
+import org.apache.spark.graphx.impl._
+import org.apache.spark.graphx.impl.MsgRDDFunctions._
+import org.apache.spark.serializer.SerializationStream
+
+
+class SerializerSuite extends FunSuite with LocalSparkContext {
+
+ test("IntVertexBroadcastMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = new VertexBroadcastMsg[Int](3, 4, 5)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject()
+ val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject()
+ assert(outMsg.vid === inMsg1.vid)
+ assert(outMsg.vid === inMsg2.vid)
+ assert(outMsg.data === inMsg1.data)
+ assert(outMsg.data === inMsg2.data)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("LongVertexBroadcastMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = new VertexBroadcastMsg[Long](3, 4, 5)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject()
+ val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject()
+ assert(outMsg.vid === inMsg1.vid)
+ assert(outMsg.vid === inMsg2.vid)
+ assert(outMsg.data === inMsg1.data)
+ assert(outMsg.data === inMsg2.data)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("DoubleVertexBroadcastMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject()
+ val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject()
+ assert(outMsg.vid === inMsg1.vid)
+ assert(outMsg.vid === inMsg2.vid)
+ assert(outMsg.data === inMsg1.data)
+ assert(outMsg.data === inMsg2.data)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("IntAggMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = (4: VertexID, 5)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: (VertexID, Int) = inStrm.readObject()
+ val inMsg2: (VertexID, Int) = inStrm.readObject()
+ assert(outMsg === inMsg1)
+ assert(outMsg === inMsg2)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("LongAggMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = (4: VertexID, 1L << 32)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: (VertexID, Long) = inStrm.readObject()
+ val inMsg2: (VertexID, Long) = inStrm.readObject()
+ assert(outMsg === inMsg1)
+ assert(outMsg === inMsg2)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("DoubleAggMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = (4: VertexID, 5.0)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: (VertexID, Double) = inStrm.readObject()
+ val inMsg2: (VertexID, Double) = inStrm.readObject()
+ assert(outMsg === inMsg1)
+ assert(outMsg === inMsg2)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("TestShuffleVertexBroadcastMsg") {
+ withSpark { sc =>
+ val bmsgs = sc.parallelize(0 until 100, 10).map { pid =>
+ new VertexBroadcastMsg[Int](pid, pid, pid)
+ }
+ bmsgs.partitionBy(new HashPartitioner(3)).collect()
+ }
+ }
+
+ test("variable long encoding") {
+ def testVarLongEncoding(v: Long, optimizePositive: Boolean) {
+ val bout = new ByteArrayOutputStream
+ val stream = new ShuffleSerializationStream(bout) {
+ def writeObject[T](t: T): SerializationStream = {
+ writeVarLong(t.asInstanceOf[Long], optimizePositive = optimizePositive)
+ this
+ }
+ }
+ stream.writeObject(v)
+
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val dstream = new ShuffleDeserializationStream(bin) {
+ def readObject[T](): T = {
+ readVarLong(optimizePositive).asInstanceOf[T]
+ }
+ }
+ val read = dstream.readObject[Long]()
+ assert(read === v)
+ }
+
+ // Test all variable encoding code path (each branch uses 7 bits, i.e. 1L << 7 difference)
+ val d = Random.nextLong() % 128
+ Seq[Long](0, 1L << 0 + d, 1L << 7 + d, 1L << 14 + d, 1L << 21 + d, 1L << 28 + d, 1L << 35 + d,
+ 1L << 42 + d, 1L << 49 + d, 1L << 56 + d, 1L << 63 + d).foreach { number =>
+ testVarLongEncoding(number, optimizePositive = false)
+ testVarLongEncoding(number, optimizePositive = true)
+ testVarLongEncoding(-number, optimizePositive = false)
+ testVarLongEncoding(-number, optimizePositive = true)
+ }
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
new file mode 100644
index 0000000000..573b708e89
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -0,0 +1,85 @@
+package org.apache.spark.graphx
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graphx.Graph._
+import org.apache.spark.graphx.impl.EdgePartition
+import org.apache.spark.rdd._
+import org.scalatest.FunSuite
+
+class VertexRDDSuite extends FunSuite with LocalSparkContext {
+
+ def vertices(sc: SparkContext, n: Int) = {
+ VertexRDD(sc.parallelize((0 to n).map(x => (x.toLong, x)), 5))
+ }
+
+ test("filter") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val evens = verts.filter(q => ((q._2 % 2) == 0))
+ assert(evens.count === (0 to n).filter(_ % 2 == 0).size)
+ }
+ }
+
+ test("mapValues") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val negatives = verts.mapValues(x => -x).cache() // Allow joining b with a derived RDD of b
+ assert(negatives.count === n + 1)
+ }
+ }
+
+ test("diff") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val flipEvens = verts.mapValues(x => if (x % 2 == 0) -x else x)
+ // diff should keep only the changed vertices
+ assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
+ // diff should keep the vertex values from `other`
+ assert(flipEvens.diff(verts).map(_._2).collect().toSet === (2 to n by 2).toSet)
+ }
+ }
+
+ test("leftJoin") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val evens = verts.filter(q => ((q._2 % 2) == 0))
+ // leftJoin with another VertexRDD
+ assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
+ (0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
+ // leftJoin with an RDD
+ val evensRDD = evens.map(identity)
+ assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
+ (0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
+ }
+ }
+
+ test("innerJoin") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val evens = verts.filter(q => ((q._2 % 2) == 0))
+ // innerJoin with another VertexRDD
+ assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect.toSet ===
+ (0 to n by 2).map(x => (x.toLong, 0)).toSet)
+ // innerJoin with an RDD
+ val evensRDD = evens.map(identity)
+ assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect.toSet ===
+ (0 to n by 2).map(x => (x.toLong, 0)).toSet) }
+ }
+
+ test("aggregateUsingIndex") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val messageTargets = (0 to n) ++ (0 to n by 2)
+ val messages = sc.parallelize(messageTargets.map(x => (x.toLong, 1)))
+ assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect.toSet ===
+ (0 to n).map(x => (x.toLong, if (x % 2 == 0) 2 else 1)).toSet)
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala
new file mode 100644
index 0000000000..5e2ecfcde9
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala
@@ -0,0 +1,83 @@
+package org.apache.spark.graphx.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
+
+ test("Grid Connected Components") {
+ withSpark { sc =>
+ val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache()
+ val ccGraph = ConnectedComponents.run(gridGraph).cache()
+ val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+ assert(maxCCid === 0)
+ }
+ } // end of Grid connected components
+
+
+ test("Reverse Grid Connected Components") {
+ withSpark { sc =>
+ val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache()
+ val ccGraph = ConnectedComponents.run(gridGraph).cache()
+ val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+ assert(maxCCid === 0)
+ }
+ } // end of Grid connected components
+
+
+ test("Chain Connected Components") {
+ withSpark { sc =>
+ val chain1 = (0 until 9).map(x => (x, x+1) )
+ val chain2 = (10 until 20).map(x => (x, x+1) )
+ val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
+ val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+ val ccGraph = ConnectedComponents.run(twoChains).cache()
+ val vertices = ccGraph.vertices.collect()
+ for ( (id, cc) <- vertices ) {
+ if(id < 10) { assert(cc === 0) }
+ else { assert(cc === 10) }
+ }
+ val ccMap = vertices.toMap
+ for (id <- 0 until 20) {
+ if (id < 10) {
+ assert(ccMap(id) === 0)
+ } else {
+ assert(ccMap(id) === 10)
+ }
+ }
+ }
+ } // end of chain connected components
+
+ test("Reverse Chain Connected Components") {
+ withSpark { sc =>
+ val chain1 = (0 until 9).map(x => (x, x+1) )
+ val chain2 = (10 until 20).map(x => (x, x+1) )
+ val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
+ val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse.cache()
+ val ccGraph = ConnectedComponents.run(twoChains).cache()
+ val vertices = ccGraph.vertices.collect
+ for ( (id, cc) <- vertices ) {
+ if (id < 10) {
+ assert(cc === 0)
+ } else {
+ assert(cc === 10)
+ }
+ }
+ val ccMap = vertices.toMap
+ for ( id <- 0 until 20 ) {
+ if (id < 10) {
+ assert(ccMap(id) === 0)
+ } else {
+ assert(ccMap(id) === 10)
+ }
+ }
+ }
+ } // end of reverse chain connected components
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala
new file mode 100644
index 0000000000..e365b1e230
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala
@@ -0,0 +1,126 @@
+package org.apache.spark.graphx.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.graphx._
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+
+import org.apache.spark.graphx.util.GraphGenerators
+
+
+object GridPageRank {
+ def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
+ val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
+ val outDegree = Array.fill(nRows * nCols)(0)
+ // Convert row column address into vertex ids (row major order)
+ def sub2ind(r: Int, c: Int): Int = r * nCols + c
+ // Make the grid graph
+ for (r <- 0 until nRows; c <- 0 until nCols) {
+ val ind = sub2ind(r,c)
+ if (r+1 < nRows) {
+ outDegree(ind) += 1
+ inNbrs(sub2ind(r+1,c)) += ind
+ }
+ if (c+1 < nCols) {
+ outDegree(ind) += 1
+ inNbrs(sub2ind(r,c+1)) += ind
+ }
+ }
+ // compute the pagerank
+ var pr = Array.fill(nRows * nCols)(resetProb)
+ for (iter <- 0 until nIter) {
+ val oldPr = pr
+ pr = new Array[Double](nRows * nCols)
+ for (ind <- 0 until (nRows * nCols)) {
+ pr(ind) = resetProb + (1.0 - resetProb) *
+ inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
+ }
+ }
+ (0L until (nRows * nCols)).zip(pr)
+ }
+
+}
+
+
+class PageRankSuite extends FunSuite with LocalSparkContext {
+
+ def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
+ a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
+ .map { case (id, error) => error }.sum
+ }
+
+ test("Star PageRank") {
+ withSpark { sc =>
+ val nVertices = 100
+ val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
+ val resetProb = 0.15
+ val errorTol = 1.0e-5
+
+ val staticRanks1 = PageRank.run(starGraph, numIter = 1, resetProb).vertices.cache()
+ val staticRanks2 = PageRank.run(starGraph, numIter = 2, resetProb).vertices.cache()
+
+ // Static PageRank should only take 2 iterations to converge
+ val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
+ if (pr1 != pr2) 1 else 0
+ }.map { case (vid, test) => test }.sum
+ assert(notMatching === 0)
+
+ val staticErrors = staticRanks2.map { case (vid, pr) =>
+ val correct = (vid > 0 && pr == resetProb) ||
+ (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
+ if (!correct) 1 else 0
+ }
+ assert(staticErrors.sum === 0)
+
+ val dynamicRanks = PageRank.runUntillConvergence(starGraph, 0, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(starGraph, 0, resetProb).cache()
+ assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+ assert(compareRanks(staticRanks2, standaloneRanks) < errorTol)
+ }
+ } // end of test Star PageRank
+
+
+
+ test("Grid PageRank") {
+ withSpark { sc =>
+ val rows = 10
+ val cols = 10
+ val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 50
+ val errorTol = 1.0e-5
+ val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
+
+ val staticRanks = PageRank.run(gridGraph, numIter, resetProb).vertices.cache()
+ val dynamicRanks = PageRank.runUntillConvergence(gridGraph, tol, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(gridGraph, tol, resetProb).cache()
+ val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb)))
+
+ assert(compareRanks(staticRanks, referenceRanks) < errorTol)
+ assert(compareRanks(dynamicRanks, referenceRanks) < errorTol)
+ assert(compareRanks(standaloneRanks, referenceRanks) < errorTol)
+ }
+ } // end of Grid PageRank
+
+
+ test("Chain PageRank") {
+ withSpark { sc =>
+ val chain1 = (0 until 9).map(x => (x, x+1) )
+ val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
+ val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+ val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 10
+ val errorTol = 1.0e-5
+
+ val staticRanks = PageRank.run(chain, numIter, resetProb).vertices.cache()
+ val dynamicRanks = PageRank.runUntillConvergence(chain, tol, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(chain, tol, resetProb).cache()
+
+ assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+ assert(compareRanks(dynamicRanks, standaloneRanks) < errorTol)
+ }
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala
new file mode 100644
index 0000000000..06604198d7
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala
@@ -0,0 +1,30 @@
+package org.apache.spark.graphx.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class SVDPlusPlusSuite extends FunSuite with LocalSparkContext {
+
+ test("Test SVD++ with mean square error on training set") {
+ withSpark { sc =>
+ val svdppErr = 8.0
+ val edges = sc.textFile("mllib/data/als/test.data").map { line =>
+ val fields = line.split(",")
+ Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble)
+ }
+ val conf = new SVDPlusPlusConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations
+ var (graph, u) = SVDPlusPlus.run(edges, conf)
+ val err = graph.vertices.collect.map{ case (vid, vd) =>
+ if (vid % 2 == 1) vd._4 else 0.0
+ }.reduce(_ + _) / graph.triplets.collect.size
+ assert(err <= svdppErr)
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala
new file mode 100644
index 0000000000..696b80944e
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala
@@ -0,0 +1,57 @@
+package org.apache.spark.graphx.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
+
+ test("Island Strongly Connected Components") {
+ withSpark { sc =>
+ val vertices = sc.parallelize((1L to 5L).map(x => (x, -1)))
+ val edges = sc.parallelize(Seq.empty[Edge[Int]])
+ val graph = Graph(vertices, edges)
+ val sccGraph = StronglyConnectedComponents.run(graph, 5)
+ for ((id, scc) <- sccGraph.vertices.collect) {
+ assert(id == scc)
+ }
+ }
+ }
+
+ test("Cycle Strongly Connected Components") {
+ withSpark { sc =>
+ val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7)))
+ val graph = Graph.fromEdgeTuples(rawEdges, -1)
+ val sccGraph = StronglyConnectedComponents.run(graph, 20)
+ for ((id, scc) <- sccGraph.vertices.collect) {
+ assert(0L == scc)
+ }
+ }
+ }
+
+ test("2 Cycle Strongly Connected Components") {
+ withSpark { sc =>
+ val edges =
+ Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++
+ Array(6L -> 0L, 5L -> 7L)
+ val rawEdges = sc.parallelize(edges)
+ val graph = Graph.fromEdgeTuples(rawEdges, -1)
+ val sccGraph = StronglyConnectedComponents.run(graph, 20)
+ for ((id, scc) <- sccGraph.vertices.collect) {
+ if (id < 3)
+ assert(0L == scc)
+ else if (id < 6)
+ assert(3L == scc)
+ else
+ assert(id == scc)
+ }
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala
new file mode 100644
index 0000000000..0e59912754
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala
@@ -0,0 +1,73 @@
+package org.apache.spark.graphx.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class TriangleCountSuite extends FunSuite with LocalSparkContext {
+
+ test("Count a single triangle") {
+ withSpark { sc =>
+ val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect.foreach { case (vid, count) => assert(count === 1) }
+ }
+ }
+
+ test("Count two triangles") {
+ withSpark { sc =>
+ val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
+ val rawEdges = sc.parallelize(triangles, 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect().foreach { case (vid, count) =>
+ if (vid == 0) {
+ assert(count === 2)
+ } else {
+ assert(count === 1)
+ }
+ }
+ }
+ }
+
+ test("Count two triangles with bi-directed edges") {
+ withSpark { sc =>
+ val triangles =
+ Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
+ val revTriangles = triangles.map { case (a,b) => (b,a) }
+ val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect().foreach { case (vid, count) =>
+ if (vid == 0) {
+ assert(count === 4)
+ } else {
+ assert(count === 2)
+ }
+ }
+ }
+ }
+
+ test("Count a single triangle with duplicate edges") {
+ withSpark { sc =>
+ val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect.foreach { case (vid, count) => assert(count === 1) }
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
new file mode 100644
index 0000000000..eb82436f09
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -0,0 +1,76 @@
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.graphx._
+
+class EdgePartitionSuite extends FunSuite {
+
+ test("reverse") {
+ val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
+ val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0))
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.reverse.iterator.map(_.copy()).toList === reversedEdges)
+ assert(edgePartition.reverse.reverse.iterator.map(_.copy()).toList === edges)
+ }
+
+ test("map") {
+ val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.map(e => e.srcId + e.dstId).iterator.map(_.copy()).toList ===
+ edges.map(e => e.copy(attr = e.srcId + e.dstId)))
+ }
+
+ test("groupEdges") {
+ val edges = List(
+ Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32))
+ val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36))
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges)
+ }
+
+ test("indexIterator") {
+ val edgesFrom0 = List(Edge(0, 1, 0))
+ val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
+ val sortedEdges = edgesFrom0 ++ edgesFrom1
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- Random.shuffle(sortedEdges)) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges)
+ assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
+ assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
+ }
+
+ test("innerJoin") {
+ def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
+ val builder = new EdgePartitionBuilder[A]
+ for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) }
+ builder.toEdgePartition
+ }
+ val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
+ val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
+ val a = makeEdgePartition(aList)
+ val b = makeEdgePartition(bList)
+
+ assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList ===
+ List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0)))
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
new file mode 100644
index 0000000000..d37d64e8c8
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
@@ -0,0 +1,113 @@
+package org.apache.spark.graphx.impl
+
+import org.apache.spark.graphx._
+import org.scalatest.FunSuite
+
+class VertexPartitionSuite extends FunSuite {
+
+ test("isDefined, filter") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).filter { (vid, attr) => vid == 0 }
+ assert(vp.isDefined(0))
+ assert(!vp.isDefined(1))
+ assert(!vp.isDefined(2))
+ assert(!vp.isDefined(-1))
+ }
+
+ test("isActive, numActives, replaceActives") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1)))
+ .filter { (vid, attr) => vid == 0 }
+ .replaceActives(Iterator(0, 2, 0))
+ assert(vp.isActive(0))
+ assert(!vp.isActive(1))
+ assert(vp.isActive(2))
+ assert(!vp.isActive(-1))
+ assert(vp.numActives == Some(2))
+ }
+
+ test("map") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).map { (vid, attr) => 2 }
+ assert(vp(0) === 2)
+ }
+
+ test("diff") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val vp2 = vp.filter { (vid, attr) => vid <= 1 }
+ val vp3a = vp.map { (vid, attr) => 2 }
+ val vp3b = VertexPartition(vp3a.iterator)
+ // diff with same index
+ val diff1 = vp2.diff(vp3a)
+ assert(diff1(0) === 2)
+ assert(diff1(1) === 2)
+ assert(diff1(2) === 2)
+ assert(!diff1.isDefined(2))
+ // diff with different indexes
+ val diff2 = vp2.diff(vp3b)
+ assert(diff2(0) === 2)
+ assert(diff2(1) === 2)
+ assert(diff2(2) === 2)
+ assert(!diff2.isDefined(2))
+ }
+
+ test("leftJoin") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val vp2a = vp.filter { (vid, attr) => vid <= 1 }.map { (vid, attr) => 2 }
+ val vp2b = VertexPartition(vp2a.iterator)
+ // leftJoin with same index
+ val join1 = vp.leftJoin(vp2a) { (vid, a, bOpt) => bOpt.getOrElse(a) }
+ assert(join1.iterator.toSet === Set((0L, 2), (1L, 2), (2L, 1)))
+ // leftJoin with different indexes
+ val join2 = vp.leftJoin(vp2b) { (vid, a, bOpt) => bOpt.getOrElse(a) }
+ assert(join2.iterator.toSet === Set((0L, 2), (1L, 2), (2L, 1)))
+ // leftJoin an iterator
+ val join3 = vp.leftJoin(vp2a.iterator) { (vid, a, bOpt) => bOpt.getOrElse(a) }
+ assert(join3.iterator.toSet === Set((0L, 2), (1L, 2), (2L, 1)))
+ }
+
+ test("innerJoin") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val vp2a = vp.filter { (vid, attr) => vid <= 1 }.map { (vid, attr) => 2 }
+ val vp2b = VertexPartition(vp2a.iterator)
+ // innerJoin with same index
+ val join1 = vp.innerJoin(vp2a) { (vid, a, b) => b }
+ assert(join1.iterator.toSet === Set((0L, 2), (1L, 2)))
+ // innerJoin with different indexes
+ val join2 = vp.innerJoin(vp2b) { (vid, a, b) => b }
+ assert(join2.iterator.toSet === Set((0L, 2), (1L, 2)))
+ // innerJoin an iterator
+ val join3 = vp.innerJoin(vp2a.iterator) { (vid, a, b) => b }
+ assert(join3.iterator.toSet === Set((0L, 2), (1L, 2)))
+ }
+
+ test("createUsingIndex") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val elems = List((0L, 2), (2L, 2), (3L, 2))
+ val vp2 = vp.createUsingIndex(elems.iterator)
+ assert(vp2.iterator.toSet === Set((0L, 2), (2L, 2)))
+ assert(vp.index === vp2.index)
+ }
+
+ test("innerJoinKeepLeft") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val elems = List((0L, 2), (2L, 2), (3L, 2))
+ val vp2 = vp.innerJoinKeepLeft(elems.iterator)
+ assert(vp2.iterator.toSet === Set((0L, 2), (2L, 2)))
+ assert(vp2(1) === 1)
+ }
+
+ test("aggregateUsingIndex") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val messages = List((0L, "a"), (2L, "b"), (0L, "c"), (3L, "d"))
+ val vp2 = vp.aggregateUsingIndex[String](messages.iterator, _ + _)
+ assert(vp2.iterator.toSet === Set((0L, "ac"), (2L, "b")))
+ }
+
+ test("reindex") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val vp2 = vp.filter { (vid, attr) => vid <= 1 }
+ val vp3 = vp2.reindex()
+ assert(vp2.iterator.toSet === vp3.iterator.toSet)
+ assert(vp2(2) === 1)
+ assert(vp3.index.getPos(2) === -1)
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala
new file mode 100644
index 0000000000..11db339750
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala
@@ -0,0 +1,93 @@
+package org.apache.spark.graphx.util
+
+import org.scalatest.FunSuite
+
+
+class BytecodeUtilsSuite extends FunSuite {
+
+ import BytecodeUtilsSuite.TestClass
+
+ test("closure invokes a method") {
+ val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); }
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+
+ val c2 = {e: TestClass => println(e.foo); println(e.bar); }
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo"))
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar"))
+ assert(!BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz"))
+
+ val c3 = {e: TestClass => println(e.foo); }
+ assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar"))
+ assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz"))
+ }
+
+ test("closure inside a closure invokes a method") {
+ val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); }
+ val c2 = {e: TestClass => c1(e); println(e.foo); }
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo"))
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz"))
+ }
+
+ test("closure inside a closure inside a closure invokes a method") {
+ val c1 = {e: TestClass => println(e.baz); }
+ val c2 = {e: TestClass => c1(e); println(e.foo); }
+ val c3 = {e: TestClass => c2(e) }
+ assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz"))
+ }
+
+ test("closure calling a function that invokes a method") {
+ def zoo(e: TestClass) {
+ println(e.baz)
+ }
+ val c1 = {e: TestClass => zoo(e)}
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+ }
+
+ test("closure calling a function that invokes a method which uses another closure") {
+ val c2 = {e: TestClass => println(e.baz)}
+ def zoo(e: TestClass) {
+ c2(e)
+ }
+ val c1 = {e: TestClass => zoo(e)}
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+ }
+
+ test("nested closure") {
+ val c2 = {e: TestClass => println(e.baz)}
+ def zoo(e: TestClass, c: TestClass => Unit) {
+ c(e)
+ }
+ val c1 = {e: TestClass => zoo(e, c2)}
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+ }
+
+ // The following doesn't work yet, because the byte code doesn't contain any information
+ // about what exactly "c" is.
+// test("invoke interface") {
+// val c1 = {e: TestClass => c(e)}
+// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+// assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+// }
+
+ private val c = {e: TestClass => println(e.baz)}
+}
+
+
+object BytecodeUtilsSuite {
+ class TestClass(val foo: Int, val bar: Long) {
+ def baz: Boolean = false
+ }
+}