From a5ef58113667ff73562ce6db381cff96a0b354b0 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 12 Nov 2014 13:49:20 -0800 Subject: [SPARK-3666] Extract interfaces for EdgeRDD and VertexRDD This discourages users from calling the VertexRDD and EdgeRDD constructor and makes it easier for future changes to ensure backward compatibility. Author: Ankur Dave Closes #2530 from ankurdave/SPARK-3666 and squashes the following commits: d681f45 [Ankur Dave] Define getPartitions and compute in abstract class for MIMA 1472390 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into SPARK-3666 24201d4 [Ankur Dave] Merge remote-tracking branch 'apache-spark/master' into SPARK-3666 cbe15f2 [Ankur Dave] Remove specialized annotation from VertexRDD and EdgeRDD 931b587 [Ankur Dave] Use abstract class instead of trait for binary compatibility 9ba4ec4 [Ankur Dave] Mark (Vertex|Edge)RDDImpl constructors package-private 620e603 [Ankur Dave] Extract VertexRDD interface and move implementation to VertexRDDImpl 55b6398 [Ankur Dave] Extract EdgeRDD interface and move implementation to EdgeRDDImpl --- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 111 +++-------- .../scala/org/apache/spark/graphx/VertexRDD.scala | 190 ++++--------------- .../org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 124 +++++++++++++ .../apache/spark/graphx/impl/VertexRDDImpl.scala | 205 +++++++++++++++++++++ 4 files changed, 386 insertions(+), 244 deletions(-) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 5267560b3e..869ef15893 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -17,14 +17,18 @@ package org.apache.spark.graphx -import scala.reflect.{classTag, ClassTag} +import scala.reflect.ClassTag -import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} +import org.apache.spark.Dependency +import org.apache.spark.Partition +import org.apache.spark.SparkContext +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx.impl.EdgePartition import org.apache.spark.graphx.impl.EdgePartitionBuilder +import org.apache.spark.graphx.impl.EdgeRDDImpl /** * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each @@ -32,30 +36,13 @@ import org.apache.spark.graphx.impl.EdgePartitionBuilder * edge to provide the triplet view. Shipping of the vertex attributes is managed by * `impl.ReplicatedVertexView`. */ -class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( - val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], - val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) - extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { - - override def setName(_name: String): this.type = { - if (partitionsRDD.name != null) { - partitionsRDD.setName(partitionsRDD.name + ", " + _name) - } else { - partitionsRDD.setName(_name) - } - this - } - setName("EdgeRDD") +abstract class EdgeRDD[ED, VD]( + @transient sc: SparkContext, + @transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) { - override protected def getPartitions: Array[Partition] = partitionsRDD.partitions + private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] - /** - * If `partitionsRDD` already has a partitioner, use it. Otherwise assume that the - * [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions and create a new - * partitioner that allows co-partitioning with `partitionsRDD`. - */ - override val partitioner = - partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) + override protected def getPartitions: Array[Partition] = partitionsRDD.partitions override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) @@ -66,45 +53,6 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( } } - override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() - - /** - * Persists the edge partitions at the specified storage level, ignoring any existing target - * storage level. - */ - override def persist(newLevel: StorageLevel): this.type = { - partitionsRDD.persist(newLevel) - this - } - - override def unpersist(blocking: Boolean = true): this.type = { - partitionsRDD.unpersist(blocking) - this - } - - /** Persists the edge partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */ - override def cache(): this.type = { - partitionsRDD.persist(targetStorageLevel) - this - } - - /** The number of edges in the RDD. */ - override def count(): Long = { - partitionsRDD.map(_._2.size.toLong).reduce(_ + _) - } - - private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( - f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = { - this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => - if (iter.hasNext) { - val (pid, ep) = iter.next() - Iterator(Tuple2(pid, f(pid, ep))) - } else { - Iterator.empty - } - }, preservesPartitioning = true)) - } - /** * Map the values in an edge partitioning preserving the structure but changing the values. * @@ -112,22 +60,19 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( * @param f the function from an edge to a new edge value * @return a new EdgeRDD containing the new edge values */ - def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] = - mapEdgePartitions((pid, part) => part.map(f)) + def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] /** * Reverse all the edges in this RDD. * * @return a new EdgeRDD containing all the edges reversed */ - def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse) + def reverse: EdgeRDD[ED, VD] /** Removes all edges but those matching `epred` and where both vertices match `vpred`. */ def filter( epred: EdgeTriplet[VD, ED] => Boolean, - vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = { - mapEdgePartitions((pid, part) => part.filter(epred, vpred)) - } + vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] /** * Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same @@ -140,22 +85,14 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( */ def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgeRDD[ED2, _]) - (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = { - val ed2Tag = classTag[ED2] - val ed3Tag = classTag[ED3] - this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) { - (thisIter, otherIter) => - val (pid, thisEPart) = thisIter.next() - val (_, otherEPart) = otherIter.next() - Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag))) - }) - } + (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] + + private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( + f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] - /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */ + /** Replaces the edge partitions while preserving all other properties of the EdgeRDD. */ private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag]( - partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = { - new EdgeRDD(partitionsRDD, this.targetStorageLevel) - } + partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] /** * Changes the target storage level while preserving all other properties of the @@ -164,11 +101,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( * This does not actually trigger a cache; to do this, call * [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD. */ - private[graphx] def withTargetStorageLevel( - targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = { - new EdgeRDD(this.partitionsRDD, targetStorageLevel) - } - + private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] } object EdgeRDD { @@ -197,6 +130,6 @@ object EdgeRDD { */ def fromEdgePartitions[ED: ClassTag, VD: ClassTag]( edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = { - new EdgeRDD(edgePartitions) + new EdgeRDDImpl(edgePartitions) } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 12216d9d33..f8be17669d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -27,6 +27,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx.impl.RoutingTablePartition import org.apache.spark.graphx.impl.ShippableVertexPartition import org.apache.spark.graphx.impl.VertexAttributeBlock +import org.apache.spark.graphx.impl.VertexRDDImpl /** * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by @@ -53,62 +54,16 @@ import org.apache.spark.graphx.impl.VertexAttributeBlock * * @tparam VD the vertex attribute associated with each vertex in the set. */ -class VertexRDD[@specialized VD: ClassTag]( - val partitionsRDD: RDD[ShippableVertexPartition[VD]], - val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) - extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { +abstract class VertexRDD[VD]( + @transient sc: SparkContext, + @transient deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) { - require(partitionsRDD.partitioner.isDefined) + implicit protected def vdTag: ClassTag[VD] - /** - * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting - * VertexRDD will be based on a different index and can no longer be quickly joined with this - * RDD. - */ - def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex())) - - override val partitioner = partitionsRDD.partitioner + private[graphx] def partitionsRDD: RDD[ShippableVertexPartition[VD]] override protected def getPartitions: Array[Partition] = partitionsRDD.partitions - override protected def getPreferredLocations(s: Partition): Seq[String] = - partitionsRDD.preferredLocations(s) - - override def setName(_name: String): this.type = { - if (partitionsRDD.name != null) { - partitionsRDD.setName(partitionsRDD.name + ", " + _name) - } else { - partitionsRDD.setName(_name) - } - this - } - setName("VertexRDD") - - /** - * Persists the vertex partitions at the specified storage level, ignoring any existing target - * storage level. - */ - override def persist(newLevel: StorageLevel): this.type = { - partitionsRDD.persist(newLevel) - this - } - - override def unpersist(blocking: Boolean = true): this.type = { - partitionsRDD.unpersist(blocking) - this - } - - /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */ - override def cache(): this.type = { - partitionsRDD.persist(targetStorageLevel) - this - } - - /** The number of vertices in the RDD. */ - override def count(): Long = { - partitionsRDD.map(_.size.toLong).reduce(_ + _) - } - /** * Provides the `RDD[(VertexId, VD)]` equivalent output. */ @@ -116,22 +71,28 @@ class VertexRDD[@specialized VD: ClassTag]( firstParent[ShippableVertexPartition[VD]].iterator(part, context).next.iterator } + /** + * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting + * VertexRDD will be based on a different index and can no longer be quickly joined with this + * RDD. + */ + def reindex(): VertexRDD[VD] + /** * Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD. */ private[graphx] def mapVertexPartitions[VD2: ClassTag]( f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]) - : VertexRDD[VD2] = { - val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true) - this.withPartitionsRDD(newPartitionsRDD) - } - + : VertexRDD[VD2] /** * Restricts the vertex set to the set of vertices satisfying the given predicate. This operation * preserves the index for efficient joins with the original RDD, and it sets bits in the bitmask * rather than allocating new memory. * + * It is declared and defined here to allow refining the return type from `RDD[(VertexId, VD)]` to + * `VertexRDD[VD]`. + * * @param pred the user defined predicate, which takes a tuple to conform to the * `RDD[(VertexId, VD)]` interface */ @@ -147,8 +108,7 @@ class VertexRDD[@specialized VD: ClassTag]( * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the * original VertexRDD */ - def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] = - this.mapVertexPartitions(_.map((vid, attr) => f(attr))) + def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] /** * Maps each vertex attribute, additionally supplying the vertex ID. @@ -159,23 +119,13 @@ class VertexRDD[@specialized VD: ClassTag]( * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the * original VertexRDD. The resulting VertexRDD retains the same index. */ - def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] = - this.mapVertexPartitions(_.map(f)) + def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] /** * Hides vertices that are the same between `this` and `other`; for vertices that are different, * keeps the values from `other`. */ - def diff(other: VertexRDD[VD]): VertexRDD[VD] = { - val newPartitionsRDD = partitionsRDD.zipPartitions( - other.partitionsRDD, preservesPartitioning = true - ) { (thisIter, otherIter) => - val thisPart = thisIter.next() - val otherPart = otherIter.next() - Iterator(thisPart.diff(otherPart)) - } - this.withPartitionsRDD(newPartitionsRDD) - } + def diff(other: VertexRDD[VD]): VertexRDD[VD] /** * Left joins this RDD with another VertexRDD with the same index. This function will fail if @@ -192,16 +142,7 @@ class VertexRDD[@specialized VD: ClassTag]( * @return a VertexRDD containing the results of `f` */ def leftZipJoin[VD2: ClassTag, VD3: ClassTag] - (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { - val newPartitionsRDD = partitionsRDD.zipPartitions( - other.partitionsRDD, preservesPartitioning = true - ) { (thisIter, otherIter) => - val thisPart = thisIter.next() - val otherPart = otherIter.next() - Iterator(thisPart.leftJoin(otherPart)(f)) - } - this.withPartitionsRDD(newPartitionsRDD) - } + (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] /** * Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is @@ -222,37 +163,14 @@ class VertexRDD[@specialized VD: ClassTag]( def leftJoin[VD2: ClassTag, VD3: ClassTag] (other: RDD[(VertexId, VD2)]) (f: (VertexId, VD, Option[VD2]) => VD3) - : VertexRDD[VD3] = { - // Test if the other vertex is a VertexRDD to choose the optimal join strategy. - // If the other set is a VertexRDD then we use the much more efficient leftZipJoin - other match { - case other: VertexRDD[_] => - leftZipJoin(other)(f) - case _ => - this.withPartitionsRDD[VD3]( - partitionsRDD.zipPartitions( - other.partitionBy(this.partitioner.get), preservesPartitioning = true) { - (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f)) - } - ) - } - } + : VertexRDD[VD3] /** * Efficiently inner joins this VertexRDD with another VertexRDD sharing the same index. See * [[innerJoin]] for the behavior of the join. */ def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) - (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = { - val newPartitionsRDD = partitionsRDD.zipPartitions( - other.partitionsRDD, preservesPartitioning = true - ) { (thisIter, otherIter) => - val thisPart = thisIter.next() - val otherPart = otherIter.next() - Iterator(thisPart.innerJoin(otherPart)(f)) - } - this.withPartitionsRDD(newPartitionsRDD) - } + (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] /** * Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is @@ -266,21 +184,7 @@ class VertexRDD[@specialized VD: ClassTag]( * `this` and `other`, with values supplied by `f` */ def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) - (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = { - // Test if the other vertex is a VertexRDD to choose the optimal join strategy. - // If the other set is a VertexRDD then we use the much more efficient innerZipJoin - other match { - case other: VertexRDD[_] => - innerZipJoin(other)(f) - case _ => - this.withPartitionsRDD( - partitionsRDD.zipPartitions( - other.partitionBy(this.partitioner.get), preservesPartitioning = true) { - (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f)) - } - ) - } - } + (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] /** * Aggregates vertices in `messages` that have the same ids using `reduceFunc`, returning a @@ -294,38 +198,20 @@ class VertexRDD[@specialized VD: ClassTag]( * messages. */ def aggregateUsingIndex[VD2: ClassTag]( - messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { - val shuffled = messages.partitionBy(this.partitioner.get) - val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => - thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc)) - } - this.withPartitionsRDD[VD2](parts) - } + messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] /** * Returns a new `VertexRDD` reflecting a reversal of all edge directions in the corresponding * [[EdgeRDD]]. */ - def reverseRoutingTables(): VertexRDD[VD] = - this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse)) + def reverseRoutingTables(): VertexRDD[VD] /** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */ - def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = { - val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get) - val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) { - (partIter, routingTableIter) => - val routingTable = - if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty - partIter.map(_.withRoutingTable(routingTable)) - } - this.withPartitionsRDD(vertexPartitions) - } + def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */ private[graphx] def withPartitionsRDD[VD2: ClassTag]( - partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = { - new VertexRDD(partitionsRDD, this.targetStorageLevel) - } + partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] /** * Changes the target storage level while preserving all other properties of the @@ -335,20 +221,14 @@ class VertexRDD[@specialized VD: ClassTag]( * [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD. */ private[graphx] def withTargetStorageLevel( - targetStorageLevel: StorageLevel): VertexRDD[VD] = { - new VertexRDD(this.partitionsRDD, targetStorageLevel) - } + targetStorageLevel: StorageLevel): VertexRDD[VD] /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */ private[graphx] def shipVertexAttributes( - shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = { - partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst))) - } + shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] /** Generates an RDD of vertex IDs suitable for shipping to the edge partitions. */ - private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] = { - partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds())) - } + private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] } // end of VertexRDD @@ -374,7 +254,7 @@ object VertexRDD { val vertexPartitions = vPartitioned.mapPartitions( iter => Iterator(ShippableVertexPartition(iter)), preservesPartitioning = true) - new VertexRDD(vertexPartitions) + new VertexRDDImpl(vertexPartitions) } /** @@ -419,7 +299,7 @@ object VertexRDD { if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal, mergeFunc)) } - new VertexRDD(vertexPartitions) + new VertexRDDImpl(vertexPartitions) } /** @@ -441,10 +321,10 @@ object VertexRDD { if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal)) }, preservesPartitioning = true) - new VertexRDD(vertexPartitions) + new VertexRDDImpl(vertexPartitions) } - private def createRoutingTables( + private[graphx] def createRoutingTables( edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap( diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala new file mode 100644 index 0000000000..4100a85d17 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + +import org.apache.spark.graphx._ + +class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( + override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], + val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) + extends EdgeRDD[ED, VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + + override def setName(_name: String): this.type = { + if (partitionsRDD.name != null) { + partitionsRDD.setName(partitionsRDD.name + ", " + _name) + } else { + partitionsRDD.setName(_name) + } + this + } + setName("EdgeRDD") + + /** + * If `partitionsRDD` already has a partitioner, use it. Otherwise assume that the + * [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions and create a new + * partitioner that allows co-partitioning with `partitionsRDD`. + */ + override val partitioner = + partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) + + override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() + + /** + * Persists the edge partitions at the specified storage level, ignoring any existing target + * storage level. + */ + override def persist(newLevel: StorageLevel): this.type = { + partitionsRDD.persist(newLevel) + this + } + + override def unpersist(blocking: Boolean = true): this.type = { + partitionsRDD.unpersist(blocking) + this + } + + /** Persists the edge partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */ + override def cache(): this.type = { + partitionsRDD.persist(targetStorageLevel) + this + } + + /** The number of edges in the RDD. */ + override def count(): Long = { + partitionsRDD.map(_._2.size.toLong).reduce(_ + _) + } + + override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] = + mapEdgePartitions((pid, part) => part.map(f)) + + override def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse) + + override def filter( + epred: EdgeTriplet[VD, ED] => Boolean, + vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = { + mapEdgePartitions((pid, part) => part.filter(epred, vpred)) + } + + override def innerJoin[ED2: ClassTag, ED3: ClassTag] + (other: EdgeRDD[ED2, _]) + (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = { + val ed2Tag = classTag[ED2] + val ed3Tag = classTag[ED3] + this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) { + (thisIter, otherIter) => + val (pid, thisEPart) = thisIter.next() + val (_, otherEPart) = otherIter.next() + Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag))) + }) + } + + override private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( + f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = { + this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => + if (iter.hasNext) { + val (pid, ep) = iter.next() + Iterator(Tuple2(pid, f(pid, ep))) + } else { + Iterator.empty + } + }, preservesPartitioning = true)) + } + + override private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag]( + partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = { + new EdgeRDDImpl(partitionsRDD, this.targetStorageLevel) + } + + override private[graphx] def withTargetStorageLevel( + targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = { + new EdgeRDDImpl(this.partitionsRDD, targetStorageLevel) + } + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala new file mode 100644 index 0000000000..08405629bc --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark._ +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ +import org.apache.spark.storage.StorageLevel + +import org.apache.spark.graphx._ + +class VertexRDDImpl[VD] private[graphx] ( + val partitionsRDD: RDD[ShippableVertexPartition[VD]], + val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) + (implicit override protected val vdTag: ClassTag[VD]) + extends VertexRDD[VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + + require(partitionsRDD.partitioner.isDefined) + + override def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex())) + + override val partitioner = partitionsRDD.partitioner + + override protected def getPreferredLocations(s: Partition): Seq[String] = + partitionsRDD.preferredLocations(s) + + override def setName(_name: String): this.type = { + if (partitionsRDD.name != null) { + partitionsRDD.setName(partitionsRDD.name + ", " + _name) + } else { + partitionsRDD.setName(_name) + } + this + } + setName("VertexRDD") + + /** + * Persists the vertex partitions at the specified storage level, ignoring any existing target + * storage level. + */ + override def persist(newLevel: StorageLevel): this.type = { + partitionsRDD.persist(newLevel) + this + } + + override def unpersist(blocking: Boolean = true): this.type = { + partitionsRDD.unpersist(blocking) + this + } + + /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */ + override def cache(): this.type = { + partitionsRDD.persist(targetStorageLevel) + this + } + + /** The number of vertices in the RDD. */ + override def count(): Long = { + partitionsRDD.map(_.size).reduce(_ + _) + } + + override private[graphx] def mapVertexPartitions[VD2: ClassTag]( + f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]) + : VertexRDD[VD2] = { + val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true) + this.withPartitionsRDD(newPartitionsRDD) + } + + override def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] = + this.mapVertexPartitions(_.map((vid, attr) => f(attr))) + + override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] = + this.mapVertexPartitions(_.map(f)) + + override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { + val newPartitionsRDD = partitionsRDD.zipPartitions( + other.partitionsRDD, preservesPartitioning = true + ) { (thisIter, otherIter) => + val thisPart = thisIter.next() + val otherPart = otherIter.next() + Iterator(thisPart.diff(otherPart)) + } + this.withPartitionsRDD(newPartitionsRDD) + } + + override def leftZipJoin[VD2: ClassTag, VD3: ClassTag] + (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { + val newPartitionsRDD = partitionsRDD.zipPartitions( + other.partitionsRDD, preservesPartitioning = true + ) { (thisIter, otherIter) => + val thisPart = thisIter.next() + val otherPart = otherIter.next() + Iterator(thisPart.leftJoin(otherPart)(f)) + } + this.withPartitionsRDD(newPartitionsRDD) + } + + override def leftJoin[VD2: ClassTag, VD3: ClassTag] + (other: RDD[(VertexId, VD2)]) + (f: (VertexId, VD, Option[VD2]) => VD3) + : VertexRDD[VD3] = { + // Test if the other vertex is a VertexRDD to choose the optimal join strategy. + // If the other set is a VertexRDD then we use the much more efficient leftZipJoin + other match { + case other: VertexRDD[_] => + leftZipJoin(other)(f) + case _ => + this.withPartitionsRDD[VD3]( + partitionsRDD.zipPartitions( + other.partitionBy(this.partitioner.get), preservesPartitioning = true) { + (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f)) + } + ) + } + } + + override def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) + (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = { + val newPartitionsRDD = partitionsRDD.zipPartitions( + other.partitionsRDD, preservesPartitioning = true + ) { (thisIter, otherIter) => + val thisPart = thisIter.next() + val otherPart = otherIter.next() + Iterator(thisPart.innerJoin(otherPart)(f)) + } + this.withPartitionsRDD(newPartitionsRDD) + } + + override def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) + (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = { + // Test if the other vertex is a VertexRDD to choose the optimal join strategy. + // If the other set is a VertexRDD then we use the much more efficient innerZipJoin + other match { + case other: VertexRDD[_] => + innerZipJoin(other)(f) + case _ => + this.withPartitionsRDD( + partitionsRDD.zipPartitions( + other.partitionBy(this.partitioner.get), preservesPartitioning = true) { + (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f)) + } + ) + } + } + + override def aggregateUsingIndex[VD2: ClassTag]( + messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { + val shuffled = messages.partitionBy(this.partitioner.get) + val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => + thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc)) + } + this.withPartitionsRDD[VD2](parts) + } + + override def reverseRoutingTables(): VertexRDD[VD] = + this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse)) + + override def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = { + val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get) + val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) { + (partIter, routingTableIter) => + val routingTable = + if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty + partIter.map(_.withRoutingTable(routingTable)) + } + this.withPartitionsRDD(vertexPartitions) + } + + override private[graphx] def withPartitionsRDD[VD2: ClassTag]( + partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = { + new VertexRDDImpl(partitionsRDD, this.targetStorageLevel) + } + + override private[graphx] def withTargetStorageLevel( + targetStorageLevel: StorageLevel): VertexRDD[VD] = { + new VertexRDDImpl(this.partitionsRDD, targetStorageLevel) + } + + override private[graphx] def shipVertexAttributes( + shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = { + partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst))) + } + + override private[graphx] def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] = { + partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds())) + } + +} -- cgit v1.2.3