From 9ac2bb18ede2e9f73c255fa33445af89aaf8a000 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 17 Nov 2014 11:06:31 -0800 Subject: [SPARK-4444] Drop VD type parameter from EdgeRDD Due to vertex attribute caching, EdgeRDD previously took two type parameters: ED and VD. However, this is an implementation detail that should not be exposed in the interface, so this PR drops the VD type parameter. This requires removing the `filter` method from the EdgeRDD interface, because it depends on vertex attribute caching. Author: Ankur Dave Closes #3303 from ankurdave/edgerdd-drop-tparam and squashes the following commits: 38dca9b [Ankur Dave] Leave EdgeRDD.fromEdges public fafeb51 [Ankur Dave] Drop VD type parameter from EdgeRDD --- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 35 ++++++++-------------- .../main/scala/org/apache/spark/graphx/Graph.scala | 2 +- .../scala/org/apache/spark/graphx/VertexRDD.scala | 10 +++---- .../org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 24 +++++++-------- .../org/apache/spark/graphx/impl/GraphImpl.scala | 13 ++++---- .../spark/graphx/impl/ReplicatedVertexView.scala | 4 +-- .../apache/spark/graphx/impl/VertexRDDImpl.scala | 2 +- 7 files changed, 40 insertions(+), 50 deletions(-) (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 869ef15893..cc70b396a8 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.graphx +import scala.language.existentials import scala.reflect.ClassTag import org.apache.spark.Dependency @@ -36,16 +37,16 @@ import org.apache.spark.graphx.impl.EdgeRDDImpl * edge to provide the triplet view. Shipping of the vertex attributes is managed by * `impl.ReplicatedVertexView`. */ -abstract class EdgeRDD[ED, VD]( +abstract class EdgeRDD[ED]( @transient sc: SparkContext, @transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) { - private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] + private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] forSome { type VD } override protected def getPartitions: Array[Partition] = partitionsRDD.partitions override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { - val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) + val p = firstParent[(PartitionID, EdgePartition[ED, _])].iterator(part, context) if (p.hasNext) { p.next._2.iterator.map(_.copy()) } else { @@ -60,19 +61,14 @@ abstract class EdgeRDD[ED, VD]( * @param f the function from an edge to a new edge value * @return a new EdgeRDD containing the new edge values */ - def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] + def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] /** * Reverse all the edges in this RDD. * * @return a new EdgeRDD containing all the edges reversed */ - def reverse: EdgeRDD[ED, VD] - - /** Removes all edges but those matching `epred` and where both vertices match `vpred`. */ - def filter( - epred: EdgeTriplet[VD, ED] => Boolean, - vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] + def reverse: EdgeRDD[ED] /** * Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same @@ -84,15 +80,8 @@ abstract class EdgeRDD[ED, VD]( * with values supplied by `f` */ def innerJoin[ED2: ClassTag, ED3: ClassTag] - (other: EdgeRDD[ED2, _]) - (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] - - private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( - f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] - - /** Replaces the edge partitions while preserving all other properties of the EdgeRDD. */ - private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag]( - partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] + (other: EdgeRDD[ED2]) + (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] /** * Changes the target storage level while preserving all other properties of the @@ -101,7 +90,7 @@ abstract class EdgeRDD[ED, VD]( * This does not actually trigger a cache; to do this, call * [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD. */ - private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] + private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED] } object EdgeRDD { @@ -111,7 +100,7 @@ object EdgeRDD { * @tparam ED the edge attribute type * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD */ - def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = { + def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = { val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => val builder = new EdgePartitionBuilder[ED, VD] iter.foreach { e => @@ -128,8 +117,8 @@ object EdgeRDD { * @tparam ED the edge attribute type * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD */ - def fromEdgePartitions[ED: ClassTag, VD: ClassTag]( - edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = { + private[graphx] def fromEdgePartitions[ED: ClassTag, VD: ClassTag]( + edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDDImpl[ED, VD] = { new EdgeRDDImpl(edgePartitions) } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 2c1b9518a3..6377915435 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * along with their vertex data. * */ - @transient val edges: EdgeRDD[ED, VD] + @transient val edges: EdgeRDD[ED] /** * An RDD containing the edge triplets, which are edges along with the vertex data associated with diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index f8be17669d..1db3df03c8 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -207,7 +207,7 @@ abstract class VertexRDD[VD]( def reverseRoutingTables(): VertexRDD[VD] /** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */ - def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] + def withEdges(edges: EdgeRDD[_]): VertexRDD[VD] /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */ private[graphx] def withPartitionsRDD[VD2: ClassTag]( @@ -269,7 +269,7 @@ object VertexRDD { * @param defaultVal the vertex attribute to use when creating missing vertices */ def apply[VD: ClassTag]( - vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = { + vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_], defaultVal: VD): VertexRDD[VD] = { VertexRDD(vertices, edges, defaultVal, (a, b) => a) } @@ -286,7 +286,7 @@ object VertexRDD { * @param mergeFunc the commutative, associative duplicate vertex attribute merge function */ def apply[VD: ClassTag]( - vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD + vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_], defaultVal: VD, mergeFunc: (VD, VD) => VD ): VertexRDD[VD] = { val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match { case Some(p) => vertices @@ -314,7 +314,7 @@ object VertexRDD { * @param defaultVal the vertex attribute to use when creating missing vertices */ def fromEdges[VD: ClassTag]( - edges: EdgeRDD[_, _], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = { + edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = { val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions)) val vertexPartitions = routingTables.mapPartitions({ routingTableIter => val routingTable = @@ -325,7 +325,7 @@ object VertexRDD { } private[graphx] def createRoutingTables( - edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = { + edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap( Function.tupled(RoutingTablePartition.edgePartitionToMsgs))) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 4100a85d17..a8169613b4 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -28,7 +28,7 @@ import org.apache.spark.graphx._ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) - extends EdgeRDD[ED, VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + extends EdgeRDD[ED](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { override def setName(_name: String): this.type = { if (partitionsRDD.name != null) { @@ -75,20 +75,20 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( partitionsRDD.map(_._2.size.toLong).reduce(_ + _) } - override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] = + override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] = mapEdgePartitions((pid, part) => part.map(f)) - override def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse) + override def reverse: EdgeRDDImpl[ED, VD] = mapEdgePartitions((pid, part) => part.reverse) - override def filter( + def filter( epred: EdgeTriplet[VD, ED] => Boolean, - vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = { + vpred: (VertexId, VD) => Boolean): EdgeRDDImpl[ED, VD] = { mapEdgePartitions((pid, part) => part.filter(epred, vpred)) } override def innerJoin[ED2: ClassTag, ED3: ClassTag] - (other: EdgeRDD[ED2, _]) - (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = { + (other: EdgeRDD[ED2]) + (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDDImpl[ED3, VD] = { val ed2Tag = classTag[ED2] val ed3Tag = classTag[ED3] this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) { @@ -99,8 +99,8 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( }) } - override private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( - f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = { + def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( + f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDDImpl[ED2, VD2] = { this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => if (iter.hasNext) { val (pid, ep) = iter.next() @@ -111,13 +111,13 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( }, preservesPartitioning = true)) } - override private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag]( - partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = { + private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag]( + partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDDImpl[ED2, VD2] = { new EdgeRDDImpl(partitionsRDD, this.targetStorageLevel) } override private[graphx] def withTargetStorageLevel( - targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = { + targetStorageLevel: StorageLevel): EdgeRDDImpl[ED, VD] = { new EdgeRDDImpl(this.partitionsRDD, targetStorageLevel) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 2b4636a6c6..0eae2a6738 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -43,7 +43,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( /** Default constructor is provided to support serialization */ protected def this() = this(null, null) - @transient override val edges: EdgeRDD[ED, VD] = replicatedVertexView.edges + @transient override val edges: EdgeRDDImpl[ED, VD] = replicatedVertexView.edges /** Return a RDD that brings edges together with their source and destination vertices. */ @transient override lazy val triplets: RDD[EdgeTriplet[VD, ED]] = { @@ -323,9 +323,10 @@ object GraphImpl { */ def apply[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], - edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = { + edges: EdgeRDD[ED]): GraphImpl[VD, ED] = { // Convert the vertex partitions in edges to the correct type - val newEdges = edges.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD]) + val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]] + .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD]) GraphImpl.fromExistingRDDs(vertices, newEdges) } @@ -336,8 +337,8 @@ object GraphImpl { */ def fromExistingRDDs[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], - edges: EdgeRDD[ED, VD]): GraphImpl[VD, ED] = { - new GraphImpl(vertices, new ReplicatedVertexView(edges)) + edges: EdgeRDD[ED]): GraphImpl[VD, ED] = { + new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]])) } /** @@ -345,7 +346,7 @@ object GraphImpl { * `defaultVertexAttr`. The vertices will have the same number of partitions as the EdgeRDD. */ private def fromEdgeRDD[VD: ClassTag, ED: ClassTag]( - edges: EdgeRDD[ED, VD], + edges: EdgeRDDImpl[ED, VD], defaultVertexAttr: VD, edgeStorageLevel: StorageLevel, vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala index 86b366eb92..8ab255bd40 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala @@ -33,7 +33,7 @@ import org.apache.spark.graphx._ */ private[impl] class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( - var edges: EdgeRDD[ED, VD], + var edges: EdgeRDDImpl[ED, VD], var hasSrcId: Boolean = false, var hasDstId: Boolean = false) { @@ -42,7 +42,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( * shipping level. */ def withEdges[VD2: ClassTag, ED2: ClassTag]( - edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = { + edges_ : EdgeRDDImpl[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = { new ReplicatedVertexView(edges_, hasSrcId, hasDstId) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 08405629bc..d92a55a189 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -172,7 +172,7 @@ class VertexRDDImpl[VD] private[graphx] ( override def reverseRoutingTables(): VertexRDD[VD] = this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse)) - override def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = { + override def withEdges(edges: EdgeRDD[_]): VertexRDD[VD] = { val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get) val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) { (partIter, routingTableIter) => -- cgit v1.2.3