aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-11-17 11:06:31 -0800
committerReynold Xin <rxin@databricks.com>2014-11-17 11:06:31 -0800
commit9ac2bb18ede2e9f73c255fa33445af89aaf8a000 (patch)
treef05ef7e6702a227fdcd37290c17afd0d42fc8f7a /graphx
parente7690ed20a2734b7ca88e78a60a8e75ba19e9d8b (diff)
downloadspark-9ac2bb18ede2e9f73c255fa33445af89aaf8a000.tar.gz
spark-9ac2bb18ede2e9f73c255fa33445af89aaf8a000.tar.bz2
spark-9ac2bb18ede2e9f73c255fa33445af89aaf8a000.zip
[SPARK-4444] Drop VD type parameter from EdgeRDD
Due to vertex attribute caching, EdgeRDD previously took two type parameters: ED and VD. However, this is an implementation detail that should not be exposed in the interface, so this PR drops the VD type parameter. This requires removing the `filter` method from the EdgeRDD interface, because it depends on vertex attribute caching. Author: Ankur Dave <ankurdave@gmail.com> Closes #3303 from ankurdave/edgerdd-drop-tparam and squashes the following commits: 38dca9b [Ankur Dave] Leave EdgeRDD.fromEdges public fafeb51 [Ankur Dave] Drop VD type parameter from EdgeRDD
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala35
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala24
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala13
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala2
7 files changed, 40 insertions, 50 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 869ef15893..cc70b396a8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -17,6 +17,7 @@
package org.apache.spark.graphx
+import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark.Dependency
@@ -36,16 +37,16 @@ import org.apache.spark.graphx.impl.EdgeRDDImpl
* edge to provide the triplet view. Shipping of the vertex attributes is managed by
* `impl.ReplicatedVertexView`.
*/
-abstract class EdgeRDD[ED, VD](
+abstract class EdgeRDD[ED](
@transient sc: SparkContext,
@transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
- private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]
+ private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] forSome { type VD }
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
- val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
+ val p = firstParent[(PartitionID, EdgePartition[ED, _])].iterator(part, context)
if (p.hasNext) {
p.next._2.iterator.map(_.copy())
} else {
@@ -60,19 +61,14 @@ abstract class EdgeRDD[ED, VD](
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
- def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
+ def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
/**
* Reverse all the edges in this RDD.
*
* @return a new EdgeRDD containing all the edges reversed
*/
- def reverse: EdgeRDD[ED, VD]
-
- /** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
- def filter(
- epred: EdgeTriplet[VD, ED] => Boolean,
- vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD]
+ def reverse: EdgeRDD[ED]
/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
@@ -84,15 +80,8 @@ abstract class EdgeRDD[ED, VD](
* with values supplied by `f`
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
- (other: EdgeRDD[ED2, _])
- (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
-
- private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
- f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2]
-
- /** Replaces the edge partitions while preserving all other properties of the EdgeRDD. */
- private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
- partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2]
+ (other: EdgeRDD[ED2])
+ (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
/**
* Changes the target storage level while preserving all other properties of the
@@ -101,7 +90,7 @@ abstract class EdgeRDD[ED, VD](
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
*/
- private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED, VD]
+ private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED]
}
object EdgeRDD {
@@ -111,7 +100,7 @@ object EdgeRDD {
* @tparam ED the edge attribute type
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
*/
- def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
+ def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = {
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]
iter.foreach { e =>
@@ -128,8 +117,8 @@ object EdgeRDD {
* @tparam ED the edge attribute type
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
*/
- def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
- edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
+ private[graphx] def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
+ edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDDImpl[ED, VD] = {
new EdgeRDDImpl(edgePartitions)
}
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 2c1b9518a3..6377915435 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* along with their vertex data.
*
*/
- @transient val edges: EdgeRDD[ED, VD]
+ @transient val edges: EdgeRDD[ED]
/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index f8be17669d..1db3df03c8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -207,7 +207,7 @@ abstract class VertexRDD[VD](
def reverseRoutingTables(): VertexRDD[VD]
/** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
- def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD]
+ def withEdges(edges: EdgeRDD[_]): VertexRDD[VD]
/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
private[graphx] def withPartitionsRDD[VD2: ClassTag](
@@ -269,7 +269,7 @@ object VertexRDD {
* @param defaultVal the vertex attribute to use when creating missing vertices
*/
def apply[VD: ClassTag](
- vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
+ vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_], defaultVal: VD): VertexRDD[VD] = {
VertexRDD(vertices, edges, defaultVal, (a, b) => a)
}
@@ -286,7 +286,7 @@ object VertexRDD {
* @param mergeFunc the commutative, associative duplicate vertex attribute merge function
*/
def apply[VD: ClassTag](
- vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
+ vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_], defaultVal: VD, mergeFunc: (VD, VD) => VD
): VertexRDD[VD] = {
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
case Some(p) => vertices
@@ -314,7 +314,7 @@ object VertexRDD {
* @param defaultVal the vertex attribute to use when creating missing vertices
*/
def fromEdges[VD: ClassTag](
- edges: EdgeRDD[_, _], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
+ edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
val routingTable =
@@ -325,7 +325,7 @@ object VertexRDD {
}
private[graphx] def createRoutingTables(
- edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
+ edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 4100a85d17..a8169613b4 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -28,7 +28,7 @@ import org.apache.spark.graphx._
class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
- extends EdgeRDD[ED, VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+ extends EdgeRDD[ED](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
override def setName(_name: String): this.type = {
if (partitionsRDD.name != null) {
@@ -75,20 +75,20 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
}
- override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
+ override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] =
mapEdgePartitions((pid, part) => part.map(f))
- override def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
+ override def reverse: EdgeRDDImpl[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
- override def filter(
+ def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
- vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
+ vpred: (VertexId, VD) => Boolean): EdgeRDDImpl[ED, VD] = {
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
}
override def innerJoin[ED2: ClassTag, ED3: ClassTag]
- (other: EdgeRDD[ED2, _])
- (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
+ (other: EdgeRDD[ED2])
+ (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDDImpl[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
@@ -99,8 +99,8 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
})
}
- override private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
- f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
+ def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
+ f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDDImpl[ED2, VD2] = {
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
@@ -111,13 +111,13 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
}, preservesPartitioning = true))
}
- override private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
- partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
+ private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
+ partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDDImpl[ED2, VD2] = {
new EdgeRDDImpl(partitionsRDD, this.targetStorageLevel)
}
override private[graphx] def withTargetStorageLevel(
- targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
+ targetStorageLevel: StorageLevel): EdgeRDDImpl[ED, VD] = {
new EdgeRDDImpl(this.partitionsRDD, targetStorageLevel)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 2b4636a6c6..0eae2a6738 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -43,7 +43,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
/** Default constructor is provided to support serialization */
protected def this() = this(null, null)
- @transient override val edges: EdgeRDD[ED, VD] = replicatedVertexView.edges
+ @transient override val edges: EdgeRDDImpl[ED, VD] = replicatedVertexView.edges
/** Return a RDD that brings edges together with their source and destination vertices. */
@transient override lazy val triplets: RDD[EdgeTriplet[VD, ED]] = {
@@ -323,9 +323,10 @@ object GraphImpl {
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
- edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
+ edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
// Convert the vertex partitions in edges to the correct type
- val newEdges = edges.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
+ val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
+ .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
GraphImpl.fromExistingRDDs(vertices, newEdges)
}
@@ -336,8 +337,8 @@ object GraphImpl {
*/
def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
- edges: EdgeRDD[ED, VD]): GraphImpl[VD, ED] = {
- new GraphImpl(vertices, new ReplicatedVertexView(edges))
+ edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
+ new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]]))
}
/**
@@ -345,7 +346,7 @@ object GraphImpl {
* `defaultVertexAttr`. The vertices will have the same number of partitions as the EdgeRDD.
*/
private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
- edges: EdgeRDD[ED, VD],
+ edges: EdgeRDDImpl[ED, VD],
defaultVertexAttr: VD,
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 86b366eb92..8ab255bd40 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -33,7 +33,7 @@ import org.apache.spark.graphx._
*/
private[impl]
class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
- var edges: EdgeRDD[ED, VD],
+ var edges: EdgeRDDImpl[ED, VD],
var hasSrcId: Boolean = false,
var hasDstId: Boolean = false) {
@@ -42,7 +42,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
* shipping level.
*/
def withEdges[VD2: ClassTag, ED2: ClassTag](
- edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
+ edges_ : EdgeRDDImpl[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
new ReplicatedVertexView(edges_, hasSrcId, hasDstId)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 08405629bc..d92a55a189 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -172,7 +172,7 @@ class VertexRDDImpl[VD] private[graphx] (
override def reverseRoutingTables(): VertexRDD[VD] =
this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
- override def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
+ override def withEdges(edges: EdgeRDD[_]): VertexRDD[VD] = {
val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get)
val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
(partIter, routingTableIter) =>