diff options
4 files changed, 62 insertions, 96 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 69f27601ce..0121cb1449 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -232,7 +232,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * {{{ * graph.filter( * graph => { - * val degrees: VertexSetRDD[Int] = graph.outDegrees + * val degrees: VertexRDD[Int] = graph.outDegrees * graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} * }, * vpred = (vid: VertexID, deg:Int) => deg > 0 diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 971e2615d4..3ef9d6e9cf 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -27,11 +27,10 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx.impl.MsgRDDFunctions import org.apache.spark.graphx.impl.VertexPartition - /** - * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is - * only one entry for each vertex and by pre-indexing the entries for fast, - * efficient joins. + * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is only one entry for + * each vertex and by pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the + * same index can be joined efficiently. * * @tparam VD the vertex attribute associated with each vertex in the set. * @@ -46,7 +45,7 @@ import org.apache.spark.graphx.impl.VertexPartition * val vset2 = VertexRDD(someData, reduceFunc) * // Finally we can use the VertexRDD to index another dataset * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile) - * val vset3 = VertexRDD(otherData, vset.index) + * val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b } * // Now we can construct very fast joins between the two sets * val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3) * }}} @@ -61,32 +60,18 @@ class VertexRDD[@specialized VD: ClassTag]( partitionsRDD.setName("VertexRDD") /** - * Construct a new VertexRDD that is indexed by only the keys in the RDD. - * The resulting VertexRDD will be based on a different index and can - * no longer be quickly joined with this RDD. + * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting + * VertexRDD will be based on a different index and can no longer be quickly joined with this RDD. */ def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex())) - /** - * The partitioner is defined by the index. - */ override val partitioner = partitionsRDD.partitioner - /** - * The actual partitions are defined by the tuples. - */ override protected def getPartitions: Array[Partition] = partitionsRDD.partitions - /** - * The preferred locations are computed based on the preferred - * locations of the tuples. - */ override protected def getPreferredLocations(s: Partition): Seq[String] = partitionsRDD.preferredLocations(s) - /** - * Caching a VertexRDD causes the index and values to be cached separately. - */ override def persist(newLevel: StorageLevel): VertexRDD[VD] = { partitionsRDD.persist(newLevel) this @@ -103,20 +88,20 @@ class VertexRDD[@specialized VD: ClassTag]( this } - /** Return the number of vertices in this set. */ + /** The number of vertices in the RDD. */ override def count(): Long = { partitionsRDD.map(_.size).reduce(_ + _) } /** - * Provide the `RDD[(VertexID, VD)]` equivalent output. + * Provides the `RDD[(VertexID, VD)]` equivalent output. */ override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = { firstParent[VertexPartition[VD]].iterator(part, context).next.iterator } /** - * Return a new VertexRDD by applying a function to each VertexPartition of this RDD. + * Applies a function to each [[impl.VertexPartition]] of this RDD and returns a new VertexRDD. */ def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2]) : VertexRDD[VD2] = { @@ -126,51 +111,43 @@ class VertexRDD[@specialized VD: ClassTag]( /** - * Restrict the vertex set to the set of vertices satisfying the - * given predicate. - * - * @param pred the user defined predicate, which takes a tuple to conform to - * the RDD[(VertexID, VD)] interface + * Restricts the vertex set to the set of vertices satisfying the given predicate. This operation + * preserves the index for efficient joins with the original RDD, and it sets bits in the bitmask + * rather than allocating new memory. * - * @note The vertex set preserves the original index structure - * which means that the returned RDD can be easily joined with - * the original vertex-set. Furthermore, the filter only - * modifies the bitmap index and so no new values are allocated. + * @param pred the user defined predicate, which takes a tuple to conform to the + * `RDD[(VertexID, VD)]` interface */ override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] = this.mapVertexPartitions(_.filter(Function.untupled(pred))) /** - * Pass each vertex attribute through a map function and retain the - * original RDD's partitioning and index. + * Maps each vertex attribute, preserving the index. * * @tparam VD2 the type returned by the map function * * @param f the function applied to each value in the RDD - * @return a new VertexRDD with values obtained by applying `f` to - * each of the entries in the original VertexRDD. The resulting - * VertexRDD retains the same index. + * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the + * original VertexRDD */ def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map((vid, attr) => f(attr))) /** - * Pass each vertex attribute through a map function and retain the - * original RDD's partitioning and index. + * Maps each vertex attribute, additionally supplying the vertex ID. * * @tparam VD2 the type returned by the map function * - * @param f the function applied to each value in the RDD - * @return a new VertexRDD with values obtained by applying `f` to - * each of the entries in the original VertexRDD. The resulting - * VertexRDD retains the same index. + * @param f the function applied to each ID-value pair in the RDD + * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the + * original VertexRDD. The resulting VertexRDD retains the same index. */ def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map(f)) /** - * Hides vertices that are the same between this and other. For vertices that are different, keeps - * the values from `other`. + * Hides vertices that are the same between `this` and `other`. For vertices that are different, + * keeps the values from `other`. */ def diff(other: VertexRDD[VD]): VertexRDD[VD] = { val newPartitionsRDD = partitionsRDD.zipPartitions( @@ -184,22 +161,17 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Left join this VertexSet with another VertexSet which has the - * same Index. This function will fail if both VertexSets do not - * share the same index. The resulting vertex set contains an entry - * for each vertex in this set. If the other VertexSet is missing - * any vertex in this VertexSet then a `None` attribute is generated - * - * @tparam VD2 the attribute type of the other VertexSet - * @tparam VD3 the attribute type of the resulting VertexSet + * Left joins this RDD with another VertexRDD with the same index. This function will fail if both + * VertexRDDs do not share the same index. The resulting vertex set contains an entry for each + * vertex in `this`. If `other` is missing any vertex in this VertexRDD, `f` is passed `None`. * - * @param other the other VertexSet with which to join. - * @param f the function mapping a vertex id and its attributes in - * this and the other vertex set to a new vertex attribute. - * @return a VertexRDD containing all the vertices in this - * VertexSet with `None` attributes used for Vertices missing in the - * other VertexSet. + * @tparam VD2 the attribute type of the other VertexRDD + * @tparam VD3 the attribute type of the resulting VertexRDD * + * @param other the other VertexRDD with which to join. + * @param f the function mapping a vertex id and its attributes in this and the other vertex set + * to a new vertex attribute. + * @return a VertexRDD containing the results of `f` */ def leftZipJoin[VD2: ClassTag, VD3: ClassTag] (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { @@ -214,29 +186,25 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Left join this VertexRDD with an RDD containing vertex attribute - * pairs. If the other RDD is backed by a VertexRDD with the same - * index than the efficient leftZipJoin implementation is used. The - * resulting vertex set contains an entry for each vertex in this - * set. If the other VertexRDD is missing any vertex in this - * VertexRDD then a `None` attribute is generated. - * - * If there are duplicates, the vertex is picked at random. + * Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is + * backed by a VertexRDD with the same index then the efficient [[leftZipJoin]] implementation is + * used. The resulting vertex set contains an entry for each vertex in this set. If `other` is + * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates, the vertex + * is picked arbitrarily. * * @tparam VD2 the attribute type of the other VertexRDD * @tparam VD3 the attribute type of the resulting VertexRDD * - * @param other the other VertexRDD with which to join. - * @param f the function mapping a vertex id and its attributes in - * this and the other vertex set to a new vertex attribute. - * @return a VertexRDD containing all the vertices in this - * VertexRDD with the attribute emitted by f. + * @param other the other VertexRDD with which to join + * @param f the function mapping a vertex id and its attributes in this and the other vertex set + * to a new vertex attribute. + * @return a VertexRDD containing all the vertices in this VertexRDD with the attributes emitted + * by `f`. */ def leftJoin[VD2: ClassTag, VD3: ClassTag] (other: RDD[(VertexID, VD2)]) (f: (VertexID, VD, Option[VD2]) => VD3) - : VertexRDD[VD3] = - { + : VertexRDD[VD3] = { // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient leftZipJoin other match { @@ -255,8 +223,8 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other` - * must have the same index. + * Same effect as `leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }`, but `this` and + * `other` must have the same index. */ def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { @@ -271,8 +239,9 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Replace vertices with corresponding vertices in `other`, and drop vertices without a - * corresponding vertex in `other`. + * Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is + * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation is + * used. */ def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { @@ -294,12 +263,11 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Aggregate messages with the same ids using `reduceFunc`, returning a VertexRDD that is - * co-indexed with this one. + * Aggregates vertices in `message` that have the same ids using `reduceFunc`, returning a + * VertexRDD co-indexed with `this`. */ def aggregateUsingIndex[VD2: ClassTag]( - messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = - { + messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => val vertexPartition: VertexPartition[VD] = thisIter.next() @@ -312,12 +280,12 @@ class VertexRDD[@specialized VD: ClassTag]( /** - * The VertexRDD singleton is used to construct VertexRDDs + * The VertexRDD singleton is used to construct VertexRDDs. */ object VertexRDD { /** - * Construct a vertex set from an RDD of vertex-attribute pairs. + * Construct a `VertexRDD` from an RDD of vertex-attribute pairs. * Duplicate entries are removed arbitrarily. * * @tparam VD the vertex attribute type @@ -336,16 +304,15 @@ object VertexRDD { } /** - * Construct a vertex set from an RDD of vertex-attribute pairs. - * Duplicate entries are merged using mergeFunc. + * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs, merging duplicates using + * `mergeFunc`. * * @tparam VD the vertex attribute type * * @param rdd the collection of vertex-attribute pairs * @param mergeFunc the associative, commutative merge function. */ - def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = - { + def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = { val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { case Some(p) => rdd case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) @@ -356,9 +323,12 @@ object VertexRDD { new VertexRDD(vertexPartitions) } + /** + * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using + * `defaultVal` otherwise. + */ def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD) - : VertexRDD[VD] = - { + : VertexRDD[VD] = { VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) => value.getOrElse(default) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index d4f08497a2..ca64e9af66 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -6,10 +6,7 @@ import scala.util.Sorting import org.apache.spark.graphx._ import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} - -//private[graph] class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) { - var edges = new PrimitiveVector[Edge[ED]](size) /** Add a new edge to the partition. */ diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala index 79fd962ffd..c5258360da 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala @@ -5,7 +5,6 @@ import scala.reflect.ClassTag import org.apache.spark.graphx._ import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap - /** * The Iterator type returned when constructing edge triplets. This class technically could be * an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to |