aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-10 15:42:44 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-10 15:48:20 -0800
commit57d7487d3da19df04de52235812fe7c8c24cc259 (patch)
tree1570fbbe57fbe8668226f6d31d0de12ea3885bc2
parent11dd35c28be9542d00ddb75660873525b2e22b43 (diff)
downloadspark-57d7487d3da19df04de52235812fe7c8c24cc259.tar.gz
spark-57d7487d3da19df04de52235812fe7c8c24cc259.tar.bz2
spark-57d7487d3da19df04de52235812fe7c8c24cc259.zip
Improve docs for VertexRDD
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala152
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala3
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala1
4 files changed, 62 insertions, 96 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 69f27601ce..0121cb1449 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -232,7 +232,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* {{{
* graph.filter(
* graph => {
- * val degrees: VertexSetRDD[Int] = graph.outDegrees
+ * val degrees: VertexRDD[Int] = graph.outDegrees
* graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
* },
* vpred = (vid: VertexID, deg:Int) => deg > 0
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 971e2615d4..3ef9d6e9cf 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -27,11 +27,10 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.impl.MsgRDDFunctions
import org.apache.spark.graphx.impl.VertexPartition
-
/**
- * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is
- * only one entry for each vertex and by pre-indexing the entries for fast,
- * efficient joins.
+ * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is only one entry for
+ * each vertex and by pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the
+ * same index can be joined efficiently.
*
* @tparam VD the vertex attribute associated with each vertex in the set.
*
@@ -46,7 +45,7 @@ import org.apache.spark.graphx.impl.VertexPartition
* val vset2 = VertexRDD(someData, reduceFunc)
* // Finally we can use the VertexRDD to index another dataset
* val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile)
- * val vset3 = VertexRDD(otherData, vset.index)
+ * val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
* // Now we can construct very fast joins between the two sets
* val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
* }}}
@@ -61,32 +60,18 @@ class VertexRDD[@specialized VD: ClassTag](
partitionsRDD.setName("VertexRDD")
/**
- * Construct a new VertexRDD that is indexed by only the keys in the RDD.
- * The resulting VertexRDD will be based on a different index and can
- * no longer be quickly joined with this RDD.
+ * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
+ * VertexRDD will be based on a different index and can no longer be quickly joined with this RDD.
*/
def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
- /**
- * The partitioner is defined by the index.
- */
override val partitioner = partitionsRDD.partitioner
- /**
- * The actual partitions are defined by the tuples.
- */
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
- /**
- * The preferred locations are computed based on the preferred
- * locations of the tuples.
- */
override protected def getPreferredLocations(s: Partition): Seq[String] =
partitionsRDD.preferredLocations(s)
- /**
- * Caching a VertexRDD causes the index and values to be cached separately.
- */
override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
partitionsRDD.persist(newLevel)
this
@@ -103,20 +88,20 @@ class VertexRDD[@specialized VD: ClassTag](
this
}
- /** Return the number of vertices in this set. */
+ /** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)
}
/**
- * Provide the `RDD[(VertexID, VD)]` equivalent output.
+ * Provides the `RDD[(VertexID, VD)]` equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = {
firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
}
/**
- * Return a new VertexRDD by applying a function to each VertexPartition of this RDD.
+ * Applies a function to each [[impl.VertexPartition]] of this RDD and returns a new VertexRDD.
*/
def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2])
: VertexRDD[VD2] = {
@@ -126,51 +111,43 @@ class VertexRDD[@specialized VD: ClassTag](
/**
- * Restrict the vertex set to the set of vertices satisfying the
- * given predicate.
- *
- * @param pred the user defined predicate, which takes a tuple to conform to
- * the RDD[(VertexID, VD)] interface
+ * Restricts the vertex set to the set of vertices satisfying the given predicate. This operation
+ * preserves the index for efficient joins with the original RDD, and it sets bits in the bitmask
+ * rather than allocating new memory.
*
- * @note The vertex set preserves the original index structure
- * which means that the returned RDD can be easily joined with
- * the original vertex-set. Furthermore, the filter only
- * modifies the bitmap index and so no new values are allocated.
+ * @param pred the user defined predicate, which takes a tuple to conform to the
+ * `RDD[(VertexID, VD)]` interface
*/
override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] =
this.mapVertexPartitions(_.filter(Function.untupled(pred)))
/**
- * Pass each vertex attribute through a map function and retain the
- * original RDD's partitioning and index.
+ * Maps each vertex attribute, preserving the index.
*
* @tparam VD2 the type returned by the map function
*
* @param f the function applied to each value in the RDD
- * @return a new VertexRDD with values obtained by applying `f` to
- * each of the entries in the original VertexRDD. The resulting
- * VertexRDD retains the same index.
+ * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
+ * original VertexRDD
*/
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
/**
- * Pass each vertex attribute through a map function and retain the
- * original RDD's partitioning and index.
+ * Maps each vertex attribute, additionally supplying the vertex ID.
*
* @tparam VD2 the type returned by the map function
*
- * @param f the function applied to each value in the RDD
- * @return a new VertexRDD with values obtained by applying `f` to
- * each of the entries in the original VertexRDD. The resulting
- * VertexRDD retains the same index.
+ * @param f the function applied to each ID-value pair in the RDD
+ * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
+ * original VertexRDD. The resulting VertexRDD retains the same index.
*/
def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
/**
- * Hides vertices that are the same between this and other. For vertices that are different, keeps
- * the values from `other`.
+ * Hides vertices that are the same between `this` and `other`. For vertices that are different,
+ * keeps the values from `other`.
*/
def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
@@ -184,22 +161,17 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
- * Left join this VertexSet with another VertexSet which has the
- * same Index. This function will fail if both VertexSets do not
- * share the same index. The resulting vertex set contains an entry
- * for each vertex in this set. If the other VertexSet is missing
- * any vertex in this VertexSet then a `None` attribute is generated
- *
- * @tparam VD2 the attribute type of the other VertexSet
- * @tparam VD3 the attribute type of the resulting VertexSet
+ * Left joins this RDD with another VertexRDD with the same index. This function will fail if both
+ * VertexRDDs do not share the same index. The resulting vertex set contains an entry for each
+ * vertex in `this`. If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
*
- * @param other the other VertexSet with which to join.
- * @param f the function mapping a vertex id and its attributes in
- * this and the other vertex set to a new vertex attribute.
- * @return a VertexRDD containing all the vertices in this
- * VertexSet with `None` attributes used for Vertices missing in the
- * other VertexSet.
+ * @tparam VD2 the attribute type of the other VertexRDD
+ * @tparam VD3 the attribute type of the resulting VertexRDD
*
+ * @param other the other VertexRDD with which to join.
+ * @param f the function mapping a vertex id and its attributes in this and the other vertex set
+ * to a new vertex attribute.
+ * @return a VertexRDD containing the results of `f`
*/
def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
@@ -214,29 +186,25 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
- * Left join this VertexRDD with an RDD containing vertex attribute
- * pairs. If the other RDD is backed by a VertexRDD with the same
- * index than the efficient leftZipJoin implementation is used. The
- * resulting vertex set contains an entry for each vertex in this
- * set. If the other VertexRDD is missing any vertex in this
- * VertexRDD then a `None` attribute is generated.
- *
- * If there are duplicates, the vertex is picked at random.
+ * Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
+ * backed by a VertexRDD with the same index then the efficient [[leftZipJoin]] implementation is
+ * used. The resulting vertex set contains an entry for each vertex in this set. If `other` is
+ * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates, the vertex
+ * is picked arbitrarily.
*
* @tparam VD2 the attribute type of the other VertexRDD
* @tparam VD3 the attribute type of the resulting VertexRDD
*
- * @param other the other VertexRDD with which to join.
- * @param f the function mapping a vertex id and its attributes in
- * this and the other vertex set to a new vertex attribute.
- * @return a VertexRDD containing all the vertices in this
- * VertexRDD with the attribute emitted by f.
+ * @param other the other VertexRDD with which to join
+ * @param f the function mapping a vertex id and its attributes in this and the other vertex set
+ * to a new vertex attribute.
+ * @return a VertexRDD containing all the vertices in this VertexRDD with the attributes emitted
+ * by `f`.
*/
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: RDD[(VertexID, VD2)])
(f: (VertexID, VD, Option[VD2]) => VD3)
- : VertexRDD[VD3] =
- {
+ : VertexRDD[VD3] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient leftZipJoin
other match {
@@ -255,8 +223,8 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
- * Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other`
- * must have the same index.
+ * Same effect as `leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }`, but `this` and
+ * `other` must have the same index.
*/
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
@@ -271,8 +239,9 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
- * Replace vertices with corresponding vertices in `other`, and drop vertices without a
- * corresponding vertex in `other`.
+ * Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
+ * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation is
+ * used.
*/
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
@@ -294,12 +263,11 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
- * Aggregate messages with the same ids using `reduceFunc`, returning a VertexRDD that is
- * co-indexed with this one.
+ * Aggregates vertices in `message` that have the same ids using `reduceFunc`, returning a
+ * VertexRDD co-indexed with `this`.
*/
def aggregateUsingIndex[VD2: ClassTag](
- messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
- {
+ messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
val vertexPartition: VertexPartition[VD] = thisIter.next()
@@ -312,12 +280,12 @@ class VertexRDD[@specialized VD: ClassTag](
/**
- * The VertexRDD singleton is used to construct VertexRDDs
+ * The VertexRDD singleton is used to construct VertexRDDs.
*/
object VertexRDD {
/**
- * Construct a vertex set from an RDD of vertex-attribute pairs.
+ * Construct a `VertexRDD` from an RDD of vertex-attribute pairs.
* Duplicate entries are removed arbitrarily.
*
* @tparam VD the vertex attribute type
@@ -336,16 +304,15 @@ object VertexRDD {
}
/**
- * Construct a vertex set from an RDD of vertex-attribute pairs.
- * Duplicate entries are merged using mergeFunc.
+ * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs, merging duplicates using
+ * `mergeFunc`.
*
* @tparam VD the vertex attribute type
*
* @param rdd the collection of vertex-attribute pairs
* @param mergeFunc the associative, commutative merge function.
*/
- def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
- {
+ def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
@@ -356,9 +323,12 @@ object VertexRDD {
new VertexRDD(vertexPartitions)
}
+ /**
+ * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
+ * `defaultVal` otherwise.
+ */
def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD)
- : VertexRDD[VD] =
- {
+ : VertexRDD[VD] = {
VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
value.getOrElse(default)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index d4f08497a2..ca64e9af66 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -6,10 +6,7 @@ import scala.util.Sorting
import org.apache.spark.graphx._
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
-
-//private[graph]
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) {
-
var edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 79fd962ffd..c5258360da 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -5,7 +5,6 @@ import scala.reflect.ClassTag
import org.apache.spark.graphx._
import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
-
/**
* The Iterator type returned when constructing edge triplets. This class technically could be
* an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to