aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-09 13:59:48 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-09 14:00:17 -0800
commit43e1bdc80c2b19533596df74fd7b97a2d7b84bb6 (patch)
treed614745e95f3d241442dc75a9906c0f5a9a8fc88
parentda83038234de1a16de38a24633c73fd950d4a85f (diff)
downloadspark-43e1bdc80c2b19533596df74fd7b97a2d7b84bb6.tar.gz
spark-43e1bdc80c2b19533596df74fd7b97a2d7b84bb6.tar.bz2
spark-43e1bdc80c2b19533596df74fd7b97a2d7b84bb6.zip
Pid -> PartitionID
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala8
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala5
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala16
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala8
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala14
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala14
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/package.scala2
8 files changed, 36 insertions, 35 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
index fd93359352..78821bf568 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -9,7 +9,7 @@ import org.apache.spark.storage.StorageLevel
class EdgeRDD[@specialized ED: ClassTag](
- val partitionsRDD: RDD[(Pid, EdgePartition[ED])])
+ val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
partitionsRDD.setName("EdgeRDD")
@@ -17,7 +17,7 @@ class EdgeRDD[@specialized ED: ClassTag](
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
/**
- * If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in
+ * If partitionsRDD already has a partitioner, use it. Otherwise assume that the PartitionIDs in
* partitionsRDD correspond to the actual partitions and create a new partitioner that allows
* co-partitioning with partitionsRDD.
*/
@@ -25,7 +25,7 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
- firstParent[(Pid, EdgePartition[ED])].iterator(part, context).next._2.iterator
+ firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
}
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -44,7 +44,7 @@ class EdgeRDD[@specialized ED: ClassTag](
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): EdgeRDD[ED] = persist()
- def mapEdgePartitions[ED2: ClassTag](f: (Pid, EdgePartition[ED]) => EdgePartition[ED2])
+ def mapEdgePartitions[ED2: ClassTag](f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
: EdgeRDD[ED2] = {
// iter => iter.map { case (pid, ep) => (pid, f(ep)) }
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index dd0799142e..86282e607e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -169,7 +169,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* @tparam ED2 the new edge data type
*
*/
- def mapEdges[ED2: ClassTag](map: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
+ def mapEdges[ED2: ClassTag](
+ map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
/**
* Construct a new graph where the value of each edge is
@@ -220,7 +221,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
*
*/
def mapTriplets[ED2: ClassTag](
- map: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]):
+ map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]):
Graph[VD, ED2]
/**
diff --git a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
index c01b4b9439..bc05fb812c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
@@ -2,7 +2,7 @@ package org.apache.spark.graph
sealed trait PartitionStrategy extends Serializable {
- def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid
+ def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
}
@@ -51,18 +51,18 @@ sealed trait PartitionStrategy extends Serializable {
*
*/
case object EdgePartition2D extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
- val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
val mixingPrime: VertexID = 1125899906842597L
- val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
- val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
+ val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
+ val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
}
}
case object EdgePartition1D extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
val mixingPrime: VertexID = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
}
@@ -74,7 +74,7 @@ case object EdgePartition1D extends PartitionStrategy {
* random vertex cut.
*/
case object RandomVertexCut extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
math.abs((src, dst).hashCode()) % numParts
}
}
@@ -86,7 +86,7 @@ case object RandomVertexCut extends PartitionStrategy {
* will end up on the same partition.
*/
case object CanonicalRandomVertexCut extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)
math.abs((lower, higher).hashCode()) % numParts
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 6eb401b3b5..8f42e7d592 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -69,7 +69,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val numPartitions = edges.partitions.size
val edTag = classTag[ED]
val newEdges = new EdgeRDD(edges.map { e =>
- val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
+ val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
// Should we be using 3-tuple or an optimized class
new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
@@ -173,13 +173,13 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
override def mapEdges[ED2: ClassTag](
- f: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
+ f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
new GraphImpl(vertices, newETable , routingTable, replicatedVertexView)
}
override def mapTriplets[ED2: ClassTag](
- f: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
+ f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
// Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
// manifest from GraphImpl (which would require serializing GraphImpl).
val vdTag = classTag[VD]
@@ -354,7 +354,7 @@ object GraphImpl {
}
def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
- edgePartitions: RDD[(Pid, EdgePartition[ED])],
+ edgePartitions: RDD[(PartitionID, EdgePartition[ED])],
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
index 2d03f75a28..b2fa728482 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
@@ -3,15 +3,15 @@ package org.apache.spark.graph.impl
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.Partitioner
-import org.apache.spark.graph.{Pid, VertexID}
+import org.apache.spark.graph.{PartitionID, VertexID}
import org.apache.spark.rdd.{ShuffledRDD, RDD}
class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
- @transient var partition: Pid,
+ @transient var partition: PartitionID,
var vid: VertexID,
var data: T)
- extends Product2[Pid, (VertexID, T)] with Serializable {
+ extends Product2[PartitionID, (VertexID, T)] with Serializable {
override def _1 = partition
@@ -27,9 +27,9 @@ class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
* @param data value to send
*/
class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T](
- @transient var partition: Pid,
+ @transient var partition: PartitionID,
var data: T)
- extends Product2[Pid, T] with Serializable {
+ extends Product2[PartitionID, T] with Serializable {
override def _1 = partition
@@ -41,7 +41,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef
class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
- val rdd = new ShuffledRDD[Pid, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner)
+ val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
@@ -62,7 +62,7 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = {
- new ShuffledRDD[Pid, T, MessageToPartition[T]](self, partitioner)
+ new ShuffledRDD[PartitionID, T, MessageToPartition[T]](self, partitioner)
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
index 9d2d242ffa..7d29861db1 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
@@ -46,12 +46,12 @@ class ReplicatedVertexView[VD: ClassTag](
}, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap")
}
- private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true)
- private lazy val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(true, false)
- private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true)
- private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false)
+ private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
+ private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false)
+ private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true)
+ private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false)
- def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = {
+ def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = {
(includeSrc, includeDst) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
@@ -63,7 +63,7 @@ class ReplicatedVertexView[VD: ClassTag](
def get(
includeSrc: Boolean,
includeDst: Boolean,
- actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = {
+ actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = {
// Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
// includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
// shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
@@ -81,7 +81,7 @@ class ReplicatedVertexView[VD: ClassTag](
}
private def create(includeSrc: Boolean, includeDst: Boolean)
- : RDD[(Pid, VertexPartition[VD])] = {
+ : RDD[(PartitionID, VertexPartition[VD])] = {
val vdTag = classTag[VD]
// Ship vertex attributes to edge partitions according to vertexPlacement
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
index 9e6f78197e..96d9e9d7f8 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
@@ -30,8 +30,8 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
private def createPid2Vid(
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
- val vid2pid: RDD[(VertexID, Pid)] = edges.partitionsRDD.mapPartitions { iter =>
- val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
+ val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
+ val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size
val vSet = new VertexSet
if (includeSrcAttr) { // Add src vertices to the set.
diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala
index 823d47c359..b98a11b918 100644
--- a/graph/src/main/scala/org/apache/spark/graph/package.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/package.scala
@@ -8,7 +8,7 @@ package object graph {
type VertexID = Long
// TODO: Consider using Char.
- type Pid = Int
+ type PartitionID = Int
type VertexSet = OpenHashSet[VertexID]