aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-14 22:17:18 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-14 22:17:18 -0800
commitf4d9019aa8c93e6f7539192ba5780a2f6c8ce19e (patch)
treeb8d68f367df7f304b8e396d97e78e0069ae50c36 /graphx
parent1210ec29454939087e5e8163aa05e853beba5d19 (diff)
downloadspark-f4d9019aa8c93e6f7539192ba5780a2f6c8ce19e.tar.gz
spark-f4d9019aa8c93e6f7539192ba5780a2f6c8ce19e.tar.bz2
spark-f4d9019aa8c93e6f7539192ba5780a2f6c8ce19e.zip
VertexID -> VertexId
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Edge.scala8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala18
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala32
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala42
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala16
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala32
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala30
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala16
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala44
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/package.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala12
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala10
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala28
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala8
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala18
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala2
32 files changed, 209 insertions, 209 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
index 32f1602698..580faa0866 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
@@ -28,8 +28,8 @@ package org.apache.spark.graphx
* @param attr The attribute associated with the edge
*/
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
- var srcId: VertexID = 0,
- var dstId: VertexID = 0,
+ var srcId: VertexId = 0,
+ var dstId: VertexId = 0,
var attr: ED = null.asInstanceOf[ED])
extends Serializable {
@@ -39,7 +39,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
* @param vid the id one of the two vertices on the edge.
* @return the id of the other vertex on the edge.
*/
- def otherVertexId(vid: VertexID): VertexID =
+ def otherVertexId(vid: VertexId): VertexId =
if (srcId == vid) dstId else { assert(dstId == vid); srcId }
/**
@@ -50,7 +50,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
* @return the relative direction of the edge to the corresponding
* vertex.
*/
- def relativeDirection(vid: VertexID): EdgeDirection =
+ def relativeDirection(vid: VertexId): EdgeDirection =
if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 6efef061d7..fe03ae4a62 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -102,7 +102,7 @@ class EdgeRDD[@specialized ED: ClassTag](
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
- (f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = {
+ (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
@@ -113,7 +113,7 @@ class EdgeRDD[@specialized ED: ClassTag](
})
}
- private[graphx] def collectVertexIDs(): RDD[VertexID] = {
+ private[graphx] def collectVertexIds(): RDD[VertexId] = {
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
}
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index 2c659cb070..fea43c3b2b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -50,7 +50,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
* @param vid the id one of the two vertices on the edge
* @return the attribute for the other vertex on the edge
*/
- def otherVertexAttr(vid: VertexID): VD =
+ def otherVertexAttr(vid: VertexId): VD =
if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr }
/**
@@ -59,7 +59,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
* @param vid the id of one of the two vertices on the edge
* @return the attr for the vertex with that id
*/
- def vertexAttr(vid: VertexID): VD =
+ def vertexAttr(vid: VertexId): VD =
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 7f65244cd9..eea95d38d5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -126,7 +126,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* }}}
*
*/
- def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED]
+ def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
/**
* Transforms each edge attribute in the graph using the map function. The map function is not
@@ -242,7 +242,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
- vpred: (VertexID, VD) => Boolean = ((v, d) => true))
+ vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
/**
@@ -292,7 +292,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* vertex
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
- * val inDeg: RDD[(VertexID, Int)] =
+ * val inDeg: RDD[(VertexId, Int)] =
* mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
* }}}
*
@@ -304,7 +304,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*
*/
def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
: VertexRDD[A]
@@ -328,14 +328,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*
* {{{
* val rawGraph: Graph[_, _] = Graph.textFile("webgraph")
- * val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees()
+ * val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees()
* val graph = rawGraph.outerJoinVertices(outDeg) {
* (vid, data, optDeg) => optDeg.getOrElse(0)
* }
* }}}
*/
- def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
- (mapFunc: (VertexID, VD, Option[U]) => VD2)
+ def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
+ (mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
/**
@@ -364,7 +364,7 @@ object Graph {
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
*/
def fromEdgeTuples[VD: ClassTag](
- rawEdges: RDD[(VertexID, VertexID)],
+ rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
{
@@ -405,7 +405,7 @@ object Graph {
* mentioned in edges but not in vertices
*/
def apply[VD: ClassTag, ED: ClassTag](
- vertices: RDD[(VertexID, VD)],
+ vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
index 6db8a34937..dd380d8c18 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -33,7 +33,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
- kryo.register(classOf[(VertexID, Object)])
+ kryo.register(classOf[(VertexId, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 9b864c1290..0fc1e4df68 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -80,19 +80,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*
* @return the set of neighboring ids for each vertex
*/
- def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] = {
+ def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
val nbrs =
if (edgeDirection == EdgeDirection.Either) {
- graph.mapReduceTriplets[Array[VertexID]](
+ graph.mapReduceTriplets[Array[VertexId]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _
)
} else if (edgeDirection == EdgeDirection.Out) {
- graph.mapReduceTriplets[Array[VertexID]](
+ graph.mapReduceTriplets[Array[VertexId]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId))),
reduceFunc = _ ++ _)
} else if (edgeDirection == EdgeDirection.In) {
- graph.mapReduceTriplets[Array[VertexID]](
+ graph.mapReduceTriplets[Array[VertexId]](
mapFunc = et => Iterator((et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _)
} else {
@@ -100,7 +100,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
}
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
- nbrsOpt.getOrElse(Array.empty[VertexID])
+ nbrsOpt.getOrElse(Array.empty[VertexId])
}
} // end of collectNeighborIds
@@ -116,8 +116,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*
* @return the vertex set of neighboring vertex attributes for each vertex
*/
- def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = {
- val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]](
+ def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
+ val nbrs = graph.mapReduceTriplets[Array[(VertexId,VD)]](
edge => {
val msgToSrc = (edge.srcId, Array((edge.dstId, edge.dstAttr)))
val msgToDst = (edge.dstId, Array((edge.srcId, edge.srcAttr)))
@@ -133,7 +133,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
(a, b) => a ++ b)
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
- nbrsOpt.getOrElse(Array.empty[(VertexID, VD)])
+ nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
}
} // end of collectNeighbor
@@ -164,9 +164,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* }}}
*
*/
- def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD)
+ def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
- val uf = (id: VertexID, data: VD, o: Option[U]) => {
+ val uf = (id: VertexId, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
@@ -197,7 +197,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* val degrees: VertexRDD[Int] = graph.outDegrees
* graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
* },
- * vpred = (vid: VertexID, deg:Int) => deg > 0
+ * vpred = (vid: VertexId, deg:Int) => deg > 0
* )
* }}}
*
@@ -205,7 +205,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
def filter[VD2: ClassTag, ED2: ClassTag](
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
- vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = {
+ vpred: (VertexId, VD2) => Boolean = (v:VertexId, d:VD2) => true): Graph[VD, ED] = {
graph.mask(preprocess(graph).subgraph(epred, vpred))
}
@@ -260,8 +260,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
- vprog: (VertexID, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
+ vprog: (VertexId, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
@@ -293,7 +293,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
*/
- def connectedComponents(): Graph[VertexID, ED] = {
+ def connectedComponents(): Graph[VertexId, ED] = {
ConnectedComponents.run(graph)
}
@@ -312,7 +312,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*
* @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents$#run]]
*/
- def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = {
+ def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] = {
StronglyConnectedComponents.run(graph, numIter)
}
} // end of GraphOps
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 8ba87976f1..929915362c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -23,7 +23,7 @@ package org.apache.spark.graphx
*/
trait PartitionStrategy extends Serializable {
/** Returns the partition number for a given edge. */
- def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
+ def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
}
/**
@@ -73,9 +73,9 @@ object PartitionStrategy {
* is used.
*/
case object EdgePartition2D extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
- val mixingPrime: VertexID = 1125899906842597L
+ val mixingPrime: VertexId = 1125899906842597L
val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
@@ -87,8 +87,8 @@ object PartitionStrategy {
* source.
*/
case object EdgePartition1D extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
- val mixingPrime: VertexID = 1125899906842597L
+ override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
+ val mixingPrime: VertexId = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
}
}
@@ -99,7 +99,7 @@ object PartitionStrategy {
* random vertex cut that colocates all same-direction edges between two vertices.
*/
case object RandomVertexCut extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
math.abs((src, dst).hashCode()) % numParts
}
}
@@ -111,7 +111,7 @@ object PartitionStrategy {
* regardless of direction.
*/
case object CanonicalRandomVertexCut extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)
math.abs((lower, higher).hashCode()) % numParts
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 0f6d413593..ac07a594a1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -40,9 +40,9 @@ import scala.reflect.ClassTag
* // Set the vertex attributes to the initial pagerank values
* .mapVertices((id, attr) => 1.0)
*
- * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
+ * def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
* resetProb + (1.0 - resetProb) * msgSum
- * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
+ * def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
* Iterator((edge.dstId, edge.srcAttr * edge.attr))
* def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0
@@ -113,8 +113,8 @@ object Pregel {
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
- (vprog: (VertexID, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ (vprog: (VertexId, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 9a95364cb1..edd59bcf32 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -28,7 +28,7 @@ import org.apache.spark.graphx.impl.MsgRDDFunctions
import org.apache.spark.graphx.impl.VertexPartition
/**
- * Extends `RDD[(VertexID, VD)]` by ensuring that there is only one entry for each vertex and by
+ * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
* pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be
* joined efficiently. All operations except [[reindex]] preserve the index. To construct a
* `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]].
@@ -36,12 +36,12 @@ import org.apache.spark.graphx.impl.VertexPartition
* @example Construct a `VertexRDD` from a plain RDD:
* {{{
* // Construct an initial vertex set
- * val someData: RDD[(VertexID, SomeType)] = loadData(someFile)
+ * val someData: RDD[(VertexId, SomeType)] = loadData(someFile)
* val vset = VertexRDD(someData)
* // If there were redundant values in someData we would use a reduceFunc
* val vset2 = VertexRDD(someData, reduceFunc)
* // Finally we can use the VertexRDD to index another dataset
- * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile)
+ * val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile)
* val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
* // Now we can construct very fast joins between the two sets
* val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
@@ -51,7 +51,7 @@ import org.apache.spark.graphx.impl.VertexPartition
*/
class VertexRDD[@specialized VD: ClassTag](
val partitionsRDD: RDD[VertexPartition[VD]])
- extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+ extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
require(partitionsRDD.partitioner.isDefined)
@@ -92,9 +92,9 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
- * Provides the `RDD[(VertexID, VD)]` equivalent output.
+ * Provides the `RDD[(VertexId, VD)]` equivalent output.
*/
- override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = {
+ override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
}
@@ -114,9 +114,9 @@ class VertexRDD[@specialized VD: ClassTag](
* rather than allocating new memory.
*
* @param pred the user defined predicate, which takes a tuple to conform to the
- * `RDD[(VertexID, VD)]` interface
+ * `RDD[(VertexId, VD)]` interface
*/
- override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] =
+ override def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] =
this.mapVertexPartitions(_.filter(Function.untupled(pred)))
/**
@@ -140,7 +140,7 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
* original VertexRDD. The resulting VertexRDD retains the same index.
*/
- def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] =
+ def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
/**
@@ -172,7 +172,7 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a VertexRDD containing the results of `f`
*/
def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
- (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
+ (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
@@ -200,8 +200,8 @@ class VertexRDD[@specialized VD: ClassTag](
* by `f`.
*/
def leftJoin[VD2: ClassTag, VD3: ClassTag]
- (other: RDD[(VertexID, VD2)])
- (f: (VertexID, VD, Option[VD2]) => VD3)
+ (other: RDD[(VertexId, VD2)])
+ (f: (VertexId, VD, Option[VD2]) => VD3)
: VertexRDD[VD3] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient leftZipJoin
@@ -225,7 +225,7 @@ class VertexRDD[@specialized VD: ClassTag](
* [[innerJoin]] for the behavior of the join.
*/
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
- (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
+ (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
@@ -247,8 +247,8 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this`
* and `other`, with values supplied by `f`
*/
- def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
- (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
+ def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
+ (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin
other match {
@@ -278,7 +278,7 @@ class VertexRDD[@specialized VD: ClassTag](
* messages.
*/
def aggregateUsingIndex[VD2: ClassTag](
- messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
+ messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
val vertexPartition: VertexPartition[VD] = thisIter.next()
@@ -303,8 +303,8 @@ object VertexRDD {
*
* @param rdd the collection of vertex-attribute pairs
*/
- def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = {
- val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
+ def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+ val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
}
@@ -323,8 +323,8 @@ object VertexRDD {
* @param rdd the collection of vertex-attribute pairs
* @param mergeFunc the associative, commutative merge function.
*/
- def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
- val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
+ def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
+ val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
}
@@ -338,7 +338,7 @@ object VertexRDD {
* Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
* `defaultVal` otherwise.
*/
- def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD)
+ def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD)
: VertexRDD[VD] = {
VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
value.getOrElse(default)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 6067ee8c7e..57fa5eefd5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
*/
private[graphx]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
- val srcIds: Array[VertexID],
- val dstIds: Array[VertexID],
+ val srcIds: Array[VertexId],
+ val dstIds: Array[VertexId],
val data: Array[ED],
- val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable {
+ val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
/**
* Reverse all the edges in this partition.
@@ -118,8 +118,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
val builder = new EdgePartitionBuilder[ED]
- var currSrcId: VertexID = null.asInstanceOf[VertexID]
- var currDstId: VertexID = null.asInstanceOf[VertexID]
+ var currSrcId: VertexId = null.asInstanceOf[VertexId]
+ var currDstId: VertexId = null.asInstanceOf[VertexId]
var currAttr: ED = null.asInstanceOf[ED]
var i = 0
while (i < size) {
@@ -153,7 +153,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2])
- (f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = {
+ (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = {
val builder = new EdgePartitionBuilder[ED3]
var i = 0
var j = 0
@@ -210,14 +210,14 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* iterator is generated using an index scan, so it is efficient at skipping edges that don't
* match srcIdPred.
*/
- def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] =
+ def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
/**
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
* cluster must start at position `index`.
*/
- private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] {
+ private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED]
private[this] var pos = index
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 960eeaccf1..63ccccb056 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -29,22 +29,22 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
var edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */
- def add(src: VertexID, dst: VertexID, d: ED) {
+ def add(src: VertexId, dst: VertexId, d: ED) {
edges += Edge(src, dst, d)
}
def toEdgePartition: EdgePartition[ED] = {
val edgeArray = edges.trim().array
Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
- val srcIds = new Array[VertexID](edgeArray.size)
- val dstIds = new Array[VertexID](edgeArray.size)
+ val srcIds = new Array[VertexId](edgeArray.size)
+ val dstIds = new Array[VertexId](edgeArray.size)
val data = new Array[ED](edgeArray.size)
- val index = new PrimitiveKeyOpenHashMap[VertexID, Int]
+ val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index
if (edgeArray.length > 0) {
index.update(srcIds(0), 0)
- var currSrcId: VertexID = srcIds(0)
+ var currSrcId: VertexId = srcIds(0)
var i = 0
while (i < edgeArray.size) {
srcIds(i) = edgeArray(i).srcId
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 819e3ba93a..886c250d7c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -41,7 +41,7 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
// allocating too many temporary Java objects.
private val triplet = new EdgeTriplet[VD, ED]
- private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray)
+ private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
override def hasNext: Boolean = pos < edgePartition.size
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index eee2d58c3d..1d029bf009 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -105,7 +105,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
}
- override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = {
+ override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) {
// The map preserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
@@ -153,7 +153,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = x => true,
- vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
+ vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
// Filter the vertices, reusing the partitioner and the index from this graph
val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
@@ -195,7 +195,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
//////////////////////////////////////////////////////////////////////////////////////////////////
override def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
@@ -225,7 +225,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
+ edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
.filter(e => vPart.isActive(e.dstId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
@@ -236,7 +236,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
+ edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
}
@@ -267,8 +267,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
} // end of mapReduceTriplets
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
- (other: RDD[(VertexID, U)])
- (updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] =
+ (other: RDD[(VertexId, U)])
+ (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] =
{
if (classTag[VD] equals classTag[VD2]) {
// updateF preserves type, so we can use incremental replication
@@ -312,7 +312,7 @@ object GraphImpl {
}
def apply[VD: ClassTag, ED: ClassTag](
- vertices: RDD[(VertexID, VD)],
+ vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
@@ -321,7 +321,7 @@ object GraphImpl {
// Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices)
val vPartitioned = vertices.partitionBy(partitioner)
- val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner)
+ val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner)
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
}
@@ -355,7 +355,7 @@ object GraphImpl {
/**
* Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
- * data structure (RDD[(VertexID, VertexID, ED)]).
+ * data structure (RDD[(VertexId, VertexId, ED)]).
*
* The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
* pair: the key is the partition id, and the value is an EdgePartition object containing all the
@@ -378,19 +378,19 @@ object GraphImpl {
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
edges.cache()
// Get the set of all vids
- val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size))
+ val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size))
// Create the VertexRDD.
val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
GraphImpl(vertices, edges)
}
/** Collects all vids mentioned in edges and partitions them by partitioner. */
- private def collectVertexIDsFromEdges(
+ private def collectVertexIdsFromEdges(
edges: EdgeRDD[_],
- partitioner: Partitioner): RDD[(VertexID, Int)] = {
+ partitioner: Partitioner): RDD[(VertexId, Int)] = {
// TODO: Consider doing map side distinct before shuffle.
- new ShuffledRDD[VertexID, Int, (VertexID, Int)](
- edges.collectVertexIDs.map(vid => (vid, 0)), partitioner)
- .setSerializer(classOf[VertexIDMsgSerializer].getName)
+ new ShuffledRDD[VertexId, Int, (VertexId, Int)](
+ edges.collectVertexIds.map(vid => (vid, 0)), partitioner)
+ .setSerializer(classOf[VertexIdMsgSerializer].getName)
}
} // end of object GraphImpl
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index cea9d11ebe..e9ee09c361 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -20,16 +20,16 @@ package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.Partitioner
-import org.apache.spark.graphx.{PartitionID, VertexID}
+import org.apache.spark.graphx.{PartitionID, VertexId}
import org.apache.spark.rdd.{ShuffledRDD, RDD}
private[graphx]
class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
@transient var partition: PartitionID,
- var vid: VertexID,
+ var vid: VertexId,
var data: T)
- extends Product2[PartitionID, (VertexID, T)] with Serializable {
+ extends Product2[PartitionID, (VertexId, T)] with Serializable {
override def _1 = partition
@@ -61,7 +61,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef
private[graphx]
class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
- val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner)
+ val rdd = new ShuffledRDD[PartitionID, (VertexId, T), VertexBroadcastMsg[T]](self, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
@@ -99,8 +99,8 @@ object MsgRDDFunctions {
new VertexBroadcastMsgRDDFunctions(rdd)
}
- def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = {
- val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner)
+ def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = {
+ val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 5bdc9339e9..a8154b63ce 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -50,9 +50,9 @@ class ReplicatedVertexView[VD: ClassTag](
* vids from both the source and destination of edges. It must always include both source and
* destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
*/
- private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
+ private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
case Some(prevView) =>
- prevView.localVertexIDMap
+ prevView.localVertexIdMap
case None =>
edges.partitionsRDD.mapPartitions(_.map {
case (pid, epart) =>
@@ -62,7 +62,7 @@ class ReplicatedVertexView[VD: ClassTag](
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
- }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap")
+ }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap")
}
private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
@@ -75,7 +75,7 @@ class ReplicatedVertexView[VD: ClassTag](
srcAttrOnly.unpersist(blocking)
dstAttrOnly.unpersist(blocking)
noAttrs.unpersist(blocking)
- // Don't unpersist localVertexIDMap because a future ReplicatedVertexView may be using it
+ // Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it
// without modification
this
}
@@ -133,8 +133,8 @@ class ReplicatedVertexView[VD: ClassTag](
case None =>
// Within each edge partition, place the shipped vertex attributes into the correct
- // locations specified in localVertexIDMap
- localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
+ // locations specified in localVertexIdMap
+ localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
@@ -157,15 +157,15 @@ class ReplicatedVertexView[VD: ClassTag](
private object ReplicatedVertexView {
protected def buildBuffer[VD: ClassTag](
- pid2vidIter: Iterator[Array[Array[VertexID]]],
+ pid2vidIter: Iterator[Array[Array[VertexId]]],
vertexPartIter: Iterator[VertexPartition[VD]]) = {
- val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
+ val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
val vertexPart: VertexPartition[VD] = vertexPartIter.next()
Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length
- val vids = new PrimitiveVector[VertexID](pid2vid(pid).size)
+ val vids = new PrimitiveVector[VertexId](pid2vid(pid).size)
val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
var i = 0
while (i < size) {
@@ -181,16 +181,16 @@ private object ReplicatedVertexView {
}
protected def buildActiveBuffer(
- pid2vidIter: Iterator[Array[Array[VertexID]]],
+ pid2vidIter: Iterator[Array[Array[VertexId]]],
activePartIter: Iterator[VertexPartition[_]])
- : Iterator[(Int, Array[VertexID])] = {
- val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
+ : Iterator[(Int, Array[VertexId])] = {
+ val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
val activePart: VertexPartition[_] = activePartIter.next()
Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length
- val actives = new PrimitiveVector[VertexID](vidsCandidate.size)
+ val actives = new PrimitiveVector[VertexId](vidsCandidate.size)
var i = 0
while (i < size) {
val vid = vidsCandidate(i)
@@ -205,8 +205,8 @@ private object ReplicatedVertexView {
}
private[graphx]
-class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD])
+class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
extends Serializable {
- def iterator: Iterator[(VertexID, VD)] =
+ def iterator: Iterator[(VertexId, VD)] =
(0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
index b365d4914e..fe44e1ee0c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
@@ -32,12 +32,12 @@ import org.apache.spark.util.collection.PrimitiveVector
private[impl]
class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
- val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true)
- val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false)
- val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true)
- val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false)
+ val bothAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(true, true)
+ val srcAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(true, false)
+ val dstAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(false, true)
+ val noAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(false, false)
- def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] =
+ def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] =
(includeSrcAttr, includeDstAttr) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
@@ -46,9 +46,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
}
private def createPid2Vid(
- includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = {
+ includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
- val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
+ val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size
val vSet = new VertexSet
@@ -71,7 +71,7 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
val numPartitions = vertices.partitions.size
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
- val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID])
+ val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId])
for ((vid, pid) <- iter) {
pid2vid(pid) += vid
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index bcad1fbc58..c74d487e20 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -25,12 +25,12 @@ import org.apache.spark.graphx._
import org.apache.spark.serializer._
private[graphx]
-class VertexIDMsgSerializer(conf: SparkConf) extends Serializer {
+class VertexIdMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(VertexID, _)]
+ val msg = t.asInstanceOf[(VertexId, _)]
writeVarLong(msg._1, optimizePositive = false)
this
}
@@ -123,7 +123,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(VertexID, Int)]
+ val msg = t.asInstanceOf[(VertexId, Int)]
writeVarLong(msg._1, optimizePositive = false)
writeUnsignedVarInt(msg._2)
this
@@ -147,7 +147,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(VertexID, Long)]
+ val msg = t.asInstanceOf[(VertexId, Long)]
writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg._2, optimizePositive = true)
this
@@ -171,7 +171,7 @@ class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(VertexID, Double)]
+ val msg = t.asInstanceOf[(VertexId, Double)]
writeVarLong(msg._1, optimizePositive = false)
writeDouble(msg._2)
this
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
index f13bdded75..7a54b413dc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -26,18 +26,18 @@ import org.apache.spark.util.collection.BitSet
private[graphx] object VertexPartition {
- def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = {
- val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
+ def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): VertexPartition[VD] = {
+ val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { case (k, v) =>
map(k) = v
}
new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
}
- def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD)
+ def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
: VertexPartition[VD] =
{
- val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
+ val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { case (k, v) =>
map.setMerge(k, v, mergeFunc)
}
@@ -60,15 +60,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
def size: Int = mask.cardinality()
/** Return the vertex attribute for the given vertex ID. */
- def apply(vid: VertexID): VD = values(index.getPos(vid))
+ def apply(vid: VertexId): VD = values(index.getPos(vid))
- def isDefined(vid: VertexID): Boolean = {
+ def isDefined(vid: VertexId): Boolean = {
val pos = index.getPos(vid)
pos >= 0 && mask.get(pos)
}
/** Look up vid in activeSet, throwing an exception if it is None. */
- def isActive(vid: VertexID): Boolean = {
+ def isActive(vid: VertexId): Boolean = {
activeSet.get.contains(vid)
}
@@ -88,7 +88,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* each of the entries in the original VertexRDD. The resulting
* VertexPartition retains the same index.
*/
- def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = {
+ def map[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexPartition[VD2] = {
// Construct a view of the map transformation
val newValues = new Array[VD2](capacity)
var i = mask.nextSetBit(0)
@@ -108,7 +108,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* RDD can be easily joined with the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
- def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = {
+ def filter(pred: (VertexId, VD) => Boolean): VertexPartition[VD] = {
// Allocate the array to store the results into
val newMask = new BitSet(capacity)
// Iterate over the active bits in the old mask and evaluate the predicate
@@ -146,7 +146,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
/** Left outer join another VertexPartition. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexPartition[VD2])
- (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
leftJoin(createUsingIndex(other.iterator))(f)
@@ -165,14 +165,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
/** Left outer join another iterator of messages. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
- (other: Iterator[(VertexID, VD2)])
- (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ (other: Iterator[(VertexId, VD2)])
+ (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
leftJoin(createUsingIndex(other))(f)
}
/** Inner join another VertexPartition. */
def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
- (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
+ (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
innerJoin(createUsingIndex(other.iterator))(f)
@@ -192,15 +192,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Inner join an iterator of messages.
*/
def innerJoin[U: ClassTag, VD2: ClassTag]
- (iter: Iterator[Product2[VertexID, U]])
- (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
+ (iter: Iterator[Product2[VertexId, U]])
+ (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
innerJoin(createUsingIndex(iter))(f)
}
/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
- def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]])
+ def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
: VertexPartition[VD2] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
@@ -218,7 +218,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
* the partition, hidden by the bitmask.
*/
- def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = {
+ def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD](capacity)
System.arraycopy(values, 0, newValues, 0, newValues.length)
@@ -233,7 +233,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
}
def aggregateUsingIndex[VD2: ClassTag](
- iter: Iterator[Product2[VertexID, VD2]],
+ iter: Iterator[Product2[VertexId, VD2]],
reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
@@ -253,7 +253,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition[VD2](index, newValues, newMask)
}
- def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = {
+ def replaceActives(iter: Iterator[VertexId]): VertexPartition[VD] = {
val newActiveSet = new VertexSet
iter.foreach(newActiveSet.add(_))
new VertexPartition(index, values, mask, Some(newActiveSet))
@@ -263,7 +263,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): VertexPartition[VD] = {
- val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD]
+ val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- this.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
@@ -271,8 +271,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
}
- def iterator: Iterator[(VertexID, VD)] =
+ def iterator: Iterator[(VertexId, VD)] =
mask.iterator.map(ind => (index.getValue(ind), values(ind)))
- def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind))
+ def vidIterator: Iterator[VertexId] = mask.iterator.map(ind => index.getValue(ind))
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala
index f493d2dd01..79549fe060 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala
@@ -20,5 +20,5 @@ package org.apache.spark.graphx
import org.apache.spark.util.collection.OpenHashSet
package object impl {
- private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexID]
+ private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexId]
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index 2a6c0aa6b5..e2f6cc1389 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -35,9 +35,9 @@ object ConnectedComponents {
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
- def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
- def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
+ def sendMessage(edge: EdgeTriplet[VertexId, ED]) = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 2bdd8c9f98..614555a054 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -92,7 +92,7 @@ object PageRank extends Logging {
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
- def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
+ def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]) =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
@@ -137,7 +137,7 @@ object PageRank extends Logging {
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
- def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = {
+ def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 9c7a212c5a..c327ce7935 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -79,13 +79,13 @@ object SVDPlusPlus {
(g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
g = g.outerJoinVertices(t0) {
- (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
+ (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}
def mapTrainF(conf: Conf, u: Double)
(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
- : Iterator[(VertexID, (RealVector, RealVector, Double))] = {
+ : Iterator[(VertexId, (RealVector, RealVector, Double))] = {
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
@@ -112,7 +112,7 @@ object SVDPlusPlus {
et => Iterator((et.srcId, et.dstAttr._2)),
(g1: RealVector, g2: RealVector) => g1.add(g2))
g = g.outerJoinVertices(t1) {
- (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
+ (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
}
@@ -123,7 +123,7 @@ object SVDPlusPlus {
(g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
(g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
g = g.outerJoinVertices(t2) {
- (vid: VertexID,
+ (vid: VertexId,
vd: (RealVector, RealVector, Double, Double),
msg: Option[(RealVector, RealVector, Double)]) =>
(vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
@@ -133,7 +133,7 @@ object SVDPlusPlus {
// calculate error on training set
def mapTestF(conf: Conf, u: Double)
(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
- : Iterator[(VertexID, Double)] =
+ : Iterator[(VertexId, Double)] =
{
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
@@ -146,7 +146,7 @@ object SVDPlusPlus {
g.cache()
val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
g = g.outerJoinVertices(t3) {
- (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
+ (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
index ed84f72156..46da38eeb7 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
@@ -35,7 +35,7 @@ object StronglyConnectedComponents {
*
* @return a graph with vertex attributes containing the smallest vertex id in each SCC
*/
- def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED] = {
// the graph we update with final SCC ids, and the graph we return at the end
var sccGraph = graph.mapVertices { case (vid, _) => vid }
@@ -71,7 +71,7 @@ object StronglyConnectedComponents {
// collect min of all my neighbor's scc values, update if it's smaller than mine
// then notify any neighbors with scc values larger than mine
- sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID](
+ sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId](
sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)(
(vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2),
e => {
@@ -85,7 +85,7 @@ object StronglyConnectedComponents {
// start at root of SCCs. Traverse values in reverse, notify all my neighbors
// do not propagate if colors do not match!
- sccWorkGraph = Pregel[(VertexID, Boolean), ED, Boolean](
+ sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean](
sccWorkGraph, false, activeDirection = EdgeDirection.In)(
// vertex is final if it is the root of a color
// or it has the same color as a neighbor that is final
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
index a124c892dc..7c396e6e66 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
@@ -61,7 +61,7 @@ object TriangleCount {
(vid, _, optSet) => optSet.getOrElse(null)
}
// Edge function computes intersection of smaller vertex with larger vertex
- def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = {
+ def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexId, Int)] = {
assert(et.srcAttr != null)
assert(et.dstAttr != null)
val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
index e1ff3ea0d1..425a5164ca 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/package.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
@@ -25,11 +25,11 @@ package object graphx {
* A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
* to follow any ordering or any constraints other than uniqueness.
*/
- type VertexID = Long
+ type VertexId = Long
/** Integer identifer of a graph partition. */
// TODO: Consider using Char.
type PartitionID = Int
- private[graphx] type VertexSet = OpenHashSet[VertexID]
+ private[graphx] type VertexSet = OpenHashSet[VertexId]
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 9805eb3285..7677641bfe 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -50,7 +50,7 @@ object GraphGenerators {
val mu = 4
val sigma = 1.3
- val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{
+ val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{
src => (src, sampleLogNormal(mu, sigma, numVertices))
}
val edges = vertices.flatMap { v =>
@@ -59,9 +59,9 @@ object GraphGenerators {
Graph(vertices, edges, 0)
}
- def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = {
+ def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = {
val rand = new Random()
- Array.fill(maxVertexID) { Edge[Int](src, rand.nextInt(maxVertexID), 1) }
+ Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
}
/**
@@ -206,9 +206,9 @@ object GraphGenerators {
*/
def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
// Convert row column address into vertex ids (row major order)
- def sub2ind(r: Int, c: Int): VertexID = r * cols + c
+ def sub2ind(r: Int, c: Int): VertexId = r * cols + c
- val vertices: RDD[(VertexID, (Int,Int))] =
+ val vertices: RDD[(VertexId, (Int,Int))] =
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) =>
@@ -228,7 +228,7 @@ object GraphGenerators {
* being the center vertex.
*/
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
- val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
+ val edges: RDD[(VertexId, VertexId)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
Graph.fromEdgeTuples(edges, 1)
} // end of starGraph
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index 4a792c0dab..bc2ad5677f 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -28,12 +28,12 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
test("joinVertices") {
withSpark { sc =>
val vertices =
- sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2)
+ sc.parallelize(Seq[(VertexId, String)]((1, "one"), (2, "two"), (3, "three")), 2)
val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
val g: Graph[String, String] = Graph(vertices, edges)
- val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20)))
- val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u }
+ val tbl = sc.parallelize(Seq[(VertexId, Int)]((1, 10), (2, 20)))
+ val g1 = g.joinVertices(tbl) { (vid: VertexId, attr: String, u: Int) => attr + u }
val v = g1.vertices.collect().toSet
assert(v === Set((1, "one10"), (2, "two20"), (3, "three")))
@@ -60,7 +60,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
test ("filter") {
withSpark { sc =>
val n = 5
- val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
val graph: Graph[Int, Int] = Graph(vertices, edges).cache()
val filteredGraph = graph.filter(
@@ -68,7 +68,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
val degrees: VertexRDD[Int] = graph.outDegrees
graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
},
- vpred = (vid: VertexID, deg:Int) => deg > 0
+ vpred = (vid: VertexId, deg:Int) => deg > 0
).cache()
val v = filteredGraph.vertices.collect().toSet
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index b18bc98e6d..28d34dd9a1 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext {
def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = {
- Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v")
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexId, x: VertexId)), 3), "v")
}
test("Graph.fromEdgeTuples") {
@@ -57,7 +57,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L)
val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) }
- val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
+ val vertices: RDD[(VertexId, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
val graph = Graph(vertices, edges, false)
assert( graph.edges.count() === rawEdges.size )
// Vertices not explicitly provided but referenced by edges should be created automatically
@@ -74,7 +74,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val n = 5
val star = starGraph(sc, n)
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
- (1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet)
+ (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
}
}
@@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val p = 100
val verts = 1 to n
val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
- verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0)
+ verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
assert(graph.edges.partitions.length === p)
val partitionedGraph = graph.partitionBy(EdgePartition2D)
assert(graph.edges.partitions.length === p)
@@ -136,10 +136,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n)
// mapVertices preserving type
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
- assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet)
+ assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
// mapVertices changing type
val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
- assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet)
+ assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
}
}
@@ -168,7 +168,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
- assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet)
+ assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
}
}
@@ -191,7 +191,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mask") {
withSpark { sc =>
val n = 5
- val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
val graph: Graph[Int, Int] = Graph(vertices, edges).cache()
@@ -218,7 +218,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n)
val doubleStar = Graph.fromEdgeTuples(
sc.parallelize((1 to n).flatMap(x =>
- List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v")
+ List((0: VertexId, x: VertexId), (0: VertexId, x: VertexId))), 1), "v")
val star2 = doubleStar.groupEdges { (a, b) => a}
assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
@@ -237,7 +237,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
// activeSetOpt
- val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID)
+ val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId)
val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
val vids = complete.mapVertices((vid, attr) => vid).cache()
val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
@@ -248,10 +248,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
Iterator((et.srcId, 1))
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
- assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet)
+ assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)
// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
- val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3)
+ val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x+1) % n: VertexId)), 3)
val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
@@ -262,7 +262,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
Iterator((et.dstId, 1))
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
- assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet)
+ assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)
}
}
@@ -277,7 +277,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect.toSet
- assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0)))
+ assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
// outerJoinVertices preserving type
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
val newReverseStar =
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
index 936e5c9c86..490b94429e 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
@@ -27,7 +27,7 @@ class PregelSuite extends FunSuite with LocalSparkContext {
test("1 iteration") {
withSpark { sc =>
val n = 5
- val starEdges = (1 to n).map(x => (0: VertexID, x: VertexID))
+ val starEdges = (1 to n).map(x => (0: VertexId, x: VertexId))
val star = Graph.fromEdgeTuples(sc.parallelize(starEdges, 3), "v").cache()
val result = Pregel(star, 0)(
(vid, attr, msg) => attr,
@@ -41,12 +41,12 @@ class PregelSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val chain = Graph.fromEdgeTuples(
- sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3),
+ sc.parallelize((1 until n).map(x => (x: VertexId, x + 1: VertexId)), 3),
0).cache()
- assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet)
+ assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexId, 0)).toSet)
val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }.cache()
assert(chainWithSeed.vertices.collect.toSet ===
- Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet)
+ Set((1: VertexId, 1)) ++ (2 to n).map(x => (x: VertexId, 0)).toSet)
val result = Pregel(chainWithSeed, 0)(
(vid, attr, msg) => math.max(msg, attr),
et => if (et.dstAttr != et.srcAttr) Iterator((et.dstId, et.srcAttr)) else Iterator.empty,
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
index 0c756400f4..e5a582b47b 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
@@ -99,7 +99,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("IntAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: VertexID, 5)
+ val outMsg = (4: VertexId, 5)
val bout = new ByteArrayOutputStream
val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -107,8 +107,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (VertexID, Int) = inStrm.readObject()
- val inMsg2: (VertexID, Int) = inStrm.readObject()
+ val inMsg1: (VertexId, Int) = inStrm.readObject()
+ val inMsg2: (VertexId, Int) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
@@ -119,7 +119,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("LongAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: VertexID, 1L << 32)
+ val outMsg = (4: VertexId, 1L << 32)
val bout = new ByteArrayOutputStream
val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -127,8 +127,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (VertexID, Long) = inStrm.readObject()
- val inMsg2: (VertexID, Long) = inStrm.readObject()
+ val inMsg1: (VertexId, Long) = inStrm.readObject()
+ val inMsg2: (VertexId, Long) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
@@ -139,7 +139,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("DoubleAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: VertexID, 5.0)
+ val outMsg = (4: VertexId, 5.0)
val bout = new ByteArrayOutputStream
val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -147,8 +147,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (VertexID, Double) = inStrm.readObject()
- val inMsg2: (VertexID, Double) = inStrm.readObject()
+ val inMsg1: (VertexId, Double) = inStrm.readObject()
+ val inMsg2: (VertexId, Double) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index 1195beba58..e135d1d7ad 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -79,7 +79,7 @@ class EdgePartitionSuite extends FunSuite {
test("innerJoin") {
def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
val builder = new EdgePartitionBuilder[A]
- for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) }
+ for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
builder.toEdgePartition
}
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
index eba8d7b716..3915be15b3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
@@ -100,7 +100,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
test("Connected Components on a Toy Connected Graph") {
withSpark { sc =>
// Create an RDD for the vertices
- val users: RDD[(VertexID, (String, String))] =
+ val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))