From f4d9019aa8c93e6f7539192ba5780a2f6c8ce19e Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 14 Jan 2014 22:17:18 -0800 Subject: VertexID -> VertexId --- .../main/scala/org/apache/spark/graphx/Edge.scala | 8 ++-- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 4 +- .../org/apache/spark/graphx/EdgeTriplet.scala | 4 +- .../main/scala/org/apache/spark/graphx/Graph.scala | 18 ++++----- .../apache/spark/graphx/GraphKryoRegistrator.scala | 2 +- .../scala/org/apache/spark/graphx/GraphOps.scala | 32 ++++++++-------- .../apache/spark/graphx/PartitionStrategy.scala | 14 +++---- .../scala/org/apache/spark/graphx/Pregel.scala | 8 ++-- .../scala/org/apache/spark/graphx/VertexRDD.scala | 42 ++++++++++----------- .../apache/spark/graphx/impl/EdgePartition.scala | 16 ++++---- .../spark/graphx/impl/EdgePartitionBuilder.scala | 10 ++--- .../spark/graphx/impl/EdgeTripletIterator.scala | 2 +- .../org/apache/spark/graphx/impl/GraphImpl.scala | 32 ++++++++-------- .../spark/graphx/impl/MessageToPartition.scala | 12 +++--- .../spark/graphx/impl/ReplicatedVertexView.scala | 30 +++++++-------- .../apache/spark/graphx/impl/RoutingTable.scala | 16 ++++---- .../org/apache/spark/graphx/impl/Serializers.scala | 10 ++--- .../apache/spark/graphx/impl/VertexPartition.scala | 44 +++++++++++----------- .../org/apache/spark/graphx/impl/package.scala | 2 +- .../spark/graphx/lib/ConnectedComponents.scala | 4 +- .../org/apache/spark/graphx/lib/PageRank.scala | 4 +- .../org/apache/spark/graphx/lib/SVDPlusPlus.scala | 12 +++--- .../graphx/lib/StronglyConnectedComponents.scala | 6 +-- .../apache/spark/graphx/lib/TriangleCount.scala | 2 +- .../scala/org/apache/spark/graphx/package.scala | 4 +- .../apache/spark/graphx/util/GraphGenerators.scala | 12 +++--- .../org/apache/spark/graphx/GraphOpsSuite.scala | 10 ++--- .../scala/org/apache/spark/graphx/GraphSuite.scala | 28 +++++++------- .../org/apache/spark/graphx/PregelSuite.scala | 8 ++-- .../org/apache/spark/graphx/SerializerSuite.scala | 18 ++++----- .../spark/graphx/impl/EdgePartitionSuite.scala | 2 +- .../graphx/lib/ConnectedComponentsSuite.scala | 2 +- 32 files changed, 209 insertions(+), 209 deletions(-) (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala index 32f1602698..580faa0866 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala @@ -28,8 +28,8 @@ package org.apache.spark.graphx * @param attr The attribute associated with the edge */ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( - var srcId: VertexID = 0, - var dstId: VertexID = 0, + var srcId: VertexId = 0, + var dstId: VertexId = 0, var attr: ED = null.asInstanceOf[ED]) extends Serializable { @@ -39,7 +39,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] * @param vid the id one of the two vertices on the edge. * @return the id of the other vertex on the edge. */ - def otherVertexId(vid: VertexID): VertexID = + def otherVertexId(vid: VertexId): VertexId = if (srcId == vid) dstId else { assert(dstId == vid); srcId } /** @@ -50,7 +50,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] * @return the relative direction of the edge to the corresponding * vertex. */ - def relativeDirection(vid: VertexID): EdgeDirection = + def relativeDirection(vid: VertexId): EdgeDirection = if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 6efef061d7..fe03ae4a62 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -102,7 +102,7 @@ class EdgeRDD[@specialized ED: ClassTag]( */ def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgeRDD[ED2]) - (f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = { + (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = { val ed2Tag = classTag[ED2] val ed3Tag = classTag[ED3] new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) { @@ -113,7 +113,7 @@ class EdgeRDD[@specialized ED: ClassTag]( }) } - private[graphx] def collectVertexIDs(): RDD[VertexID] = { + private[graphx] def collectVertexIds(): RDD[VertexId] = { partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala index 2c659cb070..fea43c3b2b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala @@ -50,7 +50,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { * @param vid the id one of the two vertices on the edge * @return the attribute for the other vertex on the edge */ - def otherVertexAttr(vid: VertexID): VD = + def otherVertexAttr(vid: VertexId): VD = if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr } /** @@ -59,7 +59,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { * @param vid the id of one of the two vertices on the edge * @return the attr for the vertex with that id */ - def vertexAttr(vid: VertexID): VD = + def vertexAttr(vid: VertexId): VD = if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr } override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 7f65244cd9..eea95d38d5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -126,7 +126,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * }}} * */ - def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED] + def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED] /** * Transforms each edge attribute in the graph using the map function. The map function is not @@ -242,7 +242,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def subgraph( epred: EdgeTriplet[VD,ED] => Boolean = (x => true), - vpred: (VertexID, VD) => Boolean = ((v, d) => true)) + vpred: (VertexId, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED] /** @@ -292,7 +292,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * vertex * {{{ * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph") - * val inDeg: RDD[(VertexID, Int)] = + * val inDeg: RDD[(VertexId, Int)] = * mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _) * }}} * @@ -304,7 +304,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * */ def mapReduceTriplets[A: ClassTag]( - mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], reduceFunc: (A, A) => A, activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) : VertexRDD[A] @@ -328,14 +328,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * * {{{ * val rawGraph: Graph[_, _] = Graph.textFile("webgraph") - * val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees() + * val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees() * val graph = rawGraph.outerJoinVertices(outDeg) { * (vid, data, optDeg) => optDeg.getOrElse(0) * } * }}} */ - def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) - (mapFunc: (VertexID, VD, Option[U]) => VD2) + def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) + (mapFunc: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED] /** @@ -364,7 +364,7 @@ object Graph { * (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex. */ def fromEdgeTuples[VD: ClassTag]( - rawEdges: RDD[(VertexID, VertexID)], + rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = { @@ -405,7 +405,7 @@ object Graph { * mentioned in edges but not in vertices */ def apply[VD: ClassTag, ED: ClassTag]( - vertices: RDD[(VertexID, VD)], + vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = { GraphImpl(vertices, edges, defaultVertexAttr) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala index 6db8a34937..dd380d8c18 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -33,7 +33,7 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[Edge[Object]]) kryo.register(classOf[MessageToPartition[Object]]) kryo.register(classOf[VertexBroadcastMsg[Object]]) - kryo.register(classOf[(VertexID, Object)]) + kryo.register(classOf[(VertexId, Object)]) kryo.register(classOf[EdgePartition[Object]]) kryo.register(classOf[BitSet]) kryo.register(classOf[VertexIdToIndexMap]) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 9b864c1290..0fc1e4df68 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -80,19 +80,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @return the set of neighboring ids for each vertex */ - def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] = { + def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = { val nbrs = if (edgeDirection == EdgeDirection.Either) { - graph.mapReduceTriplets[Array[VertexID]]( + graph.mapReduceTriplets[Array[VertexId]]( mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))), reduceFunc = _ ++ _ ) } else if (edgeDirection == EdgeDirection.Out) { - graph.mapReduceTriplets[Array[VertexID]]( + graph.mapReduceTriplets[Array[VertexId]]( mapFunc = et => Iterator((et.srcId, Array(et.dstId))), reduceFunc = _ ++ _) } else if (edgeDirection == EdgeDirection.In) { - graph.mapReduceTriplets[Array[VertexID]]( + graph.mapReduceTriplets[Array[VertexId]]( mapFunc = et => Iterator((et.dstId, Array(et.srcId))), reduceFunc = _ ++ _) } else { @@ -100,7 +100,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali "direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)") } graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => - nbrsOpt.getOrElse(Array.empty[VertexID]) + nbrsOpt.getOrElse(Array.empty[VertexId]) } } // end of collectNeighborIds @@ -116,8 +116,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @return the vertex set of neighboring vertex attributes for each vertex */ - def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = { - val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]]( + def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = { + val nbrs = graph.mapReduceTriplets[Array[(VertexId,VD)]]( edge => { val msgToSrc = (edge.srcId, Array((edge.dstId, edge.dstAttr))) val msgToDst = (edge.dstId, Array((edge.srcId, edge.srcAttr))) @@ -133,7 +133,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali (a, b) => a ++ b) graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => - nbrsOpt.getOrElse(Array.empty[(VertexID, VD)]) + nbrsOpt.getOrElse(Array.empty[(VertexId, VD)]) } } // end of collectNeighbor @@ -164,9 +164,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * }}} * */ - def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD) + def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD) : Graph[VD, ED] = { - val uf = (id: VertexID, data: VD, o: Option[U]) => { + val uf = (id: VertexId, data: VD, o: Option[U]) => { o match { case Some(u) => mapFunc(id, data, u) case None => data @@ -197,7 +197,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * val degrees: VertexRDD[Int] = graph.outDegrees * graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} * }, - * vpred = (vid: VertexID, deg:Int) => deg > 0 + * vpred = (vid: VertexId, deg:Int) => deg > 0 * ) * }}} * @@ -205,7 +205,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali def filter[VD2: ClassTag, ED2: ClassTag]( preprocess: Graph[VD, ED] => Graph[VD2, ED2], epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true, - vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = { + vpred: (VertexId, VD2) => Boolean = (v:VertexId, d:VD2) => true): Graph[VD, ED] = { graph.mask(preprocess(graph).subgraph(epred, vpred)) } @@ -260,8 +260,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)( - vprog: (VertexID, VD, A) => VD, - sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], + vprog: (VertexId, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg) @@ -293,7 +293,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] */ - def connectedComponents(): Graph[VertexID, ED] = { + def connectedComponents(): Graph[VertexId, ED] = { ConnectedComponents.run(graph) } @@ -312,7 +312,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents$#run]] */ - def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = { + def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] = { StronglyConnectedComponents.run(graph, numIter) } } // end of GraphOps diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 8ba87976f1..929915362c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -23,7 +23,7 @@ package org.apache.spark.graphx */ trait PartitionStrategy extends Serializable { /** Returns the partition number for a given edge. */ - def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID + def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID } /** @@ -73,9 +73,9 @@ object PartitionStrategy { * is used. */ case object EdgePartition2D extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt - val mixingPrime: VertexID = 1125899906842597L + val mixingPrime: VertexId = 1125899906842597L val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt (col * ceilSqrtNumParts + row) % numParts @@ -87,8 +87,8 @@ object PartitionStrategy { * source. */ case object EdgePartition1D extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { - val mixingPrime: VertexID = 1125899906842597L + override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { + val mixingPrime: VertexId = 1125899906842597L (math.abs(src) * mixingPrime).toInt % numParts } } @@ -99,7 +99,7 @@ object PartitionStrategy { * random vertex cut that colocates all same-direction edges between two vertices. */ case object RandomVertexCut extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { math.abs((src, dst).hashCode()) % numParts } } @@ -111,7 +111,7 @@ object PartitionStrategy { * regardless of direction. */ case object CanonicalRandomVertexCut extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val lower = math.min(src, dst) val higher = math.max(src, dst) math.abs((lower, higher).hashCode()) % numParts diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 0f6d413593..ac07a594a1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -40,9 +40,9 @@ import scala.reflect.ClassTag * // Set the vertex attributes to the initial pagerank values * .mapVertices((id, attr) => 1.0) * - * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = + * def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double = * resetProb + (1.0 - resetProb) * msgSum - * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = + * def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = * Iterator((edge.dstId, edge.srcAttr * edge.attr)) * def messageCombiner(a: Double, b: Double): Double = a + b * val initialMessage = 0.0 @@ -113,8 +113,8 @@ object Pregel { initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) - (vprog: (VertexID, VD, A) => VD, - sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + (vprog: (VertexId, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 9a95364cb1..edd59bcf32 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -28,7 +28,7 @@ import org.apache.spark.graphx.impl.MsgRDDFunctions import org.apache.spark.graphx.impl.VertexPartition /** - * Extends `RDD[(VertexID, VD)]` by ensuring that there is only one entry for each vertex and by + * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by * pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be * joined efficiently. All operations except [[reindex]] preserve the index. To construct a * `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]]. @@ -36,12 +36,12 @@ import org.apache.spark.graphx.impl.VertexPartition * @example Construct a `VertexRDD` from a plain RDD: * {{{ * // Construct an initial vertex set - * val someData: RDD[(VertexID, SomeType)] = loadData(someFile) + * val someData: RDD[(VertexId, SomeType)] = loadData(someFile) * val vset = VertexRDD(someData) * // If there were redundant values in someData we would use a reduceFunc * val vset2 = VertexRDD(someData, reduceFunc) * // Finally we can use the VertexRDD to index another dataset - * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile) + * val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile) * val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b } * // Now we can construct very fast joins between the two sets * val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3) @@ -51,7 +51,7 @@ import org.apache.spark.graphx.impl.VertexPartition */ class VertexRDD[@specialized VD: ClassTag]( val partitionsRDD: RDD[VertexPartition[VD]]) - extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { require(partitionsRDD.partitioner.isDefined) @@ -92,9 +92,9 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Provides the `RDD[(VertexID, VD)]` equivalent output. + * Provides the `RDD[(VertexId, VD)]` equivalent output. */ - override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = { + override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = { firstParent[VertexPartition[VD]].iterator(part, context).next.iterator } @@ -114,9 +114,9 @@ class VertexRDD[@specialized VD: ClassTag]( * rather than allocating new memory. * * @param pred the user defined predicate, which takes a tuple to conform to the - * `RDD[(VertexID, VD)]` interface + * `RDD[(VertexId, VD)]` interface */ - override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] = + override def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] = this.mapVertexPartitions(_.filter(Function.untupled(pred))) /** @@ -140,7 +140,7 @@ class VertexRDD[@specialized VD: ClassTag]( * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the * original VertexRDD. The resulting VertexRDD retains the same index. */ - def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] = + def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map(f)) /** @@ -172,7 +172,7 @@ class VertexRDD[@specialized VD: ClassTag]( * @return a VertexRDD containing the results of `f` */ def leftZipJoin[VD2: ClassTag, VD3: ClassTag] - (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { + (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { val newPartitionsRDD = partitionsRDD.zipPartitions( other.partitionsRDD, preservesPartitioning = true ) { (thisIter, otherIter) => @@ -200,8 +200,8 @@ class VertexRDD[@specialized VD: ClassTag]( * by `f`. */ def leftJoin[VD2: ClassTag, VD3: ClassTag] - (other: RDD[(VertexID, VD2)]) - (f: (VertexID, VD, Option[VD2]) => VD3) + (other: RDD[(VertexId, VD2)]) + (f: (VertexId, VD, Option[VD2]) => VD3) : VertexRDD[VD3] = { // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient leftZipJoin @@ -225,7 +225,7 @@ class VertexRDD[@specialized VD: ClassTag]( * [[innerJoin]] for the behavior of the join. */ def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) - (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { + (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = { val newPartitionsRDD = partitionsRDD.zipPartitions( other.partitionsRDD, preservesPartitioning = true ) { (thisIter, otherIter) => @@ -247,8 +247,8 @@ class VertexRDD[@specialized VD: ClassTag]( * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this` * and `other`, with values supplied by `f` */ - def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) - (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { + def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) + (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = { // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient innerZipJoin other match { @@ -278,7 +278,7 @@ class VertexRDD[@specialized VD: ClassTag]( * messages. */ def aggregateUsingIndex[VD2: ClassTag]( - messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { + messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => val vertexPartition: VertexPartition[VD] = thisIter.next() @@ -303,8 +303,8 @@ object VertexRDD { * * @param rdd the collection of vertex-attribute pairs */ - def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = { - val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { + def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = { + val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match { case Some(p) => rdd case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) } @@ -323,8 +323,8 @@ object VertexRDD { * @param rdd the collection of vertex-attribute pairs * @param mergeFunc the associative, commutative merge function. */ - def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = { - val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { + def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = { + val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match { case Some(p) => rdd case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) } @@ -338,7 +338,7 @@ object VertexRDD { * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using * `defaultVal` otherwise. */ - def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD) + def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD) : VertexRDD[VD] = { VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) => value.getOrElse(default) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 6067ee8c7e..57fa5eefd5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap */ private[graphx] class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag]( - val srcIds: Array[VertexID], - val dstIds: Array[VertexID], + val srcIds: Array[VertexId], + val dstIds: Array[VertexId], val data: Array[ED], - val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable { + val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable { /** * Reverse all the edges in this partition. @@ -118,8 +118,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) */ def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = { val builder = new EdgePartitionBuilder[ED] - var currSrcId: VertexID = null.asInstanceOf[VertexID] - var currDstId: VertexID = null.asInstanceOf[VertexID] + var currSrcId: VertexId = null.asInstanceOf[VertexId] + var currDstId: VertexId = null.asInstanceOf[VertexId] var currAttr: ED = null.asInstanceOf[ED] var i = 0 while (i < size) { @@ -153,7 +153,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) */ def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgePartition[ED2]) - (f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = { + (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = { val builder = new EdgePartitionBuilder[ED3] var i = 0 var j = 0 @@ -210,14 +210,14 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * iterator is generated using an index scan, so it is efficient at skipping edges that don't * match srcIdPred. */ - def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] = + def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] = index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator)) /** * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The * cluster must start at position `index`. */ - private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] { + private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] { private[this] val edge = new Edge[ED] private[this] var pos = index diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index 960eeaccf1..63ccccb056 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -29,22 +29,22 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I var edges = new PrimitiveVector[Edge[ED]](size) /** Add a new edge to the partition. */ - def add(src: VertexID, dst: VertexID, d: ED) { + def add(src: VertexId, dst: VertexId, d: ED) { edges += Edge(src, dst, d) } def toEdgePartition: EdgePartition[ED] = { val edgeArray = edges.trim().array Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) - val srcIds = new Array[VertexID](edgeArray.size) - val dstIds = new Array[VertexID](edgeArray.size) + val srcIds = new Array[VertexId](edgeArray.size) + val dstIds = new Array[VertexId](edgeArray.size) val data = new Array[ED](edgeArray.size) - val index = new PrimitiveKeyOpenHashMap[VertexID, Int] + val index = new PrimitiveKeyOpenHashMap[VertexId, Int] // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and // adding them to the index if (edgeArray.length > 0) { index.update(srcIds(0), 0) - var currSrcId: VertexID = srcIds(0) + var currSrcId: VertexId = srcIds(0) var i = 0 while (i < edgeArray.size) { srcIds(i) = edgeArray(i).srcId diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala index 819e3ba93a..886c250d7c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala @@ -41,7 +41,7 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag]( // allocating too many temporary Java objects. private val triplet = new EdgeTriplet[VD, ED] - private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray) + private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray) override def hasNext: Boolean = pos < edgePartition.size diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index eee2d58c3d..1d029bf009 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -105,7 +105,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) } - override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = { + override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = { if (classTag[VD] equals classTag[VD2]) { // The map preserves type, so we can use incremental replication val newVerts = vertices.mapVertexPartitions(_.map(f)).cache() @@ -153,7 +153,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def subgraph( epred: EdgeTriplet[VD, ED] => Boolean = x => true, - vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = { + vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = { // Filter the vertices, reusing the partitioner and the index from this graph val newVerts = vertices.mapVertexPartitions(_.filter(vpred)) @@ -195,7 +195,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( ////////////////////////////////////////////////////////////////////////////////////////////////// override def mapReduceTriplets[A: ClassTag]( - mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], reduceFunc: (A, A) => A, activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = { @@ -225,7 +225,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( val edgeIter = activeDirectionOpt match { case Some(EdgeDirection.Both) => if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) + edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) .filter(e => vPart.isActive(e.dstId)) } else { edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) @@ -236,7 +236,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId)) case Some(EdgeDirection.Out) => if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) + edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) } else { edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) } @@ -267,8 +267,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } // end of mapReduceTriplets override def outerJoinVertices[U: ClassTag, VD2: ClassTag] - (other: RDD[(VertexID, U)]) - (updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = + (other: RDD[(VertexId, U)]) + (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = { if (classTag[VD] equals classTag[VD2]) { // updateF preserves type, so we can use incremental replication @@ -312,7 +312,7 @@ object GraphImpl { } def apply[VD: ClassTag, ED: ClassTag]( - vertices: RDD[(VertexID, VD)], + vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD): GraphImpl[VD, ED] = { @@ -321,7 +321,7 @@ object GraphImpl { // Get the set of all vids val partitioner = Partitioner.defaultPartitioner(vertices) val vPartitioned = vertices.partitionBy(partitioner) - val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner) + val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner) val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) => vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1) } @@ -355,7 +355,7 @@ object GraphImpl { /** * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges - * data structure (RDD[(VertexID, VertexID, ED)]). + * data structure (RDD[(VertexId, VertexId, ED)]). * * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value * pair: the key is the partition id, and the value is an EdgePartition object containing all the @@ -378,19 +378,19 @@ object GraphImpl { defaultVertexAttr: VD): GraphImpl[VD, ED] = { edges.cache() // Get the set of all vids - val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size)) + val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size)) // Create the VertexRDD. val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr)) GraphImpl(vertices, edges) } /** Collects all vids mentioned in edges and partitions them by partitioner. */ - private def collectVertexIDsFromEdges( + private def collectVertexIdsFromEdges( edges: EdgeRDD[_], - partitioner: Partitioner): RDD[(VertexID, Int)] = { + partitioner: Partitioner): RDD[(VertexId, Int)] = { // TODO: Consider doing map side distinct before shuffle. - new ShuffledRDD[VertexID, Int, (VertexID, Int)]( - edges.collectVertexIDs.map(vid => (vid, 0)), partitioner) - .setSerializer(classOf[VertexIDMsgSerializer].getName) + new ShuffledRDD[VertexId, Int, (VertexId, Int)]( + edges.collectVertexIds.map(vid => (vid, 0)), partitioner) + .setSerializer(classOf[VertexIdMsgSerializer].getName) } } // end of object GraphImpl diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala index cea9d11ebe..e9ee09c361 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala @@ -20,16 +20,16 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} import org.apache.spark.Partitioner -import org.apache.spark.graphx.{PartitionID, VertexID} +import org.apache.spark.graphx.{PartitionID, VertexId} import org.apache.spark.rdd.{ShuffledRDD, RDD} private[graphx] class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T]( @transient var partition: PartitionID, - var vid: VertexID, + var vid: VertexId, var data: T) - extends Product2[PartitionID, (VertexID, T)] with Serializable { + extends Product2[PartitionID, (VertexId, T)] with Serializable { override def _1 = partition @@ -61,7 +61,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef private[graphx] class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) { def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = { - val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner) + val rdd = new ShuffledRDD[PartitionID, (VertexId, T), VertexBroadcastMsg[T]](self, partitioner) // Set a custom serializer if the data is of int or double type. if (classTag[T] == ClassTag.Int) { @@ -99,8 +99,8 @@ object MsgRDDFunctions { new VertexBroadcastMsgRDDFunctions(rdd) } - def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = { - val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner) + def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = { + val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner) // Set a custom serializer if the data is of int or double type. if (classTag[T] == ClassTag.Int) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala index 5bdc9339e9..a8154b63ce 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala @@ -50,9 +50,9 @@ class ReplicatedVertexView[VD: ClassTag]( * vids from both the source and destination of edges. It must always include both source and * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. */ - private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { + private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { case Some(prevView) => - prevView.localVertexIDMap + prevView.localVertexIdMap case None => edges.partitionsRDD.mapPartitions(_.map { case (pid, epart) => @@ -62,7 +62,7 @@ class ReplicatedVertexView[VD: ClassTag]( vidToIndex.add(e.dstId) } (pid, vidToIndex) - }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap") + }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap") } private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true) @@ -75,7 +75,7 @@ class ReplicatedVertexView[VD: ClassTag]( srcAttrOnly.unpersist(blocking) dstAttrOnly.unpersist(blocking) noAttrs.unpersist(blocking) - // Don't unpersist localVertexIDMap because a future ReplicatedVertexView may be using it + // Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it // without modification this } @@ -133,8 +133,8 @@ class ReplicatedVertexView[VD: ClassTag]( case None => // Within each edge partition, place the shipped vertex attributes into the correct - // locations specified in localVertexIDMap - localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => + // locations specified in localVertexIdMap + localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => val (pid, vidToIndex) = mapIter.next() assert(!mapIter.hasNext) // Populate the vertex array using the vidToIndex map @@ -157,15 +157,15 @@ class ReplicatedVertexView[VD: ClassTag]( private object ReplicatedVertexView { protected def buildBuffer[VD: ClassTag]( - pid2vidIter: Iterator[Array[Array[VertexID]]], + pid2vidIter: Iterator[Array[Array[VertexId]]], vertexPartIter: Iterator[VertexPartition[VD]]) = { - val pid2vid: Array[Array[VertexID]] = pid2vidIter.next() + val pid2vid: Array[Array[VertexId]] = pid2vidIter.next() val vertexPart: VertexPartition[VD] = vertexPartIter.next() Iterator.tabulate(pid2vid.size) { pid => val vidsCandidate = pid2vid(pid) val size = vidsCandidate.length - val vids = new PrimitiveVector[VertexID](pid2vid(pid).size) + val vids = new PrimitiveVector[VertexId](pid2vid(pid).size) val attrs = new PrimitiveVector[VD](pid2vid(pid).size) var i = 0 while (i < size) { @@ -181,16 +181,16 @@ private object ReplicatedVertexView { } protected def buildActiveBuffer( - pid2vidIter: Iterator[Array[Array[VertexID]]], + pid2vidIter: Iterator[Array[Array[VertexId]]], activePartIter: Iterator[VertexPartition[_]]) - : Iterator[(Int, Array[VertexID])] = { - val pid2vid: Array[Array[VertexID]] = pid2vidIter.next() + : Iterator[(Int, Array[VertexId])] = { + val pid2vid: Array[Array[VertexId]] = pid2vidIter.next() val activePart: VertexPartition[_] = activePartIter.next() Iterator.tabulate(pid2vid.size) { pid => val vidsCandidate = pid2vid(pid) val size = vidsCandidate.length - val actives = new PrimitiveVector[VertexID](vidsCandidate.size) + val actives = new PrimitiveVector[VertexId](vidsCandidate.size) var i = 0 while (i < size) { val vid = vidsCandidate(i) @@ -205,8 +205,8 @@ private object ReplicatedVertexView { } private[graphx] -class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD]) +class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD]) extends Serializable { - def iterator: Iterator[(VertexID, VD)] = + def iterator: Iterator[(VertexId, VD)] = (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala index b365d4914e..fe44e1ee0c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala @@ -32,12 +32,12 @@ import org.apache.spark.util.collection.PrimitiveVector private[impl] class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { - val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true) - val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false) - val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true) - val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false) + val bothAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(true, true) + val srcAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(true, false) + val dstAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(false, true) + val noAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(false, false) - def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = + def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = (includeSrcAttr, includeDstAttr) match { case (true, true) => bothAttrs case (true, false) => srcAttrOnly @@ -46,9 +46,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { } private def createPid2Vid( - includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = { + includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. - val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => + val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() val numEdges = edgePartition.size val vSet = new VertexSet @@ -71,7 +71,7 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { val numPartitions = vertices.partitions.size vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter => - val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID]) + val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId]) for ((vid, pid) <- iter) { pid2vid(pid) += vid } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala index bcad1fbc58..c74d487e20 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala @@ -25,12 +25,12 @@ import org.apache.spark.graphx._ import org.apache.spark.serializer._ private[graphx] -class VertexIDMsgSerializer(conf: SparkConf) extends Serializer { +class VertexIdMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(VertexID, _)] + val msg = t.asInstanceOf[(VertexId, _)] writeVarLong(msg._1, optimizePositive = false) this } @@ -123,7 +123,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(VertexID, Int)] + val msg = t.asInstanceOf[(VertexId, Int)] writeVarLong(msg._1, optimizePositive = false) writeUnsignedVarInt(msg._2) this @@ -147,7 +147,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(VertexID, Long)] + val msg = t.asInstanceOf[(VertexId, Long)] writeVarLong(msg._1, optimizePositive = false) writeVarLong(msg._2, optimizePositive = true) this @@ -171,7 +171,7 @@ class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(VertexID, Double)] + val msg = t.asInstanceOf[(VertexId, Double)] writeVarLong(msg._1, optimizePositive = false) writeDouble(msg._2) this diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala index f13bdded75..7a54b413dc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala @@ -26,18 +26,18 @@ import org.apache.spark.util.collection.BitSet private[graphx] object VertexPartition { - def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = { - val map = new PrimitiveKeyOpenHashMap[VertexID, VD] + def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): VertexPartition[VD] = { + val map = new PrimitiveKeyOpenHashMap[VertexId, VD] iter.foreach { case (k, v) => map(k) = v } new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) } - def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD) + def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD) : VertexPartition[VD] = { - val map = new PrimitiveKeyOpenHashMap[VertexID, VD] + val map = new PrimitiveKeyOpenHashMap[VertexId, VD] iter.foreach { case (k, v) => map.setMerge(k, v, mergeFunc) } @@ -60,15 +60,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( def size: Int = mask.cardinality() /** Return the vertex attribute for the given vertex ID. */ - def apply(vid: VertexID): VD = values(index.getPos(vid)) + def apply(vid: VertexId): VD = values(index.getPos(vid)) - def isDefined(vid: VertexID): Boolean = { + def isDefined(vid: VertexId): Boolean = { val pos = index.getPos(vid) pos >= 0 && mask.get(pos) } /** Look up vid in activeSet, throwing an exception if it is None. */ - def isActive(vid: VertexID): Boolean = { + def isActive(vid: VertexId): Boolean = { activeSet.get.contains(vid) } @@ -88,7 +88,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * each of the entries in the original VertexRDD. The resulting * VertexPartition retains the same index. */ - def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = { + def map[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexPartition[VD2] = { // Construct a view of the map transformation val newValues = new Array[VD2](capacity) var i = mask.nextSetBit(0) @@ -108,7 +108,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * RDD can be easily joined with the original vertex-set. Furthermore, the filter only * modifies the bitmap index and so no new values are allocated. */ - def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = { + def filter(pred: (VertexId, VD) => Boolean): VertexPartition[VD] = { // Allocate the array to store the results into val newMask = new BitSet(capacity) // Iterate over the active bits in the old mask and evaluate the predicate @@ -146,7 +146,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( /** Left outer join another VertexPartition. */ def leftJoin[VD2: ClassTag, VD3: ClassTag] (other: VertexPartition[VD2]) - (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { + (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { if (index != other.index) { logWarning("Joining two VertexPartitions with different indexes is slow.") leftJoin(createUsingIndex(other.iterator))(f) @@ -165,14 +165,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( /** Left outer join another iterator of messages. */ def leftJoin[VD2: ClassTag, VD3: ClassTag] - (other: Iterator[(VertexID, VD2)]) - (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { + (other: Iterator[(VertexId, VD2)]) + (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { leftJoin(createUsingIndex(other))(f) } /** Inner join another VertexPartition. */ def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U]) - (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = { + (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = { if (index != other.index) { logWarning("Joining two VertexPartitions with different indexes is slow.") innerJoin(createUsingIndex(other.iterator))(f) @@ -192,15 +192,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * Inner join an iterator of messages. */ def innerJoin[U: ClassTag, VD2: ClassTag] - (iter: Iterator[Product2[VertexID, U]]) - (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = { + (iter: Iterator[Product2[VertexId, U]]) + (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = { innerJoin(createUsingIndex(iter))(f) } /** * Similar effect as aggregateUsingIndex((a, b) => a) */ - def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]]) + def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]]) : VertexPartition[VD2] = { val newMask = new BitSet(capacity) val newValues = new Array[VD2](capacity) @@ -218,7 +218,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in * the partition, hidden by the bitmask. */ - def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = { + def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = { val newMask = new BitSet(capacity) val newValues = new Array[VD](capacity) System.arraycopy(values, 0, newValues, 0, newValues.length) @@ -233,7 +233,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( } def aggregateUsingIndex[VD2: ClassTag]( - iter: Iterator[Product2[VertexID, VD2]], + iter: Iterator[Product2[VertexId, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = { val newMask = new BitSet(capacity) val newValues = new Array[VD2](capacity) @@ -253,7 +253,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( new VertexPartition[VD2](index, newValues, newMask) } - def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = { + def replaceActives(iter: Iterator[VertexId]): VertexPartition[VD] = { val newActiveSet = new VertexSet iter.foreach(newActiveSet.add(_)) new VertexPartition(index, values, mask, Some(newActiveSet)) @@ -263,7 +263,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * Construct a new VertexPartition whose index contains only the vertices in the mask. */ def reindex(): VertexPartition[VD] = { - val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD] + val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD] val arbitraryMerge = (a: VD, b: VD) => a for ((k, v) <- this.iterator) { hashMap.setMerge(k, v, arbitraryMerge) @@ -271,8 +271,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet) } - def iterator: Iterator[(VertexID, VD)] = + def iterator: Iterator[(VertexId, VD)] = mask.iterator.map(ind => (index.getValue(ind), values(ind))) - def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind)) + def vidIterator: Iterator[VertexId] = mask.iterator.map(ind => index.getValue(ind)) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala index f493d2dd01..79549fe060 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala @@ -20,5 +20,5 @@ package org.apache.spark.graphx import org.apache.spark.util.collection.OpenHashSet package object impl { - private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexID] + private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexId] } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index 2a6c0aa6b5..e2f6cc1389 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -35,9 +35,9 @@ object ConnectedComponents { * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { val ccGraph = graph.mapVertices { case (vid, _) => vid } - def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { + def sendMessage(edge: EdgeTriplet[VertexId, ED]) = { if (edge.srcAttr < edge.dstAttr) { Iterator((edge.dstId, edge.srcAttr)) } else if (edge.srcAttr > edge.dstAttr) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 2bdd8c9f98..614555a054 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -92,7 +92,7 @@ object PageRank extends Logging { // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = + def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double = resetProb + (1.0 - resetProb) * msgSum def sendMessage(edge: EdgeTriplet[Double, Double]) = Iterator((edge.dstId, edge.srcAttr * edge.attr)) @@ -137,7 +137,7 @@ object PageRank extends Logging { // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = { + def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = { val (oldPR, lastDelta) = attr val newPR = oldPR + (1.0 - resetProb) * msgSum (newPR, newPR - oldPR) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index 9c7a212c5a..c327ce7935 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -79,13 +79,13 @@ object SVDPlusPlus { (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2)) g = g.outerJoinVertices(t0) { - (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => + (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) } def mapTrainF(conf: Conf, u: Double) (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) - : Iterator[(VertexID, (RealVector, RealVector, Double))] = { + : Iterator[(VertexId, (RealVector, RealVector, Double))] = { val (usr, itm) = (et.srcAttr, et.dstAttr) val (p, q) = (usr._1, itm._1) var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) @@ -112,7 +112,7 @@ object SVDPlusPlus { et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2)) g = g.outerJoinVertices(t1) { - (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => + (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd } @@ -123,7 +123,7 @@ object SVDPlusPlus { (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) => (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3)) g = g.outerJoinVertices(t2) { - (vid: VertexID, + (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) => (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4) @@ -133,7 +133,7 @@ object SVDPlusPlus { // calculate error on training set def mapTestF(conf: Conf, u: Double) (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) - : Iterator[(VertexID, Double)] = + : Iterator[(VertexId, Double)] = { val (usr, itm) = (et.srcAttr, et.dstAttr) val (p, q) = (usr._1, itm._1) @@ -146,7 +146,7 @@ object SVDPlusPlus { g.cache() val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) g = g.outerJoinVertices(t3) { - (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => + (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala index ed84f72156..46da38eeb7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala @@ -35,7 +35,7 @@ object StronglyConnectedComponents { * * @return a graph with vertex attributes containing the smallest vertex id in each SCC */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED] = { // the graph we update with final SCC ids, and the graph we return at the end var sccGraph = graph.mapVertices { case (vid, _) => vid } @@ -71,7 +71,7 @@ object StronglyConnectedComponents { // collect min of all my neighbor's scc values, update if it's smaller than mine // then notify any neighbors with scc values larger than mine - sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID]( + sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId]( sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)( (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2), e => { @@ -85,7 +85,7 @@ object StronglyConnectedComponents { // start at root of SCCs. Traverse values in reverse, notify all my neighbors // do not propagate if colors do not match! - sccWorkGraph = Pregel[(VertexID, Boolean), ED, Boolean]( + sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean]( sccWorkGraph, false, activeDirection = EdgeDirection.In)( // vertex is final if it is the root of a color // or it has the same color as a neighbor that is final diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index a124c892dc..7c396e6e66 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -61,7 +61,7 @@ object TriangleCount { (vid, _, optSet) => optSet.getOrElse(null) } // Edge function computes intersection of smaller vertex with larger vertex - def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = { + def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexId, Int)] = { assert(et.srcAttr != null) assert(et.dstAttr != null) val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala index e1ff3ea0d1..425a5164ca 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/package.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala @@ -25,11 +25,11 @@ package object graphx { * A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need * to follow any ordering or any constraints other than uniqueness. */ - type VertexID = Long + type VertexId = Long /** Integer identifer of a graph partition. */ // TODO: Consider using Char. type PartitionID = Int - private[graphx] type VertexSet = OpenHashSet[VertexID] + private[graphx] type VertexSet = OpenHashSet[VertexId] } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index 9805eb3285..7677641bfe 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -50,7 +50,7 @@ object GraphGenerators { val mu = 4 val sigma = 1.3 - val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{ + val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{ src => (src, sampleLogNormal(mu, sigma, numVertices)) } val edges = vertices.flatMap { v => @@ -59,9 +59,9 @@ object GraphGenerators { Graph(vertices, edges, 0) } - def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = { + def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = { val rand = new Random() - Array.fill(maxVertexID) { Edge[Int](src, rand.nextInt(maxVertexID), 1) } + Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) } } /** @@ -206,9 +206,9 @@ object GraphGenerators { */ def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = { // Convert row column address into vertex ids (row major order) - def sub2ind(r: Int, c: Int): VertexID = r * cols + c + def sub2ind(r: Int, c: Int): VertexId = r * cols + c - val vertices: RDD[(VertexID, (Int,Int))] = + val vertices: RDD[(VertexId, (Int,Int))] = sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) ) val edges: RDD[Edge[Double]] = vertices.flatMap{ case (vid, (r,c)) => @@ -228,7 +228,7 @@ object GraphGenerators { * being the center vertex. */ def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { - val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) + val edges: RDD[(VertexId, VertexId)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) Graph.fromEdgeTuples(edges, 1) } // end of starGraph diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index 4a792c0dab..bc2ad5677f 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -28,12 +28,12 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { test("joinVertices") { withSpark { sc => val vertices = - sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2) + sc.parallelize(Seq[(VertexId, String)]((1, "one"), (2, "two"), (3, "three")), 2) val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo")))) val g: Graph[String, String] = Graph(vertices, edges) - val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20))) - val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u } + val tbl = sc.parallelize(Seq[(VertexId, Int)]((1, 10), (2, 20))) + val g1 = g.joinVertices(tbl) { (vid: VertexId, attr: String, u: Int) => attr + u } val v = g1.vertices.collect().toSet assert(v === Set((1, "one10"), (2, "two20"), (3, "three"))) @@ -60,7 +60,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { test ("filter") { withSpark { sc => val n = 5 - val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x))) + val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x))) val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) val graph: Graph[Int, Int] = Graph(vertices, edges).cache() val filteredGraph = graph.filter( @@ -68,7 +68,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { val degrees: VertexRDD[Int] = graph.outDegrees graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} }, - vpred = (vid: VertexID, deg:Int) => deg > 0 + vpred = (vid: VertexId, deg:Int) => deg > 0 ).cache() val v = filteredGraph.vertices.collect().toSet diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index b18bc98e6d..28d34dd9a1 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.rdd._ class GraphSuite extends FunSuite with LocalSparkContext { def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = { - Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v") + Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexId, x: VertexId)), 3), "v") } test("Graph.fromEdgeTuples") { @@ -57,7 +57,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark { sc => val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L) val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) } - val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true))) + val vertices: RDD[(VertexId, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true))) val graph = Graph(vertices, edges, false) assert( graph.edges.count() === rawEdges.size ) // Vertices not explicitly provided but referenced by edges should be created automatically @@ -74,7 +74,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val n = 5 val star = starGraph(sc, n) assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet === - (1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet) + (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet) } } @@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val p = 100 val verts = 1 to n val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x => - verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0) + verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0) assert(graph.edges.partitions.length === p) val partitionedGraph = graph.partitionBy(EdgePartition2D) assert(graph.edges.partitions.length === p) @@ -136,10 +136,10 @@ class GraphSuite extends FunSuite with LocalSparkContext { val star = starGraph(sc, n) // mapVertices preserving type val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2") - assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet) + assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet) // mapVertices changing type val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length) - assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet) + assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, 1)).toSet) } } @@ -168,7 +168,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark { sc => val n = 5 val star = starGraph(sc, n) - assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet) + assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexId, 1)).toSet) } } @@ -191,7 +191,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { test("mask") { withSpark { sc => val n = 5 - val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x))) + val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x))) val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) val graph: Graph[Int, Int] = Graph(vertices, edges).cache() @@ -218,7 +218,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val star = starGraph(sc, n) val doubleStar = Graph.fromEdgeTuples( sc.parallelize((1 to n).flatMap(x => - List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v") + List((0: VertexId, x: VertexId), (0: VertexId, x: VertexId))), 1), "v") val star2 = doubleStar.groupEdges { (a, b) => a} assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) === star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int])) @@ -237,7 +237,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) // activeSetOpt - val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID) + val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId) val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0) val vids = complete.mapVertices((vid, attr) => vid).cache() val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 } @@ -248,10 +248,10 @@ class GraphSuite extends FunSuite with LocalSparkContext { } Iterator((et.srcId, 1)) }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet - assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet) + assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet) // outerJoinVertices followed by mapReduceTriplets(activeSetOpt) - val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3) + val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x+1) % n: VertexId)), 3) val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache() val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache() val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) } @@ -262,7 +262,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { } Iterator((et.dstId, 1)) }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet - assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet) + assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet) } } @@ -277,7 +277,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets( et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)), (a: Int, b: Int) => a + b).collect.toSet - assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0))) + assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0))) // outerJoinVertices preserving type val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString } val newReverseStar = diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala index 936e5c9c86..490b94429e 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala @@ -27,7 +27,7 @@ class PregelSuite extends FunSuite with LocalSparkContext { test("1 iteration") { withSpark { sc => val n = 5 - val starEdges = (1 to n).map(x => (0: VertexID, x: VertexID)) + val starEdges = (1 to n).map(x => (0: VertexId, x: VertexId)) val star = Graph.fromEdgeTuples(sc.parallelize(starEdges, 3), "v").cache() val result = Pregel(star, 0)( (vid, attr, msg) => attr, @@ -41,12 +41,12 @@ class PregelSuite extends FunSuite with LocalSparkContext { withSpark { sc => val n = 5 val chain = Graph.fromEdgeTuples( - sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3), + sc.parallelize((1 until n).map(x => (x: VertexId, x + 1: VertexId)), 3), 0).cache() - assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet) + assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexId, 0)).toSet) val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }.cache() assert(chainWithSeed.vertices.collect.toSet === - Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet) + Set((1: VertexId, 1)) ++ (2 to n).map(x => (x: VertexId, 0)).toSet) val result = Pregel(chainWithSeed, 0)( (vid, attr, msg) => math.max(msg, attr), et => if (et.dstAttr != et.srcAttr) Iterator((et.dstId, et.srcAttr)) else Iterator.empty, diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala index 0c756400f4..e5a582b47b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala @@ -99,7 +99,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext { test("IntAggMsgSerializer") { val conf = new SparkConf(false) - val outMsg = (4: VertexID, 5) + val outMsg = (4: VertexId, 5) val bout = new ByteArrayOutputStream val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) @@ -107,8 +107,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext { bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin) - val inMsg1: (VertexID, Int) = inStrm.readObject() - val inMsg2: (VertexID, Int) = inStrm.readObject() + val inMsg1: (VertexId, Int) = inStrm.readObject() + val inMsg2: (VertexId, Int) = inStrm.readObject() assert(outMsg === inMsg1) assert(outMsg === inMsg2) @@ -119,7 +119,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext { test("LongAggMsgSerializer") { val conf = new SparkConf(false) - val outMsg = (4: VertexID, 1L << 32) + val outMsg = (4: VertexId, 1L << 32) val bout = new ByteArrayOutputStream val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) @@ -127,8 +127,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext { bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin) - val inMsg1: (VertexID, Long) = inStrm.readObject() - val inMsg2: (VertexID, Long) = inStrm.readObject() + val inMsg1: (VertexId, Long) = inStrm.readObject() + val inMsg2: (VertexId, Long) = inStrm.readObject() assert(outMsg === inMsg1) assert(outMsg === inMsg2) @@ -139,7 +139,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext { test("DoubleAggMsgSerializer") { val conf = new SparkConf(false) - val outMsg = (4: VertexID, 5.0) + val outMsg = (4: VertexId, 5.0) val bout = new ByteArrayOutputStream val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) @@ -147,8 +147,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext { bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin) - val inMsg1: (VertexID, Double) = inStrm.readObject() - val inMsg2: (VertexID, Double) = inStrm.readObject() + val inMsg1: (VertexId, Double) = inStrm.readObject() + val inMsg2: (VertexId, Double) = inStrm.readObject() assert(outMsg === inMsg1) assert(outMsg === inMsg2) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala index 1195beba58..e135d1d7ad 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -79,7 +79,7 @@ class EdgePartitionSuite extends FunSuite { test("innerJoin") { def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { val builder = new EdgePartitionBuilder[A] - for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) } + for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) } builder.toEdgePartition } val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index eba8d7b716..3915be15b3 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -100,7 +100,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext { test("Connected Components on a Toy Connected Graph") { withSpark { sc => // Create an RDD for the vertices - val users: RDD[(VertexID, (String, String))] = + val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (4L, ("peter", "student")))) -- cgit v1.2.3