aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-09 13:52:07 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-09 14:00:16 -0800
commitda83038234de1a16de38a24633c73fd950d4a85f (patch)
tree2520adf02fc0bc1d60b2933fef2beb9f069fe3ff
parentec12c63409c2db85c27a87813a8d0505ea8f6c21 (diff)
downloadspark-da83038234de1a16de38a24633c73fd950d4a85f.tar.gz
spark-da83038234de1a16de38a24633c73fd950d4a85f.tar.bz2
spark-da83038234de1a16de38a24633c73fd950d4a85f.zip
Vid -> VertexID
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Edge.scala10
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala18
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLab.scala24
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphOps.scala30
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala14
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Pregel.scala8
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala42
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala6
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala12
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala16
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala10
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala32
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala12
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala29
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala16
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala10
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala47
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/package.scala6
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala12
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala18
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala31
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala10
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala18
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala2
31 files changed, 234 insertions, 221 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Edge.scala b/graph/src/main/scala/org/apache/spark/graph/Edge.scala
index 5ac77839eb..19c28bea68 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Edge.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Edge.scala
@@ -11,11 +11,11 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
/**
* The vertex id of the source vertex
*/
- var srcId: Vid = 0,
+ var srcId: VertexID = 0,
/**
* The vertex id of the target vertex.
*/
- var dstId: Vid = 0,
+ var dstId: VertexID = 0,
/**
* The attribute associated with the edge.
*/
@@ -27,7 +27,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
* @param vid the id one of the two vertices on the edge.
* @return the id of the other vertex on the edge.
*/
- def otherVertexId(vid: Vid): Vid =
+ def otherVertexId(vid: VertexID): VertexID =
if (srcId == vid) dstId else { assert(dstId == vid); srcId }
/**
@@ -38,13 +38,13 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
* @return the relative direction of the edge to the corresponding
* vertex.
*/
- def relativeDirection(vid: Vid): EdgeDirection =
+ def relativeDirection(vid: VertexID): EdgeDirection =
if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }
}
object Edge {
def lexicographicOrdering[ED] = new Ordering[Edge[ED]] {
override def compare(a: Edge[ED], b: Edge[ED]): Int =
- Ordering[(Vid, Vid)].compare((a.srcId, a.dstId), (b.srcId, b.dstId))
+ Ordering[(VertexID, VertexID)].compare((a.srcId, a.dstId), (b.srcId, b.dstId))
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
index 230202d6b0..fd93359352 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag](
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
- (f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = {
+ (f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
@@ -66,7 +66,7 @@ class EdgeRDD[@specialized ED: ClassTag](
})
}
- def collectVids(): RDD[Vid] = {
+ def collectVertexIDs(): RDD[VertexID] = {
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
index 5a384a5f84..a5103ed3cb 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
@@ -47,7 +47,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
* @param vid the id one of the two vertices on the edge.
* @return the attribute for the other vertex on the edge.
*/
- def otherVertexAttr(vid: Vid): VD =
+ def otherVertexAttr(vid: VertexID): VD =
if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr }
/**
@@ -56,7 +56,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
* @param vid the id of one of the two vertices on the edge
* @return the attr for the vertex with that id.
*/
- def vertexAttr(vid: Vid): VD =
+ def vertexAttr(vid: VertexID): VD =
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
override def toString() = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index 420d01b426..dd0799142e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -125,7 +125,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* }}}
*
*/
- def mapVertices[VD2: ClassTag](map: (Vid, VD) => VD2): Graph[VD2, ED]
+ def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED]
/**
* Construct a new graph where the value of each edge is
@@ -253,7 +253,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* satisfy the predicates.
*/
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
- vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
+ vpred: (VertexID, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
/**
* Subgraph of this graph with only vertices and edges from the other graph.
@@ -302,7 +302,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* vertex
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
- * val inDeg: RDD[(Vid, Int)] =
+ * val inDeg: RDD[(VertexID, Int)] =
* mapReduceTriplets[Int](et => Array((et.dst.id, 1)), _ + _)
* }}}
*
@@ -314,7 +314,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
*
*/
def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
: VertexRDD[A]
@@ -341,15 +341,15 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
*
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("webgraph")
- * val outDeg: RDD[(Vid, Int)] = rawGraph.outDegrees()
+ * val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees()
* val graph = rawGraph.outerJoinVertices(outDeg) {
* (vid, data, optDeg) => optDeg.getOrElse(0)
* }
* }}}
*
*/
- def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(Vid, U)])
- (mapFunc: (Vid, VD, Option[U]) => VD2)
+ def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(VertexID, U)])
+ (mapFunc: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Save a copy of the GraphOps object so there is always one unique GraphOps object
@@ -377,7 +377,7 @@ object Graph {
* (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex.
*/
def fromEdgeTuples[VD: ClassTag](
- rawEdges: RDD[(Vid, Vid)],
+ rawEdges: RDD[(VertexID, VertexID)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = {
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
@@ -419,7 +419,7 @@ object Graph {
* partitioning the edges.
*/
def apply[VD: ClassTag, ED: ClassTag](
- vertices: RDD[(Vid, VD)],
+ vertices: RDD[(VertexID, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr)
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
index b8c1b5b0f0..296f3848f1 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
@@ -14,7 +14,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
- kryo.register(classOf[(Vid, Object)])
+ kryo.register(classOf[(VertexID, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
index c1ce5cd9cc..22f4854019 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
@@ -42,11 +42,12 @@ object GraphLab extends Logging {
(graph: Graph[VD, ED], numIter: Int,
gatherDirection: EdgeDirection = EdgeDirection.In,
scatterDirection: EdgeDirection = EdgeDirection.Out)
- (gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
+ (gatherFunc: (VertexID, EdgeTriplet[VD, ED]) => A,
mergeFunc: (A, A) => A,
- applyFunc: (Vid, VD, Option[A]) => VD,
- scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
- startVertices: (Vid, VD) => Boolean = (vid: Vid, data: VD) => true): Graph[VD, ED] = {
+ applyFunc: (VertexID, VD, Option[A]) => VD,
+ scatterFunc: (VertexID, EdgeTriplet[VD, ED]) => Boolean,
+ startVertices: (VertexID, VD) => Boolean = (vid: VertexID, data: VD) => true)
+ : Graph[VD, ED] = {
// Add an active attribute to all vertices to track convergence.
@@ -56,7 +57,7 @@ object GraphLab extends Logging {
// The gather function wrapper strips the active attribute and
// only invokes the gather function on active vertices
- def gather(vid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = {
+ def gather(vid: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = {
if (e.vertexAttr(vid)._1) {
val edgeTriplet = new EdgeTriplet[VD,ED]
edgeTriplet.set(e)
@@ -70,7 +71,7 @@ object GraphLab extends Logging {
// The apply function wrapper strips the vertex of the active attribute
// and only invokes the apply function on active vertices
- def apply(vid: Vid, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = {
+ def apply(vid: VertexID, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = {
val (active, vData) = data
if (active) (true, applyFunc(vid, vData, accum))
else (false, vData)
@@ -78,8 +79,8 @@ object GraphLab extends Logging {
// The scatter function wrapper strips the vertex of the active attribute
// and only invokes the scatter function on active vertices
- def scatter(rawVid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = {
- val vid = e.otherVertexId(rawVid)
+ def scatter(rawVertexID: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = {
+ val vid = e.otherVertexId(rawVertexID)
if (e.vertexAttr(vid)._1) {
val edgeTriplet = new EdgeTriplet[VD,ED]
edgeTriplet.set(e)
@@ -92,7 +93,8 @@ object GraphLab extends Logging {
}
// Used to set the active status of vertices for the next round
- def applyActive(vid: Vid, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = {
+ def applyActive(
+ vid: VertexID, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = {
val (prevActive, vData) = data
(newActiveOpt.getOrElse(false), vData)
}
@@ -103,7 +105,7 @@ object GraphLab extends Logging {
while (i < numIter && numActive > 0) {
// Gather
- val gathered: RDD[(Vid, A)] =
+ val gathered: RDD[(VertexID, A)] =
activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
// Apply
@@ -113,7 +115,7 @@ object GraphLab extends Logging {
// Scatter is basically a gather in the opposite direction so we reverse the edge direction
// activeGraph: Graph[(Boolean, VD), ED]
- val scattered: RDD[(Vid, Boolean)] =
+ val scattered: RDD[(VertexID, Boolean)] =
activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache()
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
index 11c6120beb..e41287c1ed 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
@@ -112,7 +112,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
*
*/
def aggregateNeighbors[A: ClassTag](
- mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+ mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
reduceFunc: (A, A) => A,
dir: EdgeDirection)
: VertexRDD[A] = {
@@ -151,25 +151,27 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* @return the vertex set of neighboring ids for each vertex.
*/
def collectNeighborIds(edgeDirection: EdgeDirection) :
- VertexRDD[Array[Vid]] = {
+ VertexRDD[Array[VertexID]] = {
val nbrs =
if (edgeDirection == EdgeDirection.Both) {
- graph.mapReduceTriplets[Array[Vid]](
+ graph.mapReduceTriplets[Array[VertexID]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _
)
} else if (edgeDirection == EdgeDirection.Out) {
- graph.mapReduceTriplets[Array[Vid]](
+ graph.mapReduceTriplets[Array[VertexID]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId))),
reduceFunc = _ ++ _)
} else if (edgeDirection == EdgeDirection.In) {
- graph.mapReduceTriplets[Array[Vid]](
+ graph.mapReduceTriplets[Array[VertexID]](
mapFunc = et => Iterator((et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _)
} else {
throw new SparkException("It doesn't make sense to collect neighbor ids without a direction.")
}
- graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[Vid]) }
+ graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
+ nbrsOpt.getOrElse(Array.empty[VertexID])
+ }
} // end of collectNeighborIds
@@ -187,14 +189,16 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* vertex.
*/
def collectNeighbors(edgeDirection: EdgeDirection) :
- VertexRDD[ Array[(Vid, VD)] ] = {
- val nbrs = graph.aggregateNeighbors[Array[(Vid,VD)]](
+ VertexRDD[ Array[(VertexID, VD)] ] = {
+ val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]](
(vid, edge) =>
Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )),
(a, b) => a ++ b,
edgeDirection)
- graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[(Vid, VD)]) }
+ graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
+ nbrsOpt.getOrElse(Array.empty[(VertexID, VD)])
+ }
} // end of collectNeighbor
@@ -228,9 +232,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* }}}
*
*/
- def joinVertices[U: ClassTag](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD)
+ def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD)
: Graph[VD, ED] = {
- val uf = (id: Vid, data: VD, o: Option[U]) => {
+ val uf = (id: VertexID, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
@@ -259,7 +263,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* val degrees: VertexSetRDD[Int] = graph.outDegrees
* graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
* },
- * vpred = (vid: Vid, deg:Int) => deg > 0
+ * vpred = (vid: VertexID, deg:Int) => deg > 0
* )
* }}}
*
@@ -267,7 +271,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
def filter[VD2: ClassTag, ED2: ClassTag](
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
- vpred: (Vid, VD2) => Boolean = (v:Vid, d:VD2) => true): Graph[VD, ED] = {
+ vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = {
graph.mask(preprocess(graph).subgraph(epred, vpred))
}
} // end of GraphOps
diff --git a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
index 293a9d588a..c01b4b9439 100644
--- a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
@@ -2,7 +2,7 @@ package org.apache.spark.graph
sealed trait PartitionStrategy extends Serializable {
- def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid
+ def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid
}
@@ -51,9 +51,9 @@ sealed trait PartitionStrategy extends Serializable {
*
*/
case object EdgePartition2D extends PartitionStrategy {
- override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt
- val mixingPrime: Vid = 1125899906842597L
+ val mixingPrime: VertexID = 1125899906842597L
val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
@@ -62,8 +62,8 @@ case object EdgePartition2D extends PartitionStrategy {
case object EdgePartition1D extends PartitionStrategy {
- override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
- val mixingPrime: Vid = 1125899906842597L
+ override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
+ val mixingPrime: VertexID = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
}
}
@@ -74,7 +74,7 @@ case object EdgePartition1D extends PartitionStrategy {
* random vertex cut.
*/
case object RandomVertexCut extends PartitionStrategy {
- override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
math.abs((src, dst).hashCode()) % numParts
}
}
@@ -86,7 +86,7 @@ case object RandomVertexCut extends PartitionStrategy {
* will end up on the same partition.
*/
case object CanonicalRandomVertexCut extends PartitionStrategy {
- override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)
math.abs((lower, higher).hashCode()) % numParts
diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 4664091b57..3b84e2e5e4 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -25,9 +25,9 @@ import scala.reflect.ClassTag
* // Set the vertex attributes to the initial pagerank values
* .mapVertices( (id, attr) => 1.0 )
*
- * def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
+ * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
* resetProb + (1.0 - resetProb) * msgSum
- * def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
+ * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Option[Double] =
* Some(edge.srcAttr * edge.attr)
* def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0
@@ -88,8 +88,8 @@ object Pregel {
*/
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)(
- vprog: (Vid, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
+ vprog: (VertexID, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
index c5fb4aeca7..25b0aed85a 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
@@ -29,7 +29,7 @@ import org.apache.spark.graph.impl.VertexPartition
/**
- * A `VertexRDD[VD]` extends the `RDD[(Vid, VD)]` by ensuring that there is
+ * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is
* only one entry for each vertex and by pre-indexing the entries for fast,
* efficient joins.
*
@@ -40,12 +40,12 @@ import org.apache.spark.graph.impl.VertexPartition
* @example Construct a `VertexRDD` from a plain RDD
* {{{
* // Construct an intial vertex set
- * val someData: RDD[(Vid, SomeType)] = loadData(someFile)
+ * val someData: RDD[(VertexID, SomeType)] = loadData(someFile)
* val vset = VertexRDD(someData)
* // If there were redundant values in someData we would use a reduceFunc
* val vset2 = VertexRDD(someData, reduceFunc)
* // Finally we can use the VertexRDD to index another dataset
- * val otherData: RDD[(Vid, OtherType)] = loadData(otherFile)
+ * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile)
* val vset3 = VertexRDD(otherData, vset.index)
* // Now we can construct very fast joins between the two sets
* val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
@@ -54,7 +54,7 @@ import org.apache.spark.graph.impl.VertexPartition
*/
class VertexRDD[@specialized VD: ClassTag](
val partitionsRDD: RDD[VertexPartition[VD]])
- extends RDD[(Vid, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+ extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
require(partitionsRDD.partitioner.isDefined)
@@ -104,9 +104,9 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
- * Provide the `RDD[(Vid, VD)]` equivalent output.
+ * Provide the `RDD[(VertexID, VD)]` equivalent output.
*/
- override def compute(part: Partition, context: TaskContext): Iterator[(Vid, VD)] = {
+ override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = {
firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
}
@@ -125,14 +125,14 @@ class VertexRDD[@specialized VD: ClassTag](
* given predicate.
*
* @param pred the user defined predicate, which takes a tuple to conform to
- * the RDD[(Vid, VD)] interface
+ * the RDD[(VertexID, VD)] interface
*
* @note The vertex set preserves the original index structure
* which means that the returned RDD can be easily joined with
* the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
- override def filter(pred: Tuple2[Vid, VD] => Boolean): VertexRDD[VD] =
+ override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] =
this.mapVertexPartitions(_.filter(Function.untupled(pred)))
/**
@@ -160,7 +160,7 @@ class VertexRDD[@specialized VD: ClassTag](
* each of the entries in the original VertexRDD. The resulting
* VertexRDD retains the same index.
*/
- def mapValues[VD2: ClassTag](f: (Vid, VD) => VD2): VertexRDD[VD2] =
+ def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
/**
@@ -197,7 +197,7 @@ class VertexRDD[@specialized VD: ClassTag](
*
*/
def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
- (other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
+ (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
@@ -228,8 +228,8 @@ class VertexRDD[@specialized VD: ClassTag](
* VertexRDD with the attribute emitted by f.
*/
def leftJoin[VD2: ClassTag, VD3: ClassTag]
- (other: RDD[(Vid, VD2)])
- (f: (Vid, VD, Option[VD2]) => VD3)
+ (other: RDD[(VertexID, VD2)])
+ (f: (VertexID, VD, Option[VD2]) => VD3)
: VertexRDD[VD3] =
{
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
@@ -254,7 +254,7 @@ class VertexRDD[@specialized VD: ClassTag](
* must have the same index.
*/
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
- (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
+ (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
@@ -269,8 +269,8 @@ class VertexRDD[@specialized VD: ClassTag](
* Replace vertices with corresponding vertices in `other`, and drop vertices without a
* corresponding vertex in `other`.
*/
- def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(Vid, U)])
- (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
+ def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
+ (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin
other match {
@@ -293,7 +293,7 @@ class VertexRDD[@specialized VD: ClassTag](
* co-indexed with this one.
*/
def aggregateUsingIndex[VD2: ClassTag](
- messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
+ messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
{
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
@@ -319,8 +319,8 @@ object VertexRDD {
*
* @param rdd the collection of vertex-attribute pairs
*/
- def apply[VD: ClassTag](rdd: RDD[(Vid, VD)]): VertexRDD[VD] = {
- val partitioned: RDD[(Vid, VD)] = rdd.partitioner match {
+ def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = {
+ val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
}
@@ -339,9 +339,9 @@ object VertexRDD {
* @param rdd the collection of vertex-attribute pairs
* @param mergeFunc the associative, commutative merge function.
*/
- def apply[VD: ClassTag](rdd: RDD[(Vid, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
+ def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
{
- val partitioned: RDD[(Vid, VD)] = rdd.partitioner match {
+ val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
}
@@ -351,7 +351,7 @@ object VertexRDD {
new VertexRDD(vertexPartitions)
}
- def apply[VD: ClassTag](vids: RDD[Vid], rdd: RDD[(Vid, VD)], defaultVal: VD)
+ def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD)
: VertexRDD[VD] =
{
VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala
index 7cd947d2ba..2a6b8c0999 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala
@@ -16,10 +16,10 @@ object ConnectedComponents {
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
- def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]): Graph[Vid, ED] = {
+ def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]): Graph[VertexID, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
- def sendMessage(edge: EdgeTriplet[Vid, ED]) = {
+ def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala
index f77dffd7b4..26b8dc5ab6 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala
@@ -65,7 +65,7 @@ object PageRank extends Logging {
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
- def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
+ def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]) =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
@@ -129,7 +129,7 @@ object PageRank extends Logging {
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
- def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = {
+ def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala
index c324c984d7..8031aa10ce 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala
@@ -15,7 +15,7 @@ object StronglyConnectedComponents {
*
* @return a graph with vertex attributes containing the smallest vertex id in each SCC
*/
- def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int): Graph[Vid, ED] = {
+ def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = {
// the graph we update with final SCC ids, and the graph we return at the end
var sccGraph = graph.mapVertices { case (vid, _) => vid }
@@ -52,7 +52,7 @@ object StronglyConnectedComponents {
// collect min of all my neighbor's scc values, update if it's smaller than mine
// then notify any neighbors with scc values larger than mine
- sccWorkGraph = GraphLab[(Vid, Boolean), ED, Vid](sccWorkGraph, Integer.MAX_VALUE)(
+ sccWorkGraph = GraphLab[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Integer.MAX_VALUE)(
(vid, e) => e.otherVertexAttr(vid)._1,
(vid1, vid2) => math.min(vid1, vid2),
(vid, scc, optScc) =>
@@ -62,7 +62,7 @@ object StronglyConnectedComponents {
// start at root of SCCs. Traverse values in reverse, notify all my neighbors
// do not propagate if colors do not match!
- sccWorkGraph = GraphLab[(Vid, Boolean), ED, Boolean](
+ sccWorkGraph = GraphLab[(VertexID, Boolean), ED, Boolean](
sccWorkGraph,
Integer.MAX_VALUE,
EdgeDirection.Out,
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala
index 18395bdc5f..85fa23d309 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala
@@ -51,12 +51,12 @@ object Svdpp {
// calculate initial bias and norm
var t0 = g.mapReduceTriplets(et =>
Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
- g = g.outerJoinVertices(t0) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
+ g = g.outerJoinVertices(t0) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}
def mapTrainF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
- : Iterator[(Vid, (RealVector, RealVector, Double))] = {
+ : Iterator[(VertexID, (RealVector, RealVector, Double))] = {
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
@@ -73,19 +73,19 @@ object Svdpp {
for (i <- 0 until conf.maxIters) {
// phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2))
- g = g.outerJoinVertices(t1) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
+ g = g.outerJoinVertices(t1) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
}
// phase 2, update p for user nodes and q, y for item nodes
val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
(g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
- g = g.outerJoinVertices(t2) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) =>
+ g = g.outerJoinVertices(t2) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) =>
(vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
}
}
// calculate error on training set
- def mapTestF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(Vid, Double)] = {
+ def mapTestF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(VertexID, Double)] = {
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
@@ -95,7 +95,7 @@ object Svdpp {
Iterator((et.dstId, err))
}
val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
- g = g.outerJoinVertices(t3) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
+ g = g.outerJoinVertices(t3) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
}
(g, u)
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
index a6384320ba..81774d52e4 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
@@ -46,7 +46,7 @@ object TriangleCount {
(vid, _, optSet) => optSet.getOrElse(null)
}
// Edge function computes intersection of smaller vertex with larger vertex
- def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(Vid, Int)] = {
+ def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = {
assert(et.srcAttr != null)
assert(et.dstAttr != null)
val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
index 7ae4d7df43..b4311fa9f8 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
@@ -16,10 +16,10 @@ import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
* @tparam ED the edge attribute type.
*/
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
- val srcIds: Array[Vid],
- val dstIds: Array[Vid],
+ val srcIds: Array[VertexID],
+ val dstIds: Array[VertexID],
val data: Array[ED],
- val index: PrimitiveKeyOpenHashMap[Vid, Int]) extends Serializable {
+ val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable {
/**
* Reverse all the edges in this partition.
@@ -101,8 +101,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
val builder = new EdgePartitionBuilder[ED]
var firstIter: Boolean = true
- var currSrcId: Vid = nullValue[Vid]
- var currDstId: Vid = nullValue[Vid]
+ var currSrcId: VertexID = nullValue[VertexID]
+ var currDstId: VertexID = nullValue[VertexID]
var currAttr: ED = nullValue[ED]
var i = 0
while (i < size) {
@@ -136,7 +136,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2])
- (f: (Vid, Vid, ED, ED2) => ED3): EdgePartition[ED3] = {
+ (f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = {
val builder = new EdgePartitionBuilder[ED3]
var i = 0
var j = 0
@@ -193,14 +193,14 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* iterator is generated using an index scan, so it is efficient at skipping edges that don't
* match srcIdPred.
*/
- def indexIterator(srcIdPred: Vid => Boolean): Iterator[Edge[ED]] =
+ def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] =
index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
/**
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
* cluster must start at position `index`.
*/
- private def clusterIterator(srcId: Vid, index: Int) = new Iterator[Edge[ED]] {
+ private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED]
private[this] var pos = index
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
index ae3f3a6d03..56624ef60a 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
@@ -13,22 +13,22 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
var edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */
- def add(src: Vid, dst: Vid, d: ED) {
+ def add(src: VertexID, dst: VertexID, d: ED) {
edges += Edge(src, dst, d)
}
def toEdgePartition: EdgePartition[ED] = {
val edgeArray = edges.trim().array
Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
- val srcIds = new Array[Vid](edgeArray.size)
- val dstIds = new Array[Vid](edgeArray.size)
+ val srcIds = new Array[VertexID](edgeArray.size)
+ val dstIds = new Array[VertexID](edgeArray.size)
val data = new Array[ED](edgeArray.size)
- val index = new PrimitiveKeyOpenHashMap[Vid, Int]
+ val index = new PrimitiveKeyOpenHashMap[VertexID, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index
if (edgeArray.length > 0) {
index.update(srcIds(0), 0)
- var currSrcId: Vid = srcIds(0)
+ var currSrcId: VertexID = srcIds(0)
var i = 0
while (i < edgeArray.size) {
srcIds(i) = edgeArray(i).srcId
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala
index 4d5eb240a9..e95d79e3d6 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala
@@ -25,7 +25,7 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
// allocating too many temporary Java objects.
private val triplet = new EdgeTriplet[VD, ED]
- private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
+ private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray)
override def hasNext: Boolean = pos < edgePartition.size
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 2ce5404e94..6eb401b3b5 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -89,7 +89,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def statistics: Map[String, Any] = {
// Get the total number of vertices after replication, used to compute the replication ratio.
- def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = {
+ def numReplicatedVertices(vid2pids: RDD[Array[Array[VertexID]]]): Double = {
vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble
}
@@ -157,7 +157,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
}
- override def mapVertices[VD2: ClassTag](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
+ override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) {
// The map preserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f))
@@ -208,7 +208,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = x => true,
- vpred: (Vid, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
+ vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
// Filter the vertices, reusing the partitioner and the index from this graph
val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
@@ -250,7 +250,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
//////////////////////////////////////////////////////////////////////////////////////////////////
override def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
@@ -280,14 +280,14 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVid => vPart.isActive(srcVid))
+ edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
.filter(e => vPart.isActive(e.dstId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
}
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVid => vPart.isActive(srcVid))
+ edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
}
@@ -318,7 +318,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
} // end of mapReduceTriplets
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
- (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = {
+ (updates: RDD[(VertexID, U)])(updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) {
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(updates)(updateF)
@@ -360,7 +360,7 @@ object GraphImpl {
}
def apply[VD: ClassTag, ED: ClassTag](
- vertices: RDD[(Vid, VD)],
+ vertices: RDD[(VertexID, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
@@ -369,7 +369,7 @@ object GraphImpl {
// Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices)
val vPartitioned = vertices.partitionBy(partitioner)
- val vidsFromEdges = collectVidsFromEdges(edgeRDD, partitioner)
+ val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner)
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
}
@@ -381,7 +381,7 @@ object GraphImpl {
/**
* Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
- * data structure (RDD[(Vid, Vid, ED)]).
+ * data structure (RDD[(VertexID, VertexID, ED)]).
*
* The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
* pair: the key is the partition id, and the value is an EdgePartition object containing all the
@@ -404,19 +404,19 @@ object GraphImpl {
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
edges.cache()
// Get the set of all vids
- val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size))
+ val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size))
// Create the VertexRDD.
val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
new GraphImpl(vertices, edges)
}
/** Collects all vids mentioned in edges and partitions them by partitioner. */
- private def collectVidsFromEdges(
+ private def collectVertexIDsFromEdges(
edges: EdgeRDD[_],
- partitioner: Partitioner): RDD[(Vid, Int)] = {
+ partitioner: Partitioner): RDD[(VertexID, Int)] = {
// TODO: Consider doing map side distinct before shuffle.
- new ShuffledRDD[Vid, Int, (Vid, Int)](
- edges.collectVids.map(vid => (vid, 0)), partitioner)
- .setSerializer(classOf[VidMsgSerializer].getName)
+ new ShuffledRDD[VertexID, Int, (VertexID, Int)](
+ edges.collectVertexIDs.map(vid => (vid, 0)), partitioner)
+ .setSerializer(classOf[VertexIDMsgSerializer].getName)
}
} // end of object GraphImpl
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
index bf033945de..2d03f75a28 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
@@ -3,15 +3,15 @@ package org.apache.spark.graph.impl
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.Partitioner
-import org.apache.spark.graph.{Pid, Vid}
+import org.apache.spark.graph.{Pid, VertexID}
import org.apache.spark.rdd.{ShuffledRDD, RDD}
class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
@transient var partition: Pid,
- var vid: Vid,
+ var vid: VertexID,
var data: T)
- extends Product2[Pid, (Vid, T)] with Serializable {
+ extends Product2[Pid, (VertexID, T)] with Serializable {
override def _1 = partition
@@ -41,7 +41,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef
class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
- val rdd = new ShuffledRDD[Pid, (Vid, T), VertexBroadcastMsg[T]](self, partitioner)
+ val rdd = new ShuffledRDD[Pid, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
@@ -77,8 +77,8 @@ object MsgRDDFunctions {
new VertexBroadcastMsgRDDFunctions(rdd)
}
- def partitionForAggregation[T: ClassTag](msgs: RDD[(Vid, T)], partitioner: Partitioner) = {
- val rdd = new ShuffledRDD[Vid, T, (Vid, T)](msgs, partitioner)
+ def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = {
+ val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
index 970acfed27..9d2d242ffa 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
@@ -31,9 +31,9 @@ class ReplicatedVertexView[VD: ClassTag](
* vids from both the source and destination of edges. It must always include both source and
* destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
*/
- private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
+ private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
case Some(prevView) =>
- prevView.localVidMap
+ prevView.localVertexIDMap
case None =>
edges.partitionsRDD.mapPartitions(_.map {
case (pid, epart) =>
@@ -43,7 +43,7 @@ class ReplicatedVertexView[VD: ClassTag](
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
- }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVidMap")
+ }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap")
}
private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true)
@@ -104,8 +104,8 @@ class ReplicatedVertexView[VD: ClassTag](
case None =>
// Within each edge partition, place the shipped vertex attributes into the correct
- // locations specified in localVidMap
- localVidMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
+ // locations specified in localVertexIDMap
+ localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
@@ -128,15 +128,15 @@ class ReplicatedVertexView[VD: ClassTag](
object ReplicatedVertexView {
protected def buildBuffer[VD: ClassTag](
- pid2vidIter: Iterator[Array[Array[Vid]]],
+ pid2vidIter: Iterator[Array[Array[VertexID]]],
vertexPartIter: Iterator[VertexPartition[VD]]) = {
- val pid2vid: Array[Array[Vid]] = pid2vidIter.next()
+ val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
val vertexPart: VertexPartition[VD] = vertexPartIter.next()
Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length
- val vids = new PrimitiveVector[Vid](pid2vid(pid).size)
+ val vids = new PrimitiveVector[VertexID](pid2vid(pid).size)
val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
var i = 0
while (i < size) {
@@ -152,16 +152,16 @@ object ReplicatedVertexView {
}
protected def buildActiveBuffer(
- pid2vidIter: Iterator[Array[Array[Vid]]],
+ pid2vidIter: Iterator[Array[Array[VertexID]]],
activePartIter: Iterator[VertexPartition[_]])
- : Iterator[(Int, Array[Vid])] = {
- val pid2vid: Array[Array[Vid]] = pid2vidIter.next()
+ : Iterator[(Int, Array[VertexID])] = {
+ val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
val activePart: VertexPartition[_] = activePartIter.next()
Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length
- val actives = new PrimitiveVector[Vid](vidsCandidate.size)
+ val actives = new PrimitiveVector[VertexID](vidsCandidate.size)
var i = 0
while (i < size) {
val vid = vidsCandidate(i)
@@ -175,7 +175,8 @@ object ReplicatedVertexView {
}
}
-class VertexAttributeBlock[VD: ClassTag](val vids: Array[Vid], val attrs: Array[VD])
+class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD])
extends Serializable {
- def iterator: Iterator[(Vid, VD)] = (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
+ def iterator: Iterator[(VertexID, VD)] =
+ (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
index b6cd048b33..9e6f78197e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
@@ -14,12 +14,12 @@ import org.apache.spark.util.collection.PrimitiveVector
*/
class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
- val bothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(true, true)
- val srcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(true, false)
- val dstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(false, true)
- val noAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(false, false)
+ val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true)
+ val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false)
+ val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true)
+ val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false)
- def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] =
+ def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] =
(includeSrcAttr, includeDstAttr) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
@@ -28,9 +28,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
}
private def createPid2Vid(
- includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = {
+ includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
- val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter =>
+ val vid2pid: RDD[(VertexID, Pid)] = edges.partitionsRDD.mapPartitions { iter =>
val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size
val vSet = new VertexSet
@@ -53,7 +53,7 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
val numPartitions = vertices.partitions.size
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
- val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid])
+ val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID])
for ((vid, pid) <- iter) {
pid2vid(pid) += vid
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
index dcf619fa85..a3b0ea7689 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
@@ -7,12 +7,12 @@ import org.apache.spark.SparkConf
import org.apache.spark.graph._
import org.apache.spark.serializer._
-class VidMsgSerializer(conf: SparkConf) extends Serializer {
+class VertexIDMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(Vid, _)]
+ val msg = t.asInstanceOf[(VertexID, _)]
writeVarLong(msg._1, optimizePositive = false)
this
}
@@ -101,7 +101,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(Vid, Int)]
+ val msg = t.asInstanceOf[(VertexID, Int)]
writeVarLong(msg._1, optimizePositive = false)
writeUnsignedVarInt(msg._2)
this
@@ -124,7 +124,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(Vid, Long)]
+ val msg = t.asInstanceOf[(VertexID, Long)]
writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg._2, optimizePositive = true)
this
@@ -147,7 +147,7 @@ class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(Vid, Double)]
+ val msg = t.asInstanceOf[(VertexID, Double)]
writeVarLong(msg._1, optimizePositive = false)
writeDouble(msg._2)
this
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
index 7048a40f42..91244daa54 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
@@ -10,18 +10,18 @@ import org.apache.spark.graph._
private[graph] object VertexPartition {
- def apply[VD: ClassTag](iter: Iterator[(Vid, VD)]): VertexPartition[VD] = {
- val map = new PrimitiveKeyOpenHashMap[Vid, VD]
+ def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = {
+ val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
iter.foreach { case (k, v) =>
map(k) = v
}
new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
}
- def apply[VD: ClassTag](iter: Iterator[(Vid, VD)], mergeFunc: (VD, VD) => VD)
+ def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD)
: VertexPartition[VD] =
{
- val map = new PrimitiveKeyOpenHashMap[Vid, VD]
+ val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
iter.foreach { case (k, v) =>
map.setMerge(k, v, mergeFunc)
}
@@ -44,15 +44,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
def size: Int = mask.cardinality()
/** Return the vertex attribute for the given vertex ID. */
- def apply(vid: Vid): VD = values(index.getPos(vid))
+ def apply(vid: VertexID): VD = values(index.getPos(vid))
- def isDefined(vid: Vid): Boolean = {
+ def isDefined(vid: VertexID): Boolean = {
val pos = index.getPos(vid)
pos >= 0 && mask.get(pos)
}
/** Look up vid in activeSet, throwing an exception if it is None. */
- def isActive(vid: Vid): Boolean = {
+ def isActive(vid: VertexID): Boolean = {
activeSet.get.contains(vid)
}
@@ -72,7 +72,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* each of the entries in the original VertexRDD. The resulting
* VertexPartition retains the same index.
*/
- def map[VD2: ClassTag](f: (Vid, VD) => VD2): VertexPartition[VD2] = {
+ def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = {
// Construct a view of the map transformation
val newValues = new Array[VD2](capacity)
var i = mask.nextSetBit(0)
@@ -92,7 +92,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* RDD can be easily joined with the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
- def filter(pred: (Vid, VD) => Boolean): VertexPartition[VD] = {
+ def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = {
// Allocate the array to store the results into
val newMask = new BitSet(capacity)
// Iterate over the active bits in the old mask and evaluate the predicate
@@ -130,7 +130,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
/** Left outer join another VertexPartition. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexPartition[VD2])
- (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
leftJoin(createUsingIndex(other.iterator))(f)
@@ -149,14 +149,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
/** Left outer join another iterator of messages. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
- (other: Iterator[(Vid, VD2)])
- (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ (other: Iterator[(VertexID, VD2)])
+ (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
leftJoin(createUsingIndex(other))(f)
}
/** Inner join another VertexPartition. */
def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
- (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
+ (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
innerJoin(createUsingIndex(other.iterator))(f)
@@ -176,15 +176,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Inner join an iterator of messages.
*/
def innerJoin[U: ClassTag, VD2: ClassTag]
- (iter: Iterator[Product2[Vid, U]])
- (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
+ (iter: Iterator[Product2[VertexID, U]])
+ (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
innerJoin(createUsingIndex(iter))(f)
}
/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
- def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[Vid, VD2]])
+ def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]])
: VertexPartition[VD2] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
@@ -202,7 +202,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
* the partition, hidden by the bitmask.
*/
- def innerJoinKeepLeft(iter: Iterator[Product2[Vid, VD]]): VertexPartition[VD] = {
+ def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD](capacity)
System.arraycopy(values, 0, newValues, 0, newValues.length)
@@ -217,8 +217,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
}
def aggregateUsingIndex[VD2: ClassTag](
- iter: Iterator[Product2[Vid, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] =
- {
+ iter: Iterator[Product2[VertexID, VD2]],
+ reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
iter.foreach { product =>
@@ -237,7 +237,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition[VD2](index, newValues, newMask)
}
- def replaceActives(iter: Iterator[Vid]): VertexPartition[VD] = {
+ def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = {
val newActiveSet = new VertexSet
iter.foreach(newActiveSet.add(_))
new VertexPartition(index, values, mask, Some(newActiveSet))
@@ -247,7 +247,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): VertexPartition[VD] = {
- val hashMap = new PrimitiveKeyOpenHashMap[Vid, VD]
+ val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- this.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
@@ -255,7 +255,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
}
- def iterator: Iterator[(Vid, VD)] = mask.iterator.map(ind => (index.getValue(ind), values(ind)))
+ def iterator: Iterator[(VertexID, VD)] =
+ mask.iterator.map(ind => (index.getValue(ind), values(ind)))
- def vidIterator: Iterator[Vid] = mask.iterator.map(ind => index.getValue(ind))
+ def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind))
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala
index 655ae53bf8..823d47c359 100644
--- a/graph/src/main/scala/org/apache/spark/graph/package.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/package.scala
@@ -5,15 +5,15 @@ import org.apache.spark.util.collection.OpenHashSet
package object graph {
- type Vid = Long
+ type VertexID = Long
// TODO: Consider using Char.
type Pid = Int
- type VertexSet = OpenHashSet[Vid]
+ type VertexSet = OpenHashSet[VertexID]
// type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
- type VertexIdToIndexMap = OpenHashSet[Vid]
+ type VertexIdToIndexMap = OpenHashSet[VertexID]
/**
* Return the default null-like value for a data type T.
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
index d61f358bb0..51f45cb892 100644
--- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
@@ -70,7 +70,7 @@ object GraphGenerators {
val sigma = 1.3
//val vertsAndEdges = (0 until numVertices).flatMap { src => {
- val vertices: RDD[(Vid, Int)] = sc.parallelize(0 until numVertices).map{
+ val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{
src => (src, sampleLogNormal(mu, sigma, numVertices))
}
@@ -92,11 +92,11 @@ object GraphGenerators {
}
- def generateRandomEdges(src: Int, numEdges: Int, maxVid: Int): Array[Edge[Int]] = {
+ def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = {
val rand = new Random()
var dsts: Set[Int] = Set()
while (dsts.size < numEdges) {
- val nextDst = rand.nextInt(maxVid)
+ val nextDst = rand.nextInt(maxVertexID)
if (nextDst != src) {
dsts += nextDst
}
@@ -251,9 +251,9 @@ object GraphGenerators {
*/
def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
// Convert row column address into vertex ids (row major order)
- def sub2ind(r: Int, c: Int): Vid = r * cols + c
+ def sub2ind(r: Int, c: Int): VertexID = r * cols + c
- val vertices: RDD[(Vid, (Int,Int))] =
+ val vertices: RDD[(VertexID, (Int,Int))] =
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) =>
@@ -273,7 +273,7 @@ object GraphGenerators {
* being the center vertex.
*/
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
- val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
+ val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
Graph.fromEdgeTuples(edges, 1)
} // end of starGraph
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala
index 9e9213631f..132e6be24a 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala
@@ -11,7 +11,8 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
test("aggregateNeighbors") {
withSpark { sc =>
val n = 3
- val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1)
+ val star =
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1)
val indegrees = star.aggregateNeighbors(
(vid, edge) => Some(1),
@@ -26,21 +27,22 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
assert(outdegrees.collect().toSet === Set((0, n)))
val noVertexValues = star.aggregateNeighbors[Int](
- (vid: Vid, edge: EdgeTriplet[Int, Int]) => None,
+ (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None,
(a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"),
EdgeDirection.In)
- assert(noVertexValues.collect().toSet === Set.empty[(Vid, Int)])
+ assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)])
}
}
test("joinVertices") {
withSpark { sc =>
- val vertices = sc.parallelize(Seq[(Vid, String)]((1, "one"), (2, "two"), (3, "three")), 2)
+ val vertices =
+ sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2)
val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
val g: Graph[String, String] = Graph(vertices, edges)
- val tbl = sc.parallelize(Seq[(Vid, Int)]((1, 10), (2, 20)))
- val g1 = g.joinVertices(tbl) { (vid: Vid, attr: String, u: Int) => attr + u }
+ val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20)))
+ val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u }
val v = g1.vertices.collect().toSet
assert(v === Set((1, "one10"), (2, "two20"), (3, "three")))
@@ -67,7 +69,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
test ("filter") {
withSpark { sc =>
val n = 5
- val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
val graph: Graph[Int, Int] = Graph(vertices, edges)
val filteredGraph = graph.filter(
@@ -75,7 +77,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
val degrees: VertexRDD[Int] = graph.outDegrees
graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
},
- vpred = (vid: Vid, deg:Int) => deg > 0
+ vpred = (vid: VertexID, deg:Int) => deg > 0
)
val v = filteredGraph.vertices.collect().toSet
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index e6c19dbc40..41f3a8311d 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -9,7 +9,7 @@ import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext {
def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = {
- Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v")
}
test("Graph.fromEdgeTuples") {
@@ -39,7 +39,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L)
val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) }
- val vertices: RDD[(Vid, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
+ val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
val graph = Graph(vertices, edges, false)
assert( graph.edges.count() === rawEdges.size )
// Vertices not explicitly provided but referenced by edges should be created automatically
@@ -56,7 +56,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val n = 5
val star = starGraph(sc, n)
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
- (1 to n).map(x => (0: Vid, x: Vid, "v", "v")).toSet)
+ (1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet)
}
}
@@ -92,7 +92,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val p = 100
val verts = 1 to n
val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
- verts.filter(y => y % x == 0).map(y => (x: Vid, y: Vid))), p), 0)
+ verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0)
assert(graph.edges.partitions.length === p)
val partitionedGraph = graph.partitionBy(EdgePartition2D)
assert(graph.edges.partitions.length === p)
@@ -118,10 +118,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n)
// mapVertices preserving type
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
- assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: Vid, "v2")).toSet)
+ assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet)
// mapVertices changing type
val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
- assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet)
+ assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet)
}
}
@@ -150,7 +150,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
- assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: Vid, 1)).toSet)
+ assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet)
}
}
@@ -173,7 +173,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mask") {
withSpark { sc =>
val n = 5
- val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
val graph: Graph[Int, Int] = Graph(vertices, edges)
@@ -199,7 +199,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val n = 5
val star = starGraph(sc, n)
val doubleStar = Graph.fromEdgeTuples(
- sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v")
+ sc.parallelize((1 to n).flatMap(x =>
+ List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v")
val star2 = doubleStar.groupEdges { (a, b) => a}
assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
@@ -218,7 +219,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
// activeSetOpt
- val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: Vid, y: Vid)
+ val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID)
val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
val vids = complete.mapVertices((vid, attr) => vid).cache()
val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
@@ -229,11 +230,11 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
Iterator((et.srcId, 1))
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
- assert(numEvenNeighbors === (1 to n).map(x => (x: Vid, n / 2)).toSet)
+ assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet)
// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
- val ring = Graph.fromEdgeTuples(sc.parallelize((0 until n).map(x => (x: Vid, (x+1) % n: Vid)), 3), 0)
- .mapVertices((vid, attr) => vid).cache()
+ val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3)
+ val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_)
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
@@ -243,7 +244,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
Iterator((et.dstId, 1))
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
- assert(numOddNeighbors === (2 to n by 2).map(x => (x: Vid, 1)).toSet)
+ assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet)
}
}
@@ -258,7 +259,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect.toSet
- assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0)))
+ assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0)))
// outerJoinVertices preserving type
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
val newReverseStar =
diff --git a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala
index 44182e85ee..de7e3872ca 100644
--- a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala
@@ -10,7 +10,8 @@ class PregelSuite extends FunSuite with LocalSparkContext {
test("1 iteration") {
withSpark { sc =>
val n = 5
- val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
+ val star =
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v")
val result = Pregel(star, 0)(
(vid, attr, msg) => attr,
et => Iterator.empty,
@@ -23,11 +24,12 @@ class PregelSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val chain = Graph.fromEdgeTuples(
- sc.parallelize((1 until n).map(x => (x: Vid, x + 1: Vid)), 3),
+ sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3),
0).cache()
- assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: Vid, 0)).toSet)
+ assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet)
val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }
- assert(chainWithSeed.vertices.collect.toSet === Set((1: Vid, 1)) ++ (2 to n).map(x => (x: Vid, 0)).toSet)
+ assert(chainWithSeed.vertices.collect.toSet ===
+ Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet)
val result = Pregel(chainWithSeed, 0)(
(vid, attr, msg) => math.max(msg, attr),
et => Iterator((et.dstId, et.srcAttr)),
diff --git a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
index 4014cbe440..2864ffd1ca 100644
--- a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
@@ -82,7 +82,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("IntAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: Vid, 5)
+ val outMsg = (4: VertexID, 5)
val bout = new ByteArrayOutputStream
val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -90,8 +90,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (Vid, Int) = inStrm.readObject()
- val inMsg2: (Vid, Int) = inStrm.readObject()
+ val inMsg1: (VertexID, Int) = inStrm.readObject()
+ val inMsg2: (VertexID, Int) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
@@ -102,7 +102,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("LongAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: Vid, 1L << 32)
+ val outMsg = (4: VertexID, 1L << 32)
val bout = new ByteArrayOutputStream
val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -110,8 +110,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (Vid, Long) = inStrm.readObject()
- val inMsg2: (Vid, Long) = inStrm.readObject()
+ val inMsg1: (VertexID, Long) = inStrm.readObject()
+ val inMsg2: (VertexID, Long) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
@@ -122,7 +122,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("DoubleAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: Vid, 5.0)
+ val outMsg = (4: VertexID, 5.0)
val bout = new ByteArrayOutputStream
val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -130,8 +130,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (Vid, Double) = inStrm.readObject()
- val inMsg2: (Vid, Double) = inStrm.readObject()
+ val inMsg1: (VertexID, Double) = inStrm.readObject()
+ val inMsg2: (VertexID, Double) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
index f951fd7a82..fd0beee2f6 100644
--- a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
@@ -62,7 +62,7 @@ class EdgePartitionSuite extends FunSuite {
test("innerJoin") {
def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
val builder = new EdgePartitionBuilder[A]
- for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) }
+ for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) }
builder.toEdgePartition
}
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))