aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-05-10 14:48:07 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-10 14:48:07 -0700
commit905173df57b90f90ebafb22e43f55164445330e6 (patch)
tree805b4c6a80d206268807bfea23554e280f0b9eee
parent6c2691d0a0ed46a8b8093e05a4708706cf187168 (diff)
downloadspark-905173df57b90f90ebafb22e43f55164445330e6.tar.gz
spark-905173df57b90f90ebafb22e43f55164445330e6.tar.bz2
spark-905173df57b90f90ebafb22e43f55164445330e6.zip
Unify GraphImpl RDDs + other graph load optimizations
This PR makes the following changes, primarily in e4fbd329aef85fe2c38b0167255d2a712893d683: 1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices). 2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former. 3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side effect of unifying the edges and the triplet view. 4. *Join elimination for mapTriplets.* 5. *Ship only the needed vertex attributes when upgrading the triplet view.* If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in `ReplicatedVertexView#upgrade`. Author: Ankur Dave <ankurdave@gmail.com> Closes #497 from ankurdave/unify-rdds and squashes the following commits: 332ab43 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds 4933e2e [Ankur Dave] Exclude RoutingTable from binary compatibility check 5ba8789 [Ankur Dave] Add GraphX upgrade guide from Spark 0.9.1 13ac845 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into unify-rdds a04765c [Ankur Dave] Remove unnecessary toOps call 57202e8 [Ankur Dave] Replace case with pair parameter 75af062 [Ankur Dave] Add explicit return types 04d3ae5 [Ankur Dave] Convert implicit parameter to context bound c88b269 [Ankur Dave] Revert upgradeIterator to if-in-a-loop 0d3584c [Ankur Dave] EdgePartition.size should be val 2a928b2 [Ankur Dave] Set locality wait 10b3596 [Ankur Dave] Clean up public API ae36110 [Ankur Dave] Fix style errors e4fbd32 [Ankur Dave] Unify GraphImpl RDDs + other graph load optimizations d6d60e2 [Ankur Dave] In GraphLoader, coalesce to minEdgePartitions 62c7b78 [Ankur Dave] In Analytics, take PageRank numIter d64e8d4 [Ankur Dave] Log current Pregel iteration
-rw-r--r--docs/graphx-programming-guide.md22
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala56
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala17
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala166
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala132
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala18
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala50
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala344
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala21
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala238
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala82
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala158
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala29
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala149
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala269
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala91
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala245
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala8
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala10
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala48
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala10
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala11
-rw-r--r--project/MimaBuild.scala2
28 files changed, 1353 insertions, 851 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 07be8ba58e..42ab27bf55 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -86,6 +86,12 @@ support the [Bagel API](api/scala/index.html#org.apache.spark.bagel.package) and
[Bagel programming guide](bagel-programming-guide.html). However, we encourage Bagel users to
explore the new GraphX API and comment on issues that may complicate the transition from Bagel.
+## Upgrade Guide from Spark 0.9.1
+
+GraphX in Spark {{site.SPARK_VERSION}} contains one user-facing interface change from Spark 0.9.1. [`EdgeRDD`][EdgeRDD] may now store adjacent vertex attributes to construct the triplets, so it has gained a type parameter. The edges of a graph of type `Graph[VD, ED]` are of type `EdgeRDD[ED, VD]` rather than `EdgeRDD[ED]`.
+
+[EdgeRDD]: api/scala/index.html#org.apache.spark.graphx.EdgeRDD
+
# Getting Started
To get started you first need to import Spark and GraphX into your project, as follows:
@@ -145,12 +151,12 @@ the vertices and edges of the graph:
{% highlight scala %}
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
- val edges: EdgeRDD[ED]
+ val edges: EdgeRDD[ED, VD]
}
{% endhighlight %}
-The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID,
-VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide additional
+The classes `VertexRDD[VD]` and `EdgeRDD[ED, VD]` extend and are optimized versions of `RDD[(VertexID,
+VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED, VD]` provide additional
functionality built around graph computation and leverage internal optimizations. We discuss the
`VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge
RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
@@ -302,7 +308,7 @@ class Graph[VD, ED] {
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
- val edges: EdgeRDD[ED]
+ val edges: EdgeRDD[ED, VD]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
@@ -908,7 +914,7 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
## EdgeRDDs
-The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
+The `EdgeRDD[ED, VD]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy]. Within
each partition, edge attributes and adjacency structure, are stored separately enabling maximum
reuse when changing attribute values.
@@ -918,11 +924,11 @@ reuse when changing attribute values.
The three additional functions exposed by the `EdgeRDD` are:
{% highlight scala %}
// Transform the edge attributes while preserving the structure
-def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
+def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
// Revere the edges reusing both attributes and structure
-def reverse: EdgeRDD[ED]
+def reverse: EdgeRDD[ED, VD]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
-def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
+def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
{% endhighlight %}
In most applications we have found that operations on the `EdgeRDD` are accomplished through the
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index fa78ca99b8..a8fc095072 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -20,16 +20,19 @@ package org.apache.spark.graphx
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
-import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.graphx.impl.EdgePartition
+
/**
- * `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
- * for performance.
+ * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
+ * partition for performance. It may additionally store the vertex attributes associated with each
+ * edge to provide the triplet view. Shipping of the vertex attributes is managed by
+ * `impl.ReplicatedVertexView`.
*/
-class EdgeRDD[@specialized ED: ClassTag](
- val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
+class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
+ val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
partitionsRDD.setName("EdgeRDD")
@@ -45,8 +48,12 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
- val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
- p.next._2.iterator.map(_.copy())
+ val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
+ if (p.hasNext) {
+ p.next._2.iterator.map(_.copy())
+ } else {
+ Iterator.empty
+ }
}
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -61,11 +68,15 @@ class EdgeRDD[@specialized ED: ClassTag](
this
}
- private[graphx] def mapEdgePartitions[ED2: ClassTag](
- f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
- new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
- val (pid, ep) = iter.next()
- Iterator(Tuple2(pid, f(pid, ep)))
+ private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
+ f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
+ new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
+ if (iter.hasNext) {
+ val (pid, ep) = iter.next()
+ Iterator(Tuple2(pid, f(pid, ep)))
+ } else {
+ Iterator.empty
+ }
}, preservesPartitioning = true))
}
@@ -76,7 +87,7 @@ class EdgeRDD[@specialized ED: ClassTag](
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
- def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] =
+ def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
mapEdgePartitions((pid, part) => part.map(f))
/**
@@ -84,7 +95,14 @@ class EdgeRDD[@specialized ED: ClassTag](
*
* @return a new EdgeRDD containing all the edges reversed
*/
- def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse)
+ def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
+
+ /** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
+ def filter(
+ epred: EdgeTriplet[VD, ED] => Boolean,
+ vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
+ mapEdgePartitions((pid, part) => part.filter(epred, vpred))
+ }
/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
@@ -96,19 +114,15 @@ class EdgeRDD[@specialized ED: ClassTag](
* with values supplied by `f`
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
- (other: EdgeRDD[ED2])
- (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
+ (other: EdgeRDD[ED2, _])
+ (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
- new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
+ new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}
-
- private[graphx] def collectVertexIds(): RDD[VertexId] = {
- partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
- }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index dfc6a80158..9d473d5ebd 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
+
+ def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 5039586890..dc5dac4fda 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* along with their vertex data.
*
*/
- @transient val edges: EdgeRDD[ED]
+ @transient val edges: EdgeRDD[ED, VD]
/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
index dd380d8c18..d295d0127a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -19,10 +19,11 @@ package org.apache.spark.graphx
import com.esotericsoftware.kryo.Kryo
-import org.apache.spark.graphx.impl._
import org.apache.spark.serializer.KryoRegistrator
-import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.BoundedPriorityQueue
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx.impl._
/**
* Registers GraphX classes with Kryo for improved performance.
@@ -33,8 +34,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
+ kryo.register(classOf[RoutingTableMessage])
kryo.register(classOf[(VertexId, Object)])
- kryo.register(classOf[EdgePartition[Object]])
+ kryo.register(classOf[EdgePartition[Object, Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
kryo.register(classOf[VertexAttributeBlock[Object]])
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 18858466db..389490c139 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -47,8 +47,7 @@ object GraphLoader extends Logging {
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
* @param canonicalOrientation whether to orient edges in the positive
* direction
- * @param minEdgePartitions the number of partitions for the
- * the edge RDD
+ * @param minEdgePartitions the number of partitions for the edge RDD
*/
def edgeListFile(
sc: SparkContext,
@@ -60,8 +59,9 @@ object GraphLoader extends Logging {
val startTime = System.currentTimeMillis
// Parse the edge data table directly into edge partitions
- val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
- val builder = new EdgePartitionBuilder[Int]
+ val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
+ val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
@@ -78,7 +78,7 @@ object GraphLoader extends Logging {
}
}
Iterator((pid, builder.toEdgePartition))
- }.cache()
+ }.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 4997fbc3cb..edd5b79da1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -18,11 +18,13 @@
package org.apache.spark.graphx
import scala.reflect.ClassTag
-import org.apache.spark.SparkContext._
+import scala.util.Random
+
import org.apache.spark.SparkException
-import org.apache.spark.graphx.lib._
+import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
-import scala.util.Random
+
+import org.apache.spark.graphx.lib._
/**
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
@@ -43,19 +45,22 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* The in-degree of each vertex in the graph.
* @note Vertices with no in-edges are not returned in the resulting RDD.
*/
- @transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
+ @transient lazy val inDegrees: VertexRDD[Int] =
+ degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")
/**
* The out-degree of each vertex in the graph.
* @note Vertices with no out-edges are not returned in the resulting RDD.
*/
- @transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
+ @transient lazy val outDegrees: VertexRDD[Int] =
+ degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")
/**
* The degree of each vertex in the graph.
* @note Vertices with no edges are not returned in the resulting RDD.
*/
- @transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
+ @transient lazy val degrees: VertexRDD[Int] =
+ degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")
/**
* Computes the neighboring vertex degrees.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index ac07a594a1..4572eab287 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -18,6 +18,7 @@
package org.apache.spark.graphx
import scala.reflect.ClassTag
+import org.apache.spark.Logging
/**
@@ -52,7 +53,7 @@ import scala.reflect.ClassTag
* }}}
*
*/
-object Pregel {
+object Pregel extends Logging {
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
@@ -142,6 +143,9 @@ object Pregel {
// hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
activeMessages = messages.count()
+
+ logInfo("Pregel finished iteration " + i)
+
// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking=false)
newVerts.unpersist(blocking=false)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index f0fc605c88..8c62897037 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -24,8 +24,11 @@ import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.graphx.impl.MsgRDDFunctions
-import org.apache.spark.graphx.impl.VertexPartition
+import org.apache.spark.graphx.impl.RoutingTablePartition
+import org.apache.spark.graphx.impl.ShippableVertexPartition
+import org.apache.spark.graphx.impl.VertexAttributeBlock
+import org.apache.spark.graphx.impl.RoutingTableMessageRDDFunctions._
+import org.apache.spark.graphx.impl.VertexRDDFunctions._
/**
* Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
@@ -33,6 +36,9 @@ import org.apache.spark.graphx.impl.VertexPartition
* joined efficiently. All operations except [[reindex]] preserve the index. To construct a
* `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]].
*
+ * Additionally, stores routing information to enable joining the vertex attributes with an
+ * [[EdgeRDD]].
+ *
* @example Construct a `VertexRDD` from a plain RDD:
* {{{
* // Construct an initial vertex set
@@ -50,13 +56,11 @@ import org.apache.spark.graphx.impl.VertexPartition
* @tparam VD the vertex attribute associated with each vertex in the set.
*/
class VertexRDD[@specialized VD: ClassTag](
- val partitionsRDD: RDD[VertexPartition[VD]])
+ val partitionsRDD: RDD[ShippableVertexPartition[VD]])
extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
require(partitionsRDD.partitioner.isDefined)
- partitionsRDD.setName("VertexRDD")
-
/**
* Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
* VertexRDD will be based on a different index and can no longer be quickly joined with this
@@ -71,6 +75,16 @@ class VertexRDD[@specialized VD: ClassTag](
override protected def getPreferredLocations(s: Partition): Seq[String] =
partitionsRDD.preferredLocations(s)
+ override def setName(_name: String): this.type = {
+ if (partitionsRDD.name != null) {
+ partitionsRDD.setName(partitionsRDD.name + ", " + _name)
+ } else {
+ partitionsRDD.setName(_name)
+ }
+ this
+ }
+ setName("VertexRDD")
+
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
@@ -90,14 +104,14 @@ class VertexRDD[@specialized VD: ClassTag](
* Provides the `RDD[(VertexId, VD)]` equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
- firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
+ firstParent[ShippableVertexPartition[VD]].iterator(part, context).next.iterator
}
/**
* Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD.
*/
private[graphx] def mapVertexPartitions[VD2: ClassTag](
- f: VertexPartition[VD] => VertexPartition[VD2])
+ f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
: VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
new VertexRDD(newPartitionsRDD)
@@ -208,10 +222,8 @@ class VertexRDD[@specialized VD: ClassTag](
case _ =>
new VertexRDD[VD3](
partitionsRDD.zipPartitions(
- other.partitionBy(this.partitioner.get), preservesPartitioning = true)
- { (part, msgs) =>
- val vertexPartition: VertexPartition[VD] = part.next()
- Iterator(vertexPartition.leftJoin(msgs)(f))
+ other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
+ (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
}
)
}
@@ -254,10 +266,8 @@ class VertexRDD[@specialized VD: ClassTag](
case _ =>
new VertexRDD(
partitionsRDD.zipPartitions(
- other.partitionBy(this.partitioner.get), preservesPartitioning = true)
- { (part, msgs) =>
- val vertexPartition: VertexPartition[VD] = part.next()
- Iterator(vertexPartition.innerJoin(msgs)(f))
+ other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
+ (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
}
)
}
@@ -276,14 +286,31 @@ class VertexRDD[@specialized VD: ClassTag](
*/
def aggregateUsingIndex[VD2: ClassTag](
messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
- val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
+ val shuffled = messages.copartitionWithVertices(this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
- val vertexPartition: VertexPartition[VD] = thisIter.next()
- Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc))
+ thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
}
new VertexRDD[VD2](parts)
}
+ /**
+ * Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding
+ * [[EdgeRDD]].
+ */
+ def reverseRoutingTables(): VertexRDD[VD] =
+ this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
+
+ /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
+ private[graphx] def shipVertexAttributes(
+ shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = {
+ partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst)))
+ }
+
+ /** Generates an RDD of vertex IDs suitable for shipping to the edge partitions. */
+ private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] = {
+ partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds()))
+ }
+
} // end of VertexRDD
@@ -293,52 +320,101 @@ class VertexRDD[@specialized VD: ClassTag](
object VertexRDD {
/**
- * Construct a `VertexRDD` from an RDD of vertex-attribute pairs.
- * Duplicate entries are removed arbitrarily.
+ * Constructs a standalone `VertexRDD` (one that is not set up for efficient joins with an
+ * [[EdgeRDD]]) from an RDD of vertex-attribute pairs. Duplicate entries are removed arbitrarily.
*
* @tparam VD the vertex attribute type
*
- * @param rdd the collection of vertex-attribute pairs
+ * @param vertices the collection of vertex-attribute pairs
*/
- def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = {
- val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
- case Some(p) => rdd
- case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+ def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+ val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
+ case Some(p) => vertices
+ case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
}
- val vertexPartitions = partitioned.mapPartitions(
- iter => Iterator(VertexPartition(iter)),
+ val vertexPartitions = vPartitioned.mapPartitions(
+ iter => Iterator(ShippableVertexPartition(iter)),
preservesPartitioning = true)
new VertexRDD(vertexPartitions)
}
/**
- * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs, merging duplicates using
- * `mergeFunc`.
+ * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs. Duplicate vertex entries are
+ * removed arbitrarily. The resulting `VertexRDD` will be joinable with `edges`, and any missing
+ * vertices referred to by `edges` will be created with the attribute `defaultVal`.
*
* @tparam VD the vertex attribute type
*
- * @param rdd the collection of vertex-attribute pairs
- * @param mergeFunc the associative, commutative merge function.
+ * @param vertices the collection of vertex-attribute pairs
+ * @param edges the [[EdgeRDD]] that these vertices may be joined with
+ * @param defaultVal the vertex attribute to use when creating missing vertices
*/
- def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
- val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
- case Some(p) => rdd
- case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
+ def apply[VD: ClassTag](
+ vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
+ VertexRDD(vertices, edges, defaultVal, (a, b) => b)
+ }
+
+ /**
+ * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs. Duplicate vertex entries are
+ * merged using `mergeFunc`. The resulting `VertexRDD` will be joinable with `edges`, and any
+ * missing vertices referred to by `edges` will be created with the attribute `defaultVal`.
+ *
+ * @tparam VD the vertex attribute type
+ *
+ * @param vertices the collection of vertex-attribute pairs
+ * @param edges the [[EdgeRDD]] that these vertices may be joined with
+ * @param defaultVal the vertex attribute to use when creating missing vertices
+ * @param mergeFunc the commutative, associative duplicate vertex attribute merge function
+ */
+ def apply[VD: ClassTag](
+ vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
+ ): VertexRDD[VD] = {
+ val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
+ case Some(p) => vertices
+ case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
+ }
+ val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
+ val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {
+ (vertexIter, routingTableIter) =>
+ val routingTable =
+ if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+ Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal))
}
- val vertexPartitions = partitioned.mapPartitions(
- iter => Iterator(VertexPartition(iter)),
- preservesPartitioning = true)
new VertexRDD(vertexPartitions)
}
/**
- * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
- * `defaultVal` otherwise.
+ * Constructs a `VertexRDD` containing all vertices referred to in `edges`. The vertices will be
+ * created with the attribute `defaultVal`. The resulting `VertexRDD` will be joinable with
+ * `edges`.
+ *
+ * @tparam VD the vertex attribute type
+ *
+ * @param edges the [[EdgeRDD]] referring to the vertices to create
+ * @param numPartitions the desired number of partitions for the resulting `VertexRDD`
+ * @param defaultVal the vertex attribute to use when creating missing vertices
*/
- def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD)
- : VertexRDD[VD] = {
- VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
- value.getOrElse(default)
- }
+ def fromEdges[VD: ClassTag](
+ edges: EdgeRDD[_, _], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
+ val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
+ val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
+ val routingTable =
+ if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+ Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
+ }, preservesPartitioning = true)
+ new VertexRDD(vertexPartitions)
+ }
+
+ private def createRoutingTables(
+ edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
+ // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
+ val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
+ Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
+ .setName("VertexRDD.createRoutingTables - vid2pid (aggregation)")
+
+ val numEdgePartitions = edges.partitions.size
+ vid2pid.copartitionWithVertices(vertexPartitioner).mapPartitions(
+ iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
+ preservesPartitioning = true)
}
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index b7c472e905..871e81f8d2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -17,39 +17,86 @@
package org.apache.spark.graphx.impl
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
- * A collection of edges stored in 3 large columnar arrays (src, dst, attribute). The arrays are
- * clustered by src.
+ * A collection of edges stored in columnar format, along with any vertex attributes referenced. The
+ * edges are stored in 3 large columnar arrays (src, dst, attribute). The arrays are clustered by
+ * src. There is an optional active vertex set for filtering computation on the edges.
+ *
+ * @tparam ED the edge attribute type
+ * @tparam VD the vertex attribute type
*
* @param srcIds the source vertex id of each edge
* @param dstIds the destination vertex id of each edge
* @param data the attribute associated with each edge
* @param index a clustered index on source vertex id
- * @tparam ED the edge attribute type.
+ * @param vertices a map from referenced vertex ids to their corresponding attributes. Must
+ * contain all vertex ids from `srcIds` and `dstIds`, though not necessarily valid attributes for
+ * those vertex ids. The mask is not used.
+ * @param activeSet an optional active vertex set for filtering computation on the edges
*/
private[graphx]
-class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
+class EdgePartition[
+ @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
@transient val srcIds: Array[VertexId],
@transient val dstIds: Array[VertexId],
@transient val data: Array[ED],
- @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
+ @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
+ @transient val vertices: VertexPartition[VD],
+ @transient val activeSet: Option[VertexSet] = None
+ ) extends Serializable {
+
+ /** Return a new `EdgePartition` with the specified edge data. */
+ def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD] = {
+ new EdgePartition(srcIds, dstIds, data_, index, vertices, activeSet)
+ }
+
+ /** Return a new `EdgePartition` with the specified vertex partition. */
+ def withVertices[VD2: ClassTag](
+ vertices_ : VertexPartition[VD2]): EdgePartition[ED, VD2] = {
+ new EdgePartition(srcIds, dstIds, data, index, vertices_, activeSet)
+ }
+
+ /** Return a new `EdgePartition` with the specified active set, provided as an iterator. */
+ def withActiveSet(iter: Iterator[VertexId]): EdgePartition[ED, VD] = {
+ val newActiveSet = new VertexSet
+ iter.foreach(newActiveSet.add(_))
+ new EdgePartition(srcIds, dstIds, data, index, vertices, Some(newActiveSet))
+ }
+
+ /** Return a new `EdgePartition` with the specified active set. */
+ def withActiveSet(activeSet_ : Option[VertexSet]): EdgePartition[ED, VD] = {
+ new EdgePartition(srcIds, dstIds, data, index, vertices, activeSet_)
+ }
+
+ /** Return a new `EdgePartition` with updates to vertex attributes specified in `iter`. */
+ def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
+ this.withVertices(vertices.innerJoinKeepLeft(iter))
+ }
+
+ /** Look up vid in activeSet, throwing an exception if it is None. */
+ def isActive(vid: VertexId): Boolean = {
+ activeSet.get.contains(vid)
+ }
+
+ /** The number of active vertices, if any exist. */
+ def numActives: Option[Int] = activeSet.map(_.size)
/**
* Reverse all the edges in this partition.
*
* @return a new edge partition with all edges reversed.
*/
- def reverse: EdgePartition[ED] = {
- val builder = new EdgePartitionBuilder(size)
+ def reverse: EdgePartition[ED, VD] = {
+ val builder = new EdgePartitionBuilder(size)(classTag[ED], classTag[VD])
for (e <- iterator) {
builder.add(e.dstId, e.srcId, e.attr)
}
- builder.toEdgePartition
+ builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
}
/**
@@ -64,7 +111,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* @return a new edge partition with the result of the function `f`
* applied to each edge
*/
- def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = {
+ def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2, VD] = {
val newData = new Array[ED2](data.size)
val edge = new Edge[ED]()
val size = data.size
@@ -76,7 +123,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
newData(i) = f(edge)
i += 1
}
- new EdgePartition(srcIds, dstIds, newData, index)
+ this.withData(newData)
}
/**
@@ -91,7 +138,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* @tparam ED2 the type of the new attribute
* @return a new edge partition with the attribute values replaced
*/
- def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
+ def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2, VD] = {
// Faster than iter.toArray, because the expected size is known.
val newData = new Array[ED2](data.size)
var i = 0
@@ -100,7 +147,23 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
i += 1
}
assert(newData.size == i)
- new EdgePartition(srcIds, dstIds, newData, index)
+ this.withData(newData)
+ }
+
+ /**
+ * Construct a new edge partition containing only the edges matching `epred` and where both
+ * vertices match `vpred`.
+ */
+ def filter(
+ epred: EdgeTriplet[VD, ED] => Boolean,
+ vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
+ val filtered = tripletIterator().filter(et =>
+ vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et))
+ val builder = new EdgePartitionBuilder[ED, VD]
+ for (e <- filtered) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
}
/**
@@ -119,8 +182,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* @param merge a commutative associative merge operation
* @return a new edge partition without duplicate edges
*/
- def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
- val builder = new EdgePartitionBuilder[ED]
+ def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
+ val builder = new EdgePartitionBuilder[ED, VD]
var currSrcId: VertexId = null.asInstanceOf[VertexId]
var currDstId: VertexId = null.asInstanceOf[VertexId]
var currAttr: ED = null.asInstanceOf[ED]
@@ -141,11 +204,11 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
if (size > 0) {
builder.add(currSrcId, currDstId, currAttr)
}
- builder.toEdgePartition
+ builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
}
/**
- * Apply `f` to all edges present in both `this` and `other` and return a new EdgePartition
+ * Apply `f` to all edges present in both `this` and `other` and return a new `EdgePartition`
* containing the resulting edges.
*
* If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for
@@ -155,9 +218,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* once.
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
- (other: EdgePartition[ED2])
- (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = {
- val builder = new EdgePartitionBuilder[ED3]
+ (other: EdgePartition[ED2, _])
+ (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = {
+ val builder = new EdgePartitionBuilder[ED3, VD]
var i = 0
var j = 0
// For i = index of each edge in `this`...
@@ -175,7 +238,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
}
i += 1
}
- builder.toEdgePartition
+ builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet)
}
/**
@@ -183,7 +246,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*
* @return size of the partition
*/
- def size: Int = srcIds.size
+ val size: Int = srcIds.size
/** The number of unique source vertices in the partition. */
def indexSize: Int = index.size
@@ -212,9 +275,34 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
}
/**
+ * Get an iterator over the edge triplets in this partition.
+ *
+ * It is safe to keep references to the objects from this iterator.
+ */
+ def tripletIterator(
+ includeSrc: Boolean = true, includeDst: Boolean = true): Iterator[EdgeTriplet[VD, ED]] = {
+ new EdgeTripletIterator(this, includeSrc, includeDst)
+ }
+
+ /**
+ * Upgrade the given edge iterator into a triplet iterator.
+ *
+ * Be careful not to keep references to the objects from this iterator. To improve GC performance
+ * the same object is re-used in `next()`.
+ */
+ def upgradeIterator(
+ edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true)
+ : Iterator[EdgeTriplet[VD, ED]] = {
+ new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
+ }
+
+ /**
* Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The
* iterator is generated using an index scan, so it is efficient at skipping edges that don't
* match srcIdPred.
+ *
+ * Be careful not to keep references to the objects from this iterator. To improve GC performance
+ * the same object is re-used in `next()`.
*/
def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 63ccccb056..ecb49bef42 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -20,12 +20,14 @@ package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
import scala.util.Sorting
+import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
+
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
-import org.apache.spark.util.collection.PrimitiveVector
private[graphx]
-class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) {
+class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
+ size: Int = 64) {
var edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */
@@ -33,7 +35,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
edges += Edge(src, dst, d)
}
- def toEdgePartition: EdgePartition[ED] = {
+ def toEdgePartition: EdgePartition[ED, VD] = {
val edgeArray = edges.trim().array
Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
val srcIds = new Array[VertexId](edgeArray.size)
@@ -57,6 +59,14 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
i += 1
}
}
- new EdgePartition(srcIds, dstIds, data, index)
+
+ // Create and populate a VertexPartition with vids from the edges, but no attributes
+ val vidsIter = srcIds.iterator ++ dstIds.iterator
+ val vertexIds = new OpenHashSet[VertexId]
+ vidsIter.foreach(vid => vertexIds.add(vid))
+ val vertices = new VertexPartition(
+ vertexIds, new Array[VD](vertexIds.capacity), vertexIds.getBitSet)
+
+ new EdgePartition(srcIds, dstIds, data, index, vertices)
}
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 220a89d73d..ebb0b9418d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -23,32 +23,62 @@ import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
/**
- * The Iterator type returned when constructing edge triplets. This class technically could be
- * an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
- * debug / profile.
+ * The Iterator type returned when constructing edge triplets. This could be an anonymous class in
+ * EdgePartition.tripletIterator, but we name it here explicitly so it is easier to debug / profile.
*/
private[impl]
class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
- val vidToIndex: VertexIdToIndexMap,
- val vertexArray: Array[VD],
- val edgePartition: EdgePartition[ED])
+ val edgePartition: EdgePartition[ED, VD],
+ val includeSrc: Boolean,
+ val includeDst: Boolean)
extends Iterator[EdgeTriplet[VD, ED]] {
// Current position in the array.
private var pos = 0
- private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
-
override def hasNext: Boolean = pos < edgePartition.size
override def next() = {
val triplet = new EdgeTriplet[VD, ED]
triplet.srcId = edgePartition.srcIds(pos)
- triplet.srcAttr = vmap(triplet.srcId)
+ if (includeSrc) {
+ triplet.srcAttr = edgePartition.vertices(triplet.srcId)
+ }
triplet.dstId = edgePartition.dstIds(pos)
- triplet.dstAttr = vmap(triplet.dstId)
+ if (includeDst) {
+ triplet.dstAttr = edgePartition.vertices(triplet.dstId)
+ }
triplet.attr = edgePartition.data(pos)
pos += 1
triplet
}
}
+
+/**
+ * An Iterator type for internal use that reuses EdgeTriplet objects. This could be an anonymous
+ * class in EdgePartition.upgradeIterator, but we name it here explicitly so it is easier to debug /
+ * profile.
+ */
+private[impl]
+class ReusingEdgeTripletIterator[VD: ClassTag, ED: ClassTag](
+ val edgeIter: Iterator[Edge[ED]],
+ val edgePartition: EdgePartition[ED, VD],
+ val includeSrc: Boolean,
+ val includeDst: Boolean)
+ extends Iterator[EdgeTriplet[VD, ED]] {
+
+ private val triplet = new EdgeTriplet[VD, ED]
+
+ override def hasNext = edgeIter.hasNext
+
+ override def next() = {
+ triplet.set(edgeIter.next())
+ if (includeSrc) {
+ triplet.srcAttr = edgePartition.vertices(triplet.srcId)
+ }
+ if (includeDst) {
+ triplet.dstAttr = edgePartition.vertices(triplet.dstId)
+ }
+ triplet
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 9eabccdee4..2f2d0e03fd 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -19,54 +19,45 @@ package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}
-import org.apache.spark.util.collection.PrimitiveVector
-import org.apache.spark.{HashPartitioner, Partitioner}
+import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.{RDD, ShuffledRDD}
+import org.apache.spark.storage.StorageLevel
+
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl._
import org.apache.spark.graphx.impl.MsgRDDFunctions._
import org.apache.spark.graphx.util.BytecodeUtils
-import org.apache.spark.rdd.{ShuffledRDD, RDD}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.ClosureCleaner
/**
- * A graph that supports computation on graphs.
+ * An implementation of [[org.apache.spark.graphx.Graph]] to support computation on graphs.
*
- * Graphs are represented using two classes of data: vertex-partitioned and
- * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges`
- * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods,
- * vertex attributes are replicated to the edge partitions where they appear as sources or
- * destinations. `routingTable` stores the routing information for shipping vertex attributes to
- * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created
- * using the routing table.
+ * Graphs are represented using two RDDs: `vertices`, which contains vertex attributes and the
+ * routing information for shipping vertex attributes to edge partitions, and
+ * `replicatedVertexView`, which contains edges and the vertex attributes mentioned by each edge.
*/
class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
@transient val vertices: VertexRDD[VD],
- @transient val edges: EdgeRDD[ED],
- @transient val routingTable: RoutingTable,
- @transient val replicatedVertexView: ReplicatedVertexView[VD])
+ @transient val replicatedVertexView: ReplicatedVertexView[VD, ED])
extends Graph[VD, ED] with Serializable {
/** Default constructor is provided to support serialization */
- protected def this() = this(null, null, null, null)
+ protected def this() = this(null, null)
+
+ @transient override val edges: EdgeRDD[ED, VD] = replicatedVertexView.edges
/** Return a RDD that brings edges together with their source and destination vertices. */
- @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
- val vdTag = classTag[VD]
- val edTag = classTag[ED]
- edges.partitionsRDD.zipPartitions(
- replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) =>
- val (pid, ePart) = ePartIter.next()
- val (_, vPart) = vPartIter.next()
- new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
- }
+ @transient override lazy val triplets: RDD[EdgeTriplet[VD, ED]] = {
+ replicatedVertexView.upgrade(vertices, true, true)
+ replicatedVertexView.edges.partitionsRDD.mapPartitions(_.flatMap {
+ case (pid, part) => part.tripletIterator()
+ })
}
override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
vertices.persist(newLevel)
- edges.persist(newLevel)
+ replicatedVertexView.edges.persist(newLevel)
this
}
@@ -74,14 +65,15 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
vertices.unpersist(blocking)
- replicatedVertexView.unpersist(blocking)
+ // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone
this
}
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
- val numPartitions = edges.partitions.size
+ val numPartitions = replicatedVertexView.edges.partitions.size
val edTag = classTag[ED]
- val newEdges = new EdgeRDD(edges.map { e =>
+ val vdTag = classTag[VD]
+ val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e =>
val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
// Should we be using 3-tuple or an optimized class
@@ -89,105 +81,79 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex( { (pid, iter) =>
- val builder = new EdgePartitionBuilder[ED]()(edTag)
+ val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message.data
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
- }, preservesPartitioning = true).cache())
- GraphImpl(vertices, newEdges)
+ }, preservesPartitioning = true))
+ GraphImpl.fromExistingRDDs(vertices, newEdges)
}
override def reverse: Graph[VD, ED] = {
- val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
- GraphImpl(vertices, newETable)
+ new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
}
override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) {
+ vertices.cache()
// The map preserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
- val newReplicatedVertexView = new ReplicatedVertexView[VD2](
- changedVerts, edges, routingTable,
- Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
- new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
+ val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
+ .updateVertices(changedVerts)
+ new GraphImpl(newVerts, newReplicatedVertexView)
} else {
// The map does not preserve type, so we must re-replicate all vertices
- GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable)
+ GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
}
}
override def mapEdges[ED2: ClassTag](
f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
- val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
- new GraphImpl(vertices, newETable , routingTable, replicatedVertexView)
+ val newEdges = replicatedVertexView.edges
+ .mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
+ new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}
override def mapTriplets[ED2: ClassTag](
f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
- val newEdgePartitions =
- edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) {
- (ePartIter, vTableReplicatedIter) =>
- val (ePid, edgePartition) = ePartIter.next()
- val (vPid, vPart) = vTableReplicatedIter.next()
- assert(!vTableReplicatedIter.hasNext)
- assert(ePid == vPid)
- val et = new EdgeTriplet[VD, ED]
- val inputIterator = edgePartition.iterator.map { e =>
- et.set(e)
- et.srcAttr = vPart(e.srcId)
- et.dstAttr = vPart(e.dstId)
- et
- }
- // Apply the user function to the vertex partition
- val outputIter = f(ePid, inputIterator)
- // Consume the iterator to update the edge attributes
- val newEdgePartition = edgePartition.map(outputIter)
- Iterator((ePid, newEdgePartition))
- }
- new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView)
+ vertices.cache()
+ val mapUsesSrcAttr = accessesVertexAttr(f, "srcAttr")
+ val mapUsesDstAttr = accessesVertexAttr(f, "dstAttr")
+ replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr)
+ val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) =>
+ part.map(f(pid, part.tripletIterator(mapUsesSrcAttr, mapUsesDstAttr)))
+ }
+ new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}
override def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = x => true,
vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
+ vertices.cache()
// Filter the vertices, reusing the partitioner and the index from this graph
val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
-
- // Filter the edges
- val edTag = classTag[ED]
- val newEdges = new EdgeRDD[ED](triplets.filter { et =>
- vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)
- }.mapPartitionsWithIndex( { (pid, iter) =>
- val builder = new EdgePartitionBuilder[ED]()(edTag)
- iter.foreach { et => builder.add(et.srcId, et.dstId, et.attr) }
- val edgePartition = builder.toEdgePartition
- Iterator((pid, edgePartition))
- }, preservesPartitioning = true)).cache()
-
- // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
- // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
- // an edge.
- new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView)
- } // end of subgraph
+ // Filter the triplets. We must always upgrade the triplet view fully because vpred always runs
+ // on both src and dst vertices
+ replicatedVertexView.upgrade(vertices, true, true)
+ val newEdges = replicatedVertexView.edges.filter(epred, vpred)
+ new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
+ }
override def mask[VD2: ClassTag, ED2: ClassTag] (
other: Graph[VD2, ED2]): Graph[VD, ED] = {
val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
- val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
- // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
- // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
- // an edge.
- new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView)
+ val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v }
+ new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
}
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
- ClosureCleaner.clean(merge)
- val newETable = edges.mapEdgePartitions((pid, part) => part.groupEdges(merge))
- new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
+ val newEdges = replicatedVertexView.edges.mapEdgePartitions(
+ (pid, part) => part.groupEdges(merge))
+ new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}
// ///////////////////////////////////////////////////////////////////////////////////////////////
@@ -199,68 +165,58 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
- ClosureCleaner.clean(mapFunc)
- ClosureCleaner.clean(reduceFunc)
+ vertices.cache()
// For each vertex, replicate its attribute only to partitions where it is
// in the relevant position in an edge.
val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
- val vs = activeSetOpt match {
+ replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr)
+ val view = activeSetOpt match {
case Some((activeSet, _)) =>
- replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
+ replicatedVertexView.withActiveSet(activeSet)
case None =>
- replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr)
+ replicatedVertexView
}
val activeDirectionOpt = activeSetOpt.map(_._2)
// Map and combine.
- val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
- val (ePid, edgePartition) = ePartIter.next()
- val (vPid, vPart) = vPartIter.next()
- assert(!vPartIter.hasNext)
- assert(ePid == vPid)
- // Choose scan method
- val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
- val edgeIter = activeDirectionOpt match {
- case Some(EdgeDirection.Both) =>
- if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
- .filter(e => vPart.isActive(e.dstId))
- } else {
- edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
- }
- case Some(EdgeDirection.Either) =>
- // TODO: Because we only have a clustered index on the source vertex ID, we can't filter
- // the index here. Instead we have to scan all edges and then do the filter.
- edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
- case Some(EdgeDirection.Out) =>
- if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
- } else {
- edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
- }
- case Some(EdgeDirection.In) =>
- edgePartition.iterator.filter(e => vPart.isActive(e.dstId))
- case _ => // None
- edgePartition.iterator
- }
-
- // Scan edges and run the map function
- val et = new EdgeTriplet[VD, ED]
- val mapOutputs = edgeIter.flatMap { e =>
- et.set(e)
- if (mapUsesSrcAttr) {
- et.srcAttr = vPart(e.srcId)
- }
- if (mapUsesDstAttr) {
- et.dstAttr = vPart(e.dstId)
+ val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap {
+ case (pid, edgePartition) =>
+ // Choose scan method
+ val activeFraction = edgePartition.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
+ val edgeIter = activeDirectionOpt match {
+ case Some(EdgeDirection.Both) =>
+ if (activeFraction < 0.8) {
+ edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId))
+ .filter(e => edgePartition.isActive(e.dstId))
+ } else {
+ edgePartition.iterator.filter(e =>
+ edgePartition.isActive(e.srcId) && edgePartition.isActive(e.dstId))
+ }
+ case Some(EdgeDirection.Either) =>
+ // TODO: Because we only have a clustered index on the source vertex ID, we can't filter
+ // the index here. Instead we have to scan all edges and then do the filter.
+ edgePartition.iterator.filter(e =>
+ edgePartition.isActive(e.srcId) || edgePartition.isActive(e.dstId))
+ case Some(EdgeDirection.Out) =>
+ if (activeFraction < 0.8) {
+ edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId))
+ } else {
+ edgePartition.iterator.filter(e => edgePartition.isActive(e.srcId))
+ }
+ case Some(EdgeDirection.In) =>
+ edgePartition.iterator.filter(e => edgePartition.isActive(e.dstId))
+ case _ => // None
+ edgePartition.iterator
}
- mapFunc(et)
- }
- // Note: This doesn't allow users to send messages to arbitrary vertices.
- vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
- }
+
+ // Scan edges and run the map function
+ val mapOutputs = edgePartition.upgradeIterator(edgeIter, mapUsesSrcAttr, mapUsesDstAttr)
+ .flatMap(mapFunc(_))
+ // Note: This doesn't allow users to send messages to arbitrary vertices.
+ edgePartition.vertices.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
+ }).setName("GraphImpl.mapReduceTriplets - preAgg")
// do the final reduction reusing the index map
vertices.aggregateUsingIndex(preAgg, reduceFunc)
@@ -268,20 +224,19 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexId, U)])
- (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] =
- {
+ (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) {
+ vertices.cache()
// updateF preserves type, so we can use incremental replication
- val newVerts = vertices.leftJoin(other)(updateF)
+ val newVerts = vertices.leftJoin(other)(updateF).cache()
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
- val newReplicatedVertexView = new ReplicatedVertexView[VD2](
- changedVerts, edges, routingTable,
- Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
- new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
+ val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
+ .updateVertices(changedVerts)
+ new GraphImpl(newVerts, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(other)(updateF)
- GraphImpl(newVerts, edges, routingTable)
+ GraphImpl(newVerts, replicatedVertexView.edges)
}
}
@@ -298,73 +253,68 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
object GraphImpl {
+ /** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */
def apply[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD): GraphImpl[VD, ED] =
- {
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
}
+ /** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`. */
def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
- edgePartitions: RDD[(PartitionID, EdgePartition[ED])],
+ edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])],
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
}
+ /** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`. */
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD): GraphImpl[VD, ED] =
- {
- val edgeRDD = createEdgeRDD(edges).cache()
-
- // Get the set of all vids
- val partitioner = Partitioner.defaultPartitioner(vertices)
- val vPartitioned = vertices.partitionBy(partitioner)
- val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner)
- val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
- vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
- }
-
- val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr)
-
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache()
+ val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
GraphImpl(vertexRDD, edgeRDD)
}
+ /** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. */
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
- edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
- // Cache RDDs that are referenced multiple times
- edges.cache()
-
- GraphImpl(vertices, edges, new RoutingTable(edges, vertices))
+ edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
+ // Convert the vertex partitions in edges to the correct type
+ val newEdges = edges.mapEdgePartitions(
+ (pid, part) => part.withVertices(part.vertices.map(
+ (vid, attr) => null.asInstanceOf[VD])))
+ GraphImpl.fromExistingRDDs(vertices, newEdges)
}
- def apply[VD: ClassTag, ED: ClassTag](
+ /**
+ * Create a graph from a VertexRDD and an EdgeRDD with the same replicated vertex type as the
+ * vertices.
+ */
+ def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
- edges: EdgeRDD[ED],
- routingTable: RoutingTable): GraphImpl[VD, ED] = {
- // Cache RDDs that are referenced multiple times. `routingTable` is cached by default, so we
- // don't cache it explicitly.
- vertices.cache()
- edges.cache()
-
- new GraphImpl(
- vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable))
+ edges: EdgeRDD[ED, VD]): GraphImpl[VD, ED] = {
+ new GraphImpl(vertices, new ReplicatedVertexView(edges))
}
/**
- * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
- * data structure (RDD[(VertexId, VertexId, ED)]).
- *
- * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
- * pair: the key is the partition id, and the value is an EdgePartition object containing all the
- * edges in a partition.
+ * Create a graph from an EdgeRDD with the correct vertex type, setting missing vertices to
+ * `defaultVertexAttr`. The vertices will have the same number of partitions as the EdgeRDD.
*/
- private def createEdgeRDD[ED: ClassTag](
- edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
+ private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
+ edges: EdgeRDD[ED, VD],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ edges.cache()
+ val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr)
+ fromExistingRDDs(vertices, edges)
+ }
+
+ /** Create an EdgeRDD from a set of edges. */
+ private def createEdgeRDD[ED: ClassTag, VD: ClassTag](
+ edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
- val builder = new EdgePartitionBuilder[ED]
+ val builder = new EdgePartitionBuilder[ED, VD]
iter.foreach { e =>
builder.add(e.srcId, e.dstId, e.attr)
}
@@ -373,24 +323,4 @@ object GraphImpl {
new EdgeRDD(edgePartitions)
}
- private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
- edges: EdgeRDD[ED],
- defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- edges.cache()
- // Get the set of all vids
- val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size))
- // Create the VertexRDD.
- val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
- GraphImpl(vertices, edges)
- }
-
- /** Collects all vids mentioned in edges and partitions them by partitioner. */
- private def collectVertexIdsFromEdges(
- edges: EdgeRDD[_],
- partitioner: Partitioner): RDD[(VertexId, Int)] = {
- // TODO: Consider doing map side distinct before shuffle.
- new ShuffledRDD[VertexId, Int, (VertexId, Int)](
- edges.collectVertexIds.map(vid => (vid, 0)), partitioner)
- .setSerializer(new VertexIdMsgSerializer)
- }
} // end of object GraphImpl
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index c45ba3d2f8..1c6d7e59e9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -89,7 +89,6 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {
}
-
private[graphx]
object MsgRDDFunctions {
implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = {
@@ -99,18 +98,28 @@ object MsgRDDFunctions {
implicit def rdd2vertexMessageRDDFunctions[T: ClassTag](rdd: RDD[VertexBroadcastMsg[T]]) = {
new VertexBroadcastMsgRDDFunctions(rdd)
}
+}
- def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = {
- val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner)
+private[graphx]
+class VertexRDDFunctions[VD: ClassTag](self: RDD[(VertexId, VD)]) {
+ def copartitionWithVertices(partitioner: Partitioner): RDD[(VertexId, VD)] = {
+ val rdd = new ShuffledRDD[VertexId, VD, (VertexId, VD)](self, partitioner)
// Set a custom serializer if the data is of int or double type.
- if (classTag[T] == ClassTag.Int) {
+ if (classTag[VD] == ClassTag.Int) {
rdd.setSerializer(new IntAggMsgSerializer)
- } else if (classTag[T] == ClassTag.Long) {
+ } else if (classTag[VD] == ClassTag.Long) {
rdd.setSerializer(new LongAggMsgSerializer)
- } else if (classTag[T] == ClassTag.Double) {
+ } else if (classTag[VD] == ClassTag.Double) {
rdd.setSerializer(new DoubleAggMsgSerializer)
}
rdd
}
}
+
+private[graphx]
+object VertexRDDFunctions {
+ implicit def rdd2VertexRDDFunctions[VD: ClassTag](rdd: RDD[(VertexId, VD)]) = {
+ new VertexRDDFunctions(rdd)
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index a8154b63ce..3a0bba1b93 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -21,192 +21,102 @@ import scala.reflect.{classTag, ClassTag}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
import org.apache.spark.graphx._
/**
- * A view of the vertices after they are shipped to the join sites specified in
- * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is
- * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a
- * fresh view is created.
- *
- * The view is always cached (i.e., once it is evaluated, it remains materialized). This avoids
- * constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for
- * example. However, it means iterative algorithms must manually call `Graph.unpersist` on previous
- * iterations' graphs for best GC performance. See the implementation of
- * [[org.apache.spark.graphx.Pregel]] for an example.
+ * Manages shipping vertex attributes to the edge partitions of an
+ * [[org.apache.spark.graphx.EdgeRDD]]. Vertex attributes may be partially shipped to construct a
+ * triplet view with vertex attributes on only one side, and they may be updated. An active vertex
+ * set may additionally be shipped to the edge partitions. Be careful not to store a reference to
+ * `edges`, since it may be modified when the attribute shipping level is upgraded.
*/
private[impl]
-class ReplicatedVertexView[VD: ClassTag](
- updatedVerts: VertexRDD[VD],
- edges: EdgeRDD[_],
- routingTable: RoutingTable,
- prevViewOpt: Option[ReplicatedVertexView[VD]] = None) {
+class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
+ var edges: EdgeRDD[ED, VD],
+ var hasSrcId: Boolean = false,
+ var hasDstId: Boolean = false) {
/**
- * Within each edge partition, create a local map from vid to an index into the attribute
- * array. Each map contains a superset of the vertices that it will receive, because it stores
- * vids from both the source and destination of edges. It must always include both source and
- * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
+ * Return a new `ReplicatedVertexView` with the specified `EdgeRDD`, which must have the same
+ * shipping level.
*/
- private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
- case Some(prevView) =>
- prevView.localVertexIdMap
- case None =>
- edges.partitionsRDD.mapPartitions(_.map {
- case (pid, epart) =>
- val vidToIndex = new VertexIdToIndexMap
- epart.foreach { e =>
- vidToIndex.add(e.srcId)
- vidToIndex.add(e.dstId)
- }
- (pid, vidToIndex)
- }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap")
- }
-
- private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
- private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false)
- private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true)
- private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false)
-
- def unpersist(blocking: Boolean = true): ReplicatedVertexView[VD] = {
- bothAttrs.unpersist(blocking)
- srcAttrOnly.unpersist(blocking)
- dstAttrOnly.unpersist(blocking)
- noAttrs.unpersist(blocking)
- // Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it
- // without modification
- this
+ def withEdges[VD2: ClassTag, ED2: ClassTag](
+ edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
+ new ReplicatedVertexView(edges_, hasSrcId, hasDstId)
}
- def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = {
- (includeSrc, includeDst) match {
- case (true, true) => bothAttrs
- case (true, false) => srcAttrOnly
- case (false, true) => dstAttrOnly
- case (false, false) => noAttrs
- }
+ /**
+ * Return a new `ReplicatedVertexView` where edges are reversed and shipping levels are swapped to
+ * match.
+ */
+ def reverse() = {
+ val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
+ new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
}
- def get(
- includeSrc: Boolean,
- includeDst: Boolean,
- actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = {
- // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
- // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
- // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
- // also shipped there.
- val shippedActives = routingTable.get(true, true)
- .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _))
- .partitionBy(edges.partitioner.get)
- // Update the view with shippedActives, setting activeness flags in the resulting
- // VertexPartitions
- get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
- val (pid, vPart) = viewIter.next()
- val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator))
- Iterator((pid, newPart))
+ /**
+ * Upgrade the shipping level in-place to the specified levels by shipping vertex attributes from
+ * `vertices`. This operation modifies the `ReplicatedVertexView`, and callers can access `edges`
+ * afterwards to obtain the upgraded view.
+ */
+ def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean) {
+ val shipSrc = includeSrc && !hasSrcId
+ val shipDst = includeDst && !hasDstId
+ if (shipSrc || shipDst) {
+ val shippedVerts: RDD[(Int, VertexAttributeBlock[VD])] =
+ vertices.shipVertexAttributes(shipSrc, shipDst)
+ .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
+ includeSrc, includeDst, shipSrc, shipDst))
+ .partitionBy(edges.partitioner.get)
+ val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+ (ePartIter, shippedVertsIter) => ePartIter.map {
+ case (pid, edgePartition) =>
+ (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
+ }
+ })
+ edges = newEdges
+ hasSrcId = includeSrc
+ hasDstId = includeDst
}
}
- private def create(includeSrc: Boolean, includeDst: Boolean)
- : RDD[(PartitionID, VertexPartition[VD])] = {
- val vdTag = classTag[VD]
-
- // Ship vertex attributes to edge partitions according to vertexPlacement
- val verts = updatedVerts.partitionsRDD
- val shippedVerts = routingTable.get(includeSrc, includeDst)
- .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdTag))
+ /**
+ * Return a new `ReplicatedVertexView` where the `activeSet` in each edge partition contains only
+ * vertex ids present in `actives`. This ships a vertex id to all edge partitions where it is
+ * referenced, ignoring the attribute shipping level.
+ */
+ def withActiveSet(actives: VertexRDD[_]): ReplicatedVertexView[VD, ED] = {
+ val shippedActives = actives.shipVertexIds()
+ .setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)")
.partitionBy(edges.partitioner.get)
- // TODO: Consider using a specialized shuffler.
-
- prevViewOpt match {
- case Some(prevView) =>
- // Update prevView with shippedVerts, setting staleness flags in the resulting
- // VertexPartitions
- prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) {
- (prevViewIter, shippedVertsIter) =>
- val (pid, prevVPart) = prevViewIter.next()
- val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
- Iterator((pid, newVPart))
- }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst))
- case None =>
- // Within each edge partition, place the shipped vertex attributes into the correct
- // locations specified in localVertexIdMap
- localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
- val (pid, vidToIndex) = mapIter.next()
- assert(!mapIter.hasNext)
- // Populate the vertex array using the vidToIndex map
- val vertexArray = vdTag.newArray(vidToIndex.capacity)
- for ((_, block) <- shippedVertsIter) {
- for (i <- 0 until block.vids.size) {
- val vid = block.vids(i)
- val attr = block.attrs(i)
- val ind = vidToIndex.getPos(vid)
- vertexArray(ind) = attr
- }
- }
- val newVPart = new VertexPartition(
- vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag)
- Iterator((pid, newVPart))
- }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst))
- }
- }
-}
-
-private object ReplicatedVertexView {
- protected def buildBuffer[VD: ClassTag](
- pid2vidIter: Iterator[Array[Array[VertexId]]],
- vertexPartIter: Iterator[VertexPartition[VD]]) = {
- val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
- val vertexPart: VertexPartition[VD] = vertexPartIter.next()
-
- Iterator.tabulate(pid2vid.size) { pid =>
- val vidsCandidate = pid2vid(pid)
- val size = vidsCandidate.length
- val vids = new PrimitiveVector[VertexId](pid2vid(pid).size)
- val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
- var i = 0
- while (i < size) {
- val vid = vidsCandidate(i)
- if (vertexPart.isDefined(vid)) {
- vids += vid
- attrs += vertexPart(vid)
- }
- i += 1
+ val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
+ (ePartIter, shippedActivesIter) => ePartIter.map {
+ case (pid, edgePartition) =>
+ (pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator)))
}
- (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
- }
+ })
+ new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
}
- protected def buildActiveBuffer(
- pid2vidIter: Iterator[Array[Array[VertexId]]],
- activePartIter: Iterator[VertexPartition[_]])
- : Iterator[(Int, Array[VertexId])] = {
- val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
- val activePart: VertexPartition[_] = activePartIter.next()
+ /**
+ * Return a new `ReplicatedVertexView` where vertex attributes in edge partition are updated using
+ * `updates`. This ships a vertex attribute only to the edge partitions where it is in the
+ * position(s) specified by the attribute shipping level.
+ */
+ def updateVertices(updates: VertexRDD[VD]): ReplicatedVertexView[VD, ED] = {
+ val shippedVerts = updates.shipVertexAttributes(hasSrcId, hasDstId)
+ .setName("ReplicatedVertexView.updateVertices - shippedVerts %s %s (broadcast)".format(
+ hasSrcId, hasDstId))
+ .partitionBy(edges.partitioner.get)
- Iterator.tabulate(pid2vid.size) { pid =>
- val vidsCandidate = pid2vid(pid)
- val size = vidsCandidate.length
- val actives = new PrimitiveVector[VertexId](vidsCandidate.size)
- var i = 0
- while (i < size) {
- val vid = vidsCandidate(i)
- if (activePart.isDefined(vid)) {
- actives += vid
- }
- i += 1
+ val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+ (ePartIter, shippedVertsIter) => ePartIter.map {
+ case (pid, edgePartition) =>
+ (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
}
- (pid, actives.trim().array)
- }
+ })
+ new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
}
}
-
-private[graphx]
-class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
- extends Serializable {
- def iterator: Iterator[(VertexId, VD)] =
- (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
-}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
deleted file mode 100644
index 022d5668e2..0000000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx.impl
-
-import org.apache.spark.SparkContext._
-import org.apache.spark.graphx._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.collection.PrimitiveVector
-
-/**
- * Stores the locations of edge-partition join sites for each vertex attribute; that is, the routing
- * information for shipping vertex attributes to edge partitions. This is always cached because it
- * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and
- * (possibly) once to ship the active-set information.
- */
-private[impl]
-class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
-
- val bothAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(true, true)
- val srcAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(true, false)
- val dstAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(false, true)
- val noAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(false, false)
-
- def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] =
- (includeSrcAttr, includeDstAttr) match {
- case (true, true) => bothAttrs
- case (true, false) => srcAttrOnly
- case (false, true) => dstAttrOnly
- case (false, false) => noAttrs
- }
-
- private def createPid2Vid(
- includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = {
- // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
- val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
- val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
- val numEdges = edgePartition.size
- val vSet = new VertexSet
- if (includeSrcAttr) { // Add src vertices to the set.
- var i = 0
- while (i < numEdges) {
- vSet.add(edgePartition.srcIds(i))
- i += 1
- }
- }
- if (includeDstAttr) { // Add dst vertices to the set.
- var i = 0
- while (i < numEdges) {
- vSet.add(edgePartition.dstIds(i))
- i += 1
- }
- }
- vSet.iterator.map { vid => (vid, pid) }
- }
-
- val numEdgePartitions = edges.partitions.size
- vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
- val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
- for ((vid, pid) <- iter) {
- pid2vid(pid) += vid
- }
-
- Iterator(pid2vid.map(_.trim().array))
- }.cache().setName("RoutingTable %s %s".format(includeSrcAttr, includeDstAttr))
- }
-}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
new file mode 100644
index 0000000000..927e32ad0f
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.ShuffledRDD
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
+ * the edge partition references `vid` in the specified `position` (src, dst, or both).
+*/
+private[graphx]
+class RoutingTableMessage(
+ var vid: VertexId,
+ var pid: PartitionID,
+ var position: Byte)
+ extends Product2[VertexId, (PartitionID, Byte)] with Serializable {
+ override def _1 = vid
+ override def _2 = (pid, position)
+ override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage]
+}
+
+private[graphx]
+class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
+ /** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
+ def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
+ new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner)
+ .setSerializer(new RoutingTableMessageSerializer)
+ }
+}
+
+private[graphx]
+object RoutingTableMessageRDDFunctions {
+ import scala.language.implicitConversions
+
+ implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
+ new RoutingTableMessageRDDFunctions(rdd)
+ }
+}
+
+private[graphx]
+object RoutingTablePartition {
+ val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty)
+
+ /** Generate a `RoutingTableMessage` for each vertex referenced in `edgePartition`. */
+ def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
+ : Iterator[RoutingTableMessage] = {
+ // Determine which positions each vertex id appears in using a map where the low 2 bits
+ // represent src and dst
+ val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
+ edgePartition.srcIds.iterator.foreach { srcId =>
+ map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
+ }
+ edgePartition.dstIds.iterator.foreach { dstId =>
+ map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
+ }
+ map.iterator.map { vidAndPosition =>
+ new RoutingTableMessage(vidAndPosition._1, pid, vidAndPosition._2)
+ }
+ }
+
+ /** Build a `RoutingTablePartition` from `RoutingTableMessage`s. */
+ def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
+ : RoutingTablePartition = {
+ val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
+ val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
+ val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
+ for (msg <- iter) {
+ pid2vid(msg.pid) += msg.vid
+ srcFlags(msg.pid) += (msg.position & 0x1) != 0
+ dstFlags(msg.pid) += (msg.position & 0x2) != 0
+ }
+
+ new RoutingTablePartition(pid2vid.zipWithIndex.map {
+ case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))
+ })
+ }
+
+ /** Compact the given vector of Booleans into a BitSet. */
+ private def toBitSet(flags: PrimitiveVector[Boolean]): BitSet = {
+ val bitset = new BitSet(flags.size)
+ var i = 0
+ while (i < flags.size) {
+ if (flags(i)) {
+ bitset.set(i)
+ }
+ i += 1
+ }
+ bitset
+ }
+}
+
+/**
+ * Stores the locations of edge-partition join sites for each vertex attribute in a particular
+ * vertex partition. This provides routing information for shipping vertex attributes to edge
+ * partitions.
+ */
+private[graphx]
+class RoutingTablePartition(
+ private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) {
+ /** The maximum number of edge partitions this `RoutingTablePartition` is built to join with. */
+ val numEdgePartitions: Int = routingTable.size
+
+ /** Returns the number of vertices that will be sent to the specified edge partition. */
+ def partitionSize(pid: PartitionID): Int = routingTable(pid)._1.size
+
+ /** Returns an iterator over all vertex ids stored in this `RoutingTablePartition`. */
+ def iterator: Iterator[VertexId] = routingTable.iterator.flatMap(_._1.iterator)
+
+ /** Returns a new RoutingTablePartition reflecting a reversal of all edge directions. */
+ def reverse: RoutingTablePartition = {
+ new RoutingTablePartition(routingTable.map {
+ case (vids, srcVids, dstVids) => (vids, dstVids, srcVids)
+ })
+ }
+
+ /**
+ * Runs `f` on each vertex id to be sent to the specified edge partition. Vertex ids can be
+ * filtered by the position they have in the edge partition.
+ */
+ def foreachWithinEdgePartition
+ (pid: PartitionID, includeSrc: Boolean, includeDst: Boolean)
+ (f: VertexId => Unit) {
+ val (vidsCandidate, srcVids, dstVids) = routingTable(pid)
+ val size = vidsCandidate.length
+ if (includeSrc && includeDst) {
+ // Avoid checks for performance
+ vidsCandidate.iterator.foreach(f)
+ } else if (!includeSrc && !includeDst) {
+ // Do nothing
+ } else {
+ val relevantVids = if (includeSrc) srcVids else dstVids
+ relevantVids.iterator.foreach { i => f(vidsCandidate(i)) }
+ }
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index 1de42eeca1..033237f597 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -28,6 +28,35 @@ import org.apache.spark.graphx._
import org.apache.spark.serializer._
private[graphx]
+class RoutingTableMessageSerializer extends Serializer with Serializable {
+ override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
+
+ override def serializeStream(s: OutputStream): SerializationStream =
+ new ShuffleSerializationStream(s) {
+ def writeObject[T: ClassTag](t: T): SerializationStream = {
+ val msg = t.asInstanceOf[RoutingTableMessage]
+ writeVarLong(msg.vid, optimizePositive = false)
+ writeUnsignedVarInt(msg.pid)
+ // TODO: Write only the bottom two bits of msg.position
+ s.write(msg.position)
+ this
+ }
+ }
+
+ override def deserializeStream(s: InputStream): DeserializationStream =
+ new ShuffleDeserializationStream(s) {
+ override def readObject[T: ClassTag](): T = {
+ val a = readVarLong(optimizePositive = false)
+ val b = readUnsignedVarInt()
+ val c = s.read()
+ if (c == -1) throw new EOFException
+ new RoutingTableMessage(a, b, c.toByte).asInstanceOf[T]
+ }
+ }
+ }
+}
+
+private[graphx]
class VertexIdMsgSerializer extends Serializer with Serializable {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
new file mode 100644
index 0000000000..f4e221d4e0
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/** Stores vertex attributes to ship to an edge partition. */
+private[graphx]
+class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
+ extends Serializable {
+ def iterator: Iterator[(VertexId, VD)] =
+ (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
+}
+
+private[graphx]
+object ShippableVertexPartition {
+ /** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */
+ def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] =
+ apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD])
+
+ /**
+ * Construct a `ShippableVertexPartition` from the given vertices with the specified routing
+ * table, filling in missing vertices mentioned in the routing table using `defaultVal`.
+ */
+ def apply[VD: ClassTag](
+ iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD)
+ : ShippableVertexPartition[VD] = {
+ val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
+ val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a)
+ new ShippableVertexPartition(index, values, mask, routingTable)
+ }
+
+ import scala.language.implicitConversions
+
+ /**
+ * Implicit conversion to allow invoking `VertexPartitionBase` operations directly on a
+ * `ShippableVertexPartition`.
+ */
+ implicit def shippablePartitionToOps[VD: ClassTag](partition: ShippableVertexPartition[VD]) =
+ new ShippableVertexPartitionOps(partition)
+
+ /**
+ * Implicit evidence that `ShippableVertexPartition` is a member of the
+ * `VertexPartitionBaseOpsConstructor` typeclass. This enables invoking `VertexPartitionBase`
+ * operations on a `ShippableVertexPartition` via an evidence parameter, as in
+ * [[VertexPartitionBaseOps]].
+ */
+ implicit object ShippableVertexPartitionOpsConstructor
+ extends VertexPartitionBaseOpsConstructor[ShippableVertexPartition] {
+ def toOps[VD: ClassTag](partition: ShippableVertexPartition[VD])
+ : VertexPartitionBaseOps[VD, ShippableVertexPartition] = shippablePartitionToOps(partition)
+ }
+}
+
+/**
+ * A map from vertex id to vertex attribute that additionally stores edge partition join sites for
+ * each vertex attribute, enabling joining with an [[org.apache.spark.graphx.EdgeRDD]].
+ */
+private[graphx]
+class ShippableVertexPartition[VD: ClassTag](
+ val index: VertexIdToIndexMap,
+ val values: Array[VD],
+ val mask: BitSet,
+ val routingTable: RoutingTablePartition)
+ extends VertexPartitionBase[VD] {
+
+ /** Return a new ShippableVertexPartition with the specified routing table. */
+ def withRoutingTable(routingTable_ : RoutingTablePartition): ShippableVertexPartition[VD] = {
+ new ShippableVertexPartition(index, values, mask, routingTable_)
+ }
+
+ /**
+ * Generate a `VertexAttributeBlock` for each edge partition keyed on the edge partition ID. The
+ * `VertexAttributeBlock` contains the vertex attributes from the current partition that are
+ * referenced in the specified positions in the edge partition.
+ */
+ def shipVertexAttributes(
+ shipSrc: Boolean, shipDst: Boolean): Iterator[(PartitionID, VertexAttributeBlock[VD])] = {
+ Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
+ val initialSize = if (shipSrc && shipDst) routingTable.partitionSize(pid) else 64
+ val vids = new PrimitiveVector[VertexId](initialSize)
+ val attrs = new PrimitiveVector[VD](initialSize)
+ var i = 0
+ routingTable.foreachWithinEdgePartition(pid, shipSrc, shipDst) { vid =>
+ if (isDefined(vid)) {
+ vids += vid
+ attrs += this(vid)
+ }
+ i += 1
+ }
+ (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
+ }
+ }
+
+ /**
+ * Generate a `VertexId` array for each edge partition keyed on the edge partition ID. The array
+ * contains the visible vertex ids from the current partition that are referenced in the edge
+ * partition.
+ */
+ def shipVertexIds(): Iterator[(PartitionID, Array[VertexId])] = {
+ Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
+ val vids = new PrimitiveVector[VertexId](routingTable.partitionSize(pid))
+ var i = 0
+ routingTable.foreachWithinEdgePartition(pid, true, true) { vid =>
+ if (isDefined(vid)) {
+ vids += vid
+ }
+ i += 1
+ }
+ (pid, vids.trim().array)
+ }
+ }
+}
+
+private[graphx] class ShippableVertexPartitionOps[VD: ClassTag](self: ShippableVertexPartition[VD])
+ extends VertexPartitionBaseOps[VD, ShippableVertexPartition](self) {
+
+ def withIndex(index: VertexIdToIndexMap): ShippableVertexPartition[VD] = {
+ new ShippableVertexPartition(index, self.values, self.mask, self.routingTable)
+ }
+
+ def withValues[VD2: ClassTag](values: Array[VD2]): ShippableVertexPartition[VD2] = {
+ new ShippableVertexPartition(self.index, values, self.mask, self.routingTable)
+ }
+
+ def withMask(mask: BitSet): ShippableVertexPartition[VD] = {
+ new ShippableVertexPartition(self.index, self.values, mask, self.routingTable)
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
index 7a54b413dc..f1d174720a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -19,260 +19,59 @@ package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
-import org.apache.spark.Logging
+import org.apache.spark.util.collection.BitSet
+
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
-import org.apache.spark.util.collection.BitSet
private[graphx] object VertexPartition {
-
- def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): VertexPartition[VD] = {
- val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
- iter.foreach { case (k, v) =>
- map(k) = v
- }
- new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
- }
-
- def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
- : VertexPartition[VD] =
- {
- val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
- iter.foreach { case (k, v) =>
- map.setMerge(k, v, mergeFunc)
- }
- new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
- }
-}
-
-
-private[graphx]
-class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
- val index: VertexIdToIndexMap,
- val values: Array[VD],
- val mask: BitSet,
- /** A set of vids of active vertices. May contain vids not in index due to join rewrite. */
- private val activeSet: Option[VertexSet] = None)
- extends Logging {
-
- val capacity: Int = index.capacity
-
- def size: Int = mask.cardinality()
-
- /** Return the vertex attribute for the given vertex ID. */
- def apply(vid: VertexId): VD = values(index.getPos(vid))
-
- def isDefined(vid: VertexId): Boolean = {
- val pos = index.getPos(vid)
- pos >= 0 && mask.get(pos)
- }
-
- /** Look up vid in activeSet, throwing an exception if it is None. */
- def isActive(vid: VertexId): Boolean = {
- activeSet.get.contains(vid)
+ /** Construct a `VertexPartition` from the given vertices. */
+ def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)])
+ : VertexPartition[VD] = {
+ val (index, values, mask) = VertexPartitionBase.initFrom(iter)
+ new VertexPartition(index, values, mask)
}
- /** The number of active vertices, if any exist. */
- def numActives: Option[Int] = activeSet.map(_.size)
+ import scala.language.implicitConversions
/**
- * Pass each vertex attribute along with the vertex id through a map
- * function and retain the original RDD's partitioning and index.
- *
- * @tparam VD2 the type returned by the map function
- *
- * @param f the function applied to each vertex id and vertex
- * attribute in the RDD
- *
- * @return a new VertexPartition with values obtained by applying `f` to
- * each of the entries in the original VertexRDD. The resulting
- * VertexPartition retains the same index.
+ * Implicit conversion to allow invoking `VertexPartitionBase` operations directly on a
+ * `VertexPartition`.
*/
- def map[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexPartition[VD2] = {
- // Construct a view of the map transformation
- val newValues = new Array[VD2](capacity)
- var i = mask.nextSetBit(0)
- while (i >= 0) {
- newValues(i) = f(index.getValue(i), values(i))
- i = mask.nextSetBit(i + 1)
- }
- new VertexPartition[VD2](index, newValues, mask)
- }
-
- /**
- * Restrict the vertex set to the set of vertices satisfying the given predicate.
- *
- * @param pred the user defined predicate
- *
- * @note The vertex set preserves the original index structure which means that the returned
- * RDD can be easily joined with the original vertex-set. Furthermore, the filter only
- * modifies the bitmap index and so no new values are allocated.
- */
- def filter(pred: (VertexId, VD) => Boolean): VertexPartition[VD] = {
- // Allocate the array to store the results into
- val newMask = new BitSet(capacity)
- // Iterate over the active bits in the old mask and evaluate the predicate
- var i = mask.nextSetBit(0)
- while (i >= 0) {
- if (pred(index.getValue(i), values(i))) {
- newMask.set(i)
- }
- i = mask.nextSetBit(i + 1)
- }
- new VertexPartition(index, values, newMask)
- }
+ implicit def partitionToOps[VD: ClassTag](partition: VertexPartition[VD]) =
+ new VertexPartitionOps(partition)
/**
- * Hides vertices that are the same between this and other. For vertices that are different, keeps
- * the values from `other`. The indices of `this` and `other` must be the same.
+ * Implicit evidence that `VertexPartition` is a member of the `VertexPartitionBaseOpsConstructor`
+ * typeclass. This enables invoking `VertexPartitionBase` operations on a `VertexPartition` via an
+ * evidence parameter, as in [[VertexPartitionBaseOps]].
*/
- def diff(other: VertexPartition[VD]): VertexPartition[VD] = {
- if (index != other.index) {
- logWarning("Diffing two VertexPartitions with different indexes is slow.")
- diff(createUsingIndex(other.iterator))
- } else {
- val newMask = mask & other.mask
- var i = newMask.nextSetBit(0)
- while (i >= 0) {
- if (values(i) == other.values(i)) {
- newMask.unset(i)
- }
- i = newMask.nextSetBit(i + 1)
- }
- new VertexPartition(index, other.values, newMask)
- }
- }
-
- /** Left outer join another VertexPartition. */
- def leftJoin[VD2: ClassTag, VD3: ClassTag]
- (other: VertexPartition[VD2])
- (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
- if (index != other.index) {
- logWarning("Joining two VertexPartitions with different indexes is slow.")
- leftJoin(createUsingIndex(other.iterator))(f)
- } else {
- val newValues = new Array[VD3](capacity)
-
- var i = mask.nextSetBit(0)
- while (i >= 0) {
- val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None
- newValues(i) = f(index.getValue(i), values(i), otherV)
- i = mask.nextSetBit(i + 1)
- }
- new VertexPartition(index, newValues, mask)
- }
- }
-
- /** Left outer join another iterator of messages. */
- def leftJoin[VD2: ClassTag, VD3: ClassTag]
- (other: Iterator[(VertexId, VD2)])
- (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
- leftJoin(createUsingIndex(other))(f)
- }
-
- /** Inner join another VertexPartition. */
- def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
- (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
- if (index != other.index) {
- logWarning("Joining two VertexPartitions with different indexes is slow.")
- innerJoin(createUsingIndex(other.iterator))(f)
- } else {
- val newMask = mask & other.mask
- val newValues = new Array[VD2](capacity)
- var i = newMask.nextSetBit(0)
- while (i >= 0) {
- newValues(i) = f(index.getValue(i), values(i), other.values(i))
- i = newMask.nextSetBit(i + 1)
- }
- new VertexPartition(index, newValues, newMask)
- }
- }
-
- /**
- * Inner join an iterator of messages.
- */
- def innerJoin[U: ClassTag, VD2: ClassTag]
- (iter: Iterator[Product2[VertexId, U]])
- (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
- innerJoin(createUsingIndex(iter))(f)
+ implicit object VertexPartitionOpsConstructor
+ extends VertexPartitionBaseOpsConstructor[VertexPartition] {
+ def toOps[VD: ClassTag](partition: VertexPartition[VD])
+ : VertexPartitionBaseOps[VD, VertexPartition] = partitionToOps(partition)
}
+}
- /**
- * Similar effect as aggregateUsingIndex((a, b) => a)
- */
- def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
- : VertexPartition[VD2] = {
- val newMask = new BitSet(capacity)
- val newValues = new Array[VD2](capacity)
- iter.foreach { case (vid, vdata) =>
- val pos = index.getPos(vid)
- if (pos >= 0) {
- newMask.set(pos)
- newValues(pos) = vdata
- }
- }
- new VertexPartition[VD2](index, newValues, newMask)
- }
+/** A map from vertex id to vertex attribute. */
+private[graphx] class VertexPartition[VD: ClassTag](
+ val index: VertexIdToIndexMap,
+ val values: Array[VD],
+ val mask: BitSet)
+ extends VertexPartitionBase[VD]
- /**
- * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
- * the partition, hidden by the bitmask.
- */
- def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = {
- val newMask = new BitSet(capacity)
- val newValues = new Array[VD](capacity)
- System.arraycopy(values, 0, newValues, 0, newValues.length)
- iter.foreach { case (vid, vdata) =>
- val pos = index.getPos(vid)
- if (pos >= 0) {
- newMask.set(pos)
- newValues(pos) = vdata
- }
- }
- new VertexPartition(index, newValues, newMask)
- }
+private[graphx] class VertexPartitionOps[VD: ClassTag](self: VertexPartition[VD])
+ extends VertexPartitionBaseOps[VD, VertexPartition](self) {
- def aggregateUsingIndex[VD2: ClassTag](
- iter: Iterator[Product2[VertexId, VD2]],
- reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
- val newMask = new BitSet(capacity)
- val newValues = new Array[VD2](capacity)
- iter.foreach { product =>
- val vid = product._1
- val vdata = product._2
- val pos = index.getPos(vid)
- if (pos >= 0) {
- if (newMask.get(pos)) {
- newValues(pos) = reduceFunc(newValues(pos), vdata)
- } else { // otherwise just store the new value
- newMask.set(pos)
- newValues(pos) = vdata
- }
- }
- }
- new VertexPartition[VD2](index, newValues, newMask)
+ def withIndex(index: VertexIdToIndexMap): VertexPartition[VD] = {
+ new VertexPartition(index, self.values, self.mask)
}
- def replaceActives(iter: Iterator[VertexId]): VertexPartition[VD] = {
- val newActiveSet = new VertexSet
- iter.foreach(newActiveSet.add(_))
- new VertexPartition(index, values, mask, Some(newActiveSet))
+ def withValues[VD2: ClassTag](values: Array[VD2]): VertexPartition[VD2] = {
+ new VertexPartition(self.index, values, self.mask)
}
- /**
- * Construct a new VertexPartition whose index contains only the vertices in the mask.
- */
- def reindex(): VertexPartition[VD] = {
- val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
- val arbitraryMerge = (a: VD, b: VD) => a
- for ((k, v) <- this.iterator) {
- hashMap.setMerge(k, v, arbitraryMerge)
- }
- new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
+ def withMask(mask: BitSet): VertexPartition[VD] = {
+ new VertexPartition(self.index, self.values, mask)
}
-
- def iterator: Iterator[(VertexId, VD)] =
- mask.iterator.map(ind => (index.getValue(ind), values(ind)))
-
- def vidIterator: Iterator[VertexId] = mask.iterator.map(ind => index.getValue(ind))
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
new file mode 100644
index 0000000000..8d9e0204d2
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.language.higherKinds
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+private[graphx] object VertexPartitionBase {
+ /**
+ * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate
+ * entries arbitrarily.
+ */
+ def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
+ : (VertexIdToIndexMap, Array[VD], BitSet) = {
+ val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+ iter.foreach { pair =>
+ map(pair._1) = pair._2
+ }
+ (map.keySet, map._values, map.keySet.getBitSet)
+ }
+
+ /**
+ * Construct the constituents of a VertexPartitionBase from the given vertices, merging duplicate
+ * entries using `mergeFunc`.
+ */
+ def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
+ : (VertexIdToIndexMap, Array[VD], BitSet) = {
+ val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+ iter.foreach { pair =>
+ map.setMerge(pair._1, pair._2, mergeFunc)
+ }
+ (map.keySet, map._values, map.keySet.getBitSet)
+ }
+}
+
+/**
+ * An abstract map from vertex id to vertex attribute. [[VertexPartition]] is the corresponding
+ * concrete implementation. [[VertexPartitionBaseOps]] provides a variety of operations for
+ * VertexPartitionBase and subclasses that provide implicit evidence of membership in the
+ * `VertexPartitionBaseOpsConstructor` typeclass (for example,
+ * [[VertexPartition.VertexPartitionOpsConstructor]]).
+ */
+private[graphx] abstract class VertexPartitionBase[@specialized(Long, Int, Double) VD: ClassTag] {
+
+ def index: VertexIdToIndexMap
+ def values: Array[VD]
+ def mask: BitSet
+
+ val capacity: Int = index.capacity
+
+ def size: Int = mask.cardinality()
+
+ /** Return the vertex attribute for the given vertex ID. */
+ def apply(vid: VertexId): VD = values(index.getPos(vid))
+
+ def isDefined(vid: VertexId): Boolean = {
+ val pos = index.getPos(vid)
+ pos >= 0 && mask.get(pos)
+ }
+
+ def iterator: Iterator[(VertexId, VD)] =
+ mask.iterator.map(ind => (index.getValue(ind), values(ind)))
+}
+
+/**
+ * A typeclass for subclasses of `VertexPartitionBase` representing the ability to wrap them in a
+ * `VertexPartitionBaseOps`.
+ */
+private[graphx] trait VertexPartitionBaseOpsConstructor[T[X] <: VertexPartitionBase[X]] {
+ def toOps[VD: ClassTag](partition: T[VD]): VertexPartitionBaseOps[VD, T]
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
new file mode 100644
index 0000000000..21ff615fec
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.language.higherKinds
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+import org.apache.spark.Logging
+import org.apache.spark.util.collection.BitSet
+
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+
+/**
+ * An class containing additional operations for subclasses of VertexPartitionBase that provide
+ * implicit evidence of membership in the `VertexPartitionBaseOpsConstructor` typeclass (for
+ * example, [[VertexPartition.VertexPartitionOpsConstructor]]).
+ */
+private[graphx] abstract class VertexPartitionBaseOps
+ [VD: ClassTag, Self[X] <: VertexPartitionBase[X] : VertexPartitionBaseOpsConstructor]
+ (self: Self[VD])
+ extends Logging {
+
+ def withIndex(index: VertexIdToIndexMap): Self[VD]
+ def withValues[VD2: ClassTag](values: Array[VD2]): Self[VD2]
+ def withMask(mask: BitSet): Self[VD]
+
+ /**
+ * Pass each vertex attribute along with the vertex id through a map
+ * function and retain the original RDD's partitioning and index.
+ *
+ * @tparam VD2 the type returned by the map function
+ *
+ * @param f the function applied to each vertex id and vertex
+ * attribute in the RDD
+ *
+ * @return a new VertexPartition with values obtained by applying `f` to
+ * each of the entries in the original VertexRDD. The resulting
+ * VertexPartition retains the same index.
+ */
+ def map[VD2: ClassTag](f: (VertexId, VD) => VD2): Self[VD2] = {
+ // Construct a view of the map transformation
+ val newValues = new Array[VD2](self.capacity)
+ var i = self.mask.nextSetBit(0)
+ while (i >= 0) {
+ newValues(i) = f(self.index.getValue(i), self.values(i))
+ i = self.mask.nextSetBit(i + 1)
+ }
+ this.withValues(newValues)
+ }
+
+ /**
+ * Restrict the vertex set to the set of vertices satisfying the given predicate.
+ *
+ * @param pred the user defined predicate
+ *
+ * @note The vertex set preserves the original index structure which means that the returned
+ * RDD can be easily joined with the original vertex-set. Furthermore, the filter only
+ * modifies the bitmap index and so no new values are allocated.
+ */
+ def filter(pred: (VertexId, VD) => Boolean): Self[VD] = {
+ // Allocate the array to store the results into
+ val newMask = new BitSet(self.capacity)
+ // Iterate over the active bits in the old mask and evaluate the predicate
+ var i = self.mask.nextSetBit(0)
+ while (i >= 0) {
+ if (pred(self.index.getValue(i), self.values(i))) {
+ newMask.set(i)
+ }
+ i = self.mask.nextSetBit(i + 1)
+ }
+ this.withMask(newMask)
+ }
+
+ /**
+ * Hides vertices that are the same between this and other. For vertices that are different, keeps
+ * the values from `other`. The indices of `this` and `other` must be the same.
+ */
+ def diff(other: Self[VD]): Self[VD] = {
+ if (self.index != other.index) {
+ logWarning("Diffing two VertexPartitions with different indexes is slow.")
+ diff(createUsingIndex(other.iterator))
+ } else {
+ val newMask = self.mask & other.mask
+ var i = newMask.nextSetBit(0)
+ while (i >= 0) {
+ if (self.values(i) == other.values(i)) {
+ newMask.unset(i)
+ }
+ i = newMask.nextSetBit(i + 1)
+ }
+ this.withValues(other.values).withMask(newMask)
+ }
+ }
+
+ /** Left outer join another VertexPartition. */
+ def leftJoin[VD2: ClassTag, VD3: ClassTag]
+ (other: Self[VD2])
+ (f: (VertexId, VD, Option[VD2]) => VD3): Self[VD3] = {
+ if (self.index != other.index) {
+ logWarning("Joining two VertexPartitions with different indexes is slow.")
+ leftJoin(createUsingIndex(other.iterator))(f)
+ } else {
+ val newValues = new Array[VD3](self.capacity)
+
+ var i = self.mask.nextSetBit(0)
+ while (i >= 0) {
+ val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None
+ newValues(i) = f(self.index.getValue(i), self.values(i), otherV)
+ i = self.mask.nextSetBit(i + 1)
+ }
+ this.withValues(newValues)
+ }
+ }
+
+ /** Left outer join another iterator of messages. */
+ def leftJoin[VD2: ClassTag, VD3: ClassTag]
+ (other: Iterator[(VertexId, VD2)])
+ (f: (VertexId, VD, Option[VD2]) => VD3): Self[VD3] = {
+ leftJoin(createUsingIndex(other))(f)
+ }
+
+ /** Inner join another VertexPartition. */
+ def innerJoin[U: ClassTag, VD2: ClassTag]
+ (other: Self[U])
+ (f: (VertexId, VD, U) => VD2): Self[VD2] = {
+ if (self.index != other.index) {
+ logWarning("Joining two VertexPartitions with different indexes is slow.")
+ innerJoin(createUsingIndex(other.iterator))(f)
+ } else {
+ val newMask = self.mask & other.mask
+ val newValues = new Array[VD2](self.capacity)
+ var i = newMask.nextSetBit(0)
+ while (i >= 0) {
+ newValues(i) = f(self.index.getValue(i), self.values(i), other.values(i))
+ i = newMask.nextSetBit(i + 1)
+ }
+ this.withValues(newValues).withMask(newMask)
+ }
+ }
+
+ /**
+ * Inner join an iterator of messages.
+ */
+ def innerJoin[U: ClassTag, VD2: ClassTag]
+ (iter: Iterator[Product2[VertexId, U]])
+ (f: (VertexId, VD, U) => VD2): Self[VD2] = {
+ innerJoin(createUsingIndex(iter))(f)
+ }
+
+ /**
+ * Similar effect as aggregateUsingIndex((a, b) => a)
+ */
+ def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
+ : Self[VD2] = {
+ val newMask = new BitSet(self.capacity)
+ val newValues = new Array[VD2](self.capacity)
+ iter.foreach { pair =>
+ val pos = self.index.getPos(pair._1)
+ if (pos >= 0) {
+ newMask.set(pos)
+ newValues(pos) = pair._2
+ }
+ }
+ this.withValues(newValues).withMask(newMask)
+ }
+
+ /**
+ * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
+ * the partition, hidden by the bitmask.
+ */
+ def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): Self[VD] = {
+ val newMask = new BitSet(self.capacity)
+ val newValues = new Array[VD](self.capacity)
+ System.arraycopy(self.values, 0, newValues, 0, newValues.length)
+ iter.foreach { pair =>
+ val pos = self.index.getPos(pair._1)
+ if (pos >= 0) {
+ newMask.set(pos)
+ newValues(pos) = pair._2
+ }
+ }
+ this.withValues(newValues).withMask(newMask)
+ }
+
+ def aggregateUsingIndex[VD2: ClassTag](
+ iter: Iterator[Product2[VertexId, VD2]],
+ reduceFunc: (VD2, VD2) => VD2): Self[VD2] = {
+ val newMask = new BitSet(self.capacity)
+ val newValues = new Array[VD2](self.capacity)
+ iter.foreach { product =>
+ val vid = product._1
+ val vdata = product._2
+ val pos = self.index.getPos(vid)
+ if (pos >= 0) {
+ if (newMask.get(pos)) {
+ newValues(pos) = reduceFunc(newValues(pos), vdata)
+ } else { // otherwise just store the new value
+ newMask.set(pos)
+ newValues(pos) = vdata
+ }
+ }
+ }
+ this.withValues(newValues).withMask(newMask)
+ }
+
+ /**
+ * Construct a new VertexPartition whose index contains only the vertices in the mask.
+ */
+ def reindex(): Self[VD] = {
+ val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
+ val arbitraryMerge = (a: VD, b: VD) => a
+ for ((k, v) <- self.iterator) {
+ hashMap.setMerge(k, v, arbitraryMerge)
+ }
+ this.withIndex(hashMap.keySet).withValues(hashMap._values).withMask(hashMap.keySet.getBitSet)
+ }
+
+ /**
+ * Converts a vertex partition (in particular, one of type `Self`) into a
+ * `VertexPartitionBaseOps`. Within this class, this allows chaining the methods defined above,
+ * because these methods return a `Self` and this implicit conversion re-wraps that in a
+ * `VertexPartitionBaseOps`. This relies on the context bound on `Self`.
+ */
+ private implicit def toOps[VD2: ClassTag](
+ partition: Self[VD2]): VertexPartitionBaseOps[VD2, Self] = {
+ implicitly[VertexPartitionBaseOpsConstructor[Self]].toOps(partition)
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index d901d4fe22..069e042ed9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -55,6 +55,7 @@ object Analytics extends Logging {
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+ .set("spark.locality.wait", "100000")
taskType match {
case "pagerank" =>
@@ -62,12 +63,14 @@ object Analytics extends Logging {
var outFname = ""
var numEPart = 4
var partitionStrategy: Option[PartitionStrategy] = None
+ var numIterOpt: Option[Int] = None
options.foreach{
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numEPart", v) => numEPart = v.toInt
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+ case ("numIter", v) => numIterOpt = Some(v.toInt)
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
@@ -84,7 +87,10 @@ object Analytics extends Logging {
println("GRAPHX: Number of vertices " + graph.vertices.count)
println("GRAPHX: Number of edges " + graph.edges.count)
- val pr = graph.pageRank(tol).vertices.cache()
+ val pr = (numIterOpt match {
+ case Some(numIter) => PageRank.run(graph, numIter)
+ case None => PageRank.runUntilConvergence(graph, tol)
+ }).vertices.cache()
println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 32b5fe4813..7b9bac5d9c 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val p = 100
val verts = 1 to n
val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
- verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
+ verts.withFilter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
assert(graph.edges.partitions.length === p)
val partitionedGraph = graph.partitionBy(EdgePartition2D)
assert(graph.edges.partitions.length === p)
@@ -120,7 +120,13 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val part = iter.next()._2
Iterator((part.srcIds ++ part.dstIds).toSet)
}.collect
- assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound))
+ if (!verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) {
+ val numFailures = verts.count(id => partitionSets.count(_.contains(id)) > bound)
+ val failure = verts.maxBy(id => partitionSets.count(_.contains(id)))
+ fail(("Replication bound test failed for %d/%d vertices. " +
+ "Example: vertex %d replicated to %d (> %f) partitions.").format(
+ numFailures, n, failure, partitionSets.count(_.contains(failure)), bound))
+ }
// This should not be true for the default hash partitioning
val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
val part = iter.next()._2
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index e135d1d7ad..d2e0c01bc3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -26,10 +26,16 @@ import org.apache.spark.graphx._
class EdgePartitionSuite extends FunSuite {
+ def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A, Int] = {
+ val builder = new EdgePartitionBuilder[A, Int]
+ for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
+ builder.toEdgePartition
+ }
+
test("reverse") {
val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0))
- val builder = new EdgePartitionBuilder[Int]
+ val builder = new EdgePartitionBuilder[Int, Nothing]
for (e <- edges) {
builder.add(e.srcId, e.dstId, e.attr)
}
@@ -40,7 +46,7 @@ class EdgePartitionSuite extends FunSuite {
test("map") {
val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
- val builder = new EdgePartitionBuilder[Int]
+ val builder = new EdgePartitionBuilder[Int, Nothing]
for (e <- edges) {
builder.add(e.srcId, e.dstId, e.attr)
}
@@ -49,11 +55,22 @@ class EdgePartitionSuite extends FunSuite {
edges.map(e => e.copy(attr = e.srcId + e.dstId)))
}
+ test("filter") {
+ val edges = List(Edge(0, 1, 0), Edge(0, 2, 0), Edge(2, 0, 0))
+ val builder = new EdgePartitionBuilder[Int, Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ val filtered = edgePartition.filter(et => et.srcId == 0, (vid, attr) => vid == 0 || vid == 1)
+ assert(filtered.tripletIterator().toList.map(et => (et.srcId, et.dstId)) === List((0L, 1L)))
+ }
+
test("groupEdges") {
val edges = List(
Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32))
val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36))
- val builder = new EdgePartitionBuilder[Int]
+ val builder = new EdgePartitionBuilder[Int, Nothing]
for (e <- edges) {
builder.add(e.srcId, e.dstId, e.attr)
}
@@ -61,11 +78,19 @@ class EdgePartitionSuite extends FunSuite {
assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges)
}
+ test("upgradeIterator") {
+ val edges = List((0, 1, 0), (1, 0, 0))
+ val verts = List((0L, 1), (1L, 2))
+ val part = makeEdgePartition(edges).updateVertices(verts.iterator)
+ assert(part.upgradeIterator(part.iterator).map(_.toTuple).toList ===
+ part.tripletIterator().toList.map(_.toTuple))
+ }
+
test("indexIterator") {
val edgesFrom0 = List(Edge(0, 1, 0))
val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
val sortedEdges = edgesFrom0 ++ edgesFrom1
- val builder = new EdgePartitionBuilder[Int]
+ val builder = new EdgePartitionBuilder[Int, Nothing]
for (e <- Random.shuffle(sortedEdges)) {
builder.add(e.srcId, e.dstId, e.attr)
}
@@ -77,11 +102,6 @@ class EdgePartitionSuite extends FunSuite {
}
test("innerJoin") {
- def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
- val builder = new EdgePartitionBuilder[A]
- for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
- builder.toEdgePartition
- }
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
val a = makeEdgePartition(aList)
@@ -90,4 +110,14 @@ class EdgePartitionSuite extends FunSuite {
assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList ===
List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0)))
}
+
+ test("isActive, numActives, replaceActives") {
+ val ep = new EdgePartitionBuilder[Nothing, Nothing].toEdgePartition
+ .withActiveSet(Iterator(0L, 2L, 0L))
+ assert(ep.isActive(0))
+ assert(!ep.isActive(1))
+ assert(ep.isActive(2))
+ assert(!ep.isActive(-1))
+ assert(ep.numActives == Some(2))
+ }
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
index 9cbb2d2acd..49b2704390 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala
@@ -26,17 +26,11 @@ import org.apache.spark.graphx._
class EdgeTripletIteratorSuite extends FunSuite {
test("iterator.toList") {
- val builder = new EdgePartitionBuilder[Int]
+ val builder = new EdgePartitionBuilder[Int, Int]
builder.add(1, 2, 0)
builder.add(1, 3, 0)
builder.add(1, 4, 0)
- val vidmap = new VertexIdToIndexMap
- vidmap.add(1)
- vidmap.add(2)
- vidmap.add(3)
- vidmap.add(4)
- val vs = Array.fill(vidmap.capacity)(0)
- val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition)
+ val iter = new EdgeTripletIterator[Int, Int](builder.toEdgePartition, true, true)
val result = iter.toList.map(et => (et.srcId, et.dstId))
assert(result === Seq((1, 2), (1, 3), (1, 4)))
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
index a048d13fd1..8bf1384d51 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
@@ -30,17 +30,6 @@ class VertexPartitionSuite extends FunSuite {
assert(!vp.isDefined(-1))
}
- test("isActive, numActives, replaceActives") {
- val vp = VertexPartition(Iterator((0L, 1), (1L, 1)))
- .filter { (vid, attr) => vid == 0 }
- .replaceActives(Iterator(0, 2, 0))
- assert(vp.isActive(0))
- assert(!vp.isActive(1))
- assert(vp.isActive(2))
- assert(!vp.isActive(-1))
- assert(vp.numActives == Some(2))
- }
-
test("map") {
val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).map { (vid, attr) => 2 }
assert(vp(0) === 2)
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index efdb38e907..fafc9b36a7 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -76,6 +76,8 @@ object MimaBuild {
excludeSparkClass("util.XORShiftRandom") ++
excludeSparkClass("graphx.EdgeRDD") ++
excludeSparkClass("graphx.VertexRDD") ++
+ excludeSparkClass("graphx.impl.GraphImpl") ++
+ excludeSparkClass("graphx.impl.RoutingTable") ++
excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
excludeSparkClass("mllib.optimization.SquaredGradient") ++
excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++