From 43e1bdc80c2b19533596df74fd7b97a2d7b84bb6 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 9 Jan 2014 13:59:48 -0800 Subject: Pid -> PartitionID --- .../src/main/scala/org/apache/spark/graph/EdgeRDD.scala | 8 ++++---- graph/src/main/scala/org/apache/spark/graph/Graph.scala | 5 +++-- .../scala/org/apache/spark/graph/PartitionStrategy.scala | 16 ++++++++-------- .../scala/org/apache/spark/graph/impl/GraphImpl.scala | 8 ++++---- .../org/apache/spark/graph/impl/MessageToPartition.scala | 14 +++++++------- .../apache/spark/graph/impl/ReplicatedVertexView.scala | 14 +++++++------- .../scala/org/apache/spark/graph/impl/RoutingTable.scala | 4 ++-- .../src/main/scala/org/apache/spark/graph/package.scala | 2 +- 8 files changed, 36 insertions(+), 35 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index fd93359352..78821bf568 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -9,7 +9,7 @@ import org.apache.spark.storage.StorageLevel class EdgeRDD[@specialized ED: ClassTag]( - val partitionsRDD: RDD[(Pid, EdgePartition[ED])]) + val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])]) extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { partitionsRDD.setName("EdgeRDD") @@ -17,7 +17,7 @@ class EdgeRDD[@specialized ED: ClassTag]( override protected def getPartitions: Array[Partition] = partitionsRDD.partitions /** - * If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in + * If partitionsRDD already has a partitioner, use it. Otherwise assume that the PartitionIDs in * partitionsRDD correspond to the actual partitions and create a new partitioner that allows * co-partitioning with partitionsRDD. */ @@ -25,7 +25,7 @@ class EdgeRDD[@specialized ED: ClassTag]( partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { - firstParent[(Pid, EdgePartition[ED])].iterator(part, context).next._2.iterator + firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator } override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() @@ -44,7 +44,7 @@ class EdgeRDD[@specialized ED: ClassTag]( /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def cache(): EdgeRDD[ED] = persist() - def mapEdgePartitions[ED2: ClassTag](f: (Pid, EdgePartition[ED]) => EdgePartition[ED2]) + def mapEdgePartitions[ED2: ClassTag](f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]) : EdgeRDD[ED2] = { // iter => iter.map { case (pid, ep) => (pid, f(ep)) } new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index dd0799142e..86282e607e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -169,7 +169,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * @tparam ED2 the new edge data type * */ - def mapEdges[ED2: ClassTag](map: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] + def mapEdges[ED2: ClassTag]( + map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] /** * Construct a new graph where the value of each edge is @@ -220,7 +221,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * */ def mapTriplets[ED2: ClassTag]( - map: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): + map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] /** diff --git a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala index c01b4b9439..bc05fb812c 100644 --- a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala +++ b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala @@ -2,7 +2,7 @@ package org.apache.spark.graph sealed trait PartitionStrategy extends Serializable { - def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid + def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID } @@ -51,18 +51,18 @@ sealed trait PartitionStrategy extends Serializable { * */ case object EdgePartition2D extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = { - val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt val mixingPrime: VertexID = 1125899906842597L - val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt - val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt + val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt + val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt (col * ceilSqrtNumParts + row) % numParts } } case object EdgePartition1D extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { val mixingPrime: VertexID = 1125899906842597L (math.abs(src) * mixingPrime).toInt % numParts } @@ -74,7 +74,7 @@ case object EdgePartition1D extends PartitionStrategy { * random vertex cut. */ case object RandomVertexCut extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { math.abs((src, dst).hashCode()) % numParts } } @@ -86,7 +86,7 @@ case object RandomVertexCut extends PartitionStrategy { * will end up on the same partition. */ case object CanonicalRandomVertexCut extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = { + override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { val lower = math.min(src, dst) val higher = math.max(src, dst) math.abs((lower, higher).hashCode()) % numParts diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 6eb401b3b5..8f42e7d592 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -69,7 +69,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( val numPartitions = edges.partitions.size val edTag = classTag[ED] val newEdges = new EdgeRDD(edges.map { e => - val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) + val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) // Should we be using 3-tuple or an optimized class new MessageToPartition(part, (e.srcId, e.dstId, e.attr)) @@ -173,13 +173,13 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } override def mapEdges[ED2: ClassTag]( - f: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = { + f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = { val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator))) new GraphImpl(vertices, newETable , routingTable, replicatedVertexView) } override def mapTriplets[ED2: ClassTag]( - f: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = { + f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = { // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit // manifest from GraphImpl (which would require serializing GraphImpl). val vdTag = classTag[VD] @@ -354,7 +354,7 @@ object GraphImpl { } def fromEdgePartitions[VD: ClassTag, ED: ClassTag]( - edgePartitions: RDD[(Pid, EdgePartition[ED])], + edgePartitions: RDD[(PartitionID, EdgePartition[ED])], defaultVertexAttr: VD): GraphImpl[VD, ED] = { fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr) } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala index 2d03f75a28..b2fa728482 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala @@ -3,15 +3,15 @@ package org.apache.spark.graph.impl import scala.reflect.{classTag, ClassTag} import org.apache.spark.Partitioner -import org.apache.spark.graph.{Pid, VertexID} +import org.apache.spark.graph.{PartitionID, VertexID} import org.apache.spark.rdd.{ShuffledRDD, RDD} class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T]( - @transient var partition: Pid, + @transient var partition: PartitionID, var vid: VertexID, var data: T) - extends Product2[Pid, (VertexID, T)] with Serializable { + extends Product2[PartitionID, (VertexID, T)] with Serializable { override def _1 = partition @@ -27,9 +27,9 @@ class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T]( * @param data value to send */ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T]( - @transient var partition: Pid, + @transient var partition: PartitionID, var data: T) - extends Product2[Pid, T] with Serializable { + extends Product2[PartitionID, T] with Serializable { override def _1 = partition @@ -41,7 +41,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) { def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = { - val rdd = new ShuffledRDD[Pid, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner) + val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner) // Set a custom serializer if the data is of int or double type. if (classTag[T] == ClassTag.Int) { @@ -62,7 +62,7 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) { * Return a copy of the RDD partitioned using the specified partitioner. */ def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = { - new ShuffledRDD[Pid, T, MessageToPartition[T]](self, partitioner) + new ShuffledRDD[PartitionID, T, MessageToPartition[T]](self, partitioner) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala index 9d2d242ffa..7d29861db1 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala @@ -46,12 +46,12 @@ class ReplicatedVertexView[VD: ClassTag]( }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap") } - private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true) - private lazy val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(true, false) - private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true) - private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false) + private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true) + private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false) + private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true) + private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false) - def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = { + def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = { (includeSrc, includeDst) match { case (true, true) => bothAttrs case (true, false) => srcAttrOnly @@ -63,7 +63,7 @@ class ReplicatedVertexView[VD: ClassTag]( def get( includeSrc: Boolean, includeDst: Boolean, - actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = { + actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = { // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is @@ -81,7 +81,7 @@ class ReplicatedVertexView[VD: ClassTag]( } private def create(includeSrc: Boolean, includeDst: Boolean) - : RDD[(Pid, VertexPartition[VD])] = { + : RDD[(PartitionID, VertexPartition[VD])] = { val vdTag = classTag[VD] // Ship vertex attributes to edge partitions according to vertexPlacement diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala index 9e6f78197e..96d9e9d7f8 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala @@ -30,8 +30,8 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { private def createPid2Vid( includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. - val vid2pid: RDD[(VertexID, Pid)] = edges.partitionsRDD.mapPartitions { iter => - val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next() + val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => + val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() val numEdges = edgePartition.size val vSet = new VertexSet if (includeSrcAttr) { // Add src vertices to the set. diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala index 823d47c359..b98a11b918 100644 --- a/graph/src/main/scala/org/apache/spark/graph/package.scala +++ b/graph/src/main/scala/org/apache/spark/graph/package.scala @@ -8,7 +8,7 @@ package object graph { type VertexID = Long // TODO: Consider using Char. - type Pid = Int + type PartitionID = Int type VertexSet = OpenHashSet[VertexID] -- cgit v1.2.3