aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/IndexedRDDFunctions.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala9
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Analytics.scala27
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala146
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLab.scala37
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphOps.scala20
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Pregel.scala27
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala55
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala31
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala224
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala686
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala22
14 files changed, 764 insertions, 555 deletions
diff --git a/core/src/main/scala/org/apache/spark/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/IndexedRDDFunctions.scala
index 65c6963b71..358ab57b0c 100644
--- a/core/src/main/scala/org/apache/spark/IndexedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/IndexedRDDFunctions.scala
@@ -50,6 +50,32 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K
/**
+ * Pass each value in the key-value pair RDD through a map function without changing the keys;
+ * this also retains the original RDD's partitioning.
+ */
+ override def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = {
+ val cleanF = self.index.rdd.context.clean(f)
+ val newValues = self.index.rdd.zipPartitions(self.valuesRDD){ (keysIter, valuesIter) =>
+ val index = keysIter.next()
+ assert(keysIter.hasNext() == false)
+ val oldValues = valuesIter.next()
+ assert(valuesIter.hasNext() == false)
+ // Allocate the array to store the results into
+ val newValues: Array[Seq[U]] = new Array[Seq[U]](oldValues.size)
+ // Populate the new Values
+ for( (k,i) <- index ) {
+ if(oldValues(i) != null) {
+ newValues(i) = oldValues(i).map( v => f(k,v) )
+ }
+ }
+ Array(newValues.toSeq).iterator
+ }
+ new IndexedRDD[K,U](self.index, newValues)
+ }
+
+
+
+ /**
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala
index 79a007a939..8d2e9782c2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala
@@ -53,6 +53,8 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI
rdd.persist(newLevel)
return this
}
+
+ def partitioner: Partitioner = rdd.partitioner.get
}
@@ -85,6 +87,9 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
override val partitioner = index.rdd.partitioner
+
+
+
/**
* The actual partitions are defined by the tuples.
*/
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 435ddb9e94..569d74ae7a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -393,6 +393,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
new MappedValuesRDD(self, cleanF)
}
+
+ /**
+ * Pass each value in the key-value pair RDD through a map function without changing the keys;
+ * this also retains the original RDD's partitioning.
+ */
+ def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = {
+ self.map{ case (k,v) => (k, f(k,v)) }
+ }
+
/**
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index b411c60cee..49498fbcd4 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -44,9 +44,9 @@ object Analytics extends Logging {
numIter: Int,
resetProb: Double = 0.15) = {
// Compute the out degree of each vertex
- val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double)](graph.outDegrees,
- (vertex, deg) => (deg.getOrElse(0), 1.0)
- )
+ val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
+ (vid, vdata, deg) => (deg.getOrElse(0), 1.0)
+ }
println("Vertex Replication: " + pagerankGraph.replication)
@@ -59,11 +59,11 @@ object Analytics extends Logging {
Pregel.iterate[(Int, Double), ED, Double](pagerankGraph)(
- (vertex, a: Double) => (vertex.data._1, (resetProb + (1.0 - resetProb) * a)), // apply
+ (vid, data, a: Double) => (data._1, (resetProb + (1.0 - resetProb) * a)), // apply
(me_id, edge) => Some(edge.src.data._2 / edge.src.data._1), // gather
(a: Double, b: Double) => a + b, // merge
1.0,
- numIter).mapVertices{ case Vertex(id, (outDeg, r)) => r }
+ numIter).mapVertices{ case (id, (outDeg, r)) => r }
}
/**
@@ -74,18 +74,19 @@ object Analytics extends Logging {
maxIter: Int = Integer.MAX_VALUE,
resetProb: Double = 0.15) = {
// Compute the out degree of each vertex
- val pagerankGraph = graph.leftJoinVertices[Int, (Int, Double, Double)](graph.outDegrees,
- (vertex, degIter) => (degIter.sum, 1.0, 1.0)
- )
+ val pagerankGraph = graph.outerJoinVertices(graph.outDegrees){
+ (id, data, degIter) => (degIter.sum, 1.0, 1.0)
+ }
+
// Run PageRank
GraphLab.iterate(pagerankGraph)(
(me_id, edge) => edge.src.data._2 / edge.src.data._1, // gather
(a: Double, b: Double) => a + b,
- (vertex, a: Option[Double]) =>
- (vertex.data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), vertex.data._2), // apply
+ (id, data, a: Option[Double]) =>
+ (data._1, (resetProb + (1.0 - resetProb) * a.getOrElse(0.0)), data._2), // apply
(me_id, edge) => math.abs(edge.src.data._3 - edge.src.data._2) > tol, // scatter
- maxIter).mapVertices { case Vertex(vid, data) => data._2 }
+ maxIter).mapVertices { case (vid, data) => data._2 }
}
@@ -96,12 +97,12 @@ object Analytics extends Logging {
* that vertex.
*/
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]) = {
- val ccGraph = graph.mapVertices { case Vertex(vid, _) => vid }
+ val ccGraph = graph.mapVertices { case (vid, _) => vid }
GraphLab.iterate(ccGraph)(
(me_id, edge) => edge.otherVertex(me_id).data, // gather
(a: Vid, b: Vid) => math.min(a, b), // merge
- (v, a: Option[Vid]) => math.min(v.data, a.getOrElse(Long.MaxValue)), // apply
+ (id, data, a: Option[Vid]) => math.min(data, a.getOrElse(Long.MaxValue)), // apply
(me_id, edge) => (edge.vertex(me_id).data < edge.otherVertex(me_id).data), // scatter
gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both
)
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index 61032bf0be..39c699ce8b 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -2,6 +2,7 @@ package org.apache.spark.graph
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.ClosureCleaner
@@ -33,7 +34,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
*
* @todo should vertices return tuples instead of vertex objects?
*/
- def vertices: RDD[Vertex[VD]]
+ def vertices: RDD[(Vid,VD)]
/**
* Get the Edges and their data as an RDD. The entries in the RDD contain
@@ -101,7 +102,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* }}}
*
*/
- def mapVertices[VD2: ClassManifest](map: Vertex[VD] => VD2): Graph[VD2, ED]
+ def mapVertices[VD2: ClassManifest](map: (Vid, VD) => VD2): Graph[VD2, ED]
/**
* Construct a new graph where each the value of each edge is transformed by
@@ -149,13 +150,13 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
- /**
- * Remove edges conntecting vertices that are not in the graph.
- *
- * @todo remove this function and ensure that for a graph G=(V,E):
- * if (u,v) in E then u in V and v in V
- */
- def correctEdges(): Graph[VD, ED]
+ // /**
+ // * Remove edges conntecting vertices that are not in the graph.
+ // *
+ // * @todo remove this function and ensure that for a graph G=(V,E):
+ // * if (u,v) in E then u in V and v in V
+ // */
+ // def correctEdges(): Graph[VD, ED]
/**
* Construct a new graph with all the edges reversed. If this graph contains
@@ -183,8 +184,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* @return the subgraph containing only the vertices and edges that satisfy the
* predicates.
*/
- def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (_ => true),
- vpred: Vertex[VD] => Boolean = (_ => true) ): Graph[VD, ED]
+ def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
+ vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
// /**
@@ -200,51 +201,55 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
+ def mapReduceTriplets[A: ClassManifest](
+ mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
+ reduceFunc: (A, A) => A)
+ : RDD[(Vid, A)]
- /**
- * This function is used to compute a statistic for the neighborhood of each
- * vertex.
- *
- * This is one of the core functions in the Graph API in that enables
- * neighborhood level computation. For example this function can be used to
- * count neighbors satisfying a predicate or implement PageRank.
- *
- * @note The returned RDD may contain fewer entries than their are vertices
- * in the graph. This is because some vertices may not have neighbors or the
- * map function may return None for all neighbors.
- *
- * @param mapFunc the function applied to each edge adjacent to each vertex.
- * The mapFunc can optionally return None in which case it does not
- * contribute to the final sum.
- * @param mergeFunc the function used to merge the results of each map
- * operation.
- * @param direction the direction of edges to consider (e.g., In, Out, Both).
- * @tparam VD2 The returned type of the aggregation operation.
- *
- * @return A Spark.RDD containing tuples of vertex identifiers and thee
- * resulting value. Note that the returned RDD may contain fewer vertices
- * than in the original graph since some vertices may not have neighbors or
- * the map function could return None for all neighbors.
- *
- * @example We can use this function to compute the average follower age for
- * each user
- * {{{
- * val graph: Graph[Int,Int] = loadGraph()
- * val averageFollowerAge: RDD[(Int, Int)] =
- * graph.aggregateNeighbors[(Int,Double)](
- * (vid, edge) => (edge.otherVertex(vid).data, 1),
- * (a, b) => (a._1 + b._1, a._2 + b._2),
- * EdgeDirection.In)
- * .mapValues{ case (sum,followers) => sum.toDouble / followers}
- * }}}
- *
- */
- def aggregateNeighbors[A: ClassManifest](
- mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
- mergeFunc: (A, A) => A,
- direction: EdgeDirection)
- : Graph[(VD, Option[A]), ED]
+ // /**
+ // * This function is used to compute a statistic for the neighborhood of each
+ // * vertex.
+ // *
+ // * This is one of the core functions in the Graph API in that enables
+ // * neighborhood level computation. For example this function can be used to
+ // * count neighbors satisfying a predicate or implement PageRank.
+ // *
+ // * @note The returned RDD may contain fewer entries than their are vertices
+ // * in the graph. This is because some vertices may not have neighbors or the
+ // * map function may return None for all neighbors.
+ // *
+ // * @param mapFunc the function applied to each edge adjacent to each vertex.
+ // * The mapFunc can optionally return None in which case it does not
+ // * contribute to the final sum.
+ // * @param mergeFunc the function used to merge the results of each map
+ // * operation.
+ // * @param direction the direction of edges to consider (e.g., In, Out, Both).
+ // * @tparam VD2 The returned type of the aggregation operation.
+ // *
+ // * @return A Spark.RDD containing tuples of vertex identifiers and thee
+ // * resulting value. Note that the returned RDD may contain fewer vertices
+ // * than in the original graph since some vertices may not have neighbors or
+ // * the map function could return None for all neighbors.
+ // *
+ // * @example We can use this function to compute the average follower age for
+ // * each user
+ // * {{{
+ // * val graph: Graph[Int,Int] = loadGraph()
+ // * val averageFollowerAge: RDD[(Int, Int)] =
+ // * graph.aggregateNeighbors[(Int,Double)](
+ // * (vid, edge) => (edge.otherVertex(vid).data, 1),
+ // * (a, b) => (a._1 + b._1, a._2 + b._2),
+ // * EdgeDirection.In)
+ // * .mapValues{ case (sum,followers) => sum.toDouble / followers}
+ // * }}}
+ // *
+ // */
+ // def aggregateNeighbors[A: ClassManifest](
+ // mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+ // mergeFunc: (A, A) => A,
+ // direction: EdgeDirection)
+ // : Graph[(VD, Option[A]), ED]
/**
* This function is used to compute a statistic for the neighborhood of each
@@ -291,9 +296,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def aggregateNeighbors[A: ClassManifest](
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
reduceFunc: (A, A) => A,
- default: A, // Should this be a function or a value?
direction: EdgeDirection)
- : Graph[(VD, Option[A]), ED]
+ : RDD[(Vid, A)]
/**
@@ -328,9 +332,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* }}}
* @todo Is leftJoinVertices the right name?
*/
- def leftJoinVertices[U: ClassManifest, VD2: ClassManifest](
- table: RDD[(Vid, U)],
- mapFunc: (Vertex[VD], Option[U]) => VD2)
+ def outerJoinVertices[U: ClassManifest, VD2: ClassManifest](table: RDD[(Vid, U)])
+ (mapFunc: (Vid, VD, Option[U]) => VD2)
: Graph[VD2, ED]
/**
@@ -366,10 +369,15 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* graph.joinVertices(tbl)( (v, row) => row )
* }}}
*/
- def joinVertices[U: ClassManifest](
- table: RDD[(Vid, U)],
- mapFunc: (Vertex[VD], U) => VD)
- : Graph[VD, ED]
+ def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD)
+ : Graph[VD, ED] = {
+ ClosureCleaner.clean(mapFunc)
+ def uf(id: Vid, data: VD, o: Option[U]): VD = o match {
+ case Some(u) => mapFunc(id, data, u)
+ case None => data
+ }
+ outerJoinVertices(table)(uf)
+ }
// Save a copy of the GraphOps object so there is always one unique GraphOps object
// for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
@@ -391,16 +399,16 @@ object Graph {
rawEdges.map { case (s, t) => Edge(s, t, 1) }
}
// Determine unique vertices
- val vertices: RDD[Vertex[Int]] = edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }
- .reduceByKey(_ + _)
- .map{ case (id, deg) => Vertex(id, deg) }
+ val vertices: RDD[(Vid, Int)] =
+ edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }.reduceByKey(_ + _)
+
// Return graph
- new GraphImpl(vertices, edges)
+ GraphImpl(vertices, edges)
}
def apply[VD: ClassManifest, ED: ClassManifest](
- vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]): Graph[VD, ED] = {
- new GraphImpl(vertices, edges)
+ vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]]): Graph[VD, ED] = {
+ GraphImpl(vertices, edges)
}
implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
index 01f24a1302..ccb1bd8e5d 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
@@ -36,7 +36,7 @@ object GraphLab {
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
mergeFunc: (A, A) => A,
- applyFunc: (Vertex[VD], Option[A]) => VD,
+ applyFunc: (Vid, VD, Option[A]) => VD,
scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
numIter: Int = Integer.MAX_VALUE,
gatherDirection: EdgeDirection = EdgeDirection.In,
@@ -45,7 +45,7 @@ object GraphLab {
// Add an active attribute to all vertices to track convergence.
var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
- case Vertex(id, data) => (true, data)
+ case (id, data) => (true, data)
}.cache()
// The gather function wrapper strips the active attribute and
@@ -64,9 +64,9 @@ object GraphLab {
// The apply function wrapper strips the vertex of the active attribute
// and only invokes the apply function on active vertices
- def apply(v: Vertex[((Boolean, VD), Option[A])]): (Boolean, VD) = {
- val ((active, vData), accum) = v.data
- if (active) (true, applyFunc(Vertex(v.id, vData), accum))
+ def apply(vid: Vid, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = {
+ val (active, vData) = data
+ if (active) (true, applyFunc(vid, vData, accum))
else (false, vData)
}
@@ -89,9 +89,9 @@ object GraphLab {
}
// Used to set the active status of vertices for the next round
- def applyActive(v: Vertex[((Boolean, VD), Option[Boolean])]): (Boolean, VD) = {
- val ((prevActive, vData), newActive) = v.data
- (newActive.getOrElse(false), vData)
+ def applyActive(vid: Vid, data: (Boolean, VD), newActive: Boolean): (Boolean, VD) = {
+ val (prevActive, vData) = data
+ (newActive, vData)
}
// Main Loop ---------------------------------------------------------------------
@@ -99,29 +99,32 @@ object GraphLab {
var numActive = activeGraph.numVertices
while (i < numIter && numActive > 0) {
- val gathered: Graph[((Boolean, VD), Option[A]), ED] =
+ // Gather
+ val gathered: RDD[(Vid, A)] =
activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
- val applied: Graph[(Boolean, VD), ED] = gathered.mapVertices(apply).cache()
+ // Apply
+ activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache()
- activeGraph = applied.cache()
+
// Scatter is basically a gather in the opposite direction so we reverse the edge direction
// activeGraph: Graph[(Boolean, VD), ED]
- val scattered: Graph[((Boolean, VD), Option[Boolean]), ED] =
+ val scattered: RDD[(Vid, Boolean)] =
activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
- val newActiveGraph: Graph[(Boolean, VD), ED] =
- scattered.mapVertices(applyActive)
- activeGraph = newActiveGraph.cache()
+ activeGraph = activeGraph.joinVertices(scattered)(applyActive).cache()
- numActive = activeGraph.vertices.map(v => if (v.data._1) 1 else 0).reduce(_ + _)
+ // Calculate the number of active vertices
+ numActive = activeGraph.vertices.map{
+ case (vid, data) => if (data._1) 1 else 0
+ }.reduce(_ + _)
println("Number active vertices: " + numActive)
i += 1
}
// Remove the active attribute from the vertex data before returning the graph
- activeGraph.mapVertices(v => v.data._2)
+ activeGraph.mapVertices{case (vid, data) => data._2 }
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
index 4d7ca1268d..903e407b2d 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
@@ -48,7 +48,7 @@ object GraphLoader {
def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) }
.reduceByKey(_ + _)
- .map{ case (vid, degree) => Vertex(vid, degree) }
- new GraphImpl[Int, ED](vertices, edges)
+ .map{ case (vid, degree) => (vid, degree) }
+ GraphImpl(vertices, edges)
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
index 9e8cc0a6d5..23c783ba3a 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
@@ -1,7 +1,7 @@
package org.apache.spark.graph
import org.apache.spark.rdd.RDD
-
+import org.apache.spark.SparkContext._
class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) {
@@ -16,22 +16,18 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) {
lazy val degrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Both)
def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
- val graph: Graph[(VD, Option[Array[Vid]]), ED] = g.aggregateNeighbors(
+ val nbrs = g.aggregateNeighbors[Array[Vid]](
(vid, edge) => Some(Array(edge.otherVertex(vid).id)),
(a, b) => a ++ b,
edgeDirection)
- graph.vertices.map(v => {
- val (_, neighborIds) = v.data
- (v.id, neighborIds.getOrElse(Array()))
- })
+
+ g.vertices.leftOuterJoin(nbrs).mapValues{
+ case (_, Some(nbrs)) => nbrs
+ case (_, None) => Array.empty[Vid]
+ }
}
private def degreesRDD(edgeDirection: EdgeDirection): RDD[(Vid, Int)] = {
- val degreeGraph: Graph[(VD, Option[Int]), ED] =
- g.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
- degreeGraph.vertices.map(v => {
- val (_, degree) = v.data
- (v.id, degree.getOrElse(0))
- })
+ g.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 09bcc67c8c..93c9c09ee3 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -6,7 +6,7 @@ import org.apache.spark.rdd.RDD
object Pregel {
def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
- vprog: (Vertex[VD], A) => VD,
+ vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A,
initialMsg: A,
@@ -19,25 +19,26 @@ object Pregel {
def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge)
- def runProg(vertexWithMsgs: Vertex[(VD, Option[A])]): VD = {
- val (vData, msg) = vertexWithMsgs.data
- val v = Vertex(vertexWithMsgs.id, vData)
+ def runProg(id: Vid, data: (VD, Option[A]) ): VD = {
+ val (vData, msg) = data
msg match {
- case Some(m) => vprog(v, m)
- case None => v.data
+ case Some(m) => vprog(id, vData, m)
+ case None => vData
}
}
- var graphWithMsgs: Graph[(VD, Option[A]), ED] =
- g.mapVertices(v => (v.data, Some(initialMsg)))
+ // Receive the first set of messages
+ g.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
while (i < numIter) {
- val newGraph: Graph[VD, ED] = graphWithMsgs.mapVertices(runProg).cache()
- graphWithMsgs = newGraph.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
+ // compute the messages
+ val messages = g.aggregateNeighbors(mapF, mergeMsg, EdgeDirection.In)
+ // receive the messages
+ g = g.joinVertices(messages)(vprog)
+ // count the iteration
i += 1
}
- graphWithMsgs.mapVertices(vertexWithMsgs => vertexWithMsgs.data match {
- case (vData, _) => vData
- })
+ // Return the final graph
+ g
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
index 3d218f27b1..f0d9080d97 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
@@ -1,9 +1,6 @@
package org.apache.spark.graph.impl
import scala.collection.mutable.ArrayBuilder
-
-import it.unimi.dsi.fastutil.ints.IntArrayList
-
import org.apache.spark.graph._
@@ -11,29 +8,43 @@ import org.apache.spark.graph._
* A partition of edges in 3 large columnar arrays.
*/
private[graph]
-class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] {
+class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest](
+ val srcIds: Array[Vid],
+ val dstIds: Array[Vid],
+ val data: Array[ED]
+ ){
- private var _data: Array[ED] = _
- private var _dataBuilder = ArrayBuilder.make[ED]
+ // private var _data: Array[ED] = _
+ // private var _dataBuilder = ArrayBuilder.make[ED]
- val srcIds = new VertexArrayList
- val dstIds = new VertexArrayList
+ // var srcIds = new VertexArrayList
+ // var dstIds = new VertexArrayList
- def data: Array[ED] = _data
+ def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data)
- /** Add a new edge to the partition. */
- def add(src: Vid, dst: Vid, d: ED) {
- srcIds.add(src)
- dstIds.add(dst)
- _dataBuilder += d
+ def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = {
+ val newData = new Array[ED2](data.size)
+ val edge = new Edge[ED]()
+ for(i <- 0 until data.size){
+ edge.src = srcIds(i)
+ edge.dst = dstIds(i)
+ edge.data = data(i)
+ newData(i) = f(edge)
+ }
+ new EdgePartition(srcIds, dstIds, newData)
}
- def trim() {
- srcIds.trim()
- dstIds.trim()
- _data = _dataBuilder.result()
+ def foreach(f: Edge[ED] => Unit) {
+ val edge = new Edge[ED]
+ for(i <- 0 until data.size){
+ edge.src = srcIds(i)
+ edge.dst = dstIds(i)
+ edge.data = data(i)
+ f(edge)
+ }
}
+
def size: Int = srcIds.size
def iterator = new Iterator[Edge[ED]] {
@@ -43,11 +54,13 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
override def hasNext: Boolean = pos < EdgePartition.this.size
override def next(): Edge[ED] = {
- edge.src = srcIds.get(pos)
- edge.dst = dstIds.get(pos)
- edge.data = _data(pos)
+ edge.src = srcIds(pos)
+ edge.dst = dstIds(pos)
+ edge.data = data(pos)
pos += 1
edge
}
}
}
+
+
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
new file mode 100644
index 0000000000..f2d07d55c6
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
@@ -0,0 +1,31 @@
+package org.apache.spark.graph.impl
+
+import scala.collection.mutable.ArrayBuilder
+import org.apache.spark.graph._
+
+
+private[graph]
+class EdgePartitionBuilder[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
+ED: ClassManifest]{
+ val srcIds = new VertexArrayList
+ val dstIds = new VertexArrayList
+ var dataBuilder = ArrayBuilder.make[ED]
+
+
+ /** Add a new edge to the partition. */
+ def add(src: Vid, dst: Vid, d: ED) {
+ srcIds.add(src)
+ dstIds.add(dst)
+ dataBuilder += d
+ }
+
+ def toEdgePartition: EdgePartition[ED] = {
+ new EdgePartition(srcIds.toLongArray(), dstIds.toLongArray(), dataBuilder.result())
+ }
+
+
+}
+
+
+
+
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala
index 1cd48120a1..6779f4aa09 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala
@@ -1,112 +1,112 @@
-package org.apache.spark.graph.impl
-
-import scala.collection.mutable
-
-import org.apache.spark.Aggregator
-import org.apache.spark.Partition
-import org.apache.spark.SparkEnv
-import org.apache.spark.TaskContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Dependency
-import org.apache.spark.OneToOneDependency
-import org.apache.spark.ShuffleDependency
-import org.apache.spark.SparkContext._
-import org.apache.spark.graph._
-
-
-private[graph]
-class EdgeTripletPartition(idx: Int, val vPart: Partition, val ePart: Partition)
- extends Partition {
- override val index: Int = idx
- override def hashCode(): Int = idx
-}
-
-
-/**
- * A RDD that brings together edge data with its associated vertex data.
- */
-private[graph]
-class EdgeTripletRDD[VD: ClassManifest, ED: ClassManifest](
- vTableReplicated: RDD[(Vid, VD)],
- eTable: RDD[(Pid, EdgePartition[ED])])
- extends RDD[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])](eTable.context, Nil) {
-
- //println("ddshfkdfhds" + vTableReplicated.partitioner.get.numPartitions)
- //println("9757984589347598734549" + eTable.partitioner.get.numPartitions)
-
- assert(vTableReplicated.partitioner == eTable.partitioner)
-
- override def getDependencies: List[Dependency[_]] = {
- List(new OneToOneDependency(eTable), new OneToOneDependency(vTableReplicated))
- }
-
- override def getPartitions = Array.tabulate[Partition](eTable.partitions.size) {
- i => new EdgeTripletPartition(i, eTable.partitions(i), vTableReplicated.partitions(i))
- }
-
- override val partitioner = eTable.partitioner
-
- override def getPreferredLocations(s: Partition) =
- eTable.preferredLocations(s.asInstanceOf[EdgeTripletPartition].ePart)
-
- override def compute(s: Partition, context: TaskContext)
- : Iterator[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])] = {
-
- val split = s.asInstanceOf[EdgeTripletPartition]
-
- // Fetch the vertices and put them in a hashmap.
- // TODO: use primitive hashmaps for primitive VD types.
- val vmap = new VertexHashMap[VD]//(1000000)
- vTableReplicated.iterator(split.vPart, context).foreach { v => vmap.put(v._1, v._2) }
-
- val (pid, edgePartition) = eTable.iterator(split.ePart, context).next()
- .asInstanceOf[(Pid, EdgePartition[ED])]
-
- // Return an iterator that looks up the hash map to find matching vertices for each edge.
- val iter = new Iterator[EdgeTriplet[VD, ED]] {
- private var pos = 0
- private val e = new EdgeTriplet[VD, ED]
- e.src = new Vertex[VD]
- e.dst = new Vertex[VD]
-
- override def hasNext: Boolean = pos < edgePartition.size
- override def next() = {
- e.src.id = edgePartition.srcIds.getLong(pos)
- // assert(vmap.containsKey(e.src.id))
- e.src.data = vmap.get(e.src.id)
-
- e.dst.id = edgePartition.dstIds.getLong(pos)
- // assert(vmap.containsKey(e.dst.id))
- e.dst.data = vmap.get(e.dst.id)
-
- //println("Iter called: " + pos)
- e.data = edgePartition.data(pos)
- pos += 1
- e
- }
-
- override def toList: List[EdgeTriplet[VD, ED]] = {
- val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
- for (i <- (0 until edgePartition.size)) {
- val currentEdge = new EdgeTriplet[VD, ED]
- currentEdge.src = new Vertex[VD]
- currentEdge.dst = new Vertex[VD]
- currentEdge.src.id = edgePartition.srcIds.getLong(i)
- // assert(vmap.containsKey(e.src.id))
- currentEdge.src.data = vmap.get(currentEdge.src.id)
-
- currentEdge.dst.id = edgePartition.dstIds.getLong(i)
- // assert(vmap.containsKey(e.dst.id))
- currentEdge.dst.data = vmap.get(currentEdge.dst.id)
-
- currentEdge.data = edgePartition.data(i)
- //println("Iter: " + pos + " " + e.src.id + " " + e.dst.id + " " + e.data)
- //println("List: " + i + " " + currentEdge.src.id + " " + currentEdge.dst.id + " " + currentEdge.data)
- lb += currentEdge
- }
- lb.toList
- }
- }
- Iterator((vmap, iter))
- }
-}
+// package org.apache.spark.graph.impl
+
+// import scala.collection.mutable
+
+// import org.apache.spark.Aggregator
+// import org.apache.spark.Partition
+// import org.apache.spark.SparkEnv
+// import org.apache.spark.TaskContext
+// import org.apache.spark.rdd.RDD
+// import org.apache.spark.Dependency
+// import org.apache.spark.OneToOneDependency
+// import org.apache.spark.ShuffleDependency
+// import org.apache.spark.SparkContext._
+// import org.apache.spark.graph._
+
+
+// private[graph]
+// class EdgeTripletPartition(idx: Int, val vPart: Partition, val ePart: Partition)
+// extends Partition {
+// override val index: Int = idx
+// override def hashCode(): Int = idx
+// }
+
+
+// /**
+// * A RDD that brings together edge data with its associated vertex data.
+// */
+// private[graph]
+// class EdgeTripletRDD[VD: ClassManifest, ED: ClassManifest](
+// vTableReplicated: IndexedRDD[Pid, VertexHashMap[VD]],
+// eTable: IndexedRDD[Pid, EdgePartition[ED]])
+// extends RDD[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])](eTable.context, Nil) {
+
+// //println("ddshfkdfhds" + vTableReplicated.partitioner.get.numPartitions)
+// //println("9757984589347598734549" + eTable.partitioner.get.numPartitions)
+
+// assert(vTableReplicated.partitioner == eTable.partitioner)
+
+// override def getDependencies: List[Dependency[_]] = {
+// List(new OneToOneDependency(eTable), new OneToOneDependency(vTableReplicated))
+// }
+
+// override def getPartitions = Array.tabulate[Partition](eTable.partitions.size) {
+// i => new EdgeTripletPartition(i, eTable.partitions(i), vTableReplicated.partitions(i))
+// }
+
+// override val partitioner = eTable.partitioner
+
+// override def getPreferredLocations(s: Partition) =
+// eTable.preferredLocations(s.asInstanceOf[EdgeTripletPartition].ePart)
+
+// override def compute(s: Partition, context: TaskContext)
+// : Iterator[(VertexHashMap[VD], Iterator[EdgeTriplet[VD, ED]])] = {
+
+// val split = s.asInstanceOf[EdgeTripletPartition]
+
+// // Fetch the vertices and put them in a hashmap.
+// // TODO: use primitive hashmaps for primitive VD types.
+// val vmap = new VertexHashMap[VD]//(1000000)
+// vTableReplicated.iterator(split.vPart, context).foreach { v => vmap.put(v._1, v._2) }
+
+// val (pid, edgePartition) = eTable.iterator(split.ePart, context).next()
+// .asInstanceOf[(Pid, EdgePartition[ED])]
+
+// // Return an iterator that looks up the hash map to find matching vertices for each edge.
+// val iter = new Iterator[EdgeTriplet[VD, ED]] {
+// private var pos = 0
+// private val e = new EdgeTriplet[VD, ED]
+// e.src = new Vertex[VD]
+// e.dst = new Vertex[VD]
+
+// override def hasNext: Boolean = pos < edgePartition.size
+// override def next() = {
+// e.src.id = edgePartition.srcIds.getLong(pos)
+// // assert(vmap.containsKey(e.src.id))
+// e.src.data = vmap.get(e.src.id)
+
+// e.dst.id = edgePartition.dstIds.getLong(pos)
+// // assert(vmap.containsKey(e.dst.id))
+// e.dst.data = vmap.get(e.dst.id)
+
+// //println("Iter called: " + pos)
+// e.data = edgePartition.data(pos)
+// pos += 1
+// e
+// }
+
+// override def toList: List[EdgeTriplet[VD, ED]] = {
+// val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
+// for (i <- (0 until edgePartition.size)) {
+// val currentEdge = new EdgeTriplet[VD, ED]
+// currentEdge.src = new Vertex[VD]
+// currentEdge.dst = new Vertex[VD]
+// currentEdge.src.id = edgePartition.srcIds.getLong(i)
+// // assert(vmap.containsKey(e.src.id))
+// currentEdge.src.data = vmap.get(currentEdge.src.id)
+
+// currentEdge.dst.id = edgePartition.dstIds.getLong(i)
+// // assert(vmap.containsKey(e.dst.id))
+// currentEdge.dst.data = vmap.get(currentEdge.dst.id)
+
+// currentEdge.data = edgePartition.data(i)
+// //println("Iter: " + pos + " " + e.src.id + " " + e.dst.id + " " + e.data)
+// //println("List: " + i + " " + currentEdge.src.id + " " + currentEdge.dst.id + " " + currentEdge.data)
+// lb += currentEdge
+// }
+// lb.toList
+// }
+// }
+// Iterator((vmap, iter))
+// }
+// }
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index e178df3841..45dc863a6b 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -2,12 +2,18 @@ package org.apache.spark.graph.impl
import scala.collection.JavaConversions._
+import scala.collection.mutable
+
import org.apache.spark.SparkContext._
import org.apache.spark.Partitioner
import org.apache.spark.HashPartitioner
import org.apache.spark.util.ClosureCleaner
+import org.apache.spark.rdd
import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.IndexedRDD
+import org.apache.spark.rdd.RDDIndex
+
import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._
@@ -18,112 +24,224 @@ import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._
* A Graph RDD that supports computation on graphs.
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
- val numVertexPartitions: Int,
- val numEdgePartitions: Int,
- _rawVertices: RDD[Vertex[VD]],
- _rawEdges: RDD[Edge[ED]],
- _rawVTable: RDD[(Vid, (VD, Array[Pid]))],
- _rawETable: RDD[(Pid, EdgePartition[ED])])
+ val vTable: IndexedRDD[Vid, VD],
+ val vid2pid: IndexedRDD[Vid, Pid],
+ val eTable: IndexedRDD[Pid, EdgePartition[ED]])
extends Graph[VD, ED] {
- def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = {
- this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null)
- }
- def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = {
- if (_cached) {
- new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)
- .cache()
- } else {
- new GraphImpl(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null)
- }
+ /**
+ * The vTableReplicated is a version of the vertex data after it is
+ * replicated.
+ */
+ val vTableReplicated: IndexedRDD[Pid, VertexHashMap[VD]] = {
+ // Join vid2pid and vTable, generate a shuffle dependency on the joined
+ // result, and get the shuffle id so we can use it on the slave.
+ vTable.cogroup(vid2pid)
+ .flatMap { case (vid, (vdatas, pids)) =>
+ pids.iterator.map {
+ pid => MessageToPartition(pid, (vid, vdatas.head))
+ }
+ }
+ .partitionBy(eTable.partitioner.get) //@todo assert edge table has partitioner
+ .mapPartitionsWithIndex( (pid, iter) => {
+ // Build the hashmap for each partition
+ val vmap = new VertexHashMap[VD]
+ for( msg <- iter ) { vmap.put(msg.data._1, msg.data._2) }
+ Array((pid, vmap)).iterator
+ }, preservesPartitioning = true)
+ .indexed(eTable.index)
}
- def withVertexPartitioner(numVertexPartitions: Int) = {
- withPartitioner(numVertexPartitions, numEdgePartitions)
- }
- def withEdgePartitioner(numEdgePartitions: Int) = {
- withPartitioner(numVertexPartitions, numEdgePartitions)
- }
- protected var _cached = false
+
+
+
+ // def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = {
+ // this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null)
+ // }
+
+ // def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = {
+ // if (_cached) {
+ // new GraphImpl(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)
+ // .cache()
+ // } else {
+ // new GraphImpl(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null)
+ // }
+ // }
+
+ // def withVertexPartitioner(numVertexPartitions: Int) = {
+ // withPartitioner(numVertexPartitions, numEdgePartitions)
+ // }
+
+ // def withEdgePartitioner(numEdgePartitions: Int) = {
+ // withPartitioner(numVertexPartitions, numEdgePartitions)
+ // }
+
+
override def cache(): Graph[VD, ED] = {
eTable.cache()
+ vid2pid.cache()
vTable.cache()
- _cached = true
+ // @todo: should we cache the replicated data?
+ vTableReplicated.cache()
this
}
override def replication(): Double = {
- val rep = vTable.map{ case (_, (_, a)) => a.size }.sum
+ val rep = vid2pid.groupByKey().map(kv => kv._2.size).sum
rep / vTable.count
}
override def balance(): Array[Int] = {
- eTable.map{ case (_, epart) => epart.data.size }.collect
+ eTable.map{ case (pid, epart) => epart.data.size }.collect
}
override def reverse: Graph[VD, ED] = {
- newGraph(vertices, edges.map{ case Edge(s, t, e) => Edge(t, s, e) })
+ val etable = eTable.mapValues( _.reverse ).asInstanceOf[IndexedRDD[Pid, EdgePartition[ED]]]
+ new GraphImpl(vTable, vid2pid, etable)
}
/** Return a RDD of vertices. */
- override def vertices: RDD[Vertex[VD]] = {
- if (!_cached && _rawVertices != null) {
- _rawVertices
- } else {
- vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
- }
- }
+ override def vertices: RDD[(Vid, VD)] = vTable
+
/** Return a RDD of edges. */
override def edges: RDD[Edge[ED]] = {
- if (!_cached && _rawEdges != null) {
- _rawEdges
- } else {
- eTable.mapPartitions { iter => iter.next()._2.iterator }
- }
+ eTable.mapPartitions { iter => iter.next()._2.iterator }
}
/** Return a RDD that brings edges with its source and destination vertices together. */
override def triplets: RDD[EdgeTriplet[VD, ED]] = {
- new EdgeTripletRDD(vTableReplicated, eTable).mapPartitions { part => part.next()._2 }
+ vTableReplicated.join(eTable)
+ .mapPartitions{ iter =>
+ val (pid, (vmap, edgePartition)) = iter.next()
+ assert(iter.hasNext == false)
+ // Return an iterator that looks up the hash map to find matching
+ // vertices for each edge.
+ new Iterator[EdgeTriplet[VD, ED]] {
+ private var pos = 0
+ private val e = new EdgeTriplet[VD, ED]
+ e.src = new Vertex[VD]
+ e.dst = new Vertex[VD]
+
+ override def hasNext: Boolean = pos < edgePartition.size
+ override def next() = {
+ e.src.id = edgePartition.srcIds(pos)
+ // assert(vmap.containsKey(e.src.id))
+ e.src.data = vmap.get(e.src.id)
+ e.dst.id = edgePartition.dstIds(pos)
+ // assert(vmap.containsKey(e.dst.id))
+ e.dst.data = vmap.get(e.dst.id)
+ //println("Iter called: " + pos)
+ e.data = edgePartition.data(pos)
+ pos += 1
+ e
+ }
+
+ override def toList: List[EdgeTriplet[VD, ED]] = {
+ val lb = new mutable.ListBuffer[EdgeTriplet[VD,ED]]
+ for (i <- (0 until edgePartition.size)) {
+ val currentEdge = new EdgeTriplet[VD, ED]
+ currentEdge.src = new Vertex[VD]
+ currentEdge.dst = new Vertex[VD]
+ currentEdge.src.id = edgePartition.srcIds(i)
+ // assert(vmap.containsKey(e.src.id))
+ currentEdge.src.data = vmap.get(currentEdge.src.id)
+
+ currentEdge.dst.id = edgePartition.dstIds(i)
+ // assert(vmap.containsKey(e.dst.id))
+ currentEdge.dst.data = vmap.get(currentEdge.dst.id)
+
+ currentEdge.data = edgePartition.data(i)
+ lb += currentEdge
+ }
+ lb.toList
+ }
+ } // end of iterator
+ } // end of map partition
}
- override def mapVertices[VD2: ClassManifest](f: Vertex[VD] => VD2): Graph[VD2, ED] = {
- newGraph(vertices.map(v => Vertex(v.id, f(v))), edges)
+ override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
+ val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data))
+ .asInstanceOf[IndexedRDD[Vid, VD2]]
+ new GraphImpl(newVTable, vid2pid, eTable)
}
override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = {
- newGraph(vertices, edges.map(e => Edge(e.src, e.dst, f(e))))
+ val newETable = eTable.mapValues(eBlock => eBlock.map(f))
+ .asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]]
+ new GraphImpl(vTable, vid2pid, newETable)
}
+
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2):
Graph[VD, ED2] = {
- newGraph(vertices, triplets.map(e => Edge(e.src.id, e.dst.id, f(e))))
+ val newETable = eTable.join(vTableReplicated).mapValues{
+ case (edgePartition, vmap) =>
+ val et = new EdgeTriplet[VD, ED]
+ et.src = new Vertex[VD]
+ et.dst = new Vertex[VD]
+
+ edgePartition.map{e =>
+ et.data = e.data
+ et.src.id = e.src
+ et.src.data = vmap(e.src)
+ et.dst.id = e.dst
+ et.dst.data = vmap(e.dst)
+ f(et)
+ }
+ }.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]]
+ new GraphImpl(vTable, vid2pid, newETable)
}
- override def correctEdges(): Graph[VD, ED] = {
- val sc = vertices.context
- val vset = sc.broadcast(vertices.map(_.id).collect().toSet)
- val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst))
- Graph(vertices, newEdges)
- }
+ // override def correctEdges(): Graph[VD, ED] = {
+ // val sc = vertices.context
+ // val vset = sc.broadcast(vertices.map(_.id).collect().toSet)
+ // val newEdges = edges.filter(e => vset.value.contains(e.src) && vset.value.contains(e.dst))
+ // Graph(vertices, newEdges)
+ // }
- override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (_ => true),
- vpred: Vertex[VD] => Boolean = (_ => true) ): Graph[VD, ED] = {
+ override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
+ vpred: (Vid, VD) => Boolean = ((a,b) => true) ): Graph[VD, ED] = {
+
+ /// @todo: The following code behaves deterministically on each
+ /// vertex predicate but uses additional space. Should we swithc to
+ /// this version
+ // val predGraph = mapVertices(v => (v.data, vpred(v)))
+ // val newETable = predGraph.triplets.filter(t =>
+ // if(v.src.data._2 && v.dst.data._2) {
+ // val src = Vertex(t.src.id, t.src.data._1)
+ // val dst = Vertex(t.dst.id, t.dst.data._1)
+ // epred(new EdgeTriplet[VD, ED](src, dst, t.data))
+ // } else { false })
+
+ // val newVTable = predGraph.vertices.filter(v => v.data._1)
+ // .map(v => (v.id, v.data._1)).indexed()
+
+ // Reuse the partitioner (but not the index) from this graph
+ val newVTable = vertices.filter(v => vpred(v._1, v._2)).indexed(vTable.index.partitioner)
+
- // Restrict the set of vertices to those that satisfy the vertex predicate
- val newVertices = vertices.filter(vpred)
// Restrict the set of edges to those that satisfy the vertex and the edge predicate.
- val newEdges = triplets.filter(t => vpred(t.src) && vpred(t.dst) && epred(t))
- .map( t => Edge(t.src.id, t.dst.id, t.data) )
+ val newETable = createETable(
+ triplets.filter(
+ t => vpred( t.src.id, t.src.data ) && vpred( t.dst.id, t.dst.data ) && epred(t)
+ )
+ .map( t => Edge(t.src.id, t.dst.id, t.data) ),
+ eTable.index.partitioner.numPartitions
+ )
+
+ // Construct the Vid2Pid map. Here we assume that the filter operation
+ // behaves deterministically.
+ // @todo reindex the vertex and edge tables
+ val newVid2Pid = createVid2Pid(newETable, newVTable.index)
- new GraphImpl(newVertices, newEdges)
+ new GraphImpl(newVTable, newVid2Pid, newETable)
}
@@ -135,10 +253,10 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// TODO(crankshaw) is there a better way to do this using RDD.groupBy()
// functions?
- override def groupEdgeTriplets[ED2: ClassManifest](f: Iterator[EdgeTriplet[VD,ED]] => ED2 ):
+ override def groupEdgeTriplets[ED2: ClassManifest](
+ f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = {
//override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ):
- Graph[VD,ED2] = {
-
+
// I think that
// myRDD.mapPartitions { part =>
// val (vmap, edges) = part.next()
@@ -169,7 +287,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
.mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) }
// convert the resulting map back to a list of tuples
.toList
- // TODO(crankshaw) needs an iterator over the tuples? Why can't I map over the list?
+ // TODO(crankshaw) needs an iterator over the tuples?
+ // Why can't I map over the list?
.toIterator
// map over those tuples that contain src and dst info plus the
// new edge data to make my new edges
@@ -185,7 +304,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// and http://stackoverflow.com/questions/6998676/converting-a-scala-map-to-a-list
}
- newGraph(vertices, newEdges)
+
+ // @todo eliminate the need to call createETable
+ val newETable = createETable(newEdges,
+ eTable.index.partitioner.numPartitions)
+
+ new GraphImpl(vTable, vid2pid, newETable)
}
@@ -202,11 +326,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
.toList
.toIterator
.map { case ((src, dst), data) => Edge(src, dst, data) }
-
-
}
- newGraph(vertices, newEdges)
+ // @todo eliminate the need to call createETable
+ val newETable = createETable(newEdges,
+ eTable.index.partitioner.numPartitions)
+ new GraphImpl(vTable, vid2pid, newETable)
}
@@ -215,156 +340,90 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Lower level transformation methods
//////////////////////////////////////////////////////////////////////////////////////////////////
- override def aggregateNeighbors[A: ClassManifest](
- mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
- reduceFunc: (A, A) => A,
- default: A,
- gatherDirection: EdgeDirection)
- : Graph[(VD, Option[A]), ED] = {
+ override def mapReduceTriplets[A: ClassManifest](
+ mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
+ reduceFunc: (A, A) => A)
+ : RDD[(Vid, A)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
- val newVTable = vTableReplicated.mapPartitions({ part =>
- part.map { v => (v._1, MutableTuple2(v._2, Option.empty[A])) }
- }, preservesPartitioning = true)
-
- val newVertices: RDD[(Vid, A)] =
- new EdgeTripletRDD[MutableTuple2[VD, Option[A]], ED](newVTable, eTable)
- .mapPartitions { part =>
- val (vmap, edges) = part.next()
- val edgeSansAcc = new EdgeTriplet[VD, ED]()
- edgeSansAcc.src = new Vertex[VD]
- edgeSansAcc.dst = new Vertex[VD]
- edges.foreach { e: EdgeTriplet[MutableTuple2[VD, Option[A]], ED] =>
- edgeSansAcc.data = e.data
- edgeSansAcc.src.data = e.src.data._1
- edgeSansAcc.dst.data = e.dst.data._1
- edgeSansAcc.src.id = e.src.id
- edgeSansAcc.dst.id = e.dst.id
- if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
- e.dst.data._2 =
- if (e.dst.data._2.isEmpty) {
- mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
- } else {
- val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
- if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2
- }
- }
- if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
- e.dst.data._2 =
- if (e.dst.data._2.isEmpty) {
- mapFunc(edgeSansAcc.src.id, edgeSansAcc)
- } else {
- val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
- if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2
- }
- }
- }
- vmap.long2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry =>
- (entry.getLongKey(), entry.getValue()._2)
+ val newVTable: RDD[(Vid, A)] =
+ vTableReplicated.join(eTable).flatMap{
+ case (pid, (vmap, edgePartition)) =>
+ val aggMap = new VertexHashMap[A]
+ val et = new EdgeTriplet[VD, ED]
+ et.src = new Vertex[VD]
+ et.dst = new Vertex[VD]
+ edgePartition.foreach{e =>
+ et.data = e.data
+ et.src.id = e.src
+ et.src.data = vmap(e.src)
+ et.dst.id = e.dst
+ et.dst.data = vmap(e.dst)
+ mapFunc(et).foreach{case (vid, a) =>
+ if(aggMap.containsKey(vid)) {
+ aggMap.put(vid, reduceFunc(aggMap.get(vid), a))
+ } else { aggMap.put(vid, a) }
}
}
- .map{ case (vid, aOpt) => (vid, aOpt.get) }
- .combineByKey((v: A) => v, reduceFunc, null, vertexPartitioner, false)
+ // Return the aggregate map
+ aggMap.long2ObjectEntrySet().fastIterator().map{
+ entry => (entry.getLongKey(), entry.getValue())
+ }
+ }
+ .indexed(vTable.index).reduceByKey(reduceFunc)
- this.leftJoinVertices(newVertices, (v: Vertex[VD], a: Option[A]) => (v.data, a))
+ newVTable
}
- /**
- * Same as aggregateNeighbors but map function can return none and there is no default value.
- * As a consequence, the resulting table may be much smaller than the set of vertices.
- */
- override def aggregateNeighbors[A: ClassManifest](
- mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
- reduceFunc: (A, A) => A,
- gatherDirection: EdgeDirection): Graph[(VD, Option[A]), ED] = {
+ def aggregateNeighbors[A: ClassManifest](
+ mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+ reduceFunc: (A, A) => A,
+ dir: EdgeDirection)
+ : RDD[(Vid, A)] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
- val newVTable = vTableReplicated.mapPartitions({ part =>
- part.map { v => (v._1, MutableTuple2(v._2, Option.empty[A])) }
- }, preservesPartitioning = true)
-
- val newVertices: RDD[(Vid, A)] =
- new EdgeTripletRDD[MutableTuple2[VD, Option[A]], ED](newVTable, eTable)
- .mapPartitions { part =>
- val (vmap, edges) = part.next()
- val edgeSansAcc = new EdgeTriplet[VD, ED]()
- edgeSansAcc.src = new Vertex[VD]
- edgeSansAcc.dst = new Vertex[VD]
- edges.foreach { e: EdgeTriplet[MutableTuple2[VD, Option[A]], ED] =>
- edgeSansAcc.data = e.data
- edgeSansAcc.src.data = e.src.data._1
- edgeSansAcc.dst.data = e.dst.data._1
- edgeSansAcc.src.id = e.src.id
- edgeSansAcc.dst.id = e.dst.id
- if (gatherDirection == EdgeDirection.In || gatherDirection == EdgeDirection.Both) {
- e.dst.data._2 =
- if (e.dst.data._2.isEmpty) {
- mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
- } else {
- val tmp = mapFunc(edgeSansAcc.dst.id, edgeSansAcc)
- if (!tmp.isEmpty) Some(reduceFunc(e.dst.data._2.get, tmp.get)) else e.dst.data._2
- }
- }
- if (gatherDirection == EdgeDirection.Out || gatherDirection == EdgeDirection.Both) {
- e.src.data._2 =
- if (e.src.data._2.isEmpty) {
- mapFunc(edgeSansAcc.src.id, edgeSansAcc)
- } else {
- val tmp = mapFunc(edgeSansAcc.src.id, edgeSansAcc)
- if (!tmp.isEmpty) Some(reduceFunc(e.src.data._2.get, tmp.get)) else e.src.data._2
- }
- }
- }
- vmap.long2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry =>
- (entry.getLongKey(), entry.getValue()._2)
- }
- }
- .map{ case (vid, aOpt) => (vid, aOpt.get) }
- .combineByKey((v: A) => v, reduceFunc, null, vertexPartitioner, false)
+ // Define a new map function over edge triplets
+ def mf(et: EdgeTriplet[VD,ED]): Array[(Vid, A)] = {
+ // Compute the message to the dst vertex
+ val dstA =
+ if (dir == EdgeDirection.In || dir == EdgeDirection.Both) {
+ mapFunc(et.dst.id, et)
+ } else { Option.empty[A] }
+ // Compute the message to the source vertex
+ val srcA =
+ if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) {
+ mapFunc(et.src.id, et)
+ } else { Option.empty[A] }
+ // construct the return array
+ (srcA, dstA) match {
+ case (None, None) => Array.empty[(Vid, A)]
+ case (Some(src),None) => Array((et.src.id, src))
+ case (None, Some(dst)) => Array((et.dst.id, dst))
+ case (Some(src), Some(dst)) =>
+ Array((et.src.id, src), (et.dst.id, dst))
+ }
+ }
- this.leftJoinVertices(newVertices, (v: Vertex[VD], a: Option[A]) => (v.data, a))
+ mapReduceTriplets(mf, reduceFunc)
}
- override def leftJoinVertices[U: ClassManifest, VD2: ClassManifest](
- updates: RDD[(Vid, U)],
- updateF: (Vertex[VD], Option[U]) => VD2)
- : Graph[VD2, ED] = {
-
- ClosureCleaner.clean(updateF)
- val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter =>
- iter.map { case (vid, ((vdata, pids), update)) =>
- val newVdata = updateF(Vertex(vid, vdata), update)
- (vid, (newVdata, pids))
- }
- }, preservesPartitioning = true).cache()
- new GraphImpl(newVTable.partitions.length, eTable.partitions.length, null, null, newVTable, eTable)
- }
- override def joinVertices[U: ClassManifest](
- updates: RDD[(Vid, U)],
- updateF: (Vertex[VD], U) => VD)
- : Graph[VD, ED] = {
+ override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
+ (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2)
+ : Graph[VD2, ED] = {
ClosureCleaner.clean(updateF)
- val newVTable = vTable.leftOuterJoin(updates).mapPartitions({ iter =>
- iter.map { case (vid, ((vdata, pids), update)) =>
- if (update.isDefined) {
- val newVdata = updateF(Vertex(vid, vdata), update.get)
- (vid, (newVdata, pids))
- } else {
- (vid, (vdata, pids))
- }
- }
- }, preservesPartitioning = true).cache()
-
- new GraphImpl(newVTable.partitions.length, eTable.partitions.length, null, null, newVTable, eTable)
+ val newVTable = vTable.leftOuterJoin(updates).mapValuesWithKeys{
+ case (vid, (data, other)) => updateF(vid, data, other)
+ }.asInstanceOf[IndexedRDD[Vid,VD2]]
+ new GraphImpl(newVTable, vid2pid, eTable)
}
@@ -372,48 +431,129 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Internals hidden from callers
//////////////////////////////////////////////////////////////////////////////////////////////////
- // TODO: Support non-hash partitioning schemes.
- protected val vertexPartitioner = new HashPartitioner(numVertexPartitions)
- protected val edgePartitioner = new HashPartitioner(numEdgePartitions)
- /** Create a new graph but keep the current partitioning scheme. */
- protected def newGraph[VD2: ClassManifest, ED2: ClassManifest](
- vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = {
- (new GraphImpl[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions)
- }
+
+
+
+ // /** Create a new graph but keep the current partitioning scheme. */
+ // protected def newGraph[VD2: ClassManifest, ED2: ClassManifest](
+ // vertices: RDD[Vertex[VD2]], edges: RDD[Edge[ED2]]): Graph[VD2, ED2] = {
+ // (new GraphImpl[VD2, ED2](vertices, edges)).withPartitioner(numVertexPartitions, numEdgePartitions)
+ // }
+
+ // protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = {
+ // if (_rawETable == null) {
+ // createETable(_rawEdges, numEdgePartitions)
+ // } else {
+ // _rawETable
+ // }
+ // }
+
+ // protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = {
+ // if (_rawVTable == null) {
+ // createVTable(_rawVertices, eTable, numVertexPartitions)
+ // } else {
+ // _rawVTable
+ // }
+ // }
+
+ // protected lazy val vTableReplicated: RDD[(Vid, VD)] = {
+ // // Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get
+ // // the shuffle id so we can use it on the slave.
+ // vTable
+ // .flatMap { case (vid, (vdata, pids)) =>
+ // pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
+ // }
+ // .partitionBy(edgePartitioner)
+ // .mapPartitions({ part =>
+ // part.map { message => (message.data._1, message.data._2) }
+ // }, preservesPartitioning = true)
+ // }
+}
- protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = {
- if (_rawETable == null) {
- createETable(_rawEdges, numEdgePartitions)
- } else {
- _rawETable
- }
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+object GraphImpl {
+
+def apply[VD: ClassManifest, ED: ClassManifest](
+ vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]]):
+ GraphImpl[VD,ED] = {
+
+ apply(vertices, edges,
+ vertices.context.defaultParallelism, edges.context.defaultParallelism)
}
- protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = {
- if (_rawVTable == null) {
- createVTable(_rawVertices, eTable, numVertexPartitions)
- } else {
- _rawVTable
- }
+
+ def apply[VD: ClassManifest, ED: ClassManifest](
+ vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]],
+ numVPart: Int, numEPart: Int): GraphImpl[VD,ED] = {
+
+ val vtable = vertices.indexed(numVPart)
+ val etable = createETable(edges, numEPart)
+ val vid2pid = createVid2Pid(etable, vtable.index)
+
+ new GraphImpl(vtable, vid2pid, etable)
}
- protected lazy val vTableReplicated: RDD[(Vid, VD)] = {
- // Join vid2pid and vTable, generate a shuffle dependency on the joined result, and get
- // the shuffle id so we can use it on the slave.
- vTable
- .flatMap { case (vid, (vdata, pids)) =>
- pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
+
+
+ /**
+ * Create the edge table RDD, which is much more efficient for Java heap storage than the
+ * normal edges data structure (RDD[(Vid, Vid, ED)]).
+ *
+ * The edge table contains multiple partitions, and each partition contains only one RDD
+ * key-value pair: the key is the partition id, and the value is an EdgePartition object
+ * containing all the edges in a partition.
+ */
+ protected def createETable[ED: ClassManifest](
+ edges: RDD[Edge[ED]], numPartitions: Int)
+ : IndexedRDD[Pid, EdgePartition[ED]] = {
+ val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt
+ edges
+ .map { e =>
+ // Random partitioning based on the source vertex id.
+ // val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions)
+ val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
+ //val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
+
+ // Should we be using 3-tuple or an optimized class
+ MessageToPartition(part, (e.src, e.dst, e.data))
}
- .partitionBy(edgePartitioner)
- .mapPartitions({ part =>
- part.map { message => (message.data._1, message.data._2) }
- }, preservesPartitioning = true)
+ .partitionBy(new HashPartitioner(numPartitions))
+ .mapPartitionsWithIndex({ (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED]
+ iter.foreach { message =>
+ val data = message.data
+ builder.add(data._1, data._2, data._3)
+ }
+ Iterator((pid, builder.toEdgePartition))
+ }, preservesPartitioning = true).indexed()
}
-}
-object GraphImpl {
+ protected def createVid2Pid[ED: ClassManifest](
+ eTable: IndexedRDD[Pid, EdgePartition[ED]],
+ vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Pid] = {
+ eTable.mapPartitions { iter =>
+ val (pid, edgePartition) = iter.next()
+ val vSet = new VertexSet
+ edgePartition.foreach(e => {vSet.add(e.src); vSet.add(e.dst)})
+ vSet.iterator.map { vid => (vid.toLong, pid) }
+ }.indexed(vTableIndex)
+ }
protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = {
@@ -500,70 +640,44 @@ object GraphImpl {
}
- /**
- * Create the edge table RDD, which is much more efficient for Java heap storage than the
- * normal edges data structure (RDD[(Vid, Vid, ED)]).
- *
- * The edge table contains multiple partitions, and each partition contains only one RDD
- * key-value pair: the key is the partition id, and the value is an EdgePartition object
- * containing all the edges in a partition.
- */
- protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int)
- : RDD[(Pid, EdgePartition[ED])] = {
- val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt
- edges
- .map { e =>
- // Random partitioning based on the source vertex id.
- // val part: Pid = edgePartitionFunction1D(e.src, e.dst, numPartitions)
- val part: Pid = edgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
- //val part: Pid = canonicalEdgePartitionFunction2D(e.src, e.dst, numPartitions, ceilSqrt)
- // Should we be using 3-tuple or an optimized class
- MessageToPartition(part, (e.src, e.dst, e.data))
- // (math.abs(e.src) % numPartitions, (e.src, e.dst, e.data))
- }
- .partitionBy(new HashPartitioner(numPartitions))
- .mapPartitionsWithIndex({ (pid, iter) =>
- val edgePartition = new EdgePartition[ED]
- iter.foreach { message =>
- val data = message.data
- edgePartition.add(data._1, data._2, data._3)
- }
- edgePartition.trim()
- Iterator((pid, edgePartition))
- }, preservesPartitioning = true)
- }
- protected def createVTable[VD: ClassManifest, ED: ClassManifest](
- vertices: RDD[Vertex[VD]],
- eTable: RDD[(Pid, EdgePartition[ED])],
- numPartitions: Int)
- : RDD[(Vid, (VD, Array[Pid]))] = {
- val partitioner = new HashPartitioner(numPartitions)
+ // protected def createVTable[VD: ClassManifest, ED: ClassManifest](
+ // eTable: IndexedRDD[Pid, EdgePartition[ED]],
+ // vid2pid: Index
+ // vertices: RDD[Vertex[VD]],
- // A key-value RDD. The key is a vertex id, and the value is a list of
- // partitions that contains edges referencing the vertex.
- val vid2pid : RDD[(Vid, Seq[Pid])] = eTable.mapPartitions { iter =>
- val (pid, edgePartition) = iter.next()
- val vSet = new VertexSet
- var i = 0
- while (i < edgePartition.srcIds.size) {
- vSet.add(edgePartition.srcIds.getLong(i))
- vSet.add(edgePartition.dstIds.getLong(i))
- i += 1
- }
- vSet.iterator.map { vid => (vid.toLong, pid) }
- }.groupByKey(partitioner)
-
- vertices
- .map { v => (v.id, v.data) }
- .partitionBy(partitioner)
- .leftOuterJoin(vid2pid)
- .mapValues {
- case (vdata, None) => (vdata, Array.empty[Pid])
- case (vdata, Some(pids)) => (vdata, pids.toArray)
- }
- }
+ // default: VD) : IndexedRDD[Vid, VD] = {
+
+ // // Compute all the vertices in the edge table.
+ // val vid2pid = createVid2Pid(eTable)
+
+ // // Compute all the
+ // vertices.map(v => (v.id, v.data)).cogroup(vids)
+
+ // // A key-value RDD. The key is a vertex id, and the value is a list of
+ // // partitions that contains edges referencing the vertex.
+ // val vid2pid : RDD[(Vid, Seq[Pid])] = eTable.mapPartitions { iter =>
+ // val (pid, edgePartition) = iter.next()
+ // val vSet = new VertexSet
+ // var i = 0
+ // while (i < edgePartition.srcIds.size) {
+ // vSet.add(edgePartition.srcIds.getLong(i))
+ // vSet.add(edgePartition.dstIds.getLong(i))
+ // i += 1
+ // }
+ // vSet.iterator.map { vid => (vid.toLong, pid) }
+ // }.groupByKey(partitioner)
+
+ // vertices
+ // .map { v => (v.id, v.data) }
+ // .partitionBy(partitioner)
+ // .leftOuterJoin(vid2pid)
+ // .mapValues {
+ // case (vdata, None) => (vdata, Array.empty[Pid])
+ // case (vdata, Some(pids)) => (vdata, pids.toArray)
+ // }
+ // }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
index d0583c48a8..01a04e9c39 100644
--- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
@@ -37,7 +37,7 @@ object GraphGenerators {
val host = "local[4]"
val sc = new SparkContext(host, "Lognormal graph generator")
- val lnGraph = lognormalGraph(sc, 10000)
+ val lnGraph = logNormalGraph(sc, 10000)
val rmat = rmatGraph(sc, 1000, 3000)
@@ -69,19 +69,21 @@ object GraphGenerators {
// Right now it just generates a bunch of edges where
// the edge data is the weight (default 1)
- def lognormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = {
+ def logNormalGraph(sc: SparkContext, numVertices: Int): GraphImpl[Int, Int] = {
// based on Pregel settings
val mu = 4
val sigma = 1.3
//val vertsAndEdges = (0 until numVertices).flatMap { src => {
- val vertices = (0 until numVertices).flatMap { src =>
- Array(Vertex(src, sampleLogNormal(mu, sigma, numVertices))) }
- val edges = vertices.flatMap( { v =>
- generateRandomEdges(v.id.toInt, v.data, numVertices) })
-
+ val vertices: RDD[(Vid, Int)] = sc.parallelize(0 until numVertices).map{
+ src => (src, sampleLogNormal(mu, sigma, numVertices))
+ }
- new GraphImpl[Int, Int](sc.parallelize(vertices), sc.parallelize(edges))
+ val edges = vertices.flatMap{
+ v => generateRandomEdges(v._1.toInt, v._2, numVertices)
+ }
+
+ GraphImpl(vertices, edges)
//println("Vertices:")
//for (v <- vertices) {
// println(v.id)
@@ -161,8 +163,8 @@ object GraphGenerators {
val vertices = edges.flatMap { edge => List((edge.src, 1)) }
.reduceByKey(_ + _)
- .map{ case (vid, degree) => Vertex(vid, degree) }
- new GraphImpl[Int, ED](vertices, edges)
+ .map{ case (vid, degree) => (vid, degree) }
+ GraphImpl(vertices, edges)
}
/**