aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-18 13:00:58 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-18 13:00:58 -0800
commit9193a8f7887b919cc62dc308fbdd2d3d92d8a746 (patch)
treeb96177c6690dc2d33c469058b6f655bbd0496e7e /graph/src
parentcb20175f97f2cf95af228d2bac8d21fa92122b2f (diff)
parent3fd2e09ffb8718f347f9fa1fb057d8738ce73c80 (diff)
downloadspark-9193a8f7887b919cc62dc308fbdd2d3d92d8a746.tar.gz
spark-9193a8f7887b919cc62dc308fbdd2d3d92d8a746.tar.bz2
spark-9193a8f7887b919cc62dc308fbdd2d3d92d8a746.zip
Merge remote-tracking branch 'upstream/master' into add_project_to_graph
Conflicts: graph/src/main/scala/org/apache/spark/graph/Graph.scala graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Analytics.scala429
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Edge.scala9
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala67
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala16
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala170
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala6
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLab.scala22
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala64
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphOps.scala21
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala8
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Pregel.scala131
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala378
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala593
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala37
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala205
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala158
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala76
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala99
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala42
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala41
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala562
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala42
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala112
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala233
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala248
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala76
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/package.scala10
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala4
-rw-r--r--graph/src/test/resources/log4j.properties28
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala146
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala119
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala43
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala81
33 files changed, 2348 insertions, 1928 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index 042bcd9825..2012dadb2f 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -1,6 +1,8 @@
package org.apache.spark.graph
import org.apache.spark._
+import org.apache.spark.graph.algorithms._
+
/**
* The Analytics object contains a collection of basic graph analytics
@@ -12,272 +14,6 @@ import org.apache.spark._
*/
object Analytics extends Logging {
- /**
- * Run PageRank for a fixed number of iterations returning a graph
- * with vertex attributes containing the PageRank and edge
- * attributes the normalized edge weight.
- *
- * The following PageRank fixed point is computed for each vertex.
- *
- * {{{
- * var PR = Array.fill(n)( 1.0 )
- * val oldPR = Array.fill(n)( 1.0 )
- * for( iter <- 0 until numIter ) {
- * swap(oldPR, PR)
- * for( i <- 0 until n ) {
- * PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
- * }
- * }
- * }}}
- *
- * where `alpha` is the random reset probability (typically 0.15),
- * `inNbrs[i]` is the set of neighbors whick link to `i` and
- * `outDeg[j]` is the out degree of vertex `j`.
- *
- * Note that this is not the "normalized" PageRank and as a
- * consequence pages that have no inlinks will have a PageRank of
- * alpha.
- *
- * @tparam VD the original vertex attribute (not used)
- * @tparam ED the original edge attribute (not used)
- *
- * @param graph the graph on which to compute PageRank
- * @param numIter the number of iterations of PageRank to run
- * @param resetProb the random reset probability (alpha)
- *
- * @return the graph containing with each vertex containing the
- * PageRank and each edge containing the normalized weight.
- *
- */
- def pagerank[VD: Manifest, ED: Manifest](
- graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15):
- Graph[Double, Double] = {
-
- /**
- * Initialize the pagerankGraph with each edge attribute having
- * weight 1/outDegree and each vertex with attribute 1.0.
- */
- val pagerankGraph: Graph[Double, Double] = graph
- // Associate the degree with each vertex
- .outerJoinVertices(graph.outDegrees){
- (vid, vdata, deg) => deg.getOrElse(0)
- }
- // Set the weight on the edges based on the degree
- .mapTriplets( e => 1.0 / e.srcAttr )
- // Set the vertex attributes to the initial pagerank values
- .mapVertices( (id, attr) => 1.0 )
-
- // Display statistics about pagerank
- println(pagerankGraph.statistics)
-
- // Define the three functions needed to implement PageRank in the GraphX
- // version of Pregel
- def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
- resetProb + (1.0 - resetProb) * msgSum
- def sendMessage(edge: EdgeTriplet[Double, Double]) =
- Iterator((edge.dstId, edge.srcAttr * edge.attr))
- def messageCombiner(a: Double, b: Double): Double = a + b
- // The initial message received by all vertices in PageRank
- val initialMessage = 0.0
-
- // Execute pregel for a fixed number of iterations.
- Pregel(pagerankGraph, initialMessage, numIter)(
- vertexProgram, sendMessage, messageCombiner)
- }
-
-
- /**
- * Run a dynamic version of PageRank returning a graph with vertex
- * attributes containing the PageRank and edge attributes containing
- * the normalized edge weight.
- *
- * {{{
- * var PR = Array.fill(n)( 1.0 )
- * val oldPR = Array.fill(n)( 0.0 )
- * while( max(abs(PR - oldPr)) > tol ) {
- * swap(oldPR, PR)
- * for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) {
- * PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
- * }
- * }
- * }}}
- *
- * where `alpha` is the random reset probability (typically 0.15),
- * `inNbrs[i]` is the set of neighbors whick link to `i` and
- * `outDeg[j]` is the out degree of vertex `j`.
- *
- * Note that this is not the "normalized" PageRank and as a
- * consequence pages that have no inlinks will have a PageRank of
- * alpha.
- *
- * @tparam VD the original vertex attribute (not used)
- * @tparam ED the original edge attribute (not used)
- *
- * @param graph the graph on which to compute PageRank
- * @param tol the tolerance allowed at convergence (smaller => more
- * accurate).
- * @param resetProb the random reset probability (alpha)
- *
- * @return the graph containing with each vertex containing the
- * PageRank and each edge containing the normalized weight.
- */
- def deltaPagerank[VD: Manifest, ED: Manifest](
- graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15):
- Graph[Double, Double] = {
-
- /**
- * Initialize the pagerankGraph with each edge attribute
- * having weight 1/outDegree and each vertex with attribute 1.0.
- */
- val pagerankGraph: Graph[(Double, Double), Double] = graph
- // Associate the degree with each vertex
- .outerJoinVertices(graph.outDegrees){
- (vid, vdata, deg) => deg.getOrElse(0)
- }
- // Set the weight on the edges based on the degree
- .mapTriplets( e => 1.0 / e.srcAttr )
- // Set the vertex attributes to (initalPR, delta = 0)
- .mapVertices( (id, attr) => (0.0, 0.0) )
-
- // Display statistics about pagerank
- println(pagerankGraph.statistics)
-
- // Define the three functions needed to implement PageRank in the GraphX
- // version of Pregel
- def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = {
- val (oldPR, lastDelta) = attr
- val newPR = oldPR + (1.0 - resetProb) * msgSum
- (newPR, newPR - oldPR)
- }
- def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
- if (edge.srcAttr._2 > tol) {
- Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
- } else {
- Iterator.empty
- }
- }
- def messageCombiner(a: Double, b: Double): Double = a + b
- // The initial message received by all vertices in PageRank
- val initialMessage = resetProb / (1.0 - resetProb)
-
- // Execute a dynamic version of Pregel.
- Pregel(pagerankGraph, initialMessage)(
- vertexProgram, sendMessage, messageCombiner)
- .mapVertices( (vid, attr) => attr._1 )
- } // end of deltaPageRank
-
-
- /**
- * Compute the connected component membership of each vertex and
- * return an RDD with the vertex value containing the lowest vertex
- * id in the connected component containing that vertex.
- *
- * @tparam VD the vertex attribute type (discarded in the
- * computation)
- * @tparam ED the edge attribute type (preserved in the computation)
- *
- * @param graph the graph for which to compute the connected
- * components
- *
- * @return a graph with vertex attributes containing the smallest
- * vertex in each connected component
- */
- def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]):
- Graph[Vid, ED] = {
- val ccGraph = graph.mapVertices { case (vid, _) => vid }
-
- def sendMessage(edge: EdgeTriplet[Vid, ED]) = {
- if (edge.srcAttr < edge.dstAttr) {
- Iterator((edge.dstId, edge.srcAttr))
- } else if (edge.srcAttr > edge.dstAttr) {
- Iterator((edge.srcId, edge.dstAttr))
- } else {
- Iterator.empty
- }
- }
- val initialMessage = Long.MaxValue
- Pregel(ccGraph, initialMessage)(
- (id, attr, msg) => math.min(attr, msg),
- sendMessage,
- (a,b) => math.min(a,b)
- )
- } // end of connectedComponents
-
-
- /**
- * Compute the number of triangles passing through each vertex.
- *
- * The algorithm is relatively straightforward and can be computed in
- * three steps:
- *
- * 1) Compute the set of neighbors for each vertex
- * 2) For each edge compute the intersection of the sets and send the
- * count to both vertices.
- * 3) Compute the sum at each vertex and divide by two since each
- * triangle is counted twice.
- *
- *
- * @param graph a graph with `sourceId` less than `destId`
- * @tparam VD
- * @tparam ED
- * @return
- */
- def triangleCount[VD: ClassManifest, ED: ClassManifest](rawGraph: Graph[VD,ED]):
- Graph[Int, ED] = {
- // Remove redundant edges
- val graph = rawGraph.groupEdges( (a,b) => a ).cache
-
- // Construct set representations of the neighborhoods
- val nbrSets: VertexSetRDD[VertexSet] =
- graph.collectNeighborIds(EdgeDirection.Both).mapValuesWithKeys { (vid, nbrs) =>
- val set = new VertexSet(4)
- var i = 0
- while (i < nbrs.size) {
- // prevent self cycle
- if(nbrs(i) != vid) {
- set.add(nbrs(i))
- }
- i += 1
- }
- set
- }
- // join the sets with the graph
- val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
- (vid, _, optSet) => optSet.getOrElse(null)
- }
- // Edge function computes intersection of smaller vertex with larger vertex
- def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(Vid, Int)] = {
- assert(et.srcAttr != null)
- assert(et.dstAttr != null)
- val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
- (et.srcAttr, et.dstAttr)
- } else {
- (et.dstAttr, et.srcAttr)
- }
- val iter = smallSet.iterator()
- var counter: Int = 0
- while (iter.hasNext) {
- val vid = iter.next
- if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { counter += 1 }
- }
- Iterator((et.srcId, counter), (et.dstId, counter))
- }
- // compute the intersection along edges
- val counters: VertexSetRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _)
- // Merge counters with the graph and divide by two since each triangle is counted twice
- graph.outerJoinVertices(counters) {
- (vid, _, optCounter: Option[Int]) =>
- val dblCount = optCounter.getOrElse(0)
- // double count should be even (divisible by two)
- assert((dblCount & 1) == 0)
- dblCount / 2
- }
-
- } // end of TriangleCount
-
-
-
-
def main(args: Array[String]) = {
val host = args(0)
val taskType = args(1)
@@ -301,10 +37,10 @@ object Analytics extends Logging {
def pickPartitioner(v: String): PartitionStrategy = {
v match {
- case "RandomVertexCut" => RandomVertexCut()
- case "EdgePartition1D" => EdgePartition1D()
- case "EdgePartition2D" => EdgePartition2D()
- case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut()
+ case "RandomVertexCut" => RandomVertexCut
+ case "EdgePartition1D" => EdgePartition1D
+ case "EdgePartition2D" => EdgePartition2D
+ case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v)
}
}
@@ -318,102 +54,91 @@ object Analytics extends Logging {
taskType match {
case "pagerank" => {
- var numIter = Int.MaxValue
- var isDynamic = false
var tol:Float = 0.001F
var outFname = ""
var numVPart = 4
var numEPart = 4
- var partitionStrategy: PartitionStrategy = RandomVertexCut()
+ var partitionStrategy: Option[PartitionStrategy] = None
options.foreach{
- case ("numIter", v) => numIter = v.toInt
- case ("dynamic", v) => isDynamic = v.toBoolean
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
+ case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
- if(!isDynamic && numIter == Int.MaxValue) {
- println("Set number of iterations!")
- sys.exit(1)
- }
println("======================================")
println("| PageRank |")
- println("--------------------------------------")
- println(" Using parameters:")
- println(" \tDynamic: " + isDynamic)
- if(isDynamic) println(" \t |-> Tolerance: " + tol)
- println(" \tNumIter: " + numIter)
println("======================================")
val sc = new SparkContext(host, "PageRank(" + fname + ")")
- val graph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache()
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
- val startTime = System.currentTimeMillis
- logInfo("GRAPHX: starting tasks")
- logInfo("GRAPHX: Number of vertices " + graph.vertices.count)
- logInfo("GRAPHX: Number of edges " + graph.edges.count)
+ println("GRAPHX: Number of vertices " + graph.vertices.count)
+ println("GRAPHX: Number of edges " + graph.edges.count)
//val pr = Analytics.pagerank(graph, numIter)
- val pr = if(isDynamic) Analytics.deltaPagerank(graph, tol, numIter)
- else Analytics.pagerank(graph, numIter)
- logInfo("GRAPHX: Total rank: " + pr.vertices.map{ case (id,r) => r }.reduce(_+_) )
+ val pr = PageRank.runStandalone(graph, tol)
+
+ println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_))
+
if (!outFname.isEmpty) {
- println("Saving pageranks of pages to " + outFname)
- pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+ logWarning("Saving pageranks of pages to " + outFname)
+ pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
}
- logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
sc.stop()
}
case "cc" => {
- var numIter = Int.MaxValue
- var numVPart = 4
- var numEPart = 4
- var isDynamic = false
- var partitionStrategy: PartitionStrategy = RandomVertexCut()
-
- options.foreach{
- case ("numIter", v) => numIter = v.toInt
- case ("dynamic", v) => isDynamic = v.toBoolean
- case ("numEPart", v) => numEPart = v.toInt
- case ("numVPart", v) => numVPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
-
- if(!isDynamic && numIter == Int.MaxValue) {
- println("Set number of iterations!")
- sys.exit(1)
- }
- println("======================================")
- println("| Connected Components |")
- println("--------------------------------------")
- println(" Using parameters:")
- println(" \tDynamic: " + isDynamic)
- println(" \tNumIter: " + numIter)
- println("======================================")
-
- val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
- val graph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache()
- val cc = Analytics.connectedComponents(graph)
- println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
- sc.stop()
- }
+ var numIter = Int.MaxValue
+ var numVPart = 4
+ var numEPart = 4
+ var isDynamic = false
+ var partitionStrategy: Option[PartitionStrategy] = None
+
+ options.foreach{
+ case ("numIter", v) => numIter = v.toInt
+ case ("dynamic", v) => isDynamic = v.toBoolean
+ case ("numEPart", v) => numEPart = v.toInt
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ if(!isDynamic && numIter == Int.MaxValue) {
+ println("Set number of iterations!")
+ sys.exit(1)
+ }
+ println("======================================")
+ println("| Connected Components |")
+ println("--------------------------------------")
+ println(" Using parameters:")
+ println(" \tDynamic: " + isDynamic)
+ println(" \tNumIter: " + numIter)
+ println("======================================")
+
+ val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ val cc = ConnectedComponents.run(graph)
+ println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
+ sc.stop()
+ }
case "triangles" => {
var numVPart = 4
var numEPart = 4
- var partitionStrategy: PartitionStrategy = RandomVertexCut()
+ // TriangleCount requires the graph to be partitioned
+ var partitionStrategy: PartitionStrategy = RandomVertexCut
options.foreach{
case ("numEPart", v) => numEPart = v.toInt
@@ -426,8 +151,8 @@ object Analytics extends Logging {
println("--------------------------------------")
val sc = new SparkContext(host, "TriangleCount(" + fname + ")")
val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
- minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache()
- val triangles = Analytics.triangleCount(graph)
+ minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
+ val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map {
case (vid,data) => data.toLong
}.reduce(_+_) / 3)
@@ -537,42 +262,6 @@ object Analytics extends Logging {
}
// /**
- // * Compute the PageRank of a graph returning the pagerank of each vertex as an RDD
- // */
- // def dynamicPagerank[VD: Manifest, ED: Manifest](graph: Graph[VD, ED],
- // tol: Double, maxIter: Int = 10) = {
- // // Compute the out degree of each vertex
- // val pagerankGraph = graph.updateVertices[Int, (Int, Double, Double)](graph.outDegrees,
- // (vertex, degIter) => (degIter.sum, 1.0, 1.0)
- // )
-
- // // Run PageRank
- // GraphLab.iterateGAS(pagerankGraph)(
- // (me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
- // (a: Double, b: Double) => a + b,
- // (vertex, a: Option[Double]) =>
- // (vertex.data._1, (0.15 + 0.85 * a.getOrElse(0.0)), vertex.data._2), // apply
- // (me_id, edge) => math.abs(edge.src.data._2 - edge.dst.data._1) > tol, // scatter
- // maxIter).mapVertices { case Vertex(vid, data) => Vertex(vid, data._2) }
- // }
-
- // /**
- // * Compute the connected component membership of each vertex
- // * and return an RDD with the vertex value containing the
- // * lowest vertex id in the connected component containing
- // * that vertex.
- // */
- // def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = {
- // val ccGraph = graph.mapVertices { case Vertex(vid, _) => Vertex(vid, vid) }
- // GraphLab.iterateGA[Int, ED, Int](ccGraph)(
- // (me_id, edge) => edge.otherVertex(me_id).data, // gather
- // (a: Int, b: Int) => math.min(a, b), // merge
- // (v, a: Option[Int]) => math.min(v.data, a.getOrElse(Integer.MAX_VALUE)), // apply
- // numIter,
- // gatherDirection = EdgeDirection.Both)
- // }
-
- // /**
// * Compute the shortest path to a set of markers
// */
// def shortestPath[VD: Manifest](graph: Graph[VD, Double], sources: List[Int], numIter: Int) = {
diff --git a/graph/src/main/scala/org/apache/spark/graph/Edge.scala b/graph/src/main/scala/org/apache/spark/graph/Edge.scala
index 509a734338..7e8ae7c790 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Edge.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Edge.scala
@@ -18,7 +18,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
var dstId: Vid = 0,
/**
* The attribute associated with the edge.
- */
+ */
var attr: ED = nullValue[ED]) {
/**
@@ -30,7 +30,6 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
def otherVertexId(vid: Vid): Vid =
if (srcId == vid) dstId else { assert(dstId == vid); srcId }
-
/**
* Return the relative direction of the edge to the corresponding
* vertex.
@@ -41,5 +40,11 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
*/
def relativeDirection(vid: Vid): EdgeDirection =
if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }
+}
+object Edge {
+ def lexicographicOrdering[ED] = new Ordering[Edge[ED]] {
+ override def compare(a: Edge[ED], b: Edge[ED]): Int =
+ Ordering[(Vid, Vid)].compare((a.srcId, a.dstId), (b.srcId, b.dstId))
+ }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
new file mode 100644
index 0000000000..ee368ebb41
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -0,0 +1,67 @@
+package org.apache.spark.graph
+
+import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
+import org.apache.spark.graph.impl.EdgePartition
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+
+class EdgeRDD[@specialized ED: ClassManifest](
+ val partitionsRDD: RDD[(Pid, EdgePartition[ED])])
+ extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+
+ partitionsRDD.setName("EdgeRDD")
+
+ override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
+
+ /**
+ * If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in
+ * partitionsRDD correspond to the actual partitions and create a new partitioner that allows
+ * co-partitioning with partitionsRDD.
+ */
+ override val partitioner =
+ partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+
+ override def compute(split: Partition, context: TaskContext): Iterator[Edge[ED]] = {
+ val edgePartition = partitionsRDD.compute(split, context).next()._2
+ edgePartition.iterator
+ }
+
+ override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
+
+ /**
+ * Caching a VertexRDD causes the index and values to be cached separately.
+ */
+ override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
+ partitionsRDD.persist(newLevel)
+ this
+ }
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ override def cache(): EdgeRDD[ED] = persist()
+
+ def mapEdgePartitions[ED2: ClassManifest](f: EdgePartition[ED] => EdgePartition[ED2])
+ : EdgeRDD[ED2]= {
+ new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
+ val (pid, ep) = iter.next()
+ Iterator(Tuple2(pid, f(ep)))
+ }, preservesPartitioning = true))
+ }
+
+ def zipEdgePartitions[T: ClassManifest, U: ClassManifest]
+ (other: RDD[T])
+ (f: (EdgePartition[ED], Iterator[T]) => Iterator[U]): RDD[U] = {
+ partitionsRDD.zipPartitions(other, preservesPartitioning = true) { (ePartIter, otherIter) =>
+ val (_, edgePartition) = ePartIter.next()
+ f(edgePartition, otherIter)
+ }
+ }
+
+ def collectVids(): RDD[Vid] = {
+ partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
+ }
+
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
index aace6e54fe..76768489ee 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
@@ -1,12 +1,14 @@
package org.apache.spark.graph
+import org.apache.spark.graph.impl.VertexPartition
+
/**
* An edge triplet represents two vertices and edge along with their
* attributes.
*
* @tparam VD the type of the vertex attribute.
* @tparam ED the type of the edge attribute
- *
+ *
* @todo specialize edge triplet for basic types, though when I last
* tried specializing I got a warning about inherenting from a type
* that is not a trait.
@@ -14,20 +16,23 @@ package org.apache.spark.graph
class EdgeTriplet[VD, ED] extends Edge[ED] {
// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest,
// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] {
-
+
/**
* The source vertex attribute
*/
- var srcAttr: VD = _ //nullValue[VD]
+ var srcAttr: VD = _ //nullValue[VD]
/**
* The destination vertex attribute
*/
- var dstAttr: VD = _ //nullValue[VD]
+ var dstAttr: VD = _ //nullValue[VD]
+
+ var srcStale: Boolean = false
+ var dstStale: Boolean = false
/**
- * Set the edge properties of this triplet.
+ * Set the edge properties of this triplet.
*/
protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD,ED] = {
srcId = other.srcId
@@ -54,4 +59,5 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
def vertexAttr(vid: Vid): VD =
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
+ override def toString() = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index 144ca1b788..e544650963 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -1,8 +1,10 @@
package org.apache.spark.graph
+import org.apache.spark.graph.impl._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
+
/**
* The Graph abstractly represents a graph with arbitrary objects
* associated with vertices and edges. The graph provides basic
@@ -32,7 +34,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* @see Vertex for the vertex type.
*
*/
- val vertices: VertexSetRDD[VD]
+ val vertices: VertexRDD[VD]
/**
* Get the Edges and their data as an RDD. The entries in the RDD
@@ -84,6 +86,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def cache(): Graph[VD, ED]
/**
+ * Repartition the edges in the graph according to partitionStrategy.
+ */
+ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
+
+ /**
* Compute statistics describing the graph representation.
*/
def statistics: Map[String, Any]
@@ -162,7 +169,6 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* Construct a new graph with all the edges reversed. If this graph
* contains an edge from a to b then the returned graph contains an
* edge from b to a.
- *
*/
def reverse: Graph[VD, ED]
@@ -200,18 +206,15 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def mask[VD2: ClassManifest, ED2: ClassManifest](other: Graph[VD2, ED2]): Graph[VD, ED]
/**
- * This function merges multiple edges between two vertices into a
- * single Edge. See
- * [[org.apache.spark.graph.Graph.groupEdgeTriplets]] for more
- * detail.
+ * This function merges multiple edges between two vertices into a single Edge. For correct
+ * results, the graph must have been partitioned using partitionBy.
*
* @tparam ED2 the type of the resulting edge data after grouping.
*
- * @param f the user supplied commutative associative function to merge
- * edge attributes for duplicate edges.
+ * @param f the user supplied commutative associative function to merge edge attributes for
+ * duplicate edges.
*
- * @return Graph[VD,ED2] The resulting graph with a single Edge for
- * each source, dest vertex pair.
+ * @return Graph[VD,ED2] The resulting graph with a single Edge for each source, dest vertex pair.
*/
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
@@ -232,6 +235,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* be commutative and assosciative and is used to combine the output
* of the map phase.
*
+ * @param activeSet optionally, a set of "active" vertices and a direction of edges to consider
+ * when running `mapFunc`. For example, if the direction is Out, `mapFunc` will only be run on
+ * edges originating from vertices in the active set. `activeSet` must have the same index as the
+ * graph's vertices.
+ *
* @example We can use this function to compute the inDegree of each
* vertex
* {{{
@@ -249,8 +257,9 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
*/
def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
- reduceFunc: (A, A) => A)
- : VertexSetRDD[A]
+ reduceFunc: (A, A) => A,
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
+ : VertexRDD[A]
/**
* Join the vertices with an RDD and then apply a function from the
@@ -294,52 +303,30 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
/**
- * The Graph object contains a collection of routines used to
- * construct graphs from RDDs.
- *
+ * The Graph object contains a collection of routines used to construct graphs from RDDs.
*/
object Graph {
- import org.apache.spark.graph.impl._
- import org.apache.spark.SparkContext._
-
/**
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
- * @param rawEdges the RDD containing the set of edges in the graph
- *
- * @return a graph with edge attributes containing the count of duplicate edges.
- */
- def apply[VD: ClassManifest](rawEdges: RDD[(Vid, Vid)], defaultValue: VD): Graph[VD, Int] = {
- Graph(rawEdges, defaultValue, false, RandomVertexCut())
- }
-
- /**
- * Construct a graph from a collection of edges encoded as vertex id
- * pairs.
- *
* @param rawEdges a collection of edges in (src,dst) form.
- * @param uniqueEdges if multiple identical edges are found they are
- * combined and the edge attribute is set to the sum. Otherwise
- * duplicate edges are treated as separate.
- *
- * @return a graph with edge attributes containing either the count
- * of duplicate edges or 1 (if `uniqueEdges=false`) and vertex
- * attributes containing the total degree of each vertex.
+ * @param uniqueEdges if multiple identical edges are found they are combined and the edge
+ * attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
+ * uniqueEdges, a [[PartitionStrategy]] must be provided.
*
+ * @return a graph with edge attributes containing either the count of duplicate edges or 1
+ * (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex.
*/
- def apply[VD: ClassManifest](
+ def fromEdgeTuples[VD: ClassManifest](
rawEdges: RDD[(Vid, Vid)],
defaultValue: VD,
- uniqueEdges: Boolean,
- partitionStrategy: PartitionStrategy):
- Graph[VD, Int] = {
+ uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = {
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
- val graph = GraphImpl(edges, defaultValue, partitionStrategy)
- if (uniqueEdges) {
- graph.groupEdges((a,b) => a+b)
- } else {
- graph
+ val graph = GraphImpl(edges, defaultValue)
+ uniqueEdges match {
+ case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
+ case None => graph
}
}
@@ -352,107 +339,40 @@ object Graph {
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
- def apply[VD: ClassManifest, ED: ClassManifest](
+ def fromEdges[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED] = {
- Graph(edges, defaultValue, RandomVertexCut())
- }
-
- /**
- * Construct a graph from a collection of edges.
- *
- * @param edges the RDD containing the set of edges in the graph
- * @param defaultValue the default vertex attribute to use for each vertex
- *
- * @return a graph with edge attributes described by `edges` and vertices
- * given by all vertices in `edges` with value `defaultValue`
- */
- def apply[VD: ClassManifest, ED: ClassManifest](
- edges: RDD[Edge[ED]],
- defaultValue: VD,
- partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
- GraphImpl(edges, defaultValue, partitionStrategy)
+ GraphImpl(edges, defaultValue)
}
/**
* Construct a graph from a collection attributed vertices and
- * edges.
- *
- * @note Duplicate vertices are removed arbitrarily and missing
- * vertices (vertices in the edge collection that are not in the
- * vertex collection) are replaced by null vertex attributes.
- *
- * @tparam VD the vertex attribute type
- * @tparam ED the edge attribute type
- * @param vertices the "set" of vertices and their attributes
- * @param edges the collection of edges in the graph
- *
- */
- def apply[VD: ClassManifest, ED: ClassManifest](
- vertices: RDD[(Vid,VD)],
- edges: RDD[Edge[ED]]): Graph[VD, ED] = {
- val defaultAttr: VD = null.asInstanceOf[VD]
- Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a, RandomVertexCut())
- }
-
- /**
- * Construct a graph from a collection attributed vertices and
- * edges. Duplicate vertices are combined using the `mergeFunc` and
- * vertices found in the edge collection but not in the input
- * vertices are the default attribute `defautVertexAttr`.
- *
- * @note Duplicate vertices are removed arbitrarily .
- *
- * @tparam VD the vertex attribute type
- * @tparam ED the edge attribute type
- * @param vertices the "set" of vertices and their attributes
- * @param edges the collection of edges in the graph
- * @param defaultVertexAttr the default vertex attribute to use for
- * vertices that are mentioned in `edges` but not in `vertices`
- *
- */
- def apply[VD: ClassManifest, ED: ClassManifest](
- vertices: RDD[(Vid,VD)],
- edges: RDD[Edge[ED]],
- defaultVertexAttr: VD): Graph[VD, ED] = {
- Graph(vertices, edges, defaultVertexAttr, (a,b) => a, RandomVertexCut())
- }
-
- /**
- * Construct a graph from a collection attributed vertices and
- * edges. Duplicate vertices are combined using the `mergeFunc` and
+ * edges. Duplicate vertices are picked arbitrarily and
* vertices found in the edge collection but not in the input
- * vertices are the default attribute `defautVertexAttr`.
+ * vertices are the default attribute.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @param vertices the "set" of vertices and their attributes
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for
- * vertices that are mentioned in `edges` but not in `vertices
- * @param mergeFunc the function used to merge duplicate vertices
- * in the `vertices` collection.
+ * vertices that are mentioned in edges but not in vertices
* @param partitionStrategy the partition strategy to use when
* partitioning the edges.
- *
*/
def apply[VD: ClassManifest, ED: ClassManifest](
- vertices: RDD[(Vid,VD)],
+ vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD,
- mergeFunc: (VD, VD) => VD,
- partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
- GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc, partitionStrategy)
+ defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
+ GraphImpl(vertices, edges, defaultVertexAttr)
}
/**
- * The implicit graphToGraphOPs function extracts the GraphOps
- * member from a graph.
+ * The implicit graphToGraphOPs function extracts the GraphOps member from a graph.
*
- * To improve modularity the Graph type only contains a small set of
- * basic operations. All the convenience operations are defined in
- * the GraphOps class which may be shared across multiple graph
- * implementations.
+ * To improve modularity the Graph type only contains a small set of basic operations. All the
+ * convenience operations are defined in the GraphOps class which may be shared across multiple
+ * graph implementations.
*/
implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops
} // end of Graph object
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
index 6f18e46ab2..b8c1b5b0f0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
@@ -5,22 +5,22 @@ import com.esotericsoftware.kryo.Kryo
import org.apache.spark.graph.impl._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.util.collection.BitSet
-import org.apache.spark.graph._
+import org.apache.spark.util.BoundedPriorityQueue
+
class GraphKryoRegistrator extends KryoRegistrator {
def registerClasses(kryo: Kryo) {
kryo.register(classOf[Edge[Object]])
- kryo.register(classOf[MutableTuple2[Object, Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
- kryo.register(classOf[AggregationMsg[Object]])
kryo.register(classOf[(Vid, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
kryo.register(classOf[VertexAttributeBlock[Object]])
kryo.register(classOf[PartitionStrategy])
+ kryo.register(classOf[BoundedPriorityQueue[Object]])
// This avoids a large number of hash table lookups.
kryo.setReferences(false)
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
index bf1f4168da..5618ce6272 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
@@ -24,6 +24,8 @@ object GraphLab {
* @param scatterFunc Executed after the apply function the scatter function takes
* a triplet and signals whether the neighboring vertex program
* must be recomputed.
+ * @param startVertices predicate to determine which vertices to start the computation on.
+ * these will be the active vertices in the first iteration.
* @param numIter The maximum number of iterations to run.
* @param gatherDirection The direction of edges to consider during the gather phase
* @param scatterDirection The direction of edges to consider during the scatter phase
@@ -40,12 +42,13 @@ object GraphLab {
(gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
mergeFunc: (A, A) => A,
applyFunc: (Vid, VD, Option[A]) => VD,
- scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean): Graph[VD, ED] = {
+ scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
+ startVertices: (Vid, VD) => Boolean = (vid: Vid, data: VD) => true): Graph[VD, ED] = {
// Add an active attribute to all vertices to track convergence.
var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
- case (id, data) => (true, data)
+ case (id, data) => (startVertices(id, data), data)
}.cache()
// The gather function wrapper strips the active attribute and
@@ -86,9 +89,9 @@ object GraphLab {
}
// Used to set the active status of vertices for the next round
- def applyActive(vid: Vid, data: (Boolean, VD), newActive: Boolean): (Boolean, VD) = {
+ def applyActive(vid: Vid, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = {
val (prevActive, vData) = data
- (newActive, vData)
+ (newActiveOpt.getOrElse(false), vData)
}
// Main Loop ---------------------------------------------------------------------
@@ -110,7 +113,7 @@ object GraphLab {
val scattered: RDD[(Vid, Boolean)] =
activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
- activeGraph = activeGraph.joinVertices(scattered)(applyActive).cache()
+ activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache()
// Calculate the number of active vertices
numActive = activeGraph.vertices.map{
@@ -124,12 +127,3 @@ object GraphLab {
activeGraph.mapVertices{case (vid, data) => data._2 }
}
}
-
-
-
-
-
-
-
-
-
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
index d97c028faa..a69bfde532 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
@@ -1,12 +1,13 @@
package org.apache.spark.graph
-import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.graph.impl.GraphImpl
+import java.util.{Arrays => JArrays}
+import org.apache.spark.graph.impl.EdgePartitionBuilder
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.graph.impl.{EdgePartition, GraphImpl}
+import org.apache.spark.util.collection.PrimitiveVector
-object GraphLoader {
+object GraphLoader extends Logging {
/**
* Load an edge list from file initializing the Graph
@@ -25,8 +26,7 @@ object GraphLoader {
sc: SparkContext,
path: String,
edgeParser: Array[String] => ED,
- minEdgePartitions: Int = 1,
- partitionStrategy: PartitionStrategy = RandomVertexCut()):
+ minEdgePartitions: Int = 1):
Graph[Int, ED] = {
// Parse the edge data table
val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
@@ -43,7 +43,7 @@ object GraphLoader {
Edge(source, target, edata)
})
val defaultVertexAttr = 1
- Graph(edges, defaultVertexAttr, partitionStrategy)
+ Graph.fromEdges(edges, defaultVertexAttr)
}
/**
@@ -73,31 +73,39 @@ object GraphLoader {
* @tparam ED
* @return
*/
- def edgeListFile[ED: ClassManifest](
+ def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
- minEdgePartitions: Int = 1,
- partitionStrategy: PartitionStrategy = RandomVertexCut()):
+ minEdgePartitions: Int = 1):
Graph[Int, Int] = {
- // Parse the edge data table
- val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
- iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
- val lineArray = line.split("\\s+")
- if(lineArray.length < 2) {
- println("Invalid line: " + line)
- assert(false)
- }
- val source = lineArray(0).trim.toLong
- val target = lineArray(1).trim.toLong
- if (canonicalOrientation && target > source) {
- Edge(target, source, 1)
- } else {
- Edge(source, target, 1)
+ val startTime = System.currentTimeMillis
+
+ // Parse the edge data table directly into edge partitions
+ val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[Int]
+ iter.foreach { line =>
+ if (!line.isEmpty && line(0) != '#') {
+ val lineArray = line.split("\\s+")
+ if (lineArray.length < 2) {
+ logWarning("Invalid line: " + line)
+ }
+ val srcId = lineArray(0).toLong
+ val dstId = lineArray(1).toLong
+ if (canonicalOrientation && dstId > srcId) {
+ builder.add(dstId, srcId, 1)
+ } else {
+ builder.add(srcId, dstId, 1)
+ }
}
- })
- val defaultVertexAttr = 1
- Graph(edges, defaultVertexAttr, partitionStrategy)
+ }
+ Iterator((pid, builder.toEdgePartition))
+ }.cache()
+ edges.count()
+
+ logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
+
+ GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
} // end of edgeListFile
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
index a49eddc1df..091c778275 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
@@ -2,7 +2,6 @@ package org.apache.spark.graph
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
-import org.apache.spark.util.ClosureCleaner
import org.apache.spark.SparkException
@@ -35,14 +34,14 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* RDD.
* @note Vertices with no in edges are not returned in the resulting RDD.
*/
- lazy val inDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.In)
+ lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
/**
* Compute the out-degree of each vertex in the Graph returning an RDD.
* @note Vertices with no out edges are not returned in the resulting RDD.
*/
- lazy val outDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Out)
+ lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
/**
@@ -50,7 +49,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* @note Vertices with no edges are not returned in the resulting
* RDD.
*/
- lazy val degrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Both)
+ lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Both)
/**
@@ -59,7 +58,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* @param edgeDirection the direction along which to collect
* neighboring vertex attributes.
*/
- private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = {
+ private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = {
if (edgeDirection == EdgeDirection.In) {
graph.mapReduceTriplets(et => Iterator((et.dstId,1)), _ + _)
} else if (edgeDirection == EdgeDirection.Out) {
@@ -114,10 +113,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
reduceFunc: (A, A) => A,
dir: EdgeDirection)
- : VertexSetRDD[A] = {
-
- ClosureCleaner.clean(mapFunc)
- ClosureCleaner.clean(reduceFunc)
+ : VertexRDD[A] = {
// Define a new map function over edge triplets
val mf = (et: EdgeTriplet[VD,ED]) => {
@@ -140,7 +136,6 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
}
}
- ClosureCleaner.clean(mf)
graph.mapReduceTriplets(mf, reduceFunc)
} // end of aggregateNeighbors
@@ -154,7 +149,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* @return the vertex set of neighboring ids for each vertex.
*/
def collectNeighborIds(edgeDirection: EdgeDirection) :
- VertexSetRDD[Array[Vid]] = {
+ VertexRDD[Array[Vid]] = {
val nbrs =
if (edgeDirection == EdgeDirection.Both) {
graph.mapReduceTriplets[Array[Vid]](
@@ -190,7 +185,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* vertex.
*/
def collectNeighbors(edgeDirection: EdgeDirection) :
- VertexSetRDD[ Array[(Vid, VD)] ] = {
+ VertexRDD[ Array[(Vid, VD)] ] = {
val nbrs = graph.aggregateNeighbors[Array[(Vid,VD)]](
(vid, edge) =>
Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )),
@@ -233,14 +228,12 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
*/
def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD)
: Graph[VD, ED] = {
- ClosureCleaner.clean(mapFunc)
val uf = (id: Vid, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
}
}
- ClosureCleaner.clean(uf)
graph.outerJoinVertices(table)(uf)
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
index cf65f50657..293a9d588a 100644
--- a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
@@ -50,7 +50,7 @@ sealed trait PartitionStrategy extends Serializable {
*
*
*/
-case class EdgePartition2D() extends PartitionStrategy {
+case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt
val mixingPrime: Vid = 1125899906842597L
@@ -61,7 +61,7 @@ case class EdgePartition2D() extends PartitionStrategy {
}
-case class EdgePartition1D() extends PartitionStrategy {
+case object EdgePartition1D extends PartitionStrategy {
override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
val mixingPrime: Vid = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
@@ -73,7 +73,7 @@ case class EdgePartition1D() extends PartitionStrategy {
* Assign edges to an aribtrary machine corresponding to a
* random vertex cut.
*/
-case class RandomVertexCut() extends PartitionStrategy {
+case object RandomVertexCut extends PartitionStrategy {
override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
math.abs((src, dst).hashCode()) % numParts
}
@@ -85,7 +85,7 @@ case class RandomVertexCut() extends PartitionStrategy {
* function ensures that edges of opposite direction between the same two vertices
* will end up on the same partition.
*/
-case class CanonicalRandomVertexCut() extends PartitionStrategy {
+case object CanonicalRandomVertexCut extends PartitionStrategy {
override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)
diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 29d6225f33..285e857b69 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -1,7 +1,5 @@
package org.apache.spark.graph
-import org.apache.spark.rdd.RDD
-
/**
* This object implements a Pregel-like bulk-synchronous
@@ -13,10 +11,6 @@ import org.apache.spark.rdd.RDD
* execution while also exposing greater flexibility for graph based
* computation.
*
- * This object present several variants of the bulk synchronous
- * execution that differ only in the edge direction along which
- * messages are sent and whether a fixed number of iterations is used.
- *
* @example We can use the Pregel abstraction to implement PageRank
* {{{
* val pagerankGraph: Graph[Double, Double] = graph
@@ -43,7 +37,6 @@ import org.apache.spark.rdd.RDD
*/
object Pregel {
-
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
@@ -58,7 +51,8 @@ object Pregel {
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
- * This function iterates a fixed number (`numIter`) of iterations.
+ * This function iterates until there are no remaining messages, or
+ * for maxIterations iterations.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
@@ -69,7 +63,7 @@ object Pregel {
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
*
- * @param numIter the number of iterations to run this computation.
+ * @param maxIterations the maximum number of iterations to run for.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
@@ -91,123 +85,36 @@ object Pregel {
*
*/
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
- (graph: Graph[VD, ED], initialMsg: A, numIter: Int)(
+ (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)(
vprog: (Vid, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
- // Receive the first set of messages
- var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)).cache
-
- var i = 0
- while (i < numIter) {
- // compute the messages
- val messages = g.mapReduceTriplets(sendMsg, mergeMsg)
- // receive the messages
- g = g.joinVertices(messages)(vprog).cache
- // count the iteration
- i += 1
- }
- // Return the final graph
- g
- } // end of apply
-
-
- /**
- * Execute a Pregel-like iterative vertex-parallel abstraction. The
- * user-defined vertex-program `vprog` is executed in parallel on
- * each vertex receiving any inbound messages and computing a new
- * value for the vertex. The `sendMsg` function is then invoked on
- * all out-edges and is used to compute an optional message to the
- * destination vertex. The `mergeMsg` function is a commutative
- * associative function used to combine messages destined to the
- * same vertex.
- *
- * On the first iteration all vertices receive the `initialMsg` and
- * on subsequent iterations if a vertex does not receive a message
- * then the vertex-program is not invoked.
- *
- * This function iterates until there are no remaining messages.
- *
- * @tparam VD the vertex data type
- * @tparam ED the edge data type
- * @tparam A the Pregel message type
- *
- * @param graph the input graph.
- *
- * @param initialMsg the message each vertex will receive at the on
- * the first iteration.
- *
- * @param numIter the number of iterations to run this computation.
- *
- * @param vprog the user-defined vertex program which runs on each
- * vertex and receives the inbound message and computes a new vertex
- * value. On the first iteration the vertex program is invoked on
- * all vertices and is passed the default message. On subsequent
- * iterations the vertex program is only invoked on those vertices
- * that receive messages.
- *
- * @param sendMsg a user supplied function that is applied to out
- * edges of vertices that received messages in the current
- * iteration.
- *
- * @param mergeMsg a user supplied function that takes two incoming
- * messages of type A and merges them into a single message of type
- * A. ''This function must be commutative and associative and
- * ideally the size of A should not increase.''
- *
- * @return the resulting graph at the end of the computation
- *
- */
- def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
- (graph: Graph[VD, ED], initialMsg: A)(
- vprog: (Vid, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
- mergeMsg: (A, A) => A)
- : Graph[VD, ED] = {
-
- def vprogFun(id: Vid, attr: (VD, Boolean), msgOpt: Option[A]): (VD, Boolean) = {
- msgOpt match {
- case Some(msg) => (vprog(id, attr._1, msg), true)
- case None => (attr._1, false)
- }
- }
-
- def sendMsgFun(edge: EdgeTriplet[(VD,Boolean), ED]): Iterator[(Vid, A)] = {
- if(edge.srcAttr._2) {
- val et = new EdgeTriplet[VD, ED]
- et.srcId = edge.srcId
- et.srcAttr = edge.srcAttr._1
- et.dstId = edge.dstId
- et.dstAttr = edge.dstAttr._1
- et.attr = edge.attr
- sendMsg(et)
- } else {
- Iterator.empty
- }
- }
-
- var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) )
+ var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) )
// compute the messages
- var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache
- var activeMessages = messages.count
+ var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache()
+ var activeMessages = messages.count()
// Loop
var i = 0
- while (activeMessages > 0) {
- // receive the messages
- g = g.outerJoinVertices(messages)(vprogFun)
+ while (activeMessages > 0 && i < maxIterations) {
+ // Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
+ val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
+ // Update the graph with the new vertices.
+ g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
+
val oldMessages = messages
- // compute the messages
- messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache
- activeMessages = messages.count
+ // Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
+ // get to send messages.
+ messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache()
+ activeMessages = messages.count()
// after counting we can unpersist the old messages
oldMessages.unpersist(blocking=false)
// count the iteration
i += 1
}
- // Return the final graph
- g.mapVertices((id, attr) => attr._1)
+
+ g
} // end of apply
} // end of class Pregel
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
new file mode 100644
index 0000000000..90ac6dc61d
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graph
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.spark.graph.impl.MsgRDDFunctions
+import org.apache.spark.graph.impl.VertexPartition
+
+
+/**
+ * A `VertexRDD[VD]` extends the `RDD[(Vid, VD)]` by ensuring that there is
+ * only one entry for each vertex and by pre-indexing the entries for fast,
+ * efficient joins.
+ *
+ * @tparam VD the vertex attribute associated with each vertex in the set.
+ *
+ * To construct a `VertexRDD` use the singleton object:
+ *
+ * @example Construct a `VertexRDD` from a plain RDD
+ * {{{
+ * // Construct an intial vertex set
+ * val someData: RDD[(Vid, SomeType)] = loadData(someFile)
+ * val vset = VertexRDD(someData)
+ * // If there were redundant values in someData we would use a reduceFunc
+ * val vset2 = VertexRDD(someData, reduceFunc)
+ * // Finally we can use the VertexRDD to index another dataset
+ * val otherData: RDD[(Vid, OtherType)] = loadData(otherFile)
+ * val vset3 = VertexRDD(otherData, vset.index)
+ * // Now we can construct very fast joins between the two sets
+ * val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
+ * }}}
+ *
+ */
+class VertexRDD[@specialized VD: ClassManifest](
+ val partitionsRDD: RDD[VertexPartition[VD]])
+ extends RDD[(Vid, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+
+ require(partitionsRDD.partitioner.isDefined)
+
+ partitionsRDD.setName("VertexRDD")
+
+ /**
+ * Construct a new VertexRDD that is indexed by only the keys in the RDD.
+ * The resulting VertexRDD will be based on a different index and can
+ * no longer be quickly joined with this RDD.
+ */
+ def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
+
+ /**
+ * The partitioner is defined by the index.
+ */
+ override val partitioner = partitionsRDD.partitioner
+
+ /**
+ * The actual partitions are defined by the tuples.
+ */
+ override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
+
+ /**
+ * The preferred locations are computed based on the preferred
+ * locations of the tuples.
+ */
+ override protected def getPreferredLocations(s: Partition): Seq[String] =
+ partitionsRDD.preferredLocations(s)
+
+ /**
+ * Caching a VertexRDD causes the index and values to be cached separately.
+ */
+ override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
+ partitionsRDD.persist(newLevel)
+ this
+ }
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY)
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ override def cache(): VertexRDD[VD] = persist()
+
+ /** Return the number of vertices in this set. */
+ override def count(): Long = {
+ partitionsRDD.map(_.size).reduce(_ + _)
+ }
+
+ /**
+ * Provide the `RDD[(Vid, VD)]` equivalent output.
+ */
+ override def compute(part: Partition, context: TaskContext): Iterator[(Vid, VD)] = {
+ partitionsRDD.compute(part, context).next().iterator
+ }
+
+ /**
+ * Return a new VertexRDD by applying a function to each VertexPartition of this RDD.
+ */
+ def mapVertexPartitions[VD2: ClassManifest](f: VertexPartition[VD] => VertexPartition[VD2])
+ : VertexRDD[VD2] = {
+ val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
+ new VertexRDD(newPartitionsRDD)
+ }
+
+ /**
+ * Return a new VertexRDD by applying a function to corresponding
+ * VertexPartitions of this VertexRDD and another one.
+ */
+ def zipVertexPartitions[VD2: ClassManifest, VD3: ClassManifest]
+ (other: VertexRDD[VD2])
+ (f: (VertexPartition[VD], VertexPartition[VD2]) => VertexPartition[VD3]): VertexRDD[VD3] = {
+ val newPartitionsRDD = partitionsRDD.zipPartitions(
+ other.partitionsRDD, preservesPartitioning = true
+ ) { (thisIter, otherIter) =>
+ val thisPart = thisIter.next()
+ val otherPart = otherIter.next()
+ Iterator(f(thisPart, otherPart))
+ }
+ new VertexRDD(newPartitionsRDD)
+ }
+
+ /**
+ * Restrict the vertex set to the set of vertices satisfying the
+ * given predicate.
+ *
+ * @param pred the user defined predicate, which takes a tuple to conform to
+ * the RDD[(Vid, VD)] interface
+ *
+ * @note The vertex set preserves the original index structure
+ * which means that the returned RDD can be easily joined with
+ * the original vertex-set. Furthermore, the filter only
+ * modifies the bitmap index and so no new values are allocated.
+ */
+ override def filter(pred: Tuple2[Vid, VD] => Boolean): VertexRDD[VD] =
+ this.mapVertexPartitions(_.filter(Function.untupled(pred)))
+
+ /**
+ * Pass each vertex attribute through a map function and retain the
+ * original RDD's partitioning and index.
+ *
+ * @tparam VD2 the type returned by the map function
+ *
+ * @param f the function applied to each value in the RDD
+ * @return a new VertexRDD with values obtained by applying `f` to
+ * each of the entries in the original VertexRDD. The resulting
+ * VertexRDD retains the same index.
+ */
+ def mapValues[VD2: ClassManifest](f: VD => VD2): VertexRDD[VD2] =
+ this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
+
+ /**
+ * Pass each vertex attribute through a map function and retain the
+ * original RDD's partitioning and index.
+ *
+ * @tparam VD2 the type returned by the map function
+ *
+ * @param f the function applied to each value in the RDD
+ * @return a new VertexRDD with values obtained by applying `f` to
+ * each of the entries in the original VertexRDD. The resulting
+ * VertexRDD retains the same index.
+ */
+ def mapValues[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexRDD[VD2] =
+ this.mapVertexPartitions(_.map(f))
+
+ /**
+ * Hides vertices that are the same between this and other. For vertices that are different, keeps
+ * the values from `other`.
+ */
+ def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
+ this.zipVertexPartitions(other) { (thisPart, otherPart) =>
+ thisPart.diff(otherPart)
+ }
+ }
+
+ /**
+ * Inner join this VertexSet with another VertexSet which has the
+ * same Index. This function will fail if both VertexSets do not
+ * share the same index. The resulting vertex set will only contain
+ * vertices that are in both this and the other vertex set.
+ *
+ * @tparam VD2 the attribute type of the other VertexSet
+ * @tparam VD3 the attribute type of the resulting VertexSet
+ *
+ * @param other the other VertexSet with which to join.
+ * @param f the function mapping a vertex id and its attributes in
+ * this and the other vertex set to a new vertex attribute.
+ * @return a VertexRDD containing only the vertices in both this
+ * and the other VertexSet and with tuple attributes.
+ */
+ def zipJoin[VD2: ClassManifest, VD3: ClassManifest]
+ (other: VertexRDD[VD2])(f: (Vid, VD, VD2) => VD3): VertexRDD[VD3] = {
+ this.zipVertexPartitions(other) { (thisPart, otherPart) =>
+ thisPart.join(otherPart)(f)
+ }
+ }
+
+ /**
+ * Left join this VertexSet with another VertexSet which has the
+ * same Index. This function will fail if both VertexSets do not
+ * share the same index. The resulting vertex set contains an entry
+ * for each vertex in this set. If the other VertexSet is missing
+ * any vertex in this VertexSet then a `None` attribute is generated
+ *
+ * @tparam VD2 the attribute type of the other VertexSet
+ * @tparam VD3 the attribute type of the resulting VertexSet
+ *
+ * @param other the other VertexSet with which to join.
+ * @param f the function mapping a vertex id and its attributes in
+ * this and the other vertex set to a new vertex attribute.
+ * @return a VertexRDD containing all the vertices in this
+ * VertexSet with `None` attributes used for Vertices missing in the
+ * other VertexSet.
+ *
+ */
+ def leftZipJoin[VD2: ClassManifest, VD3: ClassManifest]
+ (other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
+ this.zipVertexPartitions(other) { (thisPart, otherPart) =>
+ thisPart.leftJoin(otherPart)(f)
+ }
+ }
+
+ /**
+ * Left join this VertexRDD with an RDD containing vertex attribute
+ * pairs. If the other RDD is backed by a VertexRDD with the same
+ * index than the efficient leftZipJoin implementation is used. The
+ * resulting vertex set contains an entry for each vertex in this
+ * set. If the other VertexRDD is missing any vertex in this
+ * VertexRDD then a `None` attribute is generated.
+ *
+ * If there are duplicates, the vertex is picked at random.
+ *
+ * @tparam VD2 the attribute type of the other VertexRDD
+ * @tparam VD3 the attribute type of the resulting VertexRDD
+ *
+ * @param other the other VertexRDD with which to join.
+ * @param f the function mapping a vertex id and its attributes in
+ * this and the other vertex set to a new vertex attribute.
+ * @return a VertexRDD containing all the vertices in this
+ * VertexRDD with the attribute emitted by f.
+ */
+ def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
+ (other: RDD[(Vid, VD2)])
+ (f: (Vid, VD, Option[VD2]) => VD3)
+ : VertexRDD[VD3] =
+ {
+ // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
+ // If the other set is a VertexRDD then we use the much more efficient leftZipJoin
+ other match {
+ case other: VertexRDD[_] =>
+ leftZipJoin(other)(f)
+ case _ =>
+ new VertexRDD[VD3](
+ partitionsRDD.zipPartitions(
+ other.partitionBy(this.partitioner.get), preservesPartitioning = true)
+ { (part, msgs) =>
+ val vertexPartition: VertexPartition[VD] = part.next()
+ Iterator(vertexPartition.leftJoin(msgs)(f))
+ }
+ )
+ }
+ }
+
+ /**
+ * Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other`
+ * must have the same index.
+ */
+ def innerZipJoin[U: ClassManifest, VD2: ClassManifest](other: VertexRDD[U])
+ (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
+ this.zipVertexPartitions(other) { (thisPart, otherPart) =>
+ thisPart.innerJoin(otherPart)(f)
+ }
+ }
+
+ /**
+ * Replace vertices with corresponding vertices in `other`, and drop vertices without a
+ * corresponding vertex in `other`.
+ */
+ def innerJoin[U: ClassManifest, VD2: ClassManifest](other: RDD[(Vid, U)])
+ (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
+ // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
+ // If the other set is a VertexRDD then we use the much more efficient innerZipJoin
+ other match {
+ case other: VertexRDD[_] =>
+ innerZipJoin(other)(f)
+ case _ =>
+ new VertexRDD(
+ partitionsRDD.zipPartitions(
+ other.partitionBy(this.partitioner.get), preservesPartitioning = true)
+ { (part, msgs) =>
+ val vertexPartition: VertexPartition[VD] = part.next()
+ Iterator(vertexPartition.innerJoin(msgs)(f))
+ }
+ )
+ }
+ }
+
+ def aggregateUsingIndex[VD2: ClassManifest](
+ messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
+ {
+ val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
+ val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
+ val vertexPartition: VertexPartition[VD] = thisIter.next()
+ Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
+ }
+ new VertexRDD[VD2](parts)
+ }
+
+} // end of VertexRDD
+
+
+/**
+ * The VertexRDD singleton is used to construct VertexRDDs
+ */
+object VertexRDD {
+
+ /**
+ * Construct a vertex set from an RDD of vertex-attribute pairs.
+ * Duplicate entries are removed arbitrarily.
+ *
+ * @tparam VD the vertex attribute type
+ *
+ * @param rdd the collection of vertex-attribute pairs
+ */
+ def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)]): VertexRDD[VD] = {
+ val partitioned: RDD[(Vid, VD)] = rdd.partitioner match {
+ case Some(p) => rdd
+ case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+ }
+ val vertexPartitions = partitioned.mapPartitions(
+ iter => Iterator(VertexPartition(iter)),
+ preservesPartitioning = true)
+ new VertexRDD(vertexPartitions)
+ }
+
+ /**
+ * Construct a vertex set from an RDD of vertex-attribute pairs.
+ * Duplicate entries are merged using mergeFunc.
+ *
+ * @tparam VD the vertex attribute type
+ *
+ * @param rdd the collection of vertex-attribute pairs
+ * @param mergeFunc the associative, commutative merge function.
+ */
+ def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
+ {
+ val partitioned: RDD[(Vid, VD)] = rdd.partitioner match {
+ case Some(p) => rdd
+ case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+ }
+ val vertexPartitions = partitioned.mapPartitions(
+ iter => Iterator(VertexPartition(iter)),
+ preservesPartitioning = true)
+ new VertexRDD(vertexPartitions)
+ }
+
+ def apply[VD: ClassManifest](vids: RDD[Vid], rdd: RDD[(Vid, VD)], defaultVal: VD)
+ : VertexRDD[VD] =
+ {
+ VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
+ value.getOrElse(default)
+ }
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
deleted file mode 100644
index ed70402a6f..0000000000
--- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
+++ /dev/null
@@ -1,593 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graph
-
-import org.apache.spark._
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
-
-import org.apache.spark.graph.impl.AggregationMsg
-import org.apache.spark.graph.impl.MsgRDDFunctions._
-import org.apache.spark.graph.impl.VertexPartition
-
-/**
- * Maintains the per-partition mapping from vertex id to the corresponding
- * location in the per-partition values array. This class is meant to be an
- * opaque type.
- *
- */
-class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
- /**
- * The persist function behaves like the standard RDD persist
- */
- def persist(newLevel: StorageLevel): VertexSetIndex = {
- rdd.persist(newLevel)
- return this
- }
-
- /**
- * Returns the partitioner object of the underlying RDD. This is
- * used by the VertexSetRDD to partition the values RDD.
- */
- def partitioner: Partitioner = rdd.partitioner.get
-} // end of VertexSetIndex
-
-/**
- * A `VertexSetRDD[VD]` extends the `RDD[(Vid, VD)]` by ensuring that there is
- * only one entry for each vertex and by pre-indexing the entries for fast,
- * efficient joins.
- *
- * @tparam VD the vertex attribute associated with each vertex in the set.
- *
- * To construct a `VertexSetRDD` use the singleton object:
- *
- * @example Construct a `VertexSetRDD` from a plain RDD
- * {{{
- * // Construct an intial vertex set
- * val someData: RDD[(Vid, SomeType)] = loadData(someFile)
- * val vset = VertexSetRDD(someData)
- * // If there were redundant values in someData we would use a reduceFunc
- * val vset2 = VertexSetRDD(someData, reduceFunc)
- * // Finally we can use the VertexSetRDD to index another dataset
- * val otherData: RDD[(Vid, OtherType)] = loadData(otherFile)
- * val vset3 = VertexSetRDD(otherData, vset.index)
- * // Now we can construct very fast joins between the two sets
- * val vset4: VertexSetRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
- * }}}
- *
- */
-class VertexSetRDD[@specialized VD: ClassManifest](
- @transient val partitionsRDD: RDD[VertexPartition[VD]])
- extends RDD[(Vid, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
-
- /**
- * The `VertexSetIndex` representing the layout of this `VertexSetRDD`.
- */
- def index = new VertexSetIndex(partitionsRDD.mapPartitions(_.map(_.index),
- preservesPartitioning = true))
-
- /**
- * Construct a new VertexSetRDD that is indexed by only the keys in the RDD.
- * The resulting VertexSet will be based on a different index and can
- * no longer be quickly joined with this RDD.
- */
- def reindex(): VertexSetRDD[VD] = VertexSetRDD(this)
-
- /**
- * An internal representation which joins the block indices with the values
- * This is used by the compute function to emulate `RDD[(Vid, VD)]`
- */
- protected[spark] val tuples = partitionsRDD.flatMap(_.iterator)
-
- /**
- * The partitioner is defined by the index.
- */
- override val partitioner = partitionsRDD.partitioner
-
- /**
- * The actual partitions are defined by the tuples.
- */
- override def getPartitions: Array[Partition] = tuples.partitions
-
- /**
- * The preferred locations are computed based on the preferred
- * locations of the tuples.
- */
- override def getPreferredLocations(s: Partition): Seq[String] =
- tuples.preferredLocations(s)
-
- /**
- * Caching a VertexSetRDD causes the index and values to be cached separately.
- */
- override def persist(newLevel: StorageLevel): VertexSetRDD[VD] = {
- partitionsRDD.persist(newLevel)
- this
- }
-
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- override def persist(): VertexSetRDD[VD] = persist(StorageLevel.MEMORY_ONLY)
-
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- override def cache(): VertexSetRDD[VD] = persist()
-
- /**
- * Provide the `RDD[(Vid, VD)]` equivalent output.
- */
- override def compute(part: Partition, context: TaskContext): Iterator[(Vid, VD)] =
- tuples.compute(part, context)
-
- /**
- * Return a new VertexSetRDD by applying a function to each VertexPartition of
- * this RDD.
- */
- def mapVertexPartitions[VD2: ClassManifest](
- f: VertexPartition[VD] => VertexPartition[VD2]): VertexSetRDD[VD2] = {
- val cleanF = sparkContext.clean(f)
- val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
- new VertexSetRDD(newPartitionsRDD)
- }
-
- /**
- * Return a new VertexSetRDD by applying a function to corresponding
- * VertexPartitions of this VertexSetRDD and another one.
- */
- def zipVertexPartitions[VD2: ClassManifest, VD3: ClassManifest]
- (other: VertexSetRDD[VD2])
- (f: (VertexPartition[VD], VertexPartition[VD2]) => VertexPartition[VD3]): VertexSetRDD[VD3] = {
- val cleanF = sparkContext.clean(f)
- val newPartitionsRDD = partitionsRDD.zipPartitions(
- other.partitionsRDD, preservesPartitioning = true
- ) {
- (thisIter, otherIter) =>
- val thisPart = thisIter.next()
- val otherPart = otherIter.next()
- Iterator(cleanF(thisPart, otherPart))
- }
- new VertexSetRDD(newPartitionsRDD)
- }
-
- /**
- * Restrict the vertex set to the set of vertices satisfying the
- * given predicate.
- *
- * @param pred the user defined predicate, which takes a tuple to conform to
- * the RDD[(Vid, VD)] interface
- *
- * @note The vertex set preserves the original index structure
- * which means that the returned RDD can be easily joined with
- * the original vertex-set. Furthermore, the filter only
- * modifies the bitmap index and so no new values are allocated.
- */
- override def filter(pred: Tuple2[Vid, VD] => Boolean): VertexSetRDD[VD] =
- this.mapVertexPartitions(_.filter(Function.untupled(pred)))
-
- /**
- * Pass each vertex attribute through a map function and retain the
- * original RDD's partitioning and index.
- *
- * @tparam VD2 the type returned by the map function
- *
- * @param f the function applied to each value in the RDD
- * @return a new VertexSetRDD with values obtained by applying `f` to
- * each of the entries in the original VertexSet. The resulting
- * VertexSetRDD retains the same index.
- */
- def mapValues[VD2: ClassManifest](f: VD => VD2): VertexSetRDD[VD2] =
- this.mapVertexPartitions(_.map { case (vid, attr) => f(attr) })
-
- /**
- * Fill in missing values for all vertices in the index.
- *
- * @param missingValue the value to be used for vertices in the
- * index that don't currently have values.
- * @return A VertexSetRDD with a value for all vertices.
- */
- def fillMissing(missingValue: VD): VertexSetRDD[VD] = {
- this.mapVertexPartitions { part =>
- // Allocate a new values array with missing value as the default
- val newValues = Array.fill(part.values.size)(missingValue)
- // Copy over the old values
- part.mask.iterator.foreach { ind =>
- newValues(ind) = part.values(ind)
- }
- // Create a new mask with all vertices in the index
- val newMask = part.index.getBitSet
- new VertexPartition(part.index, newValues, newMask)
- }
- }
-
- /**
- * Pass each vertex attribute along with the vertex id through a map
- * function and retain the original RDD's partitioning and index.
- *
- * @tparam VD2 the type returned by the map function
- *
- * @param f the function applied to each vertex id and vertex
- * attribute in the RDD
- * @return a new VertexSet with values obtained by applying `f` to
- * each of the entries in the original VertexSet. The resulting
- * VertexSetRDD retains the same index.
- */
- def mapValuesWithKeys[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexSetRDD[VD2] = {
- this.mapVertexPartitions { part =>
- // Construct a view of the map transformation
- val newValues = new Array[VD2](part.index.capacity)
- part.mask.iterator.foreach { ind =>
- newValues(ind) = f(part.index.getValueSafe(ind), part.values(ind))
- }
- new VertexPartition(part.index, newValues, part.mask)
- }
- } // end of mapValuesWithKeys
-
- /**
- * Inner join this VertexSet with another VertexSet which has the
- * same Index. This function will fail if both VertexSets do not
- * share the same index. The resulting vertex set will only contain
- * vertices that are in both this and the other vertex set.
- *
- * @tparam VD2 the attribute type of the other VertexSet
- * @tparam VD3 the attribute type of the resulting VertexSet
- *
- * @param other the other VertexSet with which to join.
- * @param f the function mapping a vertex id and its attributes in
- * this and the other vertex set to a new vertex attribute.
- * @return a VertexSetRDD containing only the vertices in both this
- * and the other VertexSet and with tuple attributes.
- *
- */
- def zipJoin[VD2: ClassManifest, VD3: ClassManifest]
- (other: VertexSetRDD[VD2])(f: (Vid, VD, VD2) => VD3): VertexSetRDD[VD3] = {
- this.zipVertexPartitions(other) {
- (thisPart, otherPart) =>
- if (thisPart.index != otherPart.index) {
- throw new SparkException("can't zip join VertexSetRDDs with different indexes")
- }
- val newValues = new Array[VD3](thisPart.index.capacity)
- val newMask = thisPart.mask & otherPart.mask
- newMask.iterator.foreach { ind =>
- newValues(ind) =
- f(thisPart.index.getValueSafe(ind), thisPart.values(ind), otherPart.values(ind))
- }
- new VertexPartition(thisPart.index, newValues, newMask)
- }
- }
-
- /**
- * Inner join this VertexSet with another VertexSet which has the
- * same Index. This function will fail if both VertexSets do not
- * share the same index.
- *
- * @param other the vertex set to join with this vertex set
- * @param f the function mapping a vertex id and its attributes in
- * this and the other vertex set to a collection of tuples.
- * @tparam VD2 the type of the other vertex set attributes
- * @tparam VD3 the type of the tuples emitted by `f`
- * @return an RDD containing the tuples emitted by `f`
- */
- def zipJoinFlatMap[VD2: ClassManifest, VD3: ClassManifest]
- (other: VertexSetRDD[VD2])
- (f: (Vid, VD, VD2) => Iterator[VD3]): RDD[VD3] = {
- val cleanF = sparkContext.clean(f)
- partitionsRDD.zipPartitions(other.partitionsRDD) {
- (thisPartIter, otherPartIter) =>
- val thisPart = thisPartIter.next()
- val otherPart = otherPartIter.next()
- if (thisPart.index != otherPart.index) {
- throw new SparkException("can't zip join VertexSetRDDs with different indexes")
- }
- (thisPart.mask & otherPart.mask).iterator.flatMap { ind =>
- cleanF(thisPart.index.getValueSafe(ind), thisPart.values(ind), otherPart.values(ind))
- }
- }
- }
-
- /**
- * Left join this VertexSet with another VertexSet which has the
- * same Index. This function will fail if both VertexSets do not
- * share the same index. The resulting vertex set contains an entry
- * for each vertex in this set. If the other VertexSet is missing
- * any vertex in this VertexSet then a `None` attribute is generated
- *
- * @tparam VD2 the attribute type of the other VertexSet
- * @tparam VD3 the attribute type of the resulting VertexSet
- *
- * @param other the other VertexSet with which to join.
- * @param f the function mapping a vertex id and its attributes in
- * this and the other vertex set to a new vertex attribute.
- * @return a VertexSetRDD containing all the vertices in this
- * VertexSet with `None` attributes used for Vertices missing in the
- * other VertexSet.
- *
- */
- def leftZipJoin[VD2: ClassManifest, VD3: ClassManifest]
- (other: VertexSetRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexSetRDD[VD3] = {
- this.zipVertexPartitions(other) {
- (thisPart, otherPart) =>
- if (thisPart.index != otherPart.index) {
- throw new SparkException("can't zip join VertexSetRDDs with different indexes")
- }
- val newValues = new Array[VD3](thisPart.index.capacity)
- thisPart.mask.iterator.foreach { ind =>
- val otherV = if (otherPart.mask.get(ind)) Option(otherPart.values(ind)) else None
- newValues(ind) = f(
- thisPart.index.getValueSafe(ind), thisPart.values(ind), otherV)
- }
- new VertexPartition(thisPart.index, newValues, thisPart.mask)
- }
- } // end of leftZipJoin
-
-
- /**
- * Left join this VertexSet with an RDD containing vertex attribute
- * pairs. If the other RDD is backed by a VertexSet with the same
- * index than the efficient leftZipJoin implementation is used. The
- * resulting vertex set contains an entry for each vertex in this
- * set. If the other VertexSet is missing any vertex in this
- * VertexSet then a `None` attribute is generated
- *
- * @tparam VD2 the attribute type of the other VertexSet
- * @tparam VD2 the attribute type of the resulting VertexSet
- *
- * @param other the other VertexSet with which to join.
- * @param f the function mapping a vertex id and its attributes in
- * this and the other vertex set to a new vertex attribute.
- * @param merge the function used combine duplicate vertex
- * attributes
- * @return a VertexSetRDD containing all the vertices in this
- * VertexSet with the attribute emitted by f.
- *
- */
- def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
- (other: RDD[(Vid, VD2)])
- (f: (Vid, VD, Option[VD2]) => VD3, merge: (VD2, VD2) => VD2 = (a: VD2, b: VD2) => a)
- : VertexSetRDD[VD3] = {
- // Test if the other vertex is a VertexSetRDD to choose the optimal
- // join strategy
- other match {
- // If the other set is a VertexSetRDD then we use the much more efficient
- // leftZipJoin
- case other: VertexSetRDD[_] => {
- leftZipJoin(other)(f)
- }
- case _ => {
- val indexedOther: VertexSetRDD[VD2] = VertexSetRDD(other, this.index, merge)
- leftZipJoin(indexedOther)(f)
- }
- }
- } // end of leftJoin
-
-} // end of VertexSetRDD
-
-
-/**
- * The VertexSetRDD singleton is used to construct VertexSets
- */
-object VertexSetRDD {
-
- /**
- * Construct a vertex set from an RDD of vertex-attribute pairs.
- * Duplicate entries are removed arbitrarily.
- *
- * @tparam VD the vertex attribute type
- *
- * @param rdd the collection of vertex-attribute pairs
- */
- def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)]): VertexSetRDD[VD] =
- apply(rdd, (a: VD, b: VD) => a)
-
- /**
- * Construct a vertex set from an RDD of vertex-attribute pairs
- * where duplicate entries are merged using the reduceFunc
- *
- * @tparam VD the vertex attribute type
- *
- * @param rdd the collection of vertex-attribute pairs
- * @param reduceFunc the function used to merge attributes of
- * duplicate vertices.
- */
- def apply[VD: ClassManifest](
- rdd: RDD[(Vid, VD)], reduceFunc: (VD, VD) => VD): VertexSetRDD[VD] = {
- val cReduceFunc = rdd.context.clean(reduceFunc)
- // Preaggregate and shuffle if necessary
- val preAgg = rdd.partitioner match {
- case Some(p) => rdd
- case None =>
- val partitioner = new HashPartitioner(rdd.partitions.size)
- // Preaggregation.
- val aggregator = new Aggregator[Vid, VD, VD](v => v, cReduceFunc, cReduceFunc)
- rdd.mapPartitions(aggregator.combineValuesByKey, true).partitionBy(partitioner)
- }
-
- val partitionsRDD = preAgg.mapPartitions(iter => {
- val hashMap = new PrimitiveKeyOpenHashMap[Vid, VD]
- for ((k, v) <- iter) {
- hashMap.setMerge(k, v, cReduceFunc)
- }
- val part = new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
- Iterator(part)
- }, preservesPartitioning = true).cache
- new VertexSetRDD(partitionsRDD)
- } // end of apply
-
- /**
- * Construct a vertex set from an RDD using an existing index.
- *
- * @note duplicate vertices are discarded arbitrarily
- *
- * @tparam VD the vertex attribute type
- * @param rdd the rdd containing vertices
- * @param indexPrototype a VertexSetRDD whose indexes will be reused. The
- * indexes must be a superset of the vertices in rdd
- * in RDD
- */
- def apply[VD: ClassManifest](
- rdd: RDD[(Vid, VD)], index: VertexSetIndex): VertexSetRDD[VD] =
- apply(rdd, index, (a: VD, b: VD) => a)
-
- /**
- * Construct a vertex set from an RDD using an existing index and a
- * user defined `combiner` to merge duplicate vertices.
- *
- * @tparam VD the vertex attribute type
- * @param rdd the rdd containing vertices
- * @param indexPrototype a VertexSetRDD whose indexes will be reused. The
- * indexes must be a superset of the vertices in rdd
- * @param reduceFunc the user defined reduce function used to merge
- * duplicate vertex attributes.
- */
- def apply[VD: ClassManifest](
- rdd: RDD[(Vid, VD)],
- index: VertexSetIndex,
- reduceFunc: (VD, VD) => VD): VertexSetRDD[VD] =
- apply(rdd, index, (v: VD) => v, reduceFunc, reduceFunc)
-
- /**
- * Construct a vertex set from an RDD of Product2[Vid, VD]
- *
- * @tparam VD the vertex attribute type
- * @param rdd the rdd containing vertices
- * @param indexPrototype a VertexSetRDD whose indexes will be reused. The
- * indexes must be a superset of the vertices in rdd
- * @param reduceFunc the user defined reduce function used to merge
- * duplicate vertex attributes.
- */
- private[spark] def aggregate[VD: ClassManifest, VidVDPair <: Product2[Vid, VD] : ClassManifest](
- rdd: RDD[VidVDPair],
- index: VertexSetIndex,
- reduceFunc: (VD, VD) => VD): VertexSetRDD[VD] = {
-
- val cReduceFunc = rdd.context.clean(reduceFunc)
- assert(rdd.partitioner == Some(index.partitioner))
- // Use the index to build the new values table
- val partitionsRDD = index.rdd.zipPartitions(
- rdd, preservesPartitioning = true
- ) {
- (indexIter, tblIter) =>
- // There is only one map
- val index = indexIter.next()
- val mask = new BitSet(index.capacity)
- val values = new Array[VD](index.capacity)
- for (vertexPair <- tblIter) {
- // Get the location of the key in the index
- val pos = index.getPos(vertexPair._1)
- if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
- throw new SparkException("Error: Trying to bind an external index " +
- "to an RDD which contains keys that are not in the index.")
- } else {
- // Get the actual index
- val ind = pos & OpenHashSet.POSITION_MASK
- // If this value has already been seen then merge
- if (mask.get(ind)) {
- values(ind) = cReduceFunc(values(ind), vertexPair._2)
- } else { // otherwise just store the new value
- mask.set(ind)
- values(ind) = vertexPair._2
- }
- }
- }
- Iterator(new VertexPartition(index, values, mask))
- }
-
- new VertexSetRDD(partitionsRDD)
- }
-
- /**
- * Construct a vertex set from an RDD using an existing index and a
- * user defined `combiner` to merge duplicate vertices.
- *
- * @tparam VD the vertex attribute type
- * @param rdd the rdd containing vertices
- * @param index the index which must be a superset of the vertices
- * in RDD
- * @param createCombiner a user defined function to create a combiner
- * from a vertex attribute
- * @param mergeValue a user defined function to merge a vertex
- * attribute into an existing combiner
- * @param mergeCombiners a user defined function to merge combiners
- *
- */
- def apply[VD: ClassManifest, C: ClassManifest](
- rdd: RDD[(Vid, VD)],
- index: VertexSetIndex,
- createCombiner: VD => C,
- mergeValue: (C, VD) => C,
- mergeCombiners: (C, C) => C): VertexSetRDD[C] = {
- val cCreateCombiner = rdd.context.clean(createCombiner)
- val cMergeValue = rdd.context.clean(mergeValue)
- val cMergeCombiners = rdd.context.clean(mergeCombiners)
- val partitioner = index.partitioner
- // Preaggregate and shuffle if necessary
- val partitioned =
- if (rdd.partitioner != Some(partitioner)) {
- // Preaggregation.
- val aggregator = new Aggregator[Vid, VD, C](cCreateCombiner, cMergeValue, cMergeCombiners)
- rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
- } else {
- rdd.mapValues(x => createCombiner(x))
- }
-
- aggregate(partitioned, index, mergeCombiners)
- } // end of apply
-
- /**
- * Construct an index of the unique vertices. The resulting index
- * can be used to build VertexSets over subsets of the vertices in
- * the input.
- */
- def makeIndex(
- keys: RDD[Vid], partitionerOpt: Option[Partitioner] = None): VertexSetIndex = {
- val partitioner = partitionerOpt match {
- case Some(p) => p
- case None => Partitioner.defaultPartitioner(keys)
- }
-
- val preAgg: RDD[(Vid, Unit)] = keys.mapPartitions(iter => {
- val keys = new VertexIdToIndexMap
- while (iter.hasNext) { keys.add(iter.next) }
- keys.iterator.map(k => (k, ()))
- }, preservesPartitioning = true).partitionBy(partitioner)
-
- val index = preAgg.mapPartitions(iter => {
- val index = new VertexIdToIndexMap
- while (iter.hasNext) { index.add(iter.next._1) }
- Iterator(index)
- }, preservesPartitioning = true).cache
-
- new VertexSetIndex(index)
- }
-
- /**
- * Create a VertexSetRDD with all vertices initialized to the default value.
- *
- * @param index an index over the set of vertices
- * @param defaultValue the default value to use when initializing the vertices
- * @tparam VD the type of the vertex attribute
- * @return
- */
- def apply[VD: ClassManifest](index: VertexSetIndex, defaultValue: VD): VertexSetRDD[VD] = {
- // Use the index to build the new values tables
- val partitionsRDD = index.rdd.mapPartitions(_.map { index =>
- val values = Array.fill(index.capacity)(defaultValue)
- val mask = index.getBitSet
- new VertexPartition(index, values, mask)
- }, preservesPartitioning = true)
- new VertexSetRDD(partitionsRDD)
- } // end of apply
-} // end of object VertexSetRDD
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala
new file mode 100644
index 0000000000..7cd947d2ba
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala
@@ -0,0 +1,37 @@
+package org.apache.spark.graph.algorithms
+
+import org.apache.spark.graph._
+
+
+object ConnectedComponents {
+ /**
+ * Compute the connected component membership of each vertex and return an RDD with the vertex
+ * value containing the lowest vertex id in the connected component containing that vertex.
+ *
+ * @tparam VD the vertex attribute type (discarded in the computation)
+ * @tparam ED the edge attribute type (preserved in the computation)
+ *
+ * @param graph the graph for which to compute the connected components
+ *
+ * @return a graph with vertex attributes containing the smallest vertex in each
+ * connected component
+ */
+ def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]): Graph[Vid, ED] = {
+ val ccGraph = graph.mapVertices { case (vid, _) => vid }
+
+ def sendMessage(edge: EdgeTriplet[Vid, ED]) = {
+ if (edge.srcAttr < edge.dstAttr) {
+ Iterator((edge.dstId, edge.srcAttr))
+ } else if (edge.srcAttr > edge.dstAttr) {
+ Iterator((edge.srcId, edge.dstAttr))
+ } else {
+ Iterator.empty
+ }
+ }
+ val initialMessage = Long.MaxValue
+ Pregel(ccGraph, initialMessage)(
+ vprog = (id, attr, msg) => math.min(attr, msg),
+ sendMsg = sendMessage,
+ mergeMsg = (a, b) => math.min(a, b))
+ } // end of connectedComponents
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala
new file mode 100644
index 0000000000..f77dffd7b4
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala
@@ -0,0 +1,205 @@
+package org.apache.spark.graph.algorithms
+
+import org.apache.spark.Logging
+import org.apache.spark.graph._
+
+
+object PageRank extends Logging {
+
+ /**
+ * Run PageRank for a fixed number of iterations returning a graph
+ * with vertex attributes containing the PageRank and edge
+ * attributes the normalized edge weight.
+ *
+ * The following PageRank fixed point is computed for each vertex.
+ *
+ * {{{
+ * var PR = Array.fill(n)( 1.0 )
+ * val oldPR = Array.fill(n)( 1.0 )
+ * for( iter <- 0 until numIter ) {
+ * swap(oldPR, PR)
+ * for( i <- 0 until n ) {
+ * PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
+ * }
+ * }
+ * }}}
+ *
+ * where `alpha` is the random reset probability (typically 0.15),
+ * `inNbrs[i]` is the set of neighbors whick link to `i` and
+ * `outDeg[j]` is the out degree of vertex `j`.
+ *
+ * Note that this is not the "normalized" PageRank and as a consequence pages that have no
+ * inlinks will have a PageRank of alpha.
+ *
+ * @tparam VD the original vertex attribute (not used)
+ * @tparam ED the original edge attribute (not used)
+ *
+ * @param graph the graph on which to compute PageRank
+ * @param numIter the number of iterations of PageRank to run
+ * @param resetProb the random reset probability (alpha)
+ *
+ * @return the graph containing with each vertex containing the PageRank and each edge
+ * containing the normalized weight.
+ *
+ */
+ def run[VD: Manifest, ED: Manifest](
+ graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
+ {
+
+ /**
+ * Initialize the pagerankGraph with each edge attribute having
+ * weight 1/outDegree and each vertex with attribute 1.0.
+ */
+ val pagerankGraph: Graph[Double, Double] = graph
+ // Associate the degree with each vertex
+ .outerJoinVertices(graph.outDegrees){
+ (vid, vdata, deg) => deg.getOrElse(0)
+ }
+ // Set the weight on the edges based on the degree
+ .mapTriplets( e => 1.0 / e.srcAttr )
+ // Set the vertex attributes to the initial pagerank values
+ .mapVertices( (id, attr) => 1.0 )
+
+ // Display statistics about pagerank
+ logInfo(pagerankGraph.statistics.toString)
+
+ // Define the three functions needed to implement PageRank in the GraphX
+ // version of Pregel
+ def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
+ resetProb + (1.0 - resetProb) * msgSum
+ def sendMessage(edge: EdgeTriplet[Double, Double]) =
+ Iterator((edge.dstId, edge.srcAttr * edge.attr))
+ def messageCombiner(a: Double, b: Double): Double = a + b
+ // The initial message received by all vertices in PageRank
+ val initialMessage = 0.0
+
+ // Execute pregel for a fixed number of iterations.
+ Pregel(pagerankGraph, initialMessage, numIter)(
+ vertexProgram, sendMessage, messageCombiner)
+ }
+
+ /**
+ * Run a dynamic version of PageRank returning a graph with vertex attributes containing the
+ * PageRank and edge attributes containing the normalized edge weight.
+ *
+ * {{{
+ * var PR = Array.fill(n)( 1.0 )
+ * val oldPR = Array.fill(n)( 0.0 )
+ * while( max(abs(PR - oldPr)) > tol ) {
+ * swap(oldPR, PR)
+ * for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) {
+ * PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
+ * }
+ * }
+ * }}}
+ *
+ * where `alpha` is the random reset probability (typically 0.15), `inNbrs[i]` is the set of
+ * neighbors whick link to `i` and `outDeg[j]` is the out degree of vertex `j`.
+ *
+ * Note that this is not the "normalized" PageRank and as a consequence pages that have no
+ * inlinks will have a PageRank of alpha.
+ *
+ * @tparam VD the original vertex attribute (not used)
+ * @tparam ED the original edge attribute (not used)
+ *
+ * @param graph the graph on which to compute PageRank
+ * @param tol the tolerance allowed at convergence (smaller => more * accurate).
+ * @param resetProb the random reset probability (alpha)
+ *
+ * @return the graph containing with each vertex containing the PageRank and each edge
+ * containing the normalized weight.
+ */
+ def runUntillConvergence[VD: Manifest, ED: Manifest](
+ graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
+ {
+ // Initialize the pagerankGraph with each edge attribute
+ // having weight 1/outDegree and each vertex with attribute 1.0.
+ val pagerankGraph: Graph[(Double, Double), Double] = graph
+ // Associate the degree with each vertex
+ .outerJoinVertices(graph.outDegrees) {
+ (vid, vdata, deg) => deg.getOrElse(0)
+ }
+ // Set the weight on the edges based on the degree
+ .mapTriplets( e => 1.0 / e.srcAttr )
+ // Set the vertex attributes to (initalPR, delta = 0)
+ .mapVertices( (id, attr) => (0.0, 0.0) )
+
+ // Display statistics about pagerank
+ logInfo(pagerankGraph.statistics.toString)
+
+ // Define the three functions needed to implement PageRank in the GraphX
+ // version of Pregel
+ def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = {
+ val (oldPR, lastDelta) = attr
+ val newPR = oldPR + (1.0 - resetProb) * msgSum
+ (newPR, newPR - oldPR)
+ }
+
+ def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
+ if (edge.srcAttr._2 > tol) {
+ Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
+ } else {
+ Iterator.empty
+ }
+ }
+
+ def messageCombiner(a: Double, b: Double): Double = a + b
+
+ // The initial message received by all vertices in PageRank
+ val initialMessage = resetProb / (1.0 - resetProb)
+
+ // Execute a dynamic version of Pregel.
+ Pregel(pagerankGraph, initialMessage)(vertexProgram, sendMessage, messageCombiner)
+ .mapVertices((vid, attr) => attr._1)
+ } // end of deltaPageRank
+
+ def runStandalone[VD: Manifest, ED: Manifest](
+ graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): VertexRDD[Double] = {
+
+ // Initialize the ranks
+ var ranks: VertexRDD[Double] = graph.vertices.mapValues((vid, attr) => resetProb).cache()
+
+ // Initialize the delta graph where each vertex stores its delta and each edge knows its weight
+ var deltaGraph: Graph[Double, Double] =
+ graph.outerJoinVertices(graph.outDegrees)((vid, vdata, deg) => deg.getOrElse(0))
+ .mapTriplets(e => 1.0 / e.srcAttr)
+ .mapVertices((vid, degree) => resetProb).cache()
+ var numDeltas: Long = ranks.count()
+
+ var prevDeltas: Option[VertexRDD[Double]] = None
+
+ var i = 0
+ val weight = (1.0 - resetProb)
+ while (numDeltas > 0) {
+ // Compute new deltas. Only deltas that existed in the last round (i.e., were greater than
+ // `tol`) get to send messages; those that were less than `tol` would send messages less than
+ // `tol` as well.
+ val deltas = deltaGraph
+ .mapReduceTriplets[Double](
+ et => Iterator((et.dstId, et.srcAttr * et.attr * weight)),
+ _ + _,
+ prevDeltas.map((_, EdgeDirection.Out)))
+ .filter { case (vid, delta) => delta > tol }
+ .cache()
+ prevDeltas = Some(deltas)
+ numDeltas = deltas.count()
+ logInfo("Standalone PageRank: iter %d has %d deltas".format(i, numDeltas))
+
+ // Update deltaGraph with the deltas
+ deltaGraph = deltaGraph.outerJoinVertices(deltas) { (vid, old, newOpt) =>
+ newOpt.getOrElse(old)
+ }.cache()
+
+ // Update ranks
+ ranks = ranks.leftZipJoin(deltas) { (vid, oldRank, deltaOpt) =>
+ oldRank + deltaOpt.getOrElse(0.0)
+ }
+ ranks.foreach(x => {}) // force the iteration for ease of debugging
+
+ i += 1
+ }
+
+ ranks
+ }
+
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala
new file mode 100644
index 0000000000..4ddf0b1fd5
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala
@@ -0,0 +1,158 @@
+package org.apache.spark.graph.algorithms
+
+import org.apache.spark._
+import org.apache.spark.rdd._
+import org.apache.spark.graph._
+import scala.util.Random
+import org.apache.commons.math.linear._
+
+class VT ( // vertex type
+ var v1: RealVector, // v1: p for user node, q for item node
+ var v2: RealVector, // v2: pu + |N(u)|^(-0.5)*sum(y) for user node, y for item node
+ var bias: Double,
+ var norm: Double // only for user node
+) extends Serializable
+
+class Msg ( // message
+ var v1: RealVector,
+ var v2: RealVector,
+ var bias: Double
+) extends Serializable
+
+object Svdpp {
+ // implement SVD++ based on http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf
+
+ def run(edges: RDD[Edge[Double]]): Graph[VT, Double] = {
+ // defalut parameters
+ val rank = 10
+ val maxIters = 20
+ val minVal = 0.0
+ val maxVal = 5.0
+ val gamma1 = 0.007
+ val gamma2 = 0.007
+ val gamma6 = 0.005
+ val gamma7 = 0.015
+
+ def defaultF(rank: Int) = {
+ val v1 = new ArrayRealVector(rank)
+ val v2 = new ArrayRealVector(rank)
+ for (i <- 0 until rank) {
+ v1.setEntry(i, Random.nextDouble)
+ v2.setEntry(i, Random.nextDouble)
+ }
+ var vd = new VT(v1, v2, 0.0, 0.0)
+ vd
+ }
+
+ // calculate initial norm and bias
+ def mapF0(et: EdgeTriplet[VT, Double]): Iterator[(Vid, (Long, Double))] = {
+ assert(et.srcAttr != null && et.dstAttr != null)
+ Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr)))
+ }
+ def reduceF0(g1: (Long, Double), g2: (Long, Double)) = {
+ (g1._1 + g2._1, g1._2 + g2._2)
+ }
+ def updateF0(vid: Vid, vd: VT, msg: Option[(Long, Double)]) = {
+ if (msg.isDefined) {
+ vd.bias = msg.get._2 / msg.get._1
+ vd.norm = 1.0 / scala.math.sqrt(msg.get._1)
+ }
+ vd
+ }
+
+ // calculate global rating mean
+ val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
+ val u = rs / rc // global rating mean
+
+ // make graph
+ var g = Graph.fromEdges(edges, defaultF(rank)).cache()
+
+ // calculate initial norm and bias
+ val t0 = g.mapReduceTriplets(mapF0, reduceF0)
+ g.outerJoinVertices(t0) {updateF0}
+
+ // phase 1
+ def mapF1(et: EdgeTriplet[VT, Double]): Iterator[(Vid, RealVector)] = {
+ assert(et.srcAttr != null && et.dstAttr != null)
+ Iterator((et.srcId, et.dstAttr.v2)) // sum up y of connected item nodes
+ }
+ def reduceF1(g1: RealVector, g2: RealVector) = {
+ g1.add(g2)
+ }
+ def updateF1(vid: Vid, vd: VT, msg: Option[RealVector]) = {
+ if (msg.isDefined) {
+ vd.v2 = vd.v1.add(msg.get.mapMultiply(vd.norm)) // pu + |N(u)|^(-0.5)*sum(y)
+ }
+ vd
+ }
+
+ // phase 2
+ def mapF2(et: EdgeTriplet[VT, Double]): Iterator[(Vid, Msg)] = {
+ assert(et.srcAttr != null && et.dstAttr != null)
+ val usr = et.srcAttr
+ val itm = et.dstAttr
+ var p = usr.v1
+ var q = itm.v1
+ val itmBias = 0.0
+ val usrBias = 0.0
+ var pred = u + usr.bias + itm.bias + q.dotProduct(usr.v2)
+ pred = math.max(pred, minVal)
+ pred = math.min(pred, maxVal)
+ val err = et.attr - pred
+ val y = (q.mapMultiply(err*usr.norm)).subtract((usr.v2).mapMultiply(gamma7))
+ val newP = (q.mapMultiply(err)).subtract(p.mapMultiply(gamma7)) // for each connected item q
+ val newQ = (usr.v2.mapMultiply(err)).subtract(q.mapMultiply(gamma7))
+ Iterator((et.srcId, new Msg(newP, y, err - gamma6*usr.bias)), (et.dstId, new Msg(newQ, y, err - gamma6*itm.bias)))
+ }
+ def reduceF2(g1: Msg, g2: Msg):Msg = {
+ g1.v1 = g1.v1.add(g2.v1)
+ g1.v2 = g1.v2.add(g2.v2)
+ g1.bias += g2.bias
+ g1
+ }
+ def updateF2(vid: Vid, vd: VT, msg: Option[Msg]) = {
+ if (msg.isDefined) {
+ vd.v1 = vd.v1.add(msg.get.v1.mapMultiply(gamma2))
+ if (vid % 2 == 1) { // item node update y
+ vd.v2 = vd.v2.add(msg.get.v2.mapMultiply(gamma2))
+ }
+ vd.bias += msg.get.bias*gamma1
+ }
+ vd
+ }
+
+ for (i <- 0 until maxIters) {
+ // phase 1
+ val t1: VertexRDD[RealVector] = g.mapReduceTriplets(mapF1, reduceF1)
+ g.outerJoinVertices(t1) {updateF1}
+ // phase 2
+ val t2: VertexRDD[Msg] = g.mapReduceTriplets(mapF2, reduceF2)
+ g.outerJoinVertices(t2) {updateF2}
+ }
+
+ // calculate error on training set
+ def mapF3(et: EdgeTriplet[VT, Double]): Iterator[(Vid, Double)] = {
+ assert(et.srcAttr != null && et.dstAttr != null)
+ val usr = et.srcAttr
+ val itm = et.dstAttr
+ var p = usr.v1
+ var q = itm.v1
+ val itmBias = 0.0
+ val usrBias = 0.0
+ var pred = u + usr.bias + itm.bias + q.dotProduct(usr.v2)
+ pred = math.max(pred, minVal)
+ pred = math.min(pred, maxVal)
+ val err = (et.attr - pred)*(et.attr - pred)
+ Iterator((et.dstId, err))
+ }
+ def updateF3(vid: Vid, vd: VT, msg: Option[Double]) = {
+ if (msg.isDefined && vid % 2 == 1) { // item sum up the errors
+ vd.norm = msg.get
+ }
+ vd
+ }
+ val t3: VertexRDD[Double] = g.mapReduceTriplets(mapF3, _ + _)
+ g.outerJoinVertices(t3) {updateF3}
+ g
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
new file mode 100644
index 0000000000..b1cd3c47d0
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
@@ -0,0 +1,76 @@
+package org.apache.spark.graph.algorithms
+
+import org.apache.spark.graph._
+
+
+object TriangleCount {
+ /**
+ * Compute the number of triangles passing through each vertex.
+ *
+ * The algorithm is relatively straightforward and can be computed in three steps:
+ *
+ * 1) Compute the set of neighbors for each vertex
+ * 2) For each edge compute the intersection of the sets and send the
+ * count to both vertices.
+ * 3) Compute the sum at each vertex and divide by two since each
+ * triangle is counted twice.
+ *
+ *
+ * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned
+ * using Graph.partitionBy.
+ *
+ * @return
+ */
+ def run[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD,ED]): Graph[Int, ED] = {
+ // Remove redundant edges
+ val g = graph.groupEdges((a, b) => a).cache
+
+ // Construct set representations of the neighborhoods
+ val nbrSets: VertexRDD[VertexSet] =
+ g.collectNeighborIds(EdgeDirection.Both).mapValues { (vid, nbrs) =>
+ val set = new VertexSet(4)
+ var i = 0
+ while (i < nbrs.size) {
+ // prevent self cycle
+ if(nbrs(i) != vid) {
+ set.add(nbrs(i))
+ }
+ i += 1
+ }
+ set
+ }
+ // join the sets with the graph
+ val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
+ (vid, _, optSet) => optSet.getOrElse(null)
+ }
+ // Edge function computes intersection of smaller vertex with larger vertex
+ def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(Vid, Int)] = {
+ assert(et.srcAttr != null)
+ assert(et.dstAttr != null)
+ val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
+ (et.srcAttr, et.dstAttr)
+ } else {
+ (et.dstAttr, et.srcAttr)
+ }
+ val iter = smallSet.iterator
+ var counter: Int = 0
+ while (iter.hasNext) {
+ val vid = iter.next
+ if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { counter += 1 }
+ }
+ Iterator((et.srcId, counter), (et.dstId, counter))
+ }
+ // compute the intersection along edges
+ val counters: VertexRDD[Int] = setGraph.mapReduceTriplets(edgeFunc, _ + _)
+ // Merge counters with the graph and divide by two since each triangle is counted twice
+ g.outerJoinVertices(counters) {
+ (vid, _, optCounter: Option[Int]) =>
+ val dblCount = optCounter.getOrElse(0)
+ // double count should be even (divisible by two)
+ assert((dblCount & 1) == 0)
+ dblCount / 2
+ }
+
+ } // end of TriangleCount
+
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
index eb3fd60d74..bfdafcc542 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
@@ -1,29 +1,36 @@
package org.apache.spark.graph.impl
import org.apache.spark.graph._
-import org.apache.spark.util.collection.OpenHashMap
+import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
/**
- * A collection of edges stored in 3 large columnar arrays (src, dst, attribute).
+ * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are
+ * clustered by src.
*
* @param srcIds the source vertex id of each edge
* @param dstIds the destination vertex id of each edge
* @param data the attribute associated with each edge
+ * @param index a clustered index on source vertex id
* @tparam ED the edge attribute type.
*/
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest](
val srcIds: Array[Vid],
val dstIds: Array[Vid],
- val data: Array[ED]) {
+ val data: Array[ED],
+ val index: PrimitiveKeyOpenHashMap[Vid, Int]) {
/**
* Reverse all the edges in this partition.
*
- * @note No new data structures are created.
- *
* @return a new edge partition with all edges reversed.
*/
- def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data)
+ def reverse: EdgePartition[ED] = {
+ val builder = new EdgePartitionBuilder(size)
+ for (e <- iterator) {
+ builder.add(e.dstId, e.srcId, e.attr)
+ }
+ builder.toEdgePartition
+ }
/**
* Construct a new edge partition by applying the function f to all
@@ -46,7 +53,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
newData(i) = f(edge)
i += 1
}
- new EdgePartition(srcIds, dstIds, newData)
+ new EdgePartition(srcIds, dstIds, newData, index)
}
/**
@@ -54,17 +61,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*
* @param f an external state mutating user defined function.
*/
- def foreach(f: Edge[ED] => Unit) {
- val edge = new Edge[ED]
- val size = data.size
- var i = 0
- while (i < size) {
- edge.srcId = srcIds(i)
- edge.dstId = dstIds(i)
- edge.attr = data(i)
- f(edge)
- i += 1
- }
+ def foreach(f: Edge[ED] => Unit) {
+ iterator.foreach(f)
}
/**
@@ -75,21 +73,29 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* @return a new edge partition without duplicate edges
*/
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
- // Aggregate all matching edges in a hashmap
- val agg = new OpenHashMap[(Vid,Vid), ED]
- foreach { e => agg.setMerge((e.srcId, e.dstId), e.attr, merge) }
- // Populate new srcId, dstId, and data, arrays
- val newSrcIds = new Array[Vid](agg.size)
- val newDstIds = new Array[Vid](agg.size)
- val newData = new Array[ED](agg.size)
+ val builder = new EdgePartitionBuilder[ED]
+ var firstIter: Boolean = true
+ var currSrcId: Vid = nullValue[Vid]
+ var currDstId: Vid = nullValue[Vid]
+ var currAttr: ED = nullValue[ED]
var i = 0
- agg.foreach { kv =>
- newSrcIds(i) = kv._1._1
- newDstIds(i) = kv._1._2
- newData(i) = kv._2
+ while (i < size) {
+ if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
+ currAttr = merge(currAttr, data(i))
+ } else {
+ if (i > 0) {
+ builder.add(currSrcId, currDstId, currAttr)
+ }
+ currSrcId = srcIds(i)
+ currDstId = dstIds(i)
+ currAttr = data(i)
+ }
i += 1
}
- new EdgePartition(newSrcIds, newDstIds, newData)
+ if (size > 0) {
+ builder.add(currSrcId, currDstId, currAttr)
+ }
+ builder.toEdgePartition
}
/**
@@ -99,6 +105,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/
def size: Int = srcIds.size
+ /** The number of unique source vertices in the partition. */
+ def indexSize: Int = index.size
+
/**
* Get an iterator over the edges in this partition.
*
@@ -118,4 +127,34 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
edge
}
}
+
+ /**
+ * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The
+ * iterator is generated using an index scan, so it is efficient at skipping edges that don't
+ * match srcIdPred.
+ */
+ def indexIterator(srcIdPred: Vid => Boolean): Iterator[Edge[ED]] =
+ index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
+
+ /**
+ * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
+ * cluster must start at position `index`.
+ */
+ private def clusterIterator(srcId: Vid, index: Int) = new Iterator[Edge[ED]] {
+ private[this] val edge = new Edge[ED]
+ private[this] var pos = index
+
+ override def hasNext: Boolean = {
+ pos >= 0 && pos < EdgePartition.this.size && srcIds(pos) == srcId
+ }
+
+ override def next(): Edge[ED] = {
+ assert(srcIds(pos) == srcId)
+ edge.srcId = srcIds(pos)
+ edge.dstId = dstIds(pos)
+ edge.attr = data(pos)
+ pos += 1
+ edge
+ }
+ }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
index 355f8f0542..3876273369 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
@@ -1,27 +1,45 @@
package org.apache.spark.graph.impl
-import scala.collection.mutable.ArrayBuilder
+import scala.util.Sorting
+
import org.apache.spark.graph._
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
//private[graph]
-class EdgePartitionBuilder[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
-ED: ClassManifest]{
- val srcIds = new VertexArrayList
- val dstIds = new VertexArrayList
- var dataBuilder = ArrayBuilder.make[ED]
+class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassManifest](size: Int = 64) {
+ var edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */
def add(src: Vid, dst: Vid, d: ED) {
- srcIds.add(src)
- dstIds.add(dst)
- dataBuilder += d
+ edges += Edge(src, dst, d)
}
def toEdgePartition: EdgePartition[ED] = {
- new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result())
+ val edgeArray = edges.trim().array
+ Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
+ val srcIds = new Array[Vid](edgeArray.size)
+ val dstIds = new Array[Vid](edgeArray.size)
+ val data = new Array[ED](edgeArray.size)
+ val index = new PrimitiveKeyOpenHashMap[Vid, Int]
+ // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
+ // adding them to the index
+ if (edgeArray.length > 0) {
+ index.update(srcIds(0), 0)
+ var currSrcId: Vid = srcIds(0)
+ var i = 0
+ while (i < edgeArray.size) {
+ srcIds(i) = edgeArray(i).srcId
+ dstIds(i) = edgeArray(i).dstId
+ data(i) = edgeArray(i).attr
+ if (edgeArray(i).srcId != currSrcId) {
+ currSrcId = edgeArray(i).srcId
+ index.update(currSrcId, i)
+ }
+ i += 1
+ }
+ }
+ new EdgePartition(srcIds, dstIds, data, index)
}
-
-
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala
new file mode 100644
index 0000000000..c9e1e08153
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala
@@ -0,0 +1,41 @@
+package org.apache.spark.graph.impl
+
+import org.apache.spark.graph._
+import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
+
+
+/**
+ * The Iterator type returned when constructing edge triplets. This class technically could be
+ * an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
+ * debug / profile.
+ */
+private[impl]
+class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
+ val vidToIndex: VertexIdToIndexMap,
+ val vertexArray: Array[VD],
+ val edgePartition: EdgePartition[ED])
+ extends Iterator[EdgeTriplet[VD, ED]] {
+
+ // Current position in the array.
+ private var pos = 0
+
+ // A triplet object that this iterator.next() call returns. We reuse this object to avoid
+ // allocating too many temporary Java objects.
+ private val triplet = new EdgeTriplet[VD, ED]
+
+ private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
+
+ override def hasNext: Boolean = pos < edgePartition.size
+
+ override def next() = {
+ triplet.srcId = edgePartition.srcIds(pos)
+ // assert(vmap.containsKey(e.src.id))
+ triplet.srcAttr = vmap(triplet.srcId)
+ triplet.dstId = edgePartition.dstIds(pos)
+ // assert(vmap.containsKey(e.dst.id))
+ triplet.dstAttr = vmap(triplet.dstId)
+ triplet.attr = edgePartition.data(pos)
+ pos += 1
+ triplet
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 96a36331f5..2fe02718e9 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -1,66 +1,15 @@
package org.apache.spark.graph.impl
-import scala.collection.JavaConversions._
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
+import org.apache.spark.util.collection.PrimitiveVector
+import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.SparkContext._
-import org.apache.spark.HashPartitioner
-import org.apache.spark.util.ClosureCleaner
-import org.apache.spark.SparkException
-
-import org.apache.spark.Partitioner
import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._
import org.apache.spark.graph.impl.MsgRDDFunctions._
import org.apache.spark.graph.util.BytecodeUtils
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{ShuffledRDD, RDD}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
-
-
-/**
- * The Iterator type returned when constructing edge triplets
- */
-class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
- val vidToIndex: VertexIdToIndexMap,
- val vertexArray: Array[VD],
- val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
-
- private var pos = 0
- private val et = new EdgeTriplet[VD, ED]
- private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
-
- override def hasNext: Boolean = pos < edgePartition.size
- override def next() = {
- et.srcId = edgePartition.srcIds(pos)
- // assert(vmap.containsKey(e.src.id))
- et.srcAttr = vmap(et.srcId)
- et.dstId = edgePartition.dstIds(pos)
- // assert(vmap.containsKey(e.dst.id))
- et.dstAttr = vmap(et.dstId)
- et.attr = edgePartition.data(pos)
- pos += 1
- et
- }
-
- override def toList: List[EdgeTriplet[VD, ED]] = {
- val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
- val currentEdge = new EdgeTriplet[VD, ED]
- for (i <- (0 until edgePartition.size)) {
- currentEdge.srcId = edgePartition.srcIds(i)
- // assert(vmap.containsKey(e.src.id))
- currentEdge.srcAttr = vmap(currentEdge.srcId)
- currentEdge.dstId = edgePartition.dstIds(i)
- // assert(vmap.containsKey(e.dst.id))
- currentEdge.dstAttr = vmap(currentEdge.dstId)
- currentEdge.attr = edgePartition.data(i)
- lb += currentEdge
- }
- lb.toList
- }
-} // end of Edge Triplet Iterator
+import org.apache.spark.util.ClosureCleaner
/**
@@ -74,59 +23,95 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
* destinations. `vertexPlacement` specifies where each vertex will be
* replicated. `vTableReplicated` stores the replicated vertex attributes, which
* are co-partitioned with the relevant edges.
+ *
+ * mask in vertices means filter
+ * mask in vTableReplicated means active
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
- @transient val vTable: VertexSetRDD[VD],
- @transient val eTable: RDD[(Pid, EdgePartition[ED])],
+ @transient val vertices: VertexRDD[VD],
+ @transient val edges: EdgeRDD[ED],
@transient val vertexPlacement: VertexPlacement,
- @transient val partitioner: PartitionStrategy)
+ @transient val vTableReplicated: VTableReplicated[VD])
extends Graph[VD, ED] {
- def this() = this(null, null, null, null)
-
- @transient val vTableReplicated: VTableReplicated[VD] =
- new VTableReplicated(vTable, eTable, vertexPlacement)
+ def this(
+ vertices: VertexRDD[VD],
+ edges: EdgeRDD[ED],
+ vertexPlacement: VertexPlacement) = {
+ this(vertices, edges, vertexPlacement, new VTableReplicated(vertices, edges, vertexPlacement))
+ }
- /** Return a RDD of vertices. */
- @transient override val vertices = vTable
+ def this(
+ vertices: VertexRDD[VD],
+ edges: EdgeRDD[ED]) = {
+ this(vertices, edges, new VertexPlacement(edges, vertices))
+ }
- /** Return a RDD of edges. */
- @transient override val edges: RDD[Edge[ED]] =
- eTable.mapPartitions(_.next()._2.iterator, true)
+ /** Return a RDD that brings edges together with their source and destination vertices. */
+ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
+ val vdManifest = classManifest[VD]
+ val edManifest = classManifest[ED]
- /** Return a RDD that brings edges with its source and destination vertices together. */
- @transient override val triplets: RDD[EdgeTriplet[VD, ED]] =
- makeTriplets(vTableReplicated.bothAttrs, eTable)
+ edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (ePart, vPartIter) =>
+ val (_, vPart) = vPartIter.next()
+ new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest)
+ }
+ }
override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
- vTable.persist(newLevel)
- eTable.persist(newLevel)
- vertexPlacement.persist(newLevel)
+ vertices.persist(newLevel)
+ edges.persist(newLevel)
this
}
override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
+ override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
+ val numPartitions = edges.partitions.size
+ val edManifest = classManifest[ED]
+ val newEdges = new EdgeRDD(edges.map { e =>
+ val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
+
+ // Should we be using 3-tuple or an optimized class
+ new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
+ }
+ .partitionBy(new HashPartitioner(numPartitions))
+ .mapPartitionsWithIndex( { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED]()(edManifest)
+ iter.foreach { message =>
+ val data = message.data
+ builder.add(data._1, data._2, data._3)
+ }
+ val edgePartition = builder.toEdgePartition
+ Iterator((pid, edgePartition))
+ }, preservesPartitioning = true).cache())
+ new GraphImpl(vertices, newEdges)
+ }
+
override def statistics: Map[String, Any] = {
- val numVertices = this.numVertices
- val numEdges = this.numEdges
- val replicationRatioBothAttrs =
- vertexPlacement.bothAttrs.map(_.map(_.size).sum).sum / numVertices
- val replicationRatioSrcAttrOnly =
- vertexPlacement.srcAttrOnly.map(_.map(_.size).sum).sum / numVertices
- val replicationRatioDstAttrOnly =
- vertexPlacement.dstAttrOnly.map(_.map(_.size).sum).sum / numVertices
- val loadArray =
- eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges)
+ // Get the total number of vertices after replication, used to compute the replication ratio.
+ def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = {
+ vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble
+ }
+
+ val numVertices = this.ops.numVertices
+ val numEdges = this.ops.numEdges
+ val replicationRatioBoth = numReplicatedVertices(vertexPlacement.bothAttrs) / numVertices
+ val replicationRatioSrcOnly = numReplicatedVertices(vertexPlacement.srcAttrOnly) / numVertices
+ val replicationRatioDstOnly = numReplicatedVertices(vertexPlacement.dstAttrOnly) / numVertices
+ // One entry for each partition, indicate the total number of edges on that partition.
+ val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges)
val minLoad = loadArray.min
val maxLoad = loadArray.max
Map(
- "Num Vertices" -> numVertices, "Num Edges" -> numEdges,
- "Replication (both)" -> replicationRatioBothAttrs,
- "Replication (src only)" -> replicationRatioSrcAttrOnly,
- "Replication (dest only)" -> replicationRatioDstAttrOnly,
+ "Num Vertices" -> numVertices,
+ "Num Edges" -> numEdges,
+ "Replication (both)" -> replicationRatioBoth,
+ "Replication (src only)" -> replicationRatioSrcOnly,
+ "Replication (dest only)" -> replicationRatioDstOnly,
"Load Array" -> loadArray,
- "Min Load" -> minLoad, "Max Load" -> maxLoad)
+ "Min Load" -> minLoad,
+ "Max Load" -> maxLoad)
}
/**
@@ -137,7 +122,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
rdd: RDD[_],
indent: String = "",
visited: Map[Int, String] = Map.empty[Int, String]) {
- if(visited.contains(rdd.id)) {
+ if (visited.contains(rdd.id)) {
println(indent + visited(rdd.id))
println(indent)
} else {
@@ -155,57 +140,79 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
}
println("eTable ------------------------------------------")
- traverseLineage(eTable, " ")
- var visited = Map(eTable.id -> "eTable")
+ traverseLineage(edges, " ")
+ var visited = Map(edges.id -> "eTable")
println("\n\nvTable ------------------------------------------")
- traverseLineage(vTable, " ", visited)
- visited += (vTable.id -> "vTable")
+ traverseLineage(vertices, " ", visited)
+ visited += (vertices.id -> "vTable")
println("\n\nvertexPlacement.bothAttrs -------------------------------")
traverseLineage(vertexPlacement.bothAttrs, " ", visited)
visited += (vertexPlacement.bothAttrs.id -> "vertexPlacement.bothAttrs")
- println("\n\nvTableReplicated.bothAttrs ----------------")
- traverseLineage(vTableReplicated.bothAttrs, " ", visited)
- visited += (vTableReplicated.bothAttrs.id -> "vTableReplicated.bothAttrs")
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
println(visited)
- } // end of print lineage
-
- override def reverse: Graph[VD, ED] = {
- val newETable = eTable.mapPartitions(_.map { case (pid, epart) => (pid, epart.reverse) },
- preservesPartitioning = true)
- new GraphImpl(vTable, newETable, vertexPlacement, partitioner)
+ } // end of printLineage
+
+ override def reverse: Graph[VD, ED] =
+ new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vertexPlacement, vTableReplicated)
+
+ override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
+ if (classManifest[VD] equals classManifest[VD2]) {
+ // The map preserves type, so we can use incremental replication
+ val newVerts = vertices.mapVertexPartitions(_.map(f))
+ val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
+ val newVTableReplicated = new VTableReplicated[VD2](
+ changedVerts, edges, vertexPlacement,
+ Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
+ new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
+ } else {
+ // The map does not preserve type, so we must re-replicate all vertices
+ new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, vertexPlacement)
+ }
}
- override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] =
- new GraphImpl(vTable.mapVertexPartitions(_.map(f)), eTable, vertexPlacement, partitioner)
+ override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] =
+ new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vertexPlacement, vTableReplicated)
- override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = {
- val newETable = eTable.mapPartitions(_.map { case (pid, epart) => (pid, epart.map(f)) },
- preservesPartitioning = true)
- new GraphImpl(vTable, newETable, vertexPlacement, partitioner)
+ override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
+ // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
+ // manifest from GraphImpl (which would require serializing GraphImpl).
+ val vdManifest = classManifest[VD]
+ val newETable =
+ edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (edgePartition, vTableReplicatedIter) =>
+ val (pid, vPart) = vTableReplicatedIter.next()
+ val et = new EdgeTriplet[VD, ED]
+ val newEdgePartition = edgePartition.map { e =>
+ et.set(e)
+ et.srcAttr = vPart(e.srcId)
+ et.dstAttr = vPart(e.dstId)
+ f(et)
+ }
+ Iterator((pid, newEdgePartition))
+ }
+ new GraphImpl(vertices, new EdgeRDD(newETable), vertexPlacement, vTableReplicated)
}
- override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] =
- GraphImpl.mapTriplets(this, f)
-
override def subgraph(
- epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
- vpred: (Vid, VD) => Boolean = ((a,b) => true)): Graph[VD, ED] = {
+ epred: EdgeTriplet[VD, ED] => Boolean = x => true,
+ vpred: (Vid, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
// Filter the vertices, reusing the partitioner (but not the index) from
// this graph
- val newVTable = vTable.mapVertexPartitions(_.filter(vpred).reindex())
+ val newVTable = vertices.mapVertexPartitions(_.filter(vpred).reindex())
- // Restrict the set of edges to those that satisfy the vertex and the edge predicate.
- val newETable = createETable(
- triplets.filter(t => vpred(t.srcId, t.srcAttr) && vpred(t.dstId, t.dstAttr) && epred(t))
- .map(t => Edge(t.srcId, t.dstId, t.attr)), partitioner)
+ val edManifest = classManifest[ED]
- // Construct the VertexPlacement map
- val newVertexPlacement = new VertexPlacement(newETable, newVTable)
+ val newETable = new EdgeRDD[ED](triplets.filter { et =>
+ vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)
+ }.mapPartitionsWithIndex( { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED]()(edManifest)
+ iter.foreach { et => builder.add(et.srcId, et.dstId, et.attr) }
+ val edgePartition = builder.toEdgePartition
+ Iterator((pid, edgePartition))
+ }, preservesPartitioning = true)).cache()
- new GraphImpl(newVTable, newETable, newVertexPlacement, partitioner)
+ new GraphImpl(newVTable, newETable)
} // end of subgraph
override def mask[VD2: ClassManifest, ED2: ClassManifest] (
@@ -213,10 +220,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
ClosureCleaner.clean(merge)
- val newETable =
- eTable.mapPartitions({ _.map(p => (p._1, p._2.groupEdges(merge))) },
- preservesPartitioning = true)
- new GraphImpl(vTable, newETable, vertexPlacement, partitioner)
+ val newETable = edges.mapEdgePartitions(_.groupEdges(merge))
+ new GraphImpl(vertices, newETable, vertexPlacement, vTableReplicated)
}
//////////////////////////////////////////////////////////////////////////////////////////////////
@@ -225,14 +230,91 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
- reduceFunc: (A, A) => A): VertexSetRDD[A] =
- GraphImpl.mapReduceTriplets(this, mapFunc, reduceFunc)
+ reduceFunc: (A, A) => A,
+ activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
+
+ ClosureCleaner.clean(mapFunc)
+ ClosureCleaner.clean(reduceFunc)
+
+ // For each vertex, replicate its attribute only to partitions where it is
+ // in the relevant position in an edge.
+ val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
+ val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
+ val vs = activeSetOpt match {
+ case Some((activeSet, _)) => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
+ case None => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr)
+ }
+ val activeDirectionOpt = activeSetOpt.map(_._2)
+
+ // Map and combine.
+ val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) =>
+ val (_, vPart) = vTableReplicatedIter.next()
+
+ // Choose scan method
+ val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
+ val edgeIter = activeDirectionOpt match {
+ case Some(EdgeDirection.Both) =>
+ if (activeFraction < 0.8) {
+ edgePartition.indexIterator(srcVid => vPart.isActive(srcVid))
+ .filter(e => vPart.isActive(e.dstId))
+ } else {
+ edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
+ }
+ case Some(EdgeDirection.Out) =>
+ if (activeFraction < 0.8) {
+ edgePartition.indexIterator(srcVid => vPart.isActive(srcVid))
+ } else {
+ edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
+ }
+ case Some(EdgeDirection.In) =>
+ edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
+ case None =>
+ edgePartition.iterator
+ }
+
+ // Scan edges and run the map function
+ val et = new EdgeTriplet[VD, ED]
+ val mapOutputs = edgeIter.flatMap { e =>
+ et.set(e)
+ if (mapUsesSrcAttr) {
+ et.srcAttr = vPart(e.srcId)
+ }
+ if (mapUsesDstAttr) {
+ et.dstAttr = vPart(e.dstId)
+ }
+ mapFunc(et)
+ }
+ // Note: This doesn't allow users to send messages to arbitrary vertices.
+ vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
+ }
+
+ // do the final reduction reusing the index map
+ vertices.aggregateUsingIndex(preAgg, reduceFunc)
+ } // end of mapReduceTriplets
override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
- (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = {
- ClosureCleaner.clean(updateF)
- val newVTable = vTable.leftJoin(updates)(updateF)
- new GraphImpl(newVTable, eTable, vertexPlacement, partitioner)
+ (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = {
+ if (classManifest[VD] equals classManifest[VD2]) {
+ // updateF preserves type, so we can use incremental replication
+ val newVerts = vertices.leftJoin(updates)(updateF)
+ val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
+ val newVTableReplicated = new VTableReplicated[VD2](
+ changedVerts, edges, vertexPlacement,
+ Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
+ new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
+ } else {
+ // updateF does not preserve type, so we must re-replicate all vertices
+ val newVerts = vertices.leftJoin(updates)(updateF)
+ new GraphImpl(newVerts, edges, vertexPlacement)
+ }
+ }
+
+ private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
+ try {
+ BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
+ } catch {
+ case _: ClassNotFoundException => true // if we don't know, be conservative
+ }
}
} // end of class GraphImpl
@@ -241,70 +323,35 @@ object GraphImpl {
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
- defaultValue: VD,
- partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = {
- val etable = createETable(edges, partitionStrategy).cache
- // Get the set of all vids
- val vids = etable.mapPartitions(iter => {
- val (pid, epart) = iter.next()
- assert(!iter.hasNext)
- epart.iterator.flatMap(e => Iterator(e.srcId, e.dstId))
- }, preservesPartitioning = true)
- // Index the set of all vids
- val index = VertexSetRDD.makeIndex(vids)
- // Index the vertices and fill in missing attributes with the default
- val vtable = VertexSetRDD(index, defaultValue)
- val vertexPlacement = new VertexPlacement(etable, vtable)
- new GraphImpl(vtable, etable, vertexPlacement, partitionStrategy)
+ defaultVertexAttr: VD): GraphImpl[VD, ED] =
+ {
+ fromEdgeRDD(createETable(edges), defaultVertexAttr)
}
- // def apply[VD: ClassManifest, ED: ClassManifest](
- // vertices: RDD[(Vid, VD)],
- // edges: RDD[Edge[ED]],
- // defaultVertexAttr: VD): GraphImpl[VD,ED] = {
- // apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, RandomVertexCut())
- // }
-
- // def apply[VD: ClassManifest, ED: ClassManifest](
- // vertices: RDD[(Vid, VD)],
- // edges: RDD[Edge[ED]],
- // defaultVertexAttr: VD,
- // partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = {
- // apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, partitionStrategy)
- // }
-
- // def apply[VD: ClassManifest, ED: ClassManifest](
- // vertices: RDD[(Vid, VD)],
- // edges: RDD[Edge[ED]],
- // defaultVertexAttr: VD,
- // mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
- // apply(vertices, edges, defaultVertexAttr, mergeFunc, RandomVertexCut())
- // }
+ def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest](
+ edges: RDD[(Pid, EdgePartition[ED])],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ fromEdgeRDD(createETableFromEdgePartitions(edges), defaultVertexAttr)
+ }
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD,
- mergeFunc: (VD, VD) => VD,
- partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = {
+ defaultVertexAttr: VD): GraphImpl[VD, ED] =
+ {
+ val etable = createETable(edges).cache()
- vertices.cache
- val etable = createETable(edges, partitionStrategy).cache
- // Get the set of all vids, preserving partitions
+ // Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices)
- val implicitVids = etable.flatMap {
- case (pid, partition) => Array.concat(partition.srcIds, partition.dstIds)
- }.map(vid => (vid, ())).partitionBy(partitioner)
- val allVids = vertices.zipPartitions(implicitVids, preservesPartitioning = true) {
- (a, b) => a.map(_._1) ++ b.map(_._1)
+ val vPartitioned = vertices.partitionBy(partitioner)
+ val vidsFromEdges = collectVidsFromEdges(etable, partitioner)
+ val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
+ vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
}
- // Index the set of all vids
- val index = VertexSetRDD.makeIndex(allVids, Some(partitioner))
- // Index the vertices and fill in missing attributes with the default
- val vtable = VertexSetRDD(vertices, index, mergeFunc).fillMissing(defaultVertexAttr)
- val vertexPlacement = new VertexPlacement(etable, vtable)
- new GraphImpl(vtable, etable, vertexPlacement, partitionStrategy)
+ val vtable = VertexRDD(vids, vPartitioned, defaultVertexAttr)
+
+ new GraphImpl(vtable, etable)
}
/**
@@ -315,39 +362,21 @@ object GraphImpl {
* key-value pair: the key is the partition id, and the value is an EdgePartition object
* containing all the edges in a partition.
*/
- protected def createETable[ED: ClassManifest](
- edges: RDD[Edge[ED]],
- partitionStrategy: PartitionStrategy): RDD[(Pid, EdgePartition[ED])] = {
- // Get the number of partitions
- val numPartitions = edges.partitions.size
-
- edges.map { e =>
- val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
-
- // Should we be using 3-tuple or an optimized class
- new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
- }
- .partitionBy(new HashPartitioner(numPartitions))
- .mapPartitionsWithIndex( (pid, iter) => {
+ private def createETable[ED: ClassManifest](
+ edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
+ val eTable = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED]
- iter.foreach { message =>
- val data = message.data
- builder.add(data._1, data._2, data._3)
+ iter.foreach { e =>
+ builder.add(e.srcId, e.dstId, e.attr)
}
- val edgePartition = builder.toEdgePartition
- Iterator((pid, edgePartition))
- }, preservesPartitioning = true).cache()
+ Iterator((pid, builder.toEdgePartition))
+ }
+ new EdgeRDD(eTable)
}
- protected def makeTriplets[VD: ClassManifest, ED: ClassManifest](
- vTableReplicated: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))],
- eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = {
- eTable.zipPartitions(vTableReplicated) {
- (eTableIter, vTableReplicatedIter) =>
- val (_, edgePartition) = eTableIter.next()
- val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
- new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
- }
+ private def createETableFromEdgePartitions[ED: ClassManifest](
+ edges: RDD[(Pid, EdgePartition[ED])]): EdgeRDD[ED] = {
+ new EdgeRDD(edges)
}
def mask[VD: ClassManifest, ED: ClassManifest, VD2: ClassManifest, ED2: ClassManifest] (
@@ -382,95 +411,24 @@ object GraphImpl {
new GraphImpl(newVTable, newETable, newVertexPlacement, thisImpl.partitioner)
}
-
- protected def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
- g: GraphImpl[VD, ED],
- f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
- val newETable = g.eTable.zipPartitions(
- g.vTableReplicated.bothAttrs, preservesPartitioning = true
- ) {
- (eTableIter, vTableReplicatedIter) =>
- val (pid, edgePartition) = eTableIter.next()
- val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
- val et = new EdgeTriplet[VD, ED]
- val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
- val newEdgePartition = edgePartition.map { e =>
- et.set(e)
- et.srcAttr = vmap(e.srcId)
- et.dstAttr = vmap(e.dstId)
- f(et)
- }
- Iterator((pid, newEdgePartition))
- }
- new GraphImpl(g.vTable, newETable, g.vertexPlacement, g.partitioner)
- }
-
- protected def mapReduceTriplets[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
- g: GraphImpl[VD, ED],
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
- reduceFunc: (A, A) => A): VertexSetRDD[A] = {
- ClosureCleaner.clean(mapFunc)
- ClosureCleaner.clean(reduceFunc)
- // For each vertex, replicate its attribute only to partitions where it is
- // in the relevant position in an edge.
- val mapFuncUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
- val mapFuncUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
- // Map and preaggregate
- val preAgg = g.eTable.zipPartitions(
- g.vTableReplicated.get(mapFuncUsesSrcAttr, mapFuncUsesDstAttr)
- ) {
- (edgePartitionIter, vTableReplicatedIter) =>
- val (_, edgePartition) = edgePartitionIter.next()
- val (_, (vidToIndex, vertexArray)) = vTableReplicatedIter.next()
- assert(vidToIndex.capacity == vertexArray.size)
- val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
- // TODO(jegonzal): This doesn't allow users to send messages to arbitrary vertices.
- val msgArray = new Array[A](vertexArray.size)
- val msgBS = new BitSet(vertexArray.size)
- // Iterate over the partition
- val et = new EdgeTriplet[VD, ED]
-
- edgePartition.foreach { e =>
- et.set(e)
- if (mapFuncUsesSrcAttr) {
- et.srcAttr = vmap(e.srcId)
- }
- if (mapFuncUsesDstAttr) {
- et.dstAttr = vmap(e.dstId)
- }
- // TODO(rxin): rewrite the foreach using a simple while loop to speed things up.
- // Also given we are only allowing zero, one, or two messages, we can completely unroll
- // the for loop.
- mapFunc(et).foreach { case (vid, msg) =>
- // verify that the vid is valid
- assert(vid == et.srcId || vid == et.dstId)
- // Get the index of the key
- val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
- // Populate the aggregator map
- if (msgBS.get(ind)) {
- msgArray(ind) = reduceFunc(msgArray(ind), msg)
- } else {
- msgArray(ind) = msg
- msgBS.set(ind)
- }
- }
- }
- // construct an iterator of tuples Iterator[(Vid, A)]
- msgBS.iterator.map { ind =>
- new AggregationMsg[A](vidToIndex.getValue(ind), msgArray(ind))
- }
- }.partitionBy(g.vTable.partitioner.get)
- // do the final reduction reusing the index map
- VertexSetRDD.aggregate(preAgg, g.vTable.index, reduceFunc)
+ private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
+ edges: EdgeRDD[ED],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ edges.cache()
+ // Get the set of all vids
+ val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size))
+ // Create the VertexRDD.
+ val vtable = VertexRDD(vids.mapValues(x => defaultVertexAttr))
+ new GraphImpl(vtable, edges)
}
- private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest](
- closure: AnyRef, attrName: String): Boolean = {
- try {
- BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
- } catch {
- case _: ClassNotFoundException => true // if we don't know, be conservative
- }
+ /** Collects all vids mentioned in edges and partitions them by partitioner. */
+ private def collectVidsFromEdges(
+ edges: EdgeRDD[_],
+ partitioner: Partitioner): RDD[(Vid, Int)] = {
+ // TODO: Consider doing map side distinct before shuffle.
+ new ShuffledRDD[Vid, Int, (Vid, Int)](
+ edges.collectVids.map(vid => (vid, 0)), partitioner)
+ .setSerializer(classOf[VidMsgSerializer].getName)
}
-
} // end of object GraphImpl
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
index d0a5adb85c..c2e452cc72 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
@@ -19,17 +19,6 @@ class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
}
-class AggregationMsg[@specialized(Int, Long, Double, Boolean) T](var vid: Vid, var data: T)
- extends Product2[Vid, T] {
-
- override def _1 = vid
-
- override def _2 = data
-
- override def canEqual(that: Any): Boolean = that.isInstanceOf[AggregationMsg[_]]
-}
-
-
/**
* A message used to send a specific value to a partition.
* @param partition index of the target partition.
@@ -65,23 +54,6 @@ class VertexBroadcastMsgRDDFunctions[T: ClassManifest](self: RDD[VertexBroadcast
}
-class AggregationMessageRDDFunctions[T: ClassManifest](self: RDD[AggregationMsg[T]]) {
- def partitionBy(partitioner: Partitioner): RDD[AggregationMsg[T]] = {
- val rdd = new ShuffledRDD[Vid, T, AggregationMsg[T]](self, partitioner)
-
- // Set a custom serializer if the data is of int or double type.
- if (classManifest[T] == ClassManifest.Int) {
- rdd.setSerializer(classOf[IntAggMsgSerializer].getName)
- } else if (classManifest[T] == ClassManifest.Long) {
- rdd.setSerializer(classOf[LongAggMsgSerializer].getName)
- } else if (classManifest[T] == ClassManifest.Double) {
- rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName)
- }
- rdd
- }
-}
-
-
class MsgRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) {
/**
@@ -103,7 +75,17 @@ object MsgRDDFunctions {
new VertexBroadcastMsgRDDFunctions(rdd)
}
- implicit def rdd2aggMessageRDDFunctions[T: ClassManifest](rdd: RDD[AggregationMsg[T]]) = {
- new AggregationMessageRDDFunctions(rdd)
+ def partitionForAggregation[T: ClassManifest](msgs: RDD[(Vid, T)], partitioner: Partitioner) = {
+ val rdd = new ShuffledRDD[Vid, T, (Vid, T)](msgs, partitioner)
+
+ // Set a custom serializer if the data is of int or double type.
+ if (classManifest[T] == ClassManifest.Int) {
+ rdd.setSerializer(classOf[IntAggMsgSerializer].getName)
+ } else if (classManifest[T] == ClassManifest.Long) {
+ rdd.setSerializer(classOf[LongAggMsgSerializer].getName)
+ } else if (classManifest[T] == ClassManifest.Double) {
+ rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName)
+ }
+ rdd
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
index 2e768e85cf..e4fa4a4421 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
@@ -3,8 +3,27 @@ package org.apache.spark.graph.impl
import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
+import org.apache.spark.graph._
import org.apache.spark.serializer._
+class VidMsgSerializer extends Serializer {
+ override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+ override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
+ def writeObject[T](t: T) = {
+ val msg = t.asInstanceOf[(Vid, _)]
+ writeVarLong(msg._1, optimizePositive = false)
+ this
+ }
+ }
+
+ override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
+ override def readObject[T](): T = {
+ (readVarLong(optimizePositive = false), null).asInstanceOf[T]
+ }
+ }
+ }
+}
/** A special shuffle serializer for VertexBroadcastMessage[Int]. */
class IntVertexBroadcastMsgSerializer extends Serializer {
@@ -13,7 +32,7 @@ class IntVertexBroadcastMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Int]]
- writeLong(msg.vid)
+ writeVarLong(msg.vid, optimizePositive = false)
writeInt(msg.data)
this
}
@@ -21,7 +40,9 @@ class IntVertexBroadcastMsgSerializer extends Serializer {
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
- new VertexBroadcastMsg[Int](0, readLong(), readInt()).asInstanceOf[T]
+ val a = readVarLong(optimizePositive = false)
+ val b = readInt()
+ new VertexBroadcastMsg[Int](0, a, b).asInstanceOf[T]
}
}
}
@@ -34,7 +55,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Long]]
- writeLong(msg.vid)
+ writeVarLong(msg.vid, optimizePositive = false)
writeLong(msg.data)
this
}
@@ -42,7 +63,7 @@ class LongVertexBroadcastMsgSerializer extends Serializer {
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
- val a = readLong()
+ val a = readVarLong(optimizePositive = false)
val b = readLong()
new VertexBroadcastMsg[Long](0, a, b).asInstanceOf[T]
}
@@ -57,7 +78,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Double]]
- writeLong(msg.vid)
+ writeVarLong(msg.vid, optimizePositive = false)
writeDouble(msg.data)
this
}
@@ -65,7 +86,7 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer {
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
def readObject[T](): T = {
- val a = readLong()
+ val a = readVarLong(optimizePositive = false)
val b = readDouble()
new VertexBroadcastMsg[Double](0, a, b).asInstanceOf[T]
}
@@ -73,25 +94,24 @@ class DoubleVertexBroadcastMsgSerializer extends Serializer {
}
}
-
/** A special shuffle serializer for AggregationMessage[Int]. */
class IntAggMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[AggregationMsg[Int]]
- writeLong(msg.vid)
- writeUnsignedVarInt(msg.data)
+ val msg = t.asInstanceOf[(Vid, Int)]
+ writeVarLong(msg._1, optimizePositive = false)
+ writeUnsignedVarInt(msg._2)
this
}
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
- val a = readLong()
+ val a = readVarLong(optimizePositive = false)
val b = readUnsignedVarInt()
- new AggregationMsg[Int](a, b).asInstanceOf[T]
+ (a, b).asInstanceOf[T]
}
}
}
@@ -103,9 +123,9 @@ class LongAggMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[AggregationMsg[Long]]
- writeVarLong(msg.vid, optimizePositive = false)
- writeVarLong(msg.data, optimizePositive = true)
+ val msg = t.asInstanceOf[(Vid, Long)]
+ writeVarLong(msg._1, optimizePositive = false)
+ writeVarLong(msg._2, optimizePositive = true)
this
}
}
@@ -114,22 +134,21 @@ class LongAggMsgSerializer extends Serializer {
override def readObject[T](): T = {
val a = readVarLong(optimizePositive = false)
val b = readVarLong(optimizePositive = true)
- new AggregationMsg[Long](a, b).asInstanceOf[T]
+ (a, b).asInstanceOf[T]
}
}
}
}
-
/** A special shuffle serializer for AggregationMessage[Double]. */
class DoubleAggMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[AggregationMsg[Double]]
- writeVarLong(msg.vid, optimizePositive = false)
- writeDouble(msg.data)
+ val msg = t.asInstanceOf[(Vid, Double)]
+ writeVarLong(msg._1, optimizePositive = false)
+ writeDouble(msg._2)
this
}
}
@@ -138,7 +157,7 @@ class DoubleAggMsgSerializer extends Serializer {
def readObject[T](): T = {
val a = readVarLong(optimizePositive = false)
val b = readDouble()
- new AggregationMsg[Double](a, b).asInstanceOf[T]
+ (a, b).asInstanceOf[T]
}
}
}
@@ -148,7 +167,7 @@ class DoubleAggMsgSerializer extends Serializer {
// Helper classes to shorten the implementation of those special serializers.
////////////////////////////////////////////////////////////////////////////////
-sealed abstract class ShuffleSerializationStream(s: OutputStream) extends SerializationStream {
+abstract class ShuffleSerializationStream(s: OutputStream) extends SerializationStream {
// The implementation should override this one.
def writeObject[T](t: T): SerializationStream
@@ -261,8 +280,7 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial
override def close(): Unit = s.close()
}
-
-sealed abstract class ShuffleDeserializationStream(s: InputStream) extends DeserializationStream {
+abstract class ShuffleDeserializationStream(s: InputStream) extends DeserializationStream {
// The implementation should override this one.
def readObject[T](): T
@@ -277,7 +295,7 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
var i: Int = 0
def readOrThrow(): Int = {
val in = s.read()
- if (in < 0) throw new java.io.EOFException
+ if (in < 0) throw new EOFException
in & 0xFF
}
var b: Int = readOrThrow()
@@ -291,22 +309,45 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
}
def readVarLong(optimizePositive: Boolean): Long = {
- // TODO: unroll the while loop.
- var value: Long = 0L
- var i: Int = 0
def readOrThrow(): Int = {
val in = s.read()
- if (in < 0) throw new java.io.EOFException
+ if (in < 0) throw new EOFException
in & 0xFF
}
- var b: Int = readOrThrow()
- while ((b & 0x80) != 0) {
- value |= (b & 0x7F).toLong << i
- i += 7
- if (i > 63) throw new IllegalArgumentException("Variable length quantity is too long")
+ var b = readOrThrow()
+ var ret: Long = b & 0x7F
+ if ((b & 0x80) != 0) {
b = readOrThrow()
+ ret |= (b & 0x7F) << 7
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F) << 14
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F) << 21
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 28
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 35
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 42
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 49
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= b.toLong << 56
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
- val ret = value | (b.toLong << i)
if (!optimizePositive) (ret >>> 1) ^ -(ret & 1) else ret
}
@@ -329,7 +370,6 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
override def close(): Unit = s.close()
}
-
sealed trait ShuffleSerializerInstance extends SerializerInstance {
override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
index 6cbac223f7..b9b2a4705b 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
@@ -2,99 +2,180 @@ package org.apache.spark.graph.impl
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.collection.{OpenHashSet, PrimitiveKeyOpenHashMap}
+import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
import org.apache.spark.graph._
-import org.apache.spark.graph.impl.MsgRDDFunctions._
/**
- * Stores the vertex attribute values after they are replicated.
+ * A view of the vertices after they are shipped to the join sites specified in
+ * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevVTableReplicated`
+ * is specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise,
+ * a fresh view is created.
+ *
+ * The view is always cached (i.e., once it is created, it remains materialized). This avoids
+ * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for
+ * example.
*/
+private[impl]
class VTableReplicated[VD: ClassManifest](
- vTable: VertexSetRDD[VD],
- eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
- vertexPlacement: VertexPlacement) {
-
- val bothAttrs: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
- VTableReplicated.createVTableReplicated(vTable, eTable, vertexPlacement, true, true)
- val srcAttrOnly: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
- VTableReplicated.createVTableReplicated(vTable, eTable, vertexPlacement, true, false)
- val dstAttrOnly: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
- VTableReplicated.createVTableReplicated(vTable, eTable, vertexPlacement, false, true)
- val noAttrs: RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
- VTableReplicated.createVTableReplicated(vTable, eTable, vertexPlacement, false, false)
-
-
- def get(includeSrcAttr: Boolean, includeDstAttr: Boolean)
- : RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] =
- (includeSrcAttr, includeDstAttr) match {
+ updatedVerts: VertexRDD[VD],
+ edges: EdgeRDD[_],
+ vertexPlacement: VertexPlacement,
+ prevVTableReplicated: Option[VTableReplicated[VD]] = None) {
+
+ /**
+ * Within each edge partition, create a local map from vid to an index into the attribute
+ * array. Each map contains a superset of the vertices that it will receive, because it stores
+ * vids from both the source and destination of edges. It must always include both source and
+ * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
+ */
+ private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevVTableReplicated match {
+ case Some(prev) =>
+ prev.localVidMap
+ case None =>
+ edges.partitionsRDD.mapPartitions(_.map {
+ case (pid, epart) =>
+ val vidToIndex = new VertexIdToIndexMap
+ epart.foreach { e =>
+ vidToIndex.add(e.srcId)
+ vidToIndex.add(e.dstId)
+ }
+ (pid, vidToIndex)
+ }, preservesPartitioning = true).cache().setName("VTableReplicated localVidMap")
+ }
+
+ private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true)
+ private lazy val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(true, false)
+ private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true)
+ private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false)
+
+ def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = {
+ (includeSrc, includeDst) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
case (false, true) => dstAttrOnly
case (false, false) => noAttrs
}
-}
+ }
+
+ def get(
+ includeSrc: Boolean,
+ includeDst: Boolean,
+ actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = {
+
+ // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
+ // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
+ // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
+ // also shipped there.
+ val shippedActives = vertexPlacement.get(true, true)
+ .zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _))
+ .partitionBy(edges.partitioner.get)
+ // Update vTableReplicated with shippedActives, setting activeness flags in the resulting
+ // VertexPartitions
+ get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
+ val (pid, vPart) = viewIter.next()
+ val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator))
+ Iterator((pid, newPart))
+ }
+ }
+
+ private def create(includeSrc: Boolean, includeDst: Boolean)
+ : RDD[(Pid, VertexPartition[VD])] = {
+ val vdManifest = classManifest[VD]
+
+ // Ship vertex attributes to edge partitions according to vertexPlacement
+ val verts = updatedVerts.partitionsRDD
+ val shippedVerts = vertexPlacement.get(includeSrc, includeDst)
+ .zipPartitions(verts)(VTableReplicated.buildBuffer(_, _)(vdManifest))
+ .partitionBy(edges.partitioner.get)
+ // TODO: Consider using a specialized shuffler.
+
+ prevVTableReplicated match {
+ case Some(vTableReplicated) =>
+ val prevView: RDD[(Pid, VertexPartition[VD])] =
+ vTableReplicated.get(includeSrc, includeDst)
-class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD])
+ // Update vTableReplicated with shippedVerts, setting staleness flags in the resulting
+ // VertexPartitions
+ prevView.zipPartitions(shippedVerts) { (prevViewIter, shippedVertsIter) =>
+ val (pid, prevVPart) = prevViewIter.next()
+ val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
+ Iterator((pid, newVPart))
+ }.cache().setName("VTableReplicated delta %s %s".format(includeSrc, includeDst))
+
+ case None =>
+ // Within each edge partition, place the shipped vertex attributes into the correct
+ // locations specified in localVidMap
+ localVidMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
+ val (pid, vidToIndex) = mapIter.next()
+ assert(!mapIter.hasNext)
+ // Populate the vertex array using the vidToIndex map
+ val vertexArray = vdManifest.newArray(vidToIndex.capacity)
+ for ((_, block) <- shippedVertsIter) {
+ for (i <- 0 until block.vids.size) {
+ val vid = block.vids(i)
+ val attr = block.attrs(i)
+ val ind = vidToIndex.getPos(vid)
+ vertexArray(ind) = attr
+ }
+ }
+ val newVPart = new VertexPartition(
+ vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest)
+ Iterator((pid, newVPart))
+ }.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst))
+ }
+ }
+}
object VTableReplicated {
- protected def createVTableReplicated[VD: ClassManifest](
- vTable: VertexSetRDD[VD],
- eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
- vertexPlacement: VertexPlacement,
- includeSrcAttr: Boolean,
- includeDstAttr: Boolean): RDD[(Pid, (VertexIdToIndexMap, Array[VD]))] = {
- val placement = vertexPlacement.get(includeSrcAttr, includeDstAttr)
-
- // Send each edge partition the vertex attributes it wants, as specified in
- // vertexPlacement
- val msgsByPartition = placement.zipPartitions(vTable.partitionsRDD) {
- (pid2vidIter, vertexPartIter) =>
- val pid2vid = pid2vidIter.next()
- val vertexPart = vertexPartIter.next()
-
- val vmap = new PrimitiveKeyOpenHashMap(vertexPart.index, vertexPart.values)
- val output = new Array[(Pid, VertexAttributeBlock[VD])](pid2vid.size)
- for (pid <- 0 until pid2vid.size) {
- val block = new VertexAttributeBlock(pid2vid(pid), pid2vid(pid).map(vid => vmap(vid)))
- output(pid) = (pid, block)
- }
- output.iterator
- }.partitionBy(eTable.partitioner.get).cache()
-
- // Within each edge partition, create a local map from vid to an index into
- // the attribute array. Each map contains a superset of the vertices that it
- // will receive, because it stores vids from both the source and destination
- // of edges. It must always include both source and destination vids because
- // some operations, such as GraphImpl.mapReduceTriplets, rely on this.
- val localVidMap = eTable.mapPartitions(_.map {
- case (pid, epart) =>
- val vidToIndex = new VertexIdToIndexMap
- epart.foreach { e =>
- vidToIndex.add(e.srcId)
- vidToIndex.add(e.dstId)
+ protected def buildBuffer[VD: ClassManifest](
+ pid2vidIter: Iterator[Array[Array[Vid]]],
+ vertexPartIter: Iterator[VertexPartition[VD]]) = {
+ val pid2vid: Array[Array[Vid]] = pid2vidIter.next()
+ val vertexPart: VertexPartition[VD] = vertexPartIter.next()
+
+ Iterator.tabulate(pid2vid.size) { pid =>
+ val vidsCandidate = pid2vid(pid)
+ val size = vidsCandidate.length
+ val vids = new PrimitiveVector[Vid](pid2vid(pid).size)
+ val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
+ var i = 0
+ while (i < size) {
+ val vid = vidsCandidate(i)
+ if (vertexPart.isDefined(vid)) {
+ vids += vid
+ attrs += vertexPart(vid)
}
- (pid, vidToIndex)
- }, preservesPartitioning = true).cache()
-
- // Within each edge partition, place the vertex attributes received from
- // msgsByPartition into the correct locations specified in localVidMap
- localVidMap.zipPartitions(msgsByPartition) {
- (mapIter, msgsIter) =>
- val (pid, vidToIndex) = mapIter.next()
- assert(!mapIter.hasNext)
- // Populate the vertex array using the vidToIndex map
- val vertexArray = new Array[VD](vidToIndex.capacity)
- for ((_, block) <- msgsIter) {
- for (i <- 0 until block.vids.size) {
- val vid = block.vids(i)
- val attr = block.attrs(i)
- val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
- vertexArray(ind) = attr
+ i += 1
+ }
+ (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
+ }
+ }
+
+ protected def buildActiveBuffer(
+ pid2vidIter: Iterator[Array[Array[Vid]]],
+ activePartIter: Iterator[VertexPartition[_]])
+ : Iterator[(Int, Array[Vid])] = {
+ val pid2vid: Array[Array[Vid]] = pid2vidIter.next()
+ val activePart: VertexPartition[_] = activePartIter.next()
+
+ Iterator.tabulate(pid2vid.size) { pid =>
+ val vidsCandidate = pid2vid(pid)
+ val size = vidsCandidate.length
+ val actives = new PrimitiveVector[Vid](vidsCandidate.size)
+ var i = 0
+ while (i < size) {
+ val vid = vidsCandidate(i)
+ if (activePart.isDefined(vid)) {
+ actives += vid
}
+ i += 1
}
- Iterator((pid, (vidToIndex, vertexArray)))
- }.cache()
+ (pid, actives.trim().array)
+ }
}
+}
+class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) {
+ def iterator: Iterator[(Vid, VD)] = (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
index 9de57375e9..ccbc83c512 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
@@ -2,12 +2,60 @@ package org.apache.spark.graph.impl
import org.apache.spark.util.collection.{BitSet, PrimitiveKeyOpenHashMap}
+import org.apache.spark.Logging
import org.apache.spark.graph._
-class VertexPartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest](
+
+private[graph] object VertexPartition {
+
+ def apply[VD: ClassManifest](iter: Iterator[(Vid, VD)]): VertexPartition[VD] = {
+ val map = new PrimitiveKeyOpenHashMap[Vid, VD]
+ iter.foreach { case (k, v) =>
+ map(k) = v
+ }
+ new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
+ }
+
+ def apply[VD: ClassManifest](iter: Iterator[(Vid, VD)], mergeFunc: (VD, VD) => VD)
+ : VertexPartition[VD] =
+ {
+ val map = new PrimitiveKeyOpenHashMap[Vid, VD]
+ iter.foreach { case (k, v) =>
+ map.setMerge(k, v, mergeFunc)
+ }
+ new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
+ }
+}
+
+
+private[graph]
+class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
val index: VertexIdToIndexMap,
val values: Array[VD],
- val mask: BitSet) {
+ val mask: BitSet,
+ /** A set of vids of active vertices. May contain vids not in index due to join rewrite. */
+ private val activeSet: Option[VertexSet] = None)
+ extends Logging {
+
+ val capacity: Int = index.capacity
+
+ def size: Int = mask.cardinality()
+
+ /** Return the vertex attribute for the given vertex ID. */
+ def apply(vid: Vid): VD = values(index.getPos(vid))
+
+ def isDefined(vid: Vid): Boolean = {
+ val pos = index.getPos(vid)
+ pos >= 0 && mask.get(pos)
+ }
+
+ /** Look up vid in activeSet, throwing an exception if it is None. */
+ def isActive(vid: Vid): Boolean = {
+ activeSet.get.contains(vid)
+ }
+
+ /** The number of active vertices, if any exist. */
+ def numActives: Option[Int] = activeSet.map(_.size)
/**
* Pass each vertex attribute along with the vertex id through a map
@@ -19,48 +67,196 @@ class VertexPartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double
* attribute in the RDD
*
* @return a new VertexPartition with values obtained by applying `f` to
- * each of the entries in the original VertexSet. The resulting
+ * each of the entries in the original VertexRDD. The resulting
* VertexPartition retains the same index.
*/
def map[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexPartition[VD2] = {
// Construct a view of the map transformation
- val newValues = new Array[VD2](index.capacity)
- mask.iterator.foreach { ind =>
- newValues(ind) = f(index.getValueSafe(ind), values(ind))
+ val newValues = new Array[VD2](capacity)
+ var i = mask.nextSetBit(0)
+ while (i >= 0) {
+ newValues(i) = f(index.getValue(i), values(i))
+ i = mask.nextSetBit(i + 1)
}
new VertexPartition[VD2](index, newValues, mask)
}
/**
- * Restrict the vertex set to the set of vertices satisfying the
- * given predicate.
+ * Restrict the vertex set to the set of vertices satisfying the given predicate.
*
* @param pred the user defined predicate
*
- * @note The vertex set preserves the original index structure
- * which means that the returned RDD can be easily joined with
- * the original vertex-set. Furthermore, the filter only
- * modifies the bitmap index and so no new values are allocated.
+ * @note The vertex set preserves the original index structure which means that the returned
+ * RDD can be easily joined with the original vertex-set. Furthermore, the filter only
+ * modifies the bitmap index and so no new values are allocated.
*/
def filter(pred: (Vid, VD) => Boolean): VertexPartition[VD] = {
// Allocate the array to store the results into
- val newMask = new BitSet(index.capacity)
- // Iterate over the active bits in the old bitset and
- // evaluate the predicate
- var ind = mask.nextSetBit(0)
- while (ind >= 0) {
- val k = index.getValueSafe(ind)
- if (pred(k, values(ind))) {
- newMask.set(ind)
+ val newMask = new BitSet(capacity)
+ // Iterate over the active bits in the old mask and evaluate the predicate
+ var i = mask.nextSetBit(0)
+ while (i >= 0) {
+ if (pred(index.getValue(i), values(i))) {
+ newMask.set(i)
}
- ind = mask.nextSetBit(ind + 1)
+ i = mask.nextSetBit(i + 1)
}
new VertexPartition(index, values, newMask)
}
/**
- * Construct a new VertexPartition whose index contains only the vertices in
- * the mask.
+ * Hides vertices that are the same between this and other. For vertices that are different, keeps
+ * the values from `other`. The indices of `this` and `other` must be the same.
+ */
+ def diff(other: VertexPartition[VD]): VertexPartition[VD] = {
+ if (index != other.index) {
+ logWarning("Diffing two VertexPartitions with different indexes is slow.")
+ diff(createUsingIndex(other.iterator))
+ } else {
+ val newMask = mask & other.mask
+ var i = newMask.nextSetBit(0)
+ while (i >= 0) {
+ if (values(i) == other.values(i)) {
+ newMask.unset(i)
+ }
+ i = newMask.nextSetBit(i + 1)
+ }
+ new VertexPartition(index, other.values, newMask)
+ }
+ }
+
+ /** Inner join another VertexPartition. */
+ def join[VD2: ClassManifest, VD3: ClassManifest]
+ (other: VertexPartition[VD2])
+ (f: (Vid, VD, VD2) => VD3): VertexPartition[VD3] =
+ {
+ if (index != other.index) {
+ logWarning("Joining two VertexPartitions with different indexes is slow.")
+ join(createUsingIndex(other.iterator))(f)
+ } else {
+ val newValues = new Array[VD3](capacity)
+ val newMask = mask & other.mask
+
+ var i = newMask.nextSetBit(0)
+ while (i >= 0) {
+ newValues(i) = f(index.getValue(i), values(i), other.values(i))
+ i = mask.nextSetBit(i + 1)
+ }
+ new VertexPartition(index, newValues, newMask)
+ }
+ }
+
+ /** Left outer join another VertexPartition. */
+ def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
+ (other: VertexPartition[VD2])
+ (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ if (index != other.index) {
+ logWarning("Joining two VertexPartitions with different indexes is slow.")
+ leftJoin(createUsingIndex(other.iterator))(f)
+ } else {
+ val newValues = new Array[VD3](capacity)
+
+ var i = mask.nextSetBit(0)
+ while (i >= 0) {
+ val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None
+ newValues(i) = f(index.getValue(i), values(i), otherV)
+ i = mask.nextSetBit(i + 1)
+ }
+ new VertexPartition(index, newValues, mask)
+ }
+ }
+
+ /** Left outer join another iterator of messages. */
+ def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
+ (other: Iterator[(Vid, VD2)])
+ (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ leftJoin(createUsingIndex(other))(f)
+ }
+
+ /** Inner join another VertexPartition. */
+ def innerJoin[U: ClassManifest, VD2: ClassManifest](other: VertexPartition[U])
+ (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
+ if (index != other.index) {
+ logWarning("Joining two VertexPartitions with different indexes is slow.")
+ innerJoin(createUsingIndex(other.iterator))(f)
+ }
+ val newMask = mask & other.mask
+ val newValues = new Array[VD2](capacity)
+ var i = newMask.nextSetBit(0)
+ while (i >= 0) {
+ newValues(i) = f(index.getValue(i), values(i), other.values(i))
+ i = newMask.nextSetBit(i + 1)
+ }
+ new VertexPartition(index, newValues, newMask)
+ }
+
+ /**
+ * Inner join an iterator of messages.
+ */
+ def innerJoin[U: ClassManifest, VD2: ClassManifest]
+ (iter: Iterator[Product2[Vid, U]])
+ (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
+ innerJoin(createUsingIndex(iter))(f)
+ }
+
+ /**
+ * Similar effect as aggregateUsingIndex((a, b) => a)
+ */
+ def createUsingIndex[VD2: ClassManifest](iter: Iterator[Product2[Vid, VD2]])
+ : VertexPartition[VD2] = {
+ val newMask = new BitSet(capacity)
+ val newValues = new Array[VD2](capacity)
+ iter.foreach { case (vid, vdata) =>
+ val pos = index.getPos(vid)
+ newMask.set(pos)
+ newValues(pos) = vdata
+ }
+ new VertexPartition[VD2](index, newValues, newMask)
+ }
+
+ /**
+ * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
+ * the partition, hidden by the bitmask.
+ */
+ def innerJoinKeepLeft(iter: Iterator[Product2[Vid, VD]]): VertexPartition[VD] = {
+ val newMask = new BitSet(capacity)
+ val newValues = new Array[VD](capacity)
+ System.arraycopy(values, 0, newValues, 0, newValues.length)
+ iter.foreach { case (vid, vdata) =>
+ val pos = index.getPos(vid)
+ newMask.set(pos)
+ newValues(pos) = vdata
+ }
+ new VertexPartition(index, newValues, newMask)
+ }
+
+ def aggregateUsingIndex[VD2: ClassManifest](
+ iter: Iterator[Product2[Vid, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] =
+ {
+ val newMask = new BitSet(capacity)
+ val newValues = new Array[VD2](capacity)
+ iter.foreach { product =>
+ val vid = product._1
+ val vdata = product._2
+ val pos = index.getPos(vid)
+ if (newMask.get(pos)) {
+ newValues(pos) = reduceFunc(newValues(pos), vdata)
+ } else { // otherwise just store the new value
+ newMask.set(pos)
+ newValues(pos) = vdata
+ }
+ }
+ new VertexPartition[VD2](index, newValues, newMask)
+ }
+
+ def replaceActives(iter: Iterator[Vid]): VertexPartition[VD] = {
+ val newActiveSet = new VertexSet
+ iter.foreach(newActiveSet.add(_))
+ new VertexPartition(index, values, mask, Some(newActiveSet))
+ }
+
+ /**
+ * Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): VertexPartition[VD] = {
val hashMap = new PrimitiveKeyOpenHashMap[Vid, VD]
@@ -68,8 +264,10 @@ class VertexPartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double
for ((k, v) <- this.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
}
- new VertexPartition(hashMap.keySet, hashMap._values, index.getBitSet)
+ new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
}
- def iterator = mask.iterator.map(ind => (index.getValueSafe(ind), values(ind)))
+ def iterator: Iterator[(Vid, VD)] = mask.iterator.map(ind => (index.getValue(ind), values(ind)))
+
+ def vidIterator: Iterator[Vid] = mask.iterator.map(ind => index.getValue(ind))
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala
index e8734df2ed..44a0a05f74 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala
@@ -1,21 +1,18 @@
package org.apache.spark.graph.impl
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.ArrayBuilder
-
+import org.apache.spark.SparkContext._
+import org.apache.spark.graph._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-
-import org.apache.spark.graph._
+import org.apache.spark.util.collection.PrimitiveVector
/**
- * Stores the layout of replicated vertex attributes for GraphImpl. Tells each
- * partition of the vertex data where it should go.
+ * Stores the locations of edge-partition join sites for each vertex attribute in `vTable`; that is,
+ * the routing information for shipping vertex attributes to edge partitions. This is always cached
+ * because it may be used multiple times in VTableReplicated -- once to ship the vertex attributes
+ * and (possibly) once to ship the active-set information.
*/
-class VertexPlacement(
- eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
- vTable: VertexSetRDD[_]) {
+class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) {
val bothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(true, true)
val srcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(true, false)
@@ -30,43 +27,38 @@ class VertexPlacement(
case (false, false) => noAttrs
}
- def persist(newLevel: StorageLevel) {
- bothAttrs.persist(newLevel)
- srcAttrOnly.persist(newLevel)
- dstAttrOnly.persist(newLevel)
- noAttrs.persist(newLevel)
- }
-
private def createPid2Vid(
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = {
- // Determine which vertices each edge partition needs by creating a mapping
- // from vid to pid
- val preAgg = eTable.mapPartitions { iter =>
- val (pid, edgePartition) = iter.next()
+ // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
+ val vid2pid: RDD[(Vid, Pid)] = eTable.partitionsRDD.mapPartitions { iter =>
+ val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
+ val numEdges = edgePartition.size
val vSet = new VertexSet
- if (includeSrcAttr || includeDstAttr) {
- edgePartition.foreach { e =>
- if (includeSrcAttr) vSet.add(e.srcId)
- if (includeDstAttr) vSet.add(e.dstId)
+ if (includeSrcAttr) { // Add src vertices to the set.
+ var i = 0
+ while (i < numEdges) {
+ vSet.add(edgePartition.srcIds(i))
+ i += 1
}
}
- vSet.iterator.map { vid => (vid.toLong, pid) }
- }
- // Aggregate the mappings to determine where each vertex should go
- val vid2pid = VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTable.index,
- (p: Pid) => ArrayBuffer(p),
- (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
- (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
- .mapValues(a => a.toArray)
- // Within each vertex partition, reorganize the placement information into
- // columnar format keyed on the destination partition
- val numPartitions = vid2pid.partitions.size
- vid2pid.mapPartitions { iter =>
- val pid2vid = Array.fill[ArrayBuilder[Vid]](numPartitions)(ArrayBuilder.make[Vid])
- for ((vid, pids) <- iter) {
- pids.foreach { pid => pid2vid(pid) += vid }
+ if (includeDstAttr) { // Add dst vertices to the set.
+ var i = 0
+ while (i < numEdges) {
+ vSet.add(edgePartition.dstIds(i))
+ i += 1
+ }
}
- Iterator(pid2vid.map(_.result))
+ vSet.iterator.map { vid => (vid, pid) }
}
+
+ val numPartitions = vTable.partitions.size
+ vid2pid.partitionBy(vTable.partitioner.get).mapPartitions { iter =>
+ val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid])
+ for ((vid, pid) <- iter) {
+ pid2vid(pid) += vid
+ }
+
+ Iterator(pid2vid.map(_.trim().array))
+ }.cache().setName("VertexPlacement %s %s".format(includeSrcAttr, includeDstAttr))
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala
index 7b53e9cce8..655ae53bf8 100644
--- a/graph/src/main/scala/org/apache/spark/graph/package.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/package.scala
@@ -6,10 +6,11 @@ import org.apache.spark.util.collection.OpenHashSet
package object graph {
type Vid = Long
+
+ // TODO: Consider using Char.
type Pid = Int
type VertexSet = OpenHashSet[Vid]
- type VertexArrayList = it.unimi.dsi.fastutil.longs.LongArrayList
// type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
type VertexIdToIndexMap = OpenHashSet[Vid]
@@ -18,11 +19,4 @@ package object graph {
* Return the default null-like value for a data type T.
*/
def nullValue[T] = null.asInstanceOf[T]
-
-
- private[graph]
- case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) U,
- @specialized(Char, Int, Boolean, Byte, Long, Float, Double) V](
- var _1: U, var _2: V)
-
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
index 4c17bab0c4..a1e285816b 100644
--- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
@@ -268,14 +268,14 @@ object GraphGenerators {
* Create a star graph with vertex 0 being the center.
*
* @param sc the spark context in which to construct the graph
- * @param the number of vertices in the star
+ * @param nverts the number of vertices in the star
*
* @return A star graph containing `nverts` vertices with vertex 0
* being the center vertex.
*/
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
- Graph(edges, 1)
+ Graph.fromEdgeTuples(edges, 1)
} // end of starGraph
diff --git a/graph/src/test/resources/log4j.properties b/graph/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..896936d8c4
--- /dev/null
+++ b/graph/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=graph/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN
diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
index e1ff8df4ea..05ebe2b84d 100644
--- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
@@ -4,6 +4,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
+import org.apache.spark.graph.algorithms._
import org.apache.spark.rdd._
import org.apache.spark.graph.LocalSparkContext._
@@ -50,35 +51,38 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
+ def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
+ a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
+ .map { case (id, error) => error }.sum
+ }
test("Star PageRank") {
withSpark(new SparkContext("local", "test")) { sc =>
val nVertices = 100
- val starGraph = GraphGenerators.starGraph(sc, nVertices)
+ val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
val resetProb = 0.15
- val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb)
- val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb)
+ val errorTol = 1.0e-5
+
+ val staticRanks1 = PageRank.run(starGraph, numIter = 1, resetProb).vertices.cache()
+ val staticRanks2 = PageRank.run(starGraph, numIter = 2, resetProb).vertices.cache()
- val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) { (vid, pr1, pr2) =>
- if (pr1 != pr2) { 1 } else { 0 }
+ // Static PageRank should only take 2 iterations to converge
+ val notMatching = staticRanks1.zipJoin(staticRanks2) { (vid, pr1, pr2) =>
+ if (pr1 != pr2) 1 else 0
}.map { case (vid, test) => test }.sum
assert(notMatching === 0)
- prGraph2.vertices.foreach(println(_))
- val errors = prGraph2.vertices.map { case (vid, pr) =>
+
+ val staticErrors = staticRanks2.map { case (vid, pr) =>
val correct = (vid > 0 && pr == resetProb) ||
- (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
- if ( !correct ) { 1 } else { 0 }
+ (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
+ if (!correct) 1 else 0
}
- assert(errors.sum === 0)
+ assert(staticErrors.sum === 0)
- val prGraph3 = Analytics.deltaPagerank(starGraph, 0, resetProb)
- val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices){ (vid, pr1, pr2Opt) =>
- pr2Opt match {
- case Some(pr2) if(pr1 == pr2) => 0
- case _ => 1
- }
- }.map { case (vid, test) => test }.sum
- assert(errors2 === 0)
+ val dynamicRanks = PageRank.runUntillConvergence(starGraph, 0, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(starGraph, 0, resetProb).cache()
+ assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+ assert(compareRanks(staticRanks2, standaloneRanks) < errorTol)
}
} // end of test Star PageRank
@@ -86,31 +90,50 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Grid PageRank") {
withSpark(new SparkContext("local", "test")) { sc =>
- val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
+ val rows = 10
+ val cols = 10
val resetProb = 0.15
- val prGraph1 = Analytics.pagerank(gridGraph, 50, resetProb).cache()
- val prGraph2 = Analytics.deltaPagerank(gridGraph, 0.0001, resetProb).cache()
- val error = prGraph1.vertices.zipJoin(prGraph2.vertices) { case (id, a, b) => (a - b) * (a - b) }
- .map { case (id, error) => error }.sum
- prGraph1.vertices.zipJoin(prGraph2.vertices) { (id, a, b) => (a, b, a-b) }.foreach(println(_))
- println(error)
- assert(error < 1.0e-5)
- val pr3: RDD[(Vid, Double)] = sc.parallelize(GridPageRank(10,10, 50, resetProb))
- val error2 = prGraph1.vertices.leftJoin(pr3) { (id, a, bOpt) =>
- val b: Double = bOpt.get
- (a - b) * (a - b)
- }.map { case (id, error) => error }.sum
- prGraph1.vertices.leftJoin(pr3) { (id, a, b) => (a, b) }.foreach( println(_) )
- println(error2)
- assert(error2 < 1.0e-5)
+ val tol = 0.0001
+ val numIter = 50
+ val errorTol = 1.0e-5
+ val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
+
+ val staticRanks = PageRank.run(gridGraph, numIter, resetProb).vertices.cache()
+ val dynamicRanks = PageRank.runUntillConvergence(gridGraph, tol, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(gridGraph, tol, resetProb).cache()
+ val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb)))
+
+ assert(compareRanks(staticRanks, referenceRanks) < errorTol)
+ assert(compareRanks(dynamicRanks, referenceRanks) < errorTol)
+ assert(compareRanks(standaloneRanks, referenceRanks) < errorTol)
}
} // end of Grid PageRank
+ test("Chain PageRank") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val chain1 = (0 until 9).map(x => (x, x+1) )
+ val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
+ val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+ val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 10
+ val errorTol = 1.0e-5
+
+ val staticRanks = PageRank.run(chain, numIter, resetProb).vertices.cache()
+ val dynamicRanks = PageRank.runUntillConvergence(chain, tol, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(chain, tol, resetProb).cache()
+
+ assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+ assert(compareRanks(dynamicRanks, standaloneRanks) < errorTol)
+ }
+ }
+
+
test("Grid Connected Components") {
withSpark(new SparkContext("local", "test")) { sc =>
- val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
- val ccGraph = Analytics.connectedComponents(gridGraph).cache()
+ val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache()
+ val ccGraph = ConnectedComponents.run(gridGraph).cache()
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
assert(maxCCid === 0)
}
@@ -119,8 +142,8 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Reverse Grid Connected Components") {
withSpark(new SparkContext("local", "test")) { sc =>
- val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse
- val ccGraph = Analytics.connectedComponents(gridGraph).cache()
+ val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache()
+ val ccGraph = ConnectedComponents.run(gridGraph).cache()
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
assert(maxCCid === 0)
}
@@ -132,15 +155,14 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
- val twoChains = Graph(rawEdges, 1.0)
- val ccGraph = Analytics.connectedComponents(twoChains).cache()
- val vertices = ccGraph.vertices.collect
+ val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+ val ccGraph = ConnectedComponents.run(twoChains).cache()
+ val vertices = ccGraph.vertices.collect()
for ( (id, cc) <- vertices ) {
if(id < 10) { assert(cc === 0) }
else { assert(cc === 10) }
}
val ccMap = vertices.toMap
- println(ccMap)
for (id <- 0 until 20) {
if (id < 10) {
assert(ccMap(id) === 0)
@@ -156,8 +178,8 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
- val twoChains = Graph(rawEdges, true).reverse
- val ccGraph = Analytics.connectedComponents(twoChains).cache()
+ val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse.cache()
+ val ccGraph = ConnectedComponents.run(twoChains).cache()
val vertices = ccGraph.vertices.collect
for ( (id, cc) <- vertices ) {
if (id < 10) {
@@ -167,7 +189,6 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
}
}
val ccMap = vertices.toMap
- println(ccMap)
for ( id <- 0 until 20 ) {
if (id < 10) {
assert(ccMap(id) === 0)
@@ -181,8 +202,8 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Count a single triangle") {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2)
- val graph = Graph(rawEdges, true).cache
- val triangleCount = Analytics.triangleCount(graph)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => assert(count === 1) }
}
@@ -193,10 +214,10 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val rawEdges = sc.parallelize(triangles, 2)
- val graph = Graph(rawEdges, true).cache
- val triangleCount = Analytics.triangleCount(graph)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
val verts = triangleCount.vertices
- verts.collect.foreach { case (vid, count) =>
+ verts.collect().foreach { case (vid, count) =>
if (vid == 0) {
assert(count === 2)
} else {
@@ -213,10 +234,10 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val revTriangles = triangles.map { case (a,b) => (b,a) }
val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
- val graph = Graph(rawEdges, true).cache
- val triangleCount = Analytics.triangleCount(graph)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
val verts = triangleCount.vertices
- verts.collect.foreach { case (vid, count) =>
+ verts.collect().foreach { case (vid, count) =>
if (vid == 0) {
assert(count === 4)
} else {
@@ -230,10 +251,25 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
- val graph = Graph(rawEdges, true).cache
- val triangleCount = Analytics.triangleCount(graph)
+ val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
+ val triangleCount = TriangleCount.run(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => assert(count === 1) }
}
}
+
+ test("Test SVD++ with mean square error on training set") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val SvdppErr = 0.01
+ val edges = sc.textFile("mllib/data/als/test.data").map { line =>
+ val fields = line.split(",")
+ Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble)
+ }
+ val graph = Svdpp.run(edges)
+ val err = graph.vertices.collect.map{ case (vid, vd) =>
+ if (vid % 2 == 1) { vd.norm } else { 0.0 }
+ }.reduce(_ + _) / graph.triplets.collect.size
+ assert(err < SvdppErr)
+ }
+ }
} // end of AnalyticsSuite
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index e70773118f..fae6eb5525 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -1,9 +1,12 @@
package org.apache.spark.graph
+import scala.util.Random
+
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.graph.LocalSparkContext._
+import org.apache.spark.graph.impl.EdgePartitionBuilder
import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext {
@@ -15,7 +18,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L)
val edges = sc.parallelize(rawEdges)
- val graph = Graph(edges, 1.0F)
+ val graph = Graph.fromEdgeTuples(edges, 1.0F)
assert(graph.edges.count() === rawEdges.size)
}
}
@@ -35,10 +38,45 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("core operations") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val n = 5
+ val star = Graph.fromEdgeTuples(
+ sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
+ // triplets
+ assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
+ (1 to n).map(x => (0: Vid, x: Vid, "v", "v")).toSet)
+ // reverse
+ val reverseStar = star.reverse
+ assert(reverseStar.outDegrees.collect.toSet === (1 to n).map(x => (x: Vid, 1)).toSet)
+ // outerJoinVertices
+ val reverseStarDegrees =
+ reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
+ val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
+ et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
+ (a: Int, b: Int) => a + b).collect.toSet
+ assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0)))
+ // mapVertices preserving type
+ val mappedVAttrs = reverseStar.mapVertices((vid, attr) => attr + "2")
+ assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: Vid, "v2")).toSet)
+ // mapVertices changing type
+ val mappedVAttrs2 = reverseStar.mapVertices((vid, attr) => attr.length)
+ assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet)
+ // groupEdges
+ val doubleStar = Graph.fromEdgeTuples(
+ sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v")
+ val star2 = doubleStar.groupEdges { (a, b) => a}
+ assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
+ star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
+ assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
+ }
+ }
+
test("mapEdges") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
- val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "defaultValue")
+ val star = Graph.fromEdgeTuples(
+ sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v")
val starWithEdgeAttrs = star.mapEdges(e => e.dstId)
// map(_.copy()) is a workaround for https://github.com/amplab/graphx/issues/25
@@ -50,20 +88,49 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mapReduceTriplets") {
withSpark(new SparkContext("local", "test")) { sc =>
- val n = 3
- val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0)
+ val n = 5
+ val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0)
val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
val neighborDegreeSums = starDeg.mapReduceTriplets(
edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
(a: Int, b: Int) => a + b)
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
+
+ // activeSetOpt
+ val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: Vid, y: Vid)
+ val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
+ val vids = complete.mapVertices((vid, attr) => vid).cache()
+ val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
+ val numEvenNeighbors = vids.mapReduceTriplets(et => {
+ // Map function should only run on edges with destination in the active set
+ if (et.dstId % 2 != 0) {
+ throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
+ }
+ Iterator((et.srcId, 1))
+ }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
+ assert(numEvenNeighbors === (1 to n).map(x => (x: Vid, n / 2)).toSet)
+
+ // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
+ val ring = Graph.fromEdgeTuples(sc.parallelize((0 until n).map(x => (x: Vid, (x+1) % n: Vid)), 3), 0)
+ .mapVertices((vid, attr) => vid).cache()
+ val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_)
+ val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
+ val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
+ // Map function should only run on edges with source in the active set
+ if (et.srcId % 2 != 1) {
+ throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
+ }
+ Iterator((et.dstId, 1))
+ }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
+ assert(numOddNeighbors === (2 to n by 2).map(x => (x: Vid, 1)).toSet)
+
}
}
test("aggregateNeighbors") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 3
- val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1)
+ val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1)
val indegrees = star.aggregateNeighbors(
(vid, edge) => Some(1),
@@ -103,7 +170,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark(new SparkContext("local", "test")) { sc =>
val chain = (0 until 100).map(x => (x, (x+1)%100) )
val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) }
- val graph = Graph(rawEdges, 1.0)
+ val graph = Graph.fromEdgeTuples(rawEdges, 1.0)
val nbrs = graph.collectNeighborIds(EdgeDirection.Both)
assert(nbrs.count === chain.size)
assert(graph.numVertices === nbrs.count)
@@ -165,18 +232,50 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("VertexSetRDD") {
withSpark(new SparkContext("local", "test")) { sc =>
- val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5)
- val b = VertexSetRDD(a).mapValues(x => -x)
- assert(b.count === 101)
+ val n = 100
+ val a = sc.parallelize((0 to n).map(x => (x.toLong, x.toLong)), 5)
+ val b = VertexRDD(a).mapValues(x => -x).cache() // Allow joining b with a derived RDD of b
+ assert(b.count === n + 1)
assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0)
- val c = VertexSetRDD(a, b.index)
+ val c = b.aggregateUsingIndex[Long](a, (x, y) => x)
assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0)
val d = c.filter(q => ((q._2 % 2) == 0))
val e = a.filter(q => ((q._2 % 2) == 0))
assert(d.count === e.count)
assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0)
+ val f = b.mapValues(x => if (x % 2 == 0) -x else x)
+ assert(b.diff(f).collect().toSet === (2 to n by 2).map(x => (x.toLong, x.toLong)).toSet)
+ }
+ }
+
+ test("subgraph") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ // Create a star graph of 10 veritces.
+ val n = 10
+ val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v")
+ // Take only vertices whose vids are even
+ val subgraph = star.subgraph(vpred = (vid, attr) => vid % 2 == 0)
+ // We should have 5 vertices.
+ assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet)
+
+ // And 4 edges.
+ assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
}
}
+ test("EdgePartition.sort") {
+ val edgesFrom0 = List(Edge(0, 1, 0))
+ val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
+ val sortedEdges = edgesFrom0 ++ edgesFrom1
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- Random.shuffle(sortedEdges)) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges)
+ assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
+ assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
+ }
}
diff --git a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala
new file mode 100644
index 0000000000..0897d9783e
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala
@@ -0,0 +1,43 @@
+package org.apache.spark.graph
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graph.LocalSparkContext._
+import org.apache.spark.rdd._
+
+class PregelSuite extends FunSuite with LocalSparkContext {
+
+ System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
+
+ test("1 iteration") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val n = 5
+ val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
+ val result = Pregel(star, 0)(
+ (vid, attr, msg) => attr,
+ et => Iterator.empty,
+ (a: Int, b: Int) => throw new Exception("mergeMsg run unexpectedly"))
+ assert(result.vertices.collect.toSet === star.vertices.collect.toSet)
+ }
+ }
+
+ test("chain propagation") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val n = 5
+ val chain = Graph.fromEdgeTuples(
+ sc.parallelize((1 until n).map(x => (x: Vid, x + 1: Vid)), 3),
+ 0).cache()
+ assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: Vid, 0)).toSet)
+ val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }
+ assert(chainWithSeed.vertices.collect.toSet === Set((1: Vid, 1)) ++ (2 to n).map(x => (x: Vid, 0)).toSet)
+ val result = Pregel(chainWithSeed, 0)(
+ (vid, attr, msg) => math.max(msg, attr),
+ et => Iterator((et.dstId, et.srcAttr)),
+ (a: Int, b: Int) => math.max(a, b))
+ assert(result.vertices.collect.toSet ===
+ chain.vertices.mapValues { (vid, attr) => attr + 1 }.collect.toSet)
+ }
+ }
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
index 6295f866b8..6b86f9b25d 100644
--- a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
@@ -1,13 +1,16 @@
package org.apache.spark.graph
+import java.io.{EOFException, ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.util.Random
+
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
+import org.apache.spark._
import org.apache.spark.graph.LocalSparkContext._
-import java.io.{EOFException, ByteArrayInputStream, ByteArrayOutputStream}
import org.apache.spark.graph.impl._
import org.apache.spark.graph.impl.MsgRDDFunctions._
-import org.apache.spark._
+import org.apache.spark.serializer.SerializationStream
class SerializerSuite extends FunSuite with LocalSparkContext {
@@ -79,7 +82,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("IntAggMsgSerializer") {
- val outMsg = new AggregationMsg[Int](4, 5)
+ val outMsg = (4: Vid, 5)
val bout = new ByteArrayOutputStream
val outStrm = new IntAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -87,12 +90,10 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntAggMsgSerializer().newInstance().deserializeStream(bin)
- val inMsg1: AggregationMsg[Int] = inStrm.readObject()
- val inMsg2: AggregationMsg[Int] = inStrm.readObject()
- assert(outMsg.vid === inMsg1.vid)
- assert(outMsg.vid === inMsg2.vid)
- assert(outMsg.data === inMsg1.data)
- assert(outMsg.data === inMsg2.data)
+ val inMsg1: (Vid, Int) = inStrm.readObject()
+ val inMsg2: (Vid, Int) = inStrm.readObject()
+ assert(outMsg === inMsg1)
+ assert(outMsg === inMsg2)
intercept[EOFException] {
inStrm.readObject()
@@ -100,7 +101,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("LongAggMsgSerializer") {
- val outMsg = new AggregationMsg[Long](4, 1L << 32)
+ val outMsg = (4: Vid, 1L << 32)
val bout = new ByteArrayOutputStream
val outStrm = new LongAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -108,12 +109,10 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongAggMsgSerializer().newInstance().deserializeStream(bin)
- val inMsg1: AggregationMsg[Long] = inStrm.readObject()
- val inMsg2: AggregationMsg[Long] = inStrm.readObject()
- assert(outMsg.vid === inMsg1.vid)
- assert(outMsg.vid === inMsg2.vid)
- assert(outMsg.data === inMsg1.data)
- assert(outMsg.data === inMsg2.data)
+ val inMsg1: (Vid, Long) = inStrm.readObject()
+ val inMsg2: (Vid, Long) = inStrm.readObject()
+ assert(outMsg === inMsg1)
+ assert(outMsg === inMsg2)
intercept[EOFException] {
inStrm.readObject()
@@ -121,7 +120,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("DoubleAggMsgSerializer") {
- val outMsg = new AggregationMsg[Double](4, 5.0)
+ val outMsg = (4: Vid, 5.0)
val bout = new ByteArrayOutputStream
val outStrm = new DoubleAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -129,12 +128,10 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleAggMsgSerializer().newInstance().deserializeStream(bin)
- val inMsg1: AggregationMsg[Double] = inStrm.readObject()
- val inMsg2: AggregationMsg[Double] = inStrm.readObject()
- assert(outMsg.vid === inMsg1.vid)
- assert(outMsg.vid === inMsg2.vid)
- assert(outMsg.data === inMsg1.data)
- assert(outMsg.data === inMsg2.data)
+ val inMsg1: (Vid, Double) = inStrm.readObject()
+ val inMsg2: (Vid, Double) = inStrm.readObject()
+ assert(outMsg === inMsg1)
+ assert(outMsg === inMsg2)
intercept[EOFException] {
inStrm.readObject()
@@ -150,11 +147,35 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
}
- test("TestShuffleAggregationMsg") {
- withSpark(new SparkContext("local[2]", "test")) { sc =>
- val bmsgs = sc.parallelize(0 until 100, 10).map(pid => new AggregationMsg[Int](pid, pid))
- bmsgs.partitionBy(new HashPartitioner(3)).collect()
+ test("variable long encoding") {
+ def testVarLongEncoding(v: Long, optimizePositive: Boolean) {
+ val bout = new ByteArrayOutputStream
+ val stream = new ShuffleSerializationStream(bout) {
+ def writeObject[T](t: T): SerializationStream = {
+ writeVarLong(t.asInstanceOf[Long], optimizePositive = optimizePositive)
+ this
+ }
+ }
+ stream.writeObject(v)
+
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val dstream = new ShuffleDeserializationStream(bin) {
+ def readObject[T](): T = {
+ readVarLong(optimizePositive).asInstanceOf[T]
+ }
+ }
+ val read = dstream.readObject[Long]()
+ assert(read === v)
}
- }
-} \ No newline at end of file
+ // Test all variable encoding code path (each branch uses 7 bits, i.e. 1L << 7 difference)
+ val d = Random.nextLong() % 128
+ Seq[Long](0, 1L << 0 + d, 1L << 7 + d, 1L << 14 + d, 1L << 21 + d, 1L << 28 + d, 1L << 35 + d,
+ 1L << 42 + d, 1L << 49 + d, 1L << 56 + d, 1L << 63 + d).foreach { number =>
+ testVarLongEncoding(number, optimizePositive = false)
+ testVarLongEncoding(number, optimizePositive = true)
+ testVarLongEncoding(-number, optimizePositive = false)
+ testVarLongEncoding(-number, optimizePositive = true)
+ }
+ }
+}