aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-11-12 13:49:20 -0800
committerReynold Xin <rxin@databricks.com>2014-11-12 13:50:01 -0800
commit127c19b449315bdeba758e48371291c61abf0952 (patch)
treea97c0842f7050458251c79d3549b90d6c15f8298 /graphx
parentdbac77ebda28e1aa7b74831601fabd615b539358 (diff)
downloadspark-127c19b449315bdeba758e48371291c61abf0952.tar.gz
spark-127c19b449315bdeba758e48371291c61abf0952.tar.bz2
spark-127c19b449315bdeba758e48371291c61abf0952.zip
[SPARK-3666] Extract interfaces for EdgeRDD and VertexRDD
This discourages users from calling the VertexRDD and EdgeRDD constructor and makes it easier for future changes to ensure backward compatibility. Author: Ankur Dave <ankurdave@gmail.com> Closes #2530 from ankurdave/SPARK-3666 and squashes the following commits: d681f45 [Ankur Dave] Define getPartitions and compute in abstract class for MIMA 1472390 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into SPARK-3666 24201d4 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into SPARK-3666 cbe15f2 [Ankur Dave] Remove specialized annotation from VertexRDD and EdgeRDD 931b587 [Ankur Dave] Use abstract class instead of trait for binary compatibility 9ba4ec4 [Ankur Dave] Mark (Vertex|Edge)RDDImpl constructors package-private 620e603 [Ankur Dave] Extract VertexRDD interface and move implementation to VertexRDDImpl 55b6398 [Ankur Dave] Extract EdgeRDD interface and move implementation to EdgeRDDImpl (cherry picked from commit a5ef58113667ff73562ce6db381cff96a0b354b0) Signed-off-by: Reynold Xin <rxin@databricks.com>
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala111
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala190
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala124
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala205
4 files changed, 386 insertions, 244 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 5267560b3e..869ef15893 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -17,14 +17,18 @@
package org.apache.spark.graphx
-import scala.reflect.{classTag, ClassTag}
+import scala.reflect.ClassTag
-import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
+import org.apache.spark.Dependency
+import org.apache.spark.Partition
+import org.apache.spark.SparkContext
+import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.graphx.impl.EdgePartitionBuilder
+import org.apache.spark.graphx.impl.EdgeRDDImpl
/**
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
@@ -32,30 +36,13 @@ import org.apache.spark.graphx.impl.EdgePartitionBuilder
* edge to provide the triplet view. Shipping of the vertex attributes is managed by
* `impl.ReplicatedVertexView`.
*/
-class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
- val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
- val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
- extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
-
- override def setName(_name: String): this.type = {
- if (partitionsRDD.name != null) {
- partitionsRDD.setName(partitionsRDD.name + ", " + _name)
- } else {
- partitionsRDD.setName(_name)
- }
- this
- }
- setName("EdgeRDD")
+abstract class EdgeRDD[ED, VD](
+ @transient sc: SparkContext,
+ @transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
- override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
+ private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]
- /**
- * If `partitionsRDD` already has a partitioner, use it. Otherwise assume that the
- * [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions and create a new
- * partitioner that allows co-partitioning with `partitionsRDD`.
- */
- override val partitioner =
- partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+ override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
@@ -66,45 +53,6 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
}
}
- override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
-
- /**
- * Persists the edge partitions at the specified storage level, ignoring any existing target
- * storage level.
- */
- override def persist(newLevel: StorageLevel): this.type = {
- partitionsRDD.persist(newLevel)
- this
- }
-
- override def unpersist(blocking: Boolean = true): this.type = {
- partitionsRDD.unpersist(blocking)
- this
- }
-
- /** Persists the edge partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
- override def cache(): this.type = {
- partitionsRDD.persist(targetStorageLevel)
- this
- }
-
- /** The number of edges in the RDD. */
- override def count(): Long = {
- partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
- }
-
- private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
- f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
- this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
- if (iter.hasNext) {
- val (pid, ep) = iter.next()
- Iterator(Tuple2(pid, f(pid, ep)))
- } else {
- Iterator.empty
- }
- }, preservesPartitioning = true))
- }
-
/**
* Map the values in an edge partitioning preserving the structure but changing the values.
*
@@ -112,22 +60,19 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
- def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
- mapEdgePartitions((pid, part) => part.map(f))
+ def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
/**
* Reverse all the edges in this RDD.
*
* @return a new EdgeRDD containing all the edges reversed
*/
- def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
+ def reverse: EdgeRDD[ED, VD]
/** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
- vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
- mapEdgePartitions((pid, part) => part.filter(epred, vpred))
- }
+ vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD]
/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
@@ -140,22 +85,14 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2, _])
- (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
- val ed2Tag = classTag[ED2]
- val ed3Tag = classTag[ED3]
- this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
- (thisIter, otherIter) =>
- val (pid, thisEPart) = thisIter.next()
- val (_, otherEPart) = otherIter.next()
- Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
- })
- }
+ (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
+
+ private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
+ f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2]
- /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
+ /** Replaces the edge partitions while preserving all other properties of the EdgeRDD. */
private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
- partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
- new EdgeRDD(partitionsRDD, this.targetStorageLevel)
- }
+ partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2]
/**
* Changes the target storage level while preserving all other properties of the
@@ -164,11 +101,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
*/
- private[graphx] def withTargetStorageLevel(
- targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
- new EdgeRDD(this.partitionsRDD, targetStorageLevel)
- }
-
+ private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED, VD]
}
object EdgeRDD {
@@ -197,6 +130,6 @@ object EdgeRDD {
*/
def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
- new EdgeRDD(edgePartitions)
+ new EdgeRDDImpl(edgePartitions)
}
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 12216d9d33..f8be17669d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -27,6 +27,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.impl.RoutingTablePartition
import org.apache.spark.graphx.impl.ShippableVertexPartition
import org.apache.spark.graphx.impl.VertexAttributeBlock
+import org.apache.spark.graphx.impl.VertexRDDImpl
/**
* Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
@@ -53,62 +54,16 @@ import org.apache.spark.graphx.impl.VertexAttributeBlock
*
* @tparam VD the vertex attribute associated with each vertex in the set.
*/
-class VertexRDD[@specialized VD: ClassTag](
- val partitionsRDD: RDD[ShippableVertexPartition[VD]],
- val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
- extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+abstract class VertexRDD[VD](
+ @transient sc: SparkContext,
+ @transient deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) {
- require(partitionsRDD.partitioner.isDefined)
+ implicit protected def vdTag: ClassTag[VD]
- /**
- * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
- * VertexRDD will be based on a different index and can no longer be quickly joined with this
- * RDD.
- */
- def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex()))
-
- override val partitioner = partitionsRDD.partitioner
+ private[graphx] def partitionsRDD: RDD[ShippableVertexPartition[VD]]
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
- override protected def getPreferredLocations(s: Partition): Seq[String] =
- partitionsRDD.preferredLocations(s)
-
- override def setName(_name: String): this.type = {
- if (partitionsRDD.name != null) {
- partitionsRDD.setName(partitionsRDD.name + ", " + _name)
- } else {
- partitionsRDD.setName(_name)
- }
- this
- }
- setName("VertexRDD")
-
- /**
- * Persists the vertex partitions at the specified storage level, ignoring any existing target
- * storage level.
- */
- override def persist(newLevel: StorageLevel): this.type = {
- partitionsRDD.persist(newLevel)
- this
- }
-
- override def unpersist(blocking: Boolean = true): this.type = {
- partitionsRDD.unpersist(blocking)
- this
- }
-
- /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */
- override def cache(): this.type = {
- partitionsRDD.persist(targetStorageLevel)
- this
- }
-
- /** The number of vertices in the RDD. */
- override def count(): Long = {
- partitionsRDD.map(_.size.toLong).reduce(_ + _)
- }
-
/**
* Provides the `RDD[(VertexId, VD)]` equivalent output.
*/
@@ -117,21 +72,27 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
+ * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
+ * VertexRDD will be based on a different index and can no longer be quickly joined with this
+ * RDD.
+ */
+ def reindex(): VertexRDD[VD]
+
+ /**
* Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD.
*/
private[graphx] def mapVertexPartitions[VD2: ClassTag](
f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
- : VertexRDD[VD2] = {
- val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
- this.withPartitionsRDD(newPartitionsRDD)
- }
-
+ : VertexRDD[VD2]
/**
* Restricts the vertex set to the set of vertices satisfying the given predicate. This operation
* preserves the index for efficient joins with the original RDD, and it sets bits in the bitmask
* rather than allocating new memory.
*
+ * It is declared and defined here to allow refining the return type from `RDD[(VertexId, VD)]` to
+ * `VertexRDD[VD]`.
+ *
* @param pred the user defined predicate, which takes a tuple to conform to the
* `RDD[(VertexId, VD)]` interface
*/
@@ -147,8 +108,7 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
* original VertexRDD
*/
- def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] =
- this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
+ def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
/**
* Maps each vertex attribute, additionally supplying the vertex ID.
@@ -159,23 +119,13 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
* original VertexRDD. The resulting VertexRDD retains the same index.
*/
- def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
- this.mapVertexPartitions(_.map(f))
+ def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
/**
* Hides vertices that are the same between `this` and `other`; for vertices that are different,
* keeps the values from `other`.
*/
- def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
- val newPartitionsRDD = partitionsRDD.zipPartitions(
- other.partitionsRDD, preservesPartitioning = true
- ) { (thisIter, otherIter) =>
- val thisPart = thisIter.next()
- val otherPart = otherIter.next()
- Iterator(thisPart.diff(otherPart))
- }
- this.withPartitionsRDD(newPartitionsRDD)
- }
+ def diff(other: VertexRDD[VD]): VertexRDD[VD]
/**
* Left joins this RDD with another VertexRDD with the same index. This function will fail if
@@ -192,16 +142,7 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a VertexRDD containing the results of `f`
*/
def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
- (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
- val newPartitionsRDD = partitionsRDD.zipPartitions(
- other.partitionsRDD, preservesPartitioning = true
- ) { (thisIter, otherIter) =>
- val thisPart = thisIter.next()
- val otherPart = otherIter.next()
- Iterator(thisPart.leftJoin(otherPart)(f))
- }
- this.withPartitionsRDD(newPartitionsRDD)
- }
+ (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
/**
* Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
@@ -222,37 +163,14 @@ class VertexRDD[@specialized VD: ClassTag](
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: RDD[(VertexId, VD2)])
(f: (VertexId, VD, Option[VD2]) => VD3)
- : VertexRDD[VD3] = {
- // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
- // If the other set is a VertexRDD then we use the much more efficient leftZipJoin
- other match {
- case other: VertexRDD[_] =>
- leftZipJoin(other)(f)
- case _ =>
- this.withPartitionsRDD[VD3](
- partitionsRDD.zipPartitions(
- other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
- (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
- }
- )
- }
- }
+ : VertexRDD[VD3]
/**
* Efficiently inner joins this VertexRDD with another VertexRDD sharing the same index. See
* [[innerJoin]] for the behavior of the join.
*/
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
- (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
- val newPartitionsRDD = partitionsRDD.zipPartitions(
- other.partitionsRDD, preservesPartitioning = true
- ) { (thisIter, otherIter) =>
- val thisPart = thisIter.next()
- val otherPart = otherIter.next()
- Iterator(thisPart.innerJoin(otherPart)(f))
- }
- this.withPartitionsRDD(newPartitionsRDD)
- }
+ (f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
/**
* Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
@@ -266,21 +184,7 @@ class VertexRDD[@specialized VD: ClassTag](
* `this` and `other`, with values supplied by `f`
*/
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
- (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
- // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
- // If the other set is a VertexRDD then we use the much more efficient innerZipJoin
- other match {
- case other: VertexRDD[_] =>
- innerZipJoin(other)(f)
- case _ =>
- this.withPartitionsRDD(
- partitionsRDD.zipPartitions(
- other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
- (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
- }
- )
- }
- }
+ (f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
/**
* Aggregates vertices in `messages` that have the same ids using `reduceFunc`, returning a
@@ -294,38 +198,20 @@ class VertexRDD[@specialized VD: ClassTag](
* messages.
*/
def aggregateUsingIndex[VD2: ClassTag](
- messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
- val shuffled = messages.partitionBy(this.partitioner.get)
- val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
- thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
- }
- this.withPartitionsRDD[VD2](parts)
- }
+ messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
/**
* Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding
* [[EdgeRDD]].
*/
- def reverseRoutingTables(): VertexRDD[VD] =
- this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
+ def reverseRoutingTables(): VertexRDD[VD]
/** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
- def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
- val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get)
- val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
- (partIter, routingTableIter) =>
- val routingTable =
- if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
- partIter.map(_.withRoutingTable(routingTable))
- }
- this.withPartitionsRDD(vertexPartitions)
- }
+ def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD]
/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
private[graphx] def withPartitionsRDD[VD2: ClassTag](
- partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
- new VertexRDD(partitionsRDD, this.targetStorageLevel)
- }
+ partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2]
/**
* Changes the target storage level while preserving all other properties of the
@@ -335,20 +221,14 @@ class VertexRDD[@specialized VD: ClassTag](
* [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD.
*/
private[graphx] def withTargetStorageLevel(
- targetStorageLevel: StorageLevel): VertexRDD[VD] = {
- new VertexRDD(this.partitionsRDD, targetStorageLevel)
- }
+ targetStorageLevel: StorageLevel): VertexRDD[VD]
/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
private[graphx] def shipVertexAttributes(
- shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = {
- partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst)))
- }
+ shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])]
/** Generates an RDD of vertex IDs suitable for shipping to the edge partitions. */
- private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] = {
- partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds()))
- }
+ private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])]
} // end of VertexRDD
@@ -374,7 +254,7 @@ object VertexRDD {
val vertexPartitions = vPartitioned.mapPartitions(
iter => Iterator(ShippableVertexPartition(iter)),
preservesPartitioning = true)
- new VertexRDD(vertexPartitions)
+ new VertexRDDImpl(vertexPartitions)
}
/**
@@ -419,7 +299,7 @@ object VertexRDD {
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal, mergeFunc))
}
- new VertexRDD(vertexPartitions)
+ new VertexRDDImpl(vertexPartitions)
}
/**
@@ -441,10 +321,10 @@ object VertexRDD {
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
}, preservesPartitioning = true)
- new VertexRDD(vertexPartitions)
+ new VertexRDDImpl(vertexPartitions)
}
- private def createRoutingTables(
+ private[graphx] def createRoutingTables(
edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
new file mode 100644
index 0000000000..4100a85d17
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.spark.graphx._
+
+class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
+ override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
+ val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
+ extends EdgeRDD[ED, VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+
+ override def setName(_name: String): this.type = {
+ if (partitionsRDD.name != null) {
+ partitionsRDD.setName(partitionsRDD.name + ", " + _name)
+ } else {
+ partitionsRDD.setName(_name)
+ }
+ this
+ }
+ setName("EdgeRDD")
+
+ /**
+ * If `partitionsRDD` already has a partitioner, use it. Otherwise assume that the
+ * [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions and create a new
+ * partitioner that allows co-partitioning with `partitionsRDD`.
+ */
+ override val partitioner =
+ partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+
+ override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
+
+ /**
+ * Persists the edge partitions at the specified storage level, ignoring any existing target
+ * storage level.
+ */
+ override def persist(newLevel: StorageLevel): this.type = {
+ partitionsRDD.persist(newLevel)
+ this
+ }
+
+ override def unpersist(blocking: Boolean = true): this.type = {
+ partitionsRDD.unpersist(blocking)
+ this
+ }
+
+ /** Persists the edge partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
+ override def cache(): this.type = {
+ partitionsRDD.persist(targetStorageLevel)
+ this
+ }
+
+ /** The number of edges in the RDD. */
+ override def count(): Long = {
+ partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
+ }
+
+ override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
+ mapEdgePartitions((pid, part) => part.map(f))
+
+ override def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
+
+ override def filter(
+ epred: EdgeTriplet[VD, ED] => Boolean,
+ vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
+ mapEdgePartitions((pid, part) => part.filter(epred, vpred))
+ }
+
+ override def innerJoin[ED2: ClassTag, ED3: ClassTag]
+ (other: EdgeRDD[ED2, _])
+ (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
+ val ed2Tag = classTag[ED2]
+ val ed3Tag = classTag[ED3]
+ this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
+ (thisIter, otherIter) =>
+ val (pid, thisEPart) = thisIter.next()
+ val (_, otherEPart) = otherIter.next()
+ Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
+ })
+ }
+
+ override private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
+ f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
+ this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
+ if (iter.hasNext) {
+ val (pid, ep) = iter.next()
+ Iterator(Tuple2(pid, f(pid, ep)))
+ } else {
+ Iterator.empty
+ }
+ }, preservesPartitioning = true))
+ }
+
+ override private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
+ partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
+ new EdgeRDDImpl(partitionsRDD, this.targetStorageLevel)
+ }
+
+ override private[graphx] def withTargetStorageLevel(
+ targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
+ new EdgeRDDImpl(this.partitionsRDD, targetStorageLevel)
+ }
+
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
new file mode 100644
index 0000000000..08405629bc
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.spark.graphx._
+
+class VertexRDDImpl[VD] private[graphx] (
+ val partitionsRDD: RDD[ShippableVertexPartition[VD]],
+ val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
+ (implicit override protected val vdTag: ClassTag[VD])
+ extends VertexRDD[VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+
+ require(partitionsRDD.partitioner.isDefined)
+
+ override def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex()))
+
+ override val partitioner = partitionsRDD.partitioner
+
+ override protected def getPreferredLocations(s: Partition): Seq[String] =
+ partitionsRDD.preferredLocations(s)
+
+ override def setName(_name: String): this.type = {
+ if (partitionsRDD.name != null) {
+ partitionsRDD.setName(partitionsRDD.name + ", " + _name)
+ } else {
+ partitionsRDD.setName(_name)
+ }
+ this
+ }
+ setName("VertexRDD")
+
+ /**
+ * Persists the vertex partitions at the specified storage level, ignoring any existing target
+ * storage level.
+ */
+ override def persist(newLevel: StorageLevel): this.type = {
+ partitionsRDD.persist(newLevel)
+ this
+ }
+
+ override def unpersist(blocking: Boolean = true): this.type = {
+ partitionsRDD.unpersist(blocking)
+ this
+ }
+
+ /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */
+ override def cache(): this.type = {
+ partitionsRDD.persist(targetStorageLevel)
+ this
+ }
+
+ /** The number of vertices in the RDD. */
+ override def count(): Long = {
+ partitionsRDD.map(_.size).reduce(_ + _)
+ }
+
+ override private[graphx] def mapVertexPartitions[VD2: ClassTag](
+ f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
+ : VertexRDD[VD2] = {
+ val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
+ this.withPartitionsRDD(newPartitionsRDD)
+ }
+
+ override def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] =
+ this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
+
+ override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
+ this.mapVertexPartitions(_.map(f))
+
+ override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
+ val newPartitionsRDD = partitionsRDD.zipPartitions(
+ other.partitionsRDD, preservesPartitioning = true
+ ) { (thisIter, otherIter) =>
+ val thisPart = thisIter.next()
+ val otherPart = otherIter.next()
+ Iterator(thisPart.diff(otherPart))
+ }
+ this.withPartitionsRDD(newPartitionsRDD)
+ }
+
+ override def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
+ (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
+ val newPartitionsRDD = partitionsRDD.zipPartitions(
+ other.partitionsRDD, preservesPartitioning = true
+ ) { (thisIter, otherIter) =>
+ val thisPart = thisIter.next()
+ val otherPart = otherIter.next()
+ Iterator(thisPart.leftJoin(otherPart)(f))
+ }
+ this.withPartitionsRDD(newPartitionsRDD)
+ }
+
+ override def leftJoin[VD2: ClassTag, VD3: ClassTag]
+ (other: RDD[(VertexId, VD2)])
+ (f: (VertexId, VD, Option[VD2]) => VD3)
+ : VertexRDD[VD3] = {
+ // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
+ // If the other set is a VertexRDD then we use the much more efficient leftZipJoin
+ other match {
+ case other: VertexRDD[_] =>
+ leftZipJoin(other)(f)
+ case _ =>
+ this.withPartitionsRDD[VD3](
+ partitionsRDD.zipPartitions(
+ other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
+ (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
+ }
+ )
+ }
+ }
+
+ override def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
+ (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
+ val newPartitionsRDD = partitionsRDD.zipPartitions(
+ other.partitionsRDD, preservesPartitioning = true
+ ) { (thisIter, otherIter) =>
+ val thisPart = thisIter.next()
+ val otherPart = otherIter.next()
+ Iterator(thisPart.innerJoin(otherPart)(f))
+ }
+ this.withPartitionsRDD(newPartitionsRDD)
+ }
+
+ override def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
+ (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
+ // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
+ // If the other set is a VertexRDD then we use the much more efficient innerZipJoin
+ other match {
+ case other: VertexRDD[_] =>
+ innerZipJoin(other)(f)
+ case _ =>
+ this.withPartitionsRDD(
+ partitionsRDD.zipPartitions(
+ other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
+ (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
+ }
+ )
+ }
+ }
+
+ override def aggregateUsingIndex[VD2: ClassTag](
+ messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
+ val shuffled = messages.partitionBy(this.partitioner.get)
+ val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
+ thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
+ }
+ this.withPartitionsRDD[VD2](parts)
+ }
+
+ override def reverseRoutingTables(): VertexRDD[VD] =
+ this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
+
+ override def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
+ val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get)
+ val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
+ (partIter, routingTableIter) =>
+ val routingTable =
+ if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+ partIter.map(_.withRoutingTable(routingTable))
+ }
+ this.withPartitionsRDD(vertexPartitions)
+ }
+
+ override private[graphx] def withPartitionsRDD[VD2: ClassTag](
+ partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
+ new VertexRDDImpl(partitionsRDD, this.targetStorageLevel)
+ }
+
+ override private[graphx] def withTargetStorageLevel(
+ targetStorageLevel: StorageLevel): VertexRDD[VD] = {
+ new VertexRDDImpl(this.partitionsRDD, targetStorageLevel)
+ }
+
+ override private[graphx] def shipVertexAttributes(
+ shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = {
+ partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst)))
+ }
+
+ override private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] = {
+ partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds()))
+ }
+
+}