From f4d9019aa8c93e6f7539192ba5780a2f6c8ce19e Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 14 Jan 2014 22:17:18 -0800 Subject: VertexID -> VertexId --- docs/graphx-programming-guide.md | 70 +++++++++++----------- .../main/scala/org/apache/spark/graphx/Edge.scala | 8 +-- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 4 +- .../org/apache/spark/graphx/EdgeTriplet.scala | 4 +- .../main/scala/org/apache/spark/graphx/Graph.scala | 18 +++--- .../apache/spark/graphx/GraphKryoRegistrator.scala | 2 +- .../scala/org/apache/spark/graphx/GraphOps.scala | 32 +++++----- .../apache/spark/graphx/PartitionStrategy.scala | 14 ++--- .../scala/org/apache/spark/graphx/Pregel.scala | 8 +-- .../scala/org/apache/spark/graphx/VertexRDD.scala | 42 ++++++------- .../apache/spark/graphx/impl/EdgePartition.scala | 16 ++--- .../spark/graphx/impl/EdgePartitionBuilder.scala | 10 ++-- .../spark/graphx/impl/EdgeTripletIterator.scala | 2 +- .../org/apache/spark/graphx/impl/GraphImpl.scala | 32 +++++----- .../spark/graphx/impl/MessageToPartition.scala | 12 ++-- .../spark/graphx/impl/ReplicatedVertexView.scala | 30 +++++----- .../apache/spark/graphx/impl/RoutingTable.scala | 16 ++--- .../org/apache/spark/graphx/impl/Serializers.scala | 10 ++-- .../apache/spark/graphx/impl/VertexPartition.scala | 44 +++++++------- .../org/apache/spark/graphx/impl/package.scala | 2 +- .../spark/graphx/lib/ConnectedComponents.scala | 4 +- .../org/apache/spark/graphx/lib/PageRank.scala | 4 +- .../org/apache/spark/graphx/lib/SVDPlusPlus.scala | 12 ++-- .../graphx/lib/StronglyConnectedComponents.scala | 6 +- .../apache/spark/graphx/lib/TriangleCount.scala | 2 +- .../scala/org/apache/spark/graphx/package.scala | 4 +- .../apache/spark/graphx/util/GraphGenerators.scala | 12 ++-- .../org/apache/spark/graphx/GraphOpsSuite.scala | 10 ++-- .../scala/org/apache/spark/graphx/GraphSuite.scala | 28 ++++----- .../org/apache/spark/graphx/PregelSuite.scala | 8 +-- .../org/apache/spark/graphx/SerializerSuite.scala | 18 +++--- .../spark/graphx/impl/EdgePartitionSuite.scala | 2 +- .../graphx/lib/ConnectedComponentsSuite.scala | 2 +- 33 files changed, 244 insertions(+), 244 deletions(-) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 03940d836b..375bf20a4f 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -186,7 +186,7 @@ code constructs a graph from a collection of RDDs: // Assume the SparkContext has already been constructed val sc: SparkContext // Create an RDD for the vertices -val users: RDD[(VertexID, (String, String))] = +val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges @@ -291,7 +291,7 @@ graph contains the following: {% highlight scala %} class Graph[VD, ED] { - def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] + def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] } @@ -313,7 +313,7 @@ val newGraph = Graph(newVertices, graph.edges) val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr)) {% endhighlight %} -[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexID,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED] +[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED] These operators are often used to initialize the graph for a particular computation or project away unnecessary properties. For example, given a graph with the out-degrees as the vertex properties @@ -339,7 +339,7 @@ add more in the future. The following is a list of the basic structural operato class Graph[VD, ED] { def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, - vpred: (VertexID, VD) => Boolean): Graph[VD, ED] + vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] } @@ -358,11 +358,11 @@ satisfy the edge predicate *and connect vertices that satisfy the vertex predica operator can be used in number of situations to restrict the graph to the vertices and edges of interest or eliminate broken links. For example in the following code we remove broken links: -[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexID,VD)⇒Boolean):Graph[VD,ED] +[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED] {% highlight scala %} // Create an RDD for the vertices -val users: RDD[(VertexID, (String, String))] = +val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (4L, ("peter", "student")))) @@ -425,9 +425,9 @@ using the *join* operators. Below we list the key join operators: {% highlight scala %} class Graph[VD, ED] { - def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD) + def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD) : Graph[VD, ED] - def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2) + def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED] } {% endhighlight %} @@ -437,7 +437,7 @@ returns a new graph with the vertex properties obtained by applying the user def to the result of the joined vertices. Vertices without a matching value in the RDD retain their original value. -[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexID,U)])((VertexID,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED] +[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED] > Note that if the RDD contains more than one value for a given vertex only one will be used. It > is therefore recommended that the input RDD be first made unique using the following which will @@ -456,7 +456,7 @@ property type. Because not all vertices may have a matching value in the input function takes an `Option` type. For example, we can setup a graph for PageRank by initializing vertex properties with their `outDegree`. -[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexID,U)])((VertexID,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED] +[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED] {% highlight scala %} @@ -490,7 +490,7 @@ PageRank Value, shortest path to the source, and smallest reachable vertex id). ### Map Reduce Triplets (mapReduceTriplets) -[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=>Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=>A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A] +[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=>Iterator[(org.apache.spark.graphx.VertexId,A)],reduceFunc:(A,A)=>A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A] The core (heavily optimized) aggregation primitive in GraphX is the [`mapReduceTriplets`][Graph.mapReduceTriplets] operator: @@ -498,7 +498,7 @@ The core (heavily optimized) aggregation primitive in GraphX is the {% highlight scala %} class Graph[VD, ED] { def mapReduceTriplets[A]( - map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], reduce: (A, A) => A) : VertexRDD[A] } @@ -580,13 +580,13 @@ compute the max in, out, and total degrees: {% highlight scala %} // Define a reduce operation to compute the highest degree vertex -def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = { +def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b } // Compute the max degrees -val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max) -val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max) -val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max) +val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max) +val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) +val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max) {% endhighlight %} ### Collecting Neighbors @@ -596,14 +596,14 @@ attributes at each vertex. This can be easily accomplished using the [`collectNeighborIds`][GraphOps.collectNeighborIds] and the [`collectNeighbors`][GraphOps.collectNeighbors] operators. -[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexID]] -[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexID,VD)]] +[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]] +[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]] {% highlight scala %} class GraphOps[VD, ED] { - def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] - def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ] + def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] + def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ] } {% endhighlight %} @@ -647,7 +647,7 @@ messages remaining. The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch* of its implementation (note calls to graph.cache have been removed): -[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexID,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexID,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED] +[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED] {% highlight scala %} class GraphOps[VD, ED] { @@ -655,8 +655,8 @@ class GraphOps[VD, ED] { (initialMsg: A, maxIter: Int = Int.MaxValue, activeDir: EdgeDirection = EdgeDirection.Out) - (vprog: (VertexID, VD, A) => VD, - sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + (vprog: (VertexId, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { // Receive the initial message at each vertex @@ -701,7 +701,7 @@ import org.apache.spark.graphx.util.GraphGenerators // A graph with edge attributes containing distances val graph: Graph[Int, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble) -val sourceId: VertexID = 42 // The ultimate source +val sourceId: VertexId = 42 // The ultimate source // Initialize the graph such that all vertices except the root have distance infinity. val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( @@ -748,7 +748,7 @@ It creates a `Graph` from the specified edges, automatically creating any vertic {% highlight scala %} object Graph { def apply[VD, ED]( - vertices: RDD[(VertexID, VD)], + vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null) : Graph[VD, ED] @@ -758,7 +758,7 @@ object Graph { defaultValue: VD): Graph[VD, ED] def fromEdgeTuples[VD]( - rawEdges: RDD[(VertexID, VertexID)], + rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] @@ -774,8 +774,8 @@ object Graph { [PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy$ [GraphLoader.edgeListFile]: api/graphx/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int] -[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexID,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED] -[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexID,VertexID)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int] +[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED] +[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int] [Graph.fromEdges]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED] # Vertex and Edge RDDs @@ -799,17 +799,17 @@ following additional functionality: {% highlight scala %} class VertexRDD[VD] { // Filter the vertex set but preserves the internal index - def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] + def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] // Transform the values without changing the ids (preserves the internal index) def mapValues[VD2](map: VD => VD2): VertexRDD[VD2] - def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2] + def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2] // Remove vertices from this set that appear in the other set def diff(other: VertexRDD[VD]): VertexRDD[VD] // Join operators that take advantage of the internal indexing to accelerate joins (substantially) - def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] - def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] + def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] + def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD. - def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] + def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] } {% endhighlight %} @@ -828,7 +828,7 @@ RDD. For example: {% highlight scala %} val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1))) -val rddB: RDD[(VertexID, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0))) +val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0))) // There should be 200 entries in rddB rddB.count val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _) @@ -854,7 +854,7 @@ def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2] // Revere the edges reusing both attributes and structure def reverse: EdgeRDD[ED] // Join two `EdgeRDD`s partitioned using the same partitioning strategy. -def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] +def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] {% endhighlight %} In most applications we have found that operations on the `EdgeRDD` are accomplished through the diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala index 32f1602698..580faa0866 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala @@ -28,8 +28,8 @@ package org.apache.spark.graphx * @param attr The attribute associated with the edge */ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( - var srcId: VertexID = 0, - var dstId: VertexID = 0, + var srcId: VertexId = 0, + var dstId: VertexId = 0, var attr: ED = null.asInstanceOf[ED]) extends Serializable { @@ -39,7 +39,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] * @param vid the id one of the two vertices on the edge. * @return the id of the other vertex on the edge. */ - def otherVertexId(vid: VertexID): VertexID = + def otherVertexId(vid: VertexId): VertexId = if (srcId == vid) dstId else { assert(dstId == vid); srcId } /** @@ -50,7 +50,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] * @return the relative direction of the edge to the corresponding * vertex. */ - def relativeDirection(vid: VertexID): EdgeDirection = + def relativeDirection(vid: VertexId): EdgeDirection = if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 6efef061d7..fe03ae4a62 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -102,7 +102,7 @@ class EdgeRDD[@specialized ED: ClassTag]( */ def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgeRDD[ED2]) - (f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = { + (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = { val ed2Tag = classTag[ED2] val ed3Tag = classTag[ED3] new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) { @@ -113,7 +113,7 @@ class EdgeRDD[@specialized ED: ClassTag]( }) } - private[graphx] def collectVertexIDs(): RDD[VertexID] = { + private[graphx] def collectVertexIds(): RDD[VertexId] = { partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala index 2c659cb070..fea43c3b2b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala @@ -50,7 +50,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { * @param vid the id one of the two vertices on the edge * @return the attribute for the other vertex on the edge */ - def otherVertexAttr(vid: VertexID): VD = + def otherVertexAttr(vid: VertexId): VD = if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr } /** @@ -59,7 +59,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { * @param vid the id of one of the two vertices on the edge * @return the attr for the vertex with that id */ - def vertexAttr(vid: VertexID): VD = + def vertexAttr(vid: VertexId): VD = if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr } override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 7f65244cd9..eea95d38d5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -126,7 +126,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * }}} * */ - def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED] + def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED] /** * Transforms each edge attribute in the graph using the map function. The map function is not @@ -242,7 +242,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab */ def subgraph( epred: EdgeTriplet[VD,ED] => Boolean = (x => true), - vpred: (VertexID, VD) => Boolean = ((v, d) => true)) + vpred: (VertexId, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED] /** @@ -292,7 +292,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * vertex * {{{ * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph") - * val inDeg: RDD[(VertexID, Int)] = + * val inDeg: RDD[(VertexId, Int)] = * mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _) * }}} * @@ -304,7 +304,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * */ def mapReduceTriplets[A: ClassTag]( - mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], reduceFunc: (A, A) => A, activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) : VertexRDD[A] @@ -328,14 +328,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * * {{{ * val rawGraph: Graph[_, _] = Graph.textFile("webgraph") - * val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees() + * val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees() * val graph = rawGraph.outerJoinVertices(outDeg) { * (vid, data, optDeg) => optDeg.getOrElse(0) * } * }}} */ - def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) - (mapFunc: (VertexID, VD, Option[U]) => VD2) + def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) + (mapFunc: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED] /** @@ -364,7 +364,7 @@ object Graph { * (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex. */ def fromEdgeTuples[VD: ClassTag]( - rawEdges: RDD[(VertexID, VertexID)], + rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = { @@ -405,7 +405,7 @@ object Graph { * mentioned in edges but not in vertices */ def apply[VD: ClassTag, ED: ClassTag]( - vertices: RDD[(VertexID, VD)], + vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = { GraphImpl(vertices, edges, defaultVertexAttr) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala index 6db8a34937..dd380d8c18 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -33,7 +33,7 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[Edge[Object]]) kryo.register(classOf[MessageToPartition[Object]]) kryo.register(classOf[VertexBroadcastMsg[Object]]) - kryo.register(classOf[(VertexID, Object)]) + kryo.register(classOf[(VertexId, Object)]) kryo.register(classOf[EdgePartition[Object]]) kryo.register(classOf[BitSet]) kryo.register(classOf[VertexIdToIndexMap]) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 9b864c1290..0fc1e4df68 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -80,19 +80,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @return the set of neighboring ids for each vertex */ - def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] = { + def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = { val nbrs = if (edgeDirection == EdgeDirection.Either) { - graph.mapReduceTriplets[Array[VertexID]]( + graph.mapReduceTriplets[Array[VertexId]]( mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))), reduceFunc = _ ++ _ ) } else if (edgeDirection == EdgeDirection.Out) { - graph.mapReduceTriplets[Array[VertexID]]( + graph.mapReduceTriplets[Array[VertexId]]( mapFunc = et => Iterator((et.srcId, Array(et.dstId))), reduceFunc = _ ++ _) } else if (edgeDirection == EdgeDirection.In) { - graph.mapReduceTriplets[Array[VertexID]]( + graph.mapReduceTriplets[Array[VertexId]]( mapFunc = et => Iterator((et.dstId, Array(et.srcId))), reduceFunc = _ ++ _) } else { @@ -100,7 +100,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali "direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)") } graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => - nbrsOpt.getOrElse(Array.empty[VertexID]) + nbrsOpt.getOrElse(Array.empty[VertexId]) } } // end of collectNeighborIds @@ -116,8 +116,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @return the vertex set of neighboring vertex attributes for each vertex */ - def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = { - val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]]( + def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = { + val nbrs = graph.mapReduceTriplets[Array[(VertexId,VD)]]( edge => { val msgToSrc = (edge.srcId, Array((edge.dstId, edge.dstAttr))) val msgToDst = (edge.dstId, Array((edge.srcId, edge.srcAttr))) @@ -133,7 +133,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali (a, b) => a ++ b) graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => - nbrsOpt.getOrElse(Array.empty[(VertexID, VD)]) + nbrsOpt.getOrElse(Array.empty[(VertexId, VD)]) } } // end of collectNeighbor @@ -164,9 +164,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * }}} * */ - def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD) + def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD) : Graph[VD, ED] = { - val uf = (id: VertexID, data: VD, o: Option[U]) => { + val uf = (id: VertexId, data: VD, o: Option[U]) => { o match { case Some(u) => mapFunc(id, data, u) case None => data @@ -197,7 +197,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * val degrees: VertexRDD[Int] = graph.outDegrees * graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} * }, - * vpred = (vid: VertexID, deg:Int) => deg > 0 + * vpred = (vid: VertexId, deg:Int) => deg > 0 * ) * }}} * @@ -205,7 +205,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali def filter[VD2: ClassTag, ED2: ClassTag]( preprocess: Graph[VD, ED] => Graph[VD2, ED2], epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true, - vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = { + vpred: (VertexId, VD2) => Boolean = (v:VertexId, d:VD2) => true): Graph[VD, ED] = { graph.mask(preprocess(graph).subgraph(epred, vpred)) } @@ -260,8 +260,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)( - vprog: (VertexID, VD, A) => VD, - sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], + vprog: (VertexId, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg) @@ -293,7 +293,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]] */ - def connectedComponents(): Graph[VertexID, ED] = { + def connectedComponents(): Graph[VertexId, ED] = { ConnectedComponents.run(graph) } @@ -312,7 +312,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * * @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents$#run]] */ - def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = { + def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] = { StronglyConnectedComponents.run(graph, numIter) } } // end of GraphOps diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 8ba87976f1..929915362c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -23,7 +23,7 @@ package org.apache.spark.graphx */ trait PartitionStrategy extends Serializable { /** Returns the partition number for a given edge. */ - def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID + def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID } /** @@ -73,9 +73,9 @@ object PartitionStrategy { * is used. */ case object EdgePartition2D extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt - val mixingPrime: VertexID = 1125899906842597L + val mixingPrime: VertexId = 1125899906842597L val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt (col * ceilSqrtNumParts + row) % numParts @@ -87,8 +87,8 @@ object PartitionStrategy { * source. */ case object EdgePartition1D extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { - val mixingPrime: VertexID = 1125899906842597L + override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { + val mixingPrime: VertexId = 1125899906842597L (math.abs(src) * mixingPrime).toInt % numParts } } @@ -99,7 +99,7 @@ object PartitionStrategy { * random vertex cut that colocates all same-direction edges between two vertices. */ case object RandomVertexCut extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { math.abs((src, dst).hashCode()) % numParts } } @@ -111,7 +111,7 @@ object PartitionStrategy { * regardless of direction. */ case object CanonicalRandomVertexCut extends PartitionStrategy { - override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = { + override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val lower = math.min(src, dst) val higher = math.max(src, dst) math.abs((lower, higher).hashCode()) % numParts diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 0f6d413593..ac07a594a1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -40,9 +40,9 @@ import scala.reflect.ClassTag * // Set the vertex attributes to the initial pagerank values * .mapVertices((id, attr) => 1.0) * - * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = + * def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double = * resetProb + (1.0 - resetProb) * msgSum - * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = + * def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = * Iterator((edge.dstId, edge.srcAttr * edge.attr)) * def messageCombiner(a: Double, b: Double): Double = a + b * val initialMessage = 0.0 @@ -113,8 +113,8 @@ object Pregel { initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) - (vprog: (VertexID, VD, A) => VD, - sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + (vprog: (VertexId, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 9a95364cb1..edd59bcf32 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -28,7 +28,7 @@ import org.apache.spark.graphx.impl.MsgRDDFunctions import org.apache.spark.graphx.impl.VertexPartition /** - * Extends `RDD[(VertexID, VD)]` by ensuring that there is only one entry for each vertex and by + * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by * pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be * joined efficiently. All operations except [[reindex]] preserve the index. To construct a * `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]]. @@ -36,12 +36,12 @@ import org.apache.spark.graphx.impl.VertexPartition * @example Construct a `VertexRDD` from a plain RDD: * {{{ * // Construct an initial vertex set - * val someData: RDD[(VertexID, SomeType)] = loadData(someFile) + * val someData: RDD[(VertexId, SomeType)] = loadData(someFile) * val vset = VertexRDD(someData) * // If there were redundant values in someData we would use a reduceFunc * val vset2 = VertexRDD(someData, reduceFunc) * // Finally we can use the VertexRDD to index another dataset - * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile) + * val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile) * val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b } * // Now we can construct very fast joins between the two sets * val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3) @@ -51,7 +51,7 @@ import org.apache.spark.graphx.impl.VertexPartition */ class VertexRDD[@specialized VD: ClassTag]( val partitionsRDD: RDD[VertexPartition[VD]]) - extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { require(partitionsRDD.partitioner.isDefined) @@ -92,9 +92,9 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Provides the `RDD[(VertexID, VD)]` equivalent output. + * Provides the `RDD[(VertexId, VD)]` equivalent output. */ - override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = { + override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = { firstParent[VertexPartition[VD]].iterator(part, context).next.iterator } @@ -114,9 +114,9 @@ class VertexRDD[@specialized VD: ClassTag]( * rather than allocating new memory. * * @param pred the user defined predicate, which takes a tuple to conform to the - * `RDD[(VertexID, VD)]` interface + * `RDD[(VertexId, VD)]` interface */ - override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] = + override def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] = this.mapVertexPartitions(_.filter(Function.untupled(pred))) /** @@ -140,7 +140,7 @@ class VertexRDD[@specialized VD: ClassTag]( * @return a new VertexRDD with values obtained by applying `f` to each of the entries in the * original VertexRDD. The resulting VertexRDD retains the same index. */ - def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] = + def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map(f)) /** @@ -172,7 +172,7 @@ class VertexRDD[@specialized VD: ClassTag]( * @return a VertexRDD containing the results of `f` */ def leftZipJoin[VD2: ClassTag, VD3: ClassTag] - (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { + (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { val newPartitionsRDD = partitionsRDD.zipPartitions( other.partitionsRDD, preservesPartitioning = true ) { (thisIter, otherIter) => @@ -200,8 +200,8 @@ class VertexRDD[@specialized VD: ClassTag]( * by `f`. */ def leftJoin[VD2: ClassTag, VD3: ClassTag] - (other: RDD[(VertexID, VD2)]) - (f: (VertexID, VD, Option[VD2]) => VD3) + (other: RDD[(VertexId, VD2)]) + (f: (VertexId, VD, Option[VD2]) => VD3) : VertexRDD[VD3] = { // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient leftZipJoin @@ -225,7 +225,7 @@ class VertexRDD[@specialized VD: ClassTag]( * [[innerJoin]] for the behavior of the join. */ def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) - (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { + (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = { val newPartitionsRDD = partitionsRDD.zipPartitions( other.partitionsRDD, preservesPartitioning = true ) { (thisIter, otherIter) => @@ -247,8 +247,8 @@ class VertexRDD[@specialized VD: ClassTag]( * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this` * and `other`, with values supplied by `f` */ - def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) - (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { + def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) + (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = { // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient innerZipJoin other match { @@ -278,7 +278,7 @@ class VertexRDD[@specialized VD: ClassTag]( * messages. */ def aggregateUsingIndex[VD2: ClassTag]( - messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { + messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => val vertexPartition: VertexPartition[VD] = thisIter.next() @@ -303,8 +303,8 @@ object VertexRDD { * * @param rdd the collection of vertex-attribute pairs */ - def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = { - val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { + def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = { + val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match { case Some(p) => rdd case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) } @@ -323,8 +323,8 @@ object VertexRDD { * @param rdd the collection of vertex-attribute pairs * @param mergeFunc the associative, commutative merge function. */ - def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = { - val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { + def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = { + val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match { case Some(p) => rdd case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) } @@ -338,7 +338,7 @@ object VertexRDD { * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using * `defaultVal` otherwise. */ - def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD) + def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD) : VertexRDD[VD] = { VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) => value.getOrElse(default) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 6067ee8c7e..57fa5eefd5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap */ private[graphx] class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag]( - val srcIds: Array[VertexID], - val dstIds: Array[VertexID], + val srcIds: Array[VertexId], + val dstIds: Array[VertexId], val data: Array[ED], - val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable { + val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable { /** * Reverse all the edges in this partition. @@ -118,8 +118,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) */ def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = { val builder = new EdgePartitionBuilder[ED] - var currSrcId: VertexID = null.asInstanceOf[VertexID] - var currDstId: VertexID = null.asInstanceOf[VertexID] + var currSrcId: VertexId = null.asInstanceOf[VertexId] + var currDstId: VertexId = null.asInstanceOf[VertexId] var currAttr: ED = null.asInstanceOf[ED] var i = 0 while (i < size) { @@ -153,7 +153,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) */ def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgePartition[ED2]) - (f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = { + (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = { val builder = new EdgePartitionBuilder[ED3] var i = 0 var j = 0 @@ -210,14 +210,14 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * iterator is generated using an index scan, so it is efficient at skipping edges that don't * match srcIdPred. */ - def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] = + def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] = index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator)) /** * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The * cluster must start at position `index`. */ - private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] { + private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] { private[this] val edge = new Edge[ED] private[this] var pos = index diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index 960eeaccf1..63ccccb056 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -29,22 +29,22 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I var edges = new PrimitiveVector[Edge[ED]](size) /** Add a new edge to the partition. */ - def add(src: VertexID, dst: VertexID, d: ED) { + def add(src: VertexId, dst: VertexId, d: ED) { edges += Edge(src, dst, d) } def toEdgePartition: EdgePartition[ED] = { val edgeArray = edges.trim().array Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) - val srcIds = new Array[VertexID](edgeArray.size) - val dstIds = new Array[VertexID](edgeArray.size) + val srcIds = new Array[VertexId](edgeArray.size) + val dstIds = new Array[VertexId](edgeArray.size) val data = new Array[ED](edgeArray.size) - val index = new PrimitiveKeyOpenHashMap[VertexID, Int] + val index = new PrimitiveKeyOpenHashMap[VertexId, Int] // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and // adding them to the index if (edgeArray.length > 0) { index.update(srcIds(0), 0) - var currSrcId: VertexID = srcIds(0) + var currSrcId: VertexId = srcIds(0) var i = 0 while (i < edgeArray.size) { srcIds(i) = edgeArray(i).srcId diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala index 819e3ba93a..886c250d7c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala @@ -41,7 +41,7 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag]( // allocating too many temporary Java objects. private val triplet = new EdgeTriplet[VD, ED] - private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray) + private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray) override def hasNext: Boolean = pos < edgePartition.size diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index eee2d58c3d..1d029bf009 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -105,7 +105,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) } - override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = { + override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = { if (classTag[VD] equals classTag[VD2]) { // The map preserves type, so we can use incremental replication val newVerts = vertices.mapVertexPartitions(_.map(f)).cache() @@ -153,7 +153,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def subgraph( epred: EdgeTriplet[VD, ED] => Boolean = x => true, - vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = { + vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = { // Filter the vertices, reusing the partitioner and the index from this graph val newVerts = vertices.mapVertexPartitions(_.filter(vpred)) @@ -195,7 +195,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( ////////////////////////////////////////////////////////////////////////////////////////////////// override def mapReduceTriplets[A: ClassTag]( - mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], reduceFunc: (A, A) => A, activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = { @@ -225,7 +225,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( val edgeIter = activeDirectionOpt match { case Some(EdgeDirection.Both) => if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) + edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) .filter(e => vPart.isActive(e.dstId)) } else { edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) @@ -236,7 +236,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId)) case Some(EdgeDirection.Out) => if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) + edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) } else { edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) } @@ -267,8 +267,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } // end of mapReduceTriplets override def outerJoinVertices[U: ClassTag, VD2: ClassTag] - (other: RDD[(VertexID, U)]) - (updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = + (other: RDD[(VertexId, U)]) + (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = { if (classTag[VD] equals classTag[VD2]) { // updateF preserves type, so we can use incremental replication @@ -312,7 +312,7 @@ object GraphImpl { } def apply[VD: ClassTag, ED: ClassTag]( - vertices: RDD[(VertexID, VD)], + vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD): GraphImpl[VD, ED] = { @@ -321,7 +321,7 @@ object GraphImpl { // Get the set of all vids val partitioner = Partitioner.defaultPartitioner(vertices) val vPartitioned = vertices.partitionBy(partitioner) - val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner) + val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner) val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) => vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1) } @@ -355,7 +355,7 @@ object GraphImpl { /** * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges - * data structure (RDD[(VertexID, VertexID, ED)]). + * data structure (RDD[(VertexId, VertexId, ED)]). * * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value * pair: the key is the partition id, and the value is an EdgePartition object containing all the @@ -378,19 +378,19 @@ object GraphImpl { defaultVertexAttr: VD): GraphImpl[VD, ED] = { edges.cache() // Get the set of all vids - val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size)) + val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size)) // Create the VertexRDD. val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr)) GraphImpl(vertices, edges) } /** Collects all vids mentioned in edges and partitions them by partitioner. */ - private def collectVertexIDsFromEdges( + private def collectVertexIdsFromEdges( edges: EdgeRDD[_], - partitioner: Partitioner): RDD[(VertexID, Int)] = { + partitioner: Partitioner): RDD[(VertexId, Int)] = { // TODO: Consider doing map side distinct before shuffle. - new ShuffledRDD[VertexID, Int, (VertexID, Int)]( - edges.collectVertexIDs.map(vid => (vid, 0)), partitioner) - .setSerializer(classOf[VertexIDMsgSerializer].getName) + new ShuffledRDD[VertexId, Int, (VertexId, Int)]( + edges.collectVertexIds.map(vid => (vid, 0)), partitioner) + .setSerializer(classOf[VertexIdMsgSerializer].getName) } } // end of object GraphImpl diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala index cea9d11ebe..e9ee09c361 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala @@ -20,16 +20,16 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} import org.apache.spark.Partitioner -import org.apache.spark.graphx.{PartitionID, VertexID} +import org.apache.spark.graphx.{PartitionID, VertexId} import org.apache.spark.rdd.{ShuffledRDD, RDD} private[graphx] class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T]( @transient var partition: PartitionID, - var vid: VertexID, + var vid: VertexId, var data: T) - extends Product2[PartitionID, (VertexID, T)] with Serializable { + extends Product2[PartitionID, (VertexId, T)] with Serializable { override def _1 = partition @@ -61,7 +61,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef private[graphx] class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) { def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = { - val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner) + val rdd = new ShuffledRDD[PartitionID, (VertexId, T), VertexBroadcastMsg[T]](self, partitioner) // Set a custom serializer if the data is of int or double type. if (classTag[T] == ClassTag.Int) { @@ -99,8 +99,8 @@ object MsgRDDFunctions { new VertexBroadcastMsgRDDFunctions(rdd) } - def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = { - val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner) + def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = { + val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner) // Set a custom serializer if the data is of int or double type. if (classTag[T] == ClassTag.Int) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala index 5bdc9339e9..a8154b63ce 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala @@ -50,9 +50,9 @@ class ReplicatedVertexView[VD: ClassTag]( * vids from both the source and destination of edges. It must always include both source and * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. */ - private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { + private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { case Some(prevView) => - prevView.localVertexIDMap + prevView.localVertexIdMap case None => edges.partitionsRDD.mapPartitions(_.map { case (pid, epart) => @@ -62,7 +62,7 @@ class ReplicatedVertexView[VD: ClassTag]( vidToIndex.add(e.dstId) } (pid, vidToIndex) - }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap") + }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap") } private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true) @@ -75,7 +75,7 @@ class ReplicatedVertexView[VD: ClassTag]( srcAttrOnly.unpersist(blocking) dstAttrOnly.unpersist(blocking) noAttrs.unpersist(blocking) - // Don't unpersist localVertexIDMap because a future ReplicatedVertexView may be using it + // Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it // without modification this } @@ -133,8 +133,8 @@ class ReplicatedVertexView[VD: ClassTag]( case None => // Within each edge partition, place the shipped vertex attributes into the correct - // locations specified in localVertexIDMap - localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => + // locations specified in localVertexIdMap + localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => val (pid, vidToIndex) = mapIter.next() assert(!mapIter.hasNext) // Populate the vertex array using the vidToIndex map @@ -157,15 +157,15 @@ class ReplicatedVertexView[VD: ClassTag]( private object ReplicatedVertexView { protected def buildBuffer[VD: ClassTag]( - pid2vidIter: Iterator[Array[Array[VertexID]]], + pid2vidIter: Iterator[Array[Array[VertexId]]], vertexPartIter: Iterator[VertexPartition[VD]]) = { - val pid2vid: Array[Array[VertexID]] = pid2vidIter.next() + val pid2vid: Array[Array[VertexId]] = pid2vidIter.next() val vertexPart: VertexPartition[VD] = vertexPartIter.next() Iterator.tabulate(pid2vid.size) { pid => val vidsCandidate = pid2vid(pid) val size = vidsCandidate.length - val vids = new PrimitiveVector[VertexID](pid2vid(pid).size) + val vids = new PrimitiveVector[VertexId](pid2vid(pid).size) val attrs = new PrimitiveVector[VD](pid2vid(pid).size) var i = 0 while (i < size) { @@ -181,16 +181,16 @@ private object ReplicatedVertexView { } protected def buildActiveBuffer( - pid2vidIter: Iterator[Array[Array[VertexID]]], + pid2vidIter: Iterator[Array[Array[VertexId]]], activePartIter: Iterator[VertexPartition[_]]) - : Iterator[(Int, Array[VertexID])] = { - val pid2vid: Array[Array[VertexID]] = pid2vidIter.next() + : Iterator[(Int, Array[VertexId])] = { + val pid2vid: Array[Array[VertexId]] = pid2vidIter.next() val activePart: VertexPartition[_] = activePartIter.next() Iterator.tabulate(pid2vid.size) { pid => val vidsCandidate = pid2vid(pid) val size = vidsCandidate.length - val actives = new PrimitiveVector[VertexID](vidsCandidate.size) + val actives = new PrimitiveVector[VertexId](vidsCandidate.size) var i = 0 while (i < size) { val vid = vidsCandidate(i) @@ -205,8 +205,8 @@ private object ReplicatedVertexView { } private[graphx] -class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD]) +class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD]) extends Serializable { - def iterator: Iterator[(VertexID, VD)] = + def iterator: Iterator[(VertexId, VD)] = (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala index b365d4914e..fe44e1ee0c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala @@ -32,12 +32,12 @@ import org.apache.spark.util.collection.PrimitiveVector private[impl] class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { - val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true) - val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false) - val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true) - val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false) + val bothAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(true, true) + val srcAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(true, false) + val dstAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(false, true) + val noAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(false, false) - def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = + def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = (includeSrcAttr, includeDstAttr) match { case (true, true) => bothAttrs case (true, false) => srcAttrOnly @@ -46,9 +46,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { } private def createPid2Vid( - includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = { + includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. - val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => + val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() val numEdges = edgePartition.size val vSet = new VertexSet @@ -71,7 +71,7 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { val numPartitions = vertices.partitions.size vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter => - val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID]) + val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId]) for ((vid, pid) <- iter) { pid2vid(pid) += vid } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala index bcad1fbc58..c74d487e20 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala @@ -25,12 +25,12 @@ import org.apache.spark.graphx._ import org.apache.spark.serializer._ private[graphx] -class VertexIDMsgSerializer(conf: SparkConf) extends Serializer { +class VertexIdMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(VertexID, _)] + val msg = t.asInstanceOf[(VertexId, _)] writeVarLong(msg._1, optimizePositive = false) this } @@ -123,7 +123,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(VertexID, Int)] + val msg = t.asInstanceOf[(VertexId, Int)] writeVarLong(msg._1, optimizePositive = false) writeUnsignedVarInt(msg._2) this @@ -147,7 +147,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(VertexID, Long)] + val msg = t.asInstanceOf[(VertexId, Long)] writeVarLong(msg._1, optimizePositive = false) writeVarLong(msg._2, optimizePositive = true) this @@ -171,7 +171,7 @@ class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(VertexID, Double)] + val msg = t.asInstanceOf[(VertexId, Double)] writeVarLong(msg._1, optimizePositive = false) writeDouble(msg._2) this diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala index f13bdded75..7a54b413dc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala @@ -26,18 +26,18 @@ import org.apache.spark.util.collection.BitSet private[graphx] object VertexPartition { - def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = { - val map = new PrimitiveKeyOpenHashMap[VertexID, VD] + def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): VertexPartition[VD] = { + val map = new PrimitiveKeyOpenHashMap[VertexId, VD] iter.foreach { case (k, v) => map(k) = v } new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) } - def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD) + def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD) : VertexPartition[VD] = { - val map = new PrimitiveKeyOpenHashMap[VertexID, VD] + val map = new PrimitiveKeyOpenHashMap[VertexId, VD] iter.foreach { case (k, v) => map.setMerge(k, v, mergeFunc) } @@ -60,15 +60,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( def size: Int = mask.cardinality() /** Return the vertex attribute for the given vertex ID. */ - def apply(vid: VertexID): VD = values(index.getPos(vid)) + def apply(vid: VertexId): VD = values(index.getPos(vid)) - def isDefined(vid: VertexID): Boolean = { + def isDefined(vid: VertexId): Boolean = { val pos = index.getPos(vid) pos >= 0 && mask.get(pos) } /** Look up vid in activeSet, throwing an exception if it is None. */ - def isActive(vid: VertexID): Boolean = { + def isActive(vid: VertexId): Boolean = { activeSet.get.contains(vid) } @@ -88,7 +88,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * each of the entries in the original VertexRDD. The resulting * VertexPartition retains the same index. */ - def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = { + def map[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexPartition[VD2] = { // Construct a view of the map transformation val newValues = new Array[VD2](capacity) var i = mask.nextSetBit(0) @@ -108,7 +108,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * RDD can be easily joined with the original vertex-set. Furthermore, the filter only * modifies the bitmap index and so no new values are allocated. */ - def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = { + def filter(pred: (VertexId, VD) => Boolean): VertexPartition[VD] = { // Allocate the array to store the results into val newMask = new BitSet(capacity) // Iterate over the active bits in the old mask and evaluate the predicate @@ -146,7 +146,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( /** Left outer join another VertexPartition. */ def leftJoin[VD2: ClassTag, VD3: ClassTag] (other: VertexPartition[VD2]) - (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { + (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { if (index != other.index) { logWarning("Joining two VertexPartitions with different indexes is slow.") leftJoin(createUsingIndex(other.iterator))(f) @@ -165,14 +165,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( /** Left outer join another iterator of messages. */ def leftJoin[VD2: ClassTag, VD3: ClassTag] - (other: Iterator[(VertexID, VD2)]) - (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { + (other: Iterator[(VertexId, VD2)]) + (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { leftJoin(createUsingIndex(other))(f) } /** Inner join another VertexPartition. */ def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U]) - (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = { + (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = { if (index != other.index) { logWarning("Joining two VertexPartitions with different indexes is slow.") innerJoin(createUsingIndex(other.iterator))(f) @@ -192,15 +192,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * Inner join an iterator of messages. */ def innerJoin[U: ClassTag, VD2: ClassTag] - (iter: Iterator[Product2[VertexID, U]]) - (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = { + (iter: Iterator[Product2[VertexId, U]]) + (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = { innerJoin(createUsingIndex(iter))(f) } /** * Similar effect as aggregateUsingIndex((a, b) => a) */ - def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]]) + def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]]) : VertexPartition[VD2] = { val newMask = new BitSet(capacity) val newValues = new Array[VD2](capacity) @@ -218,7 +218,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in * the partition, hidden by the bitmask. */ - def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = { + def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = { val newMask = new BitSet(capacity) val newValues = new Array[VD](capacity) System.arraycopy(values, 0, newValues, 0, newValues.length) @@ -233,7 +233,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( } def aggregateUsingIndex[VD2: ClassTag]( - iter: Iterator[Product2[VertexID, VD2]], + iter: Iterator[Product2[VertexId, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = { val newMask = new BitSet(capacity) val newValues = new Array[VD2](capacity) @@ -253,7 +253,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( new VertexPartition[VD2](index, newValues, newMask) } - def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = { + def replaceActives(iter: Iterator[VertexId]): VertexPartition[VD] = { val newActiveSet = new VertexSet iter.foreach(newActiveSet.add(_)) new VertexPartition(index, values, mask, Some(newActiveSet)) @@ -263,7 +263,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * Construct a new VertexPartition whose index contains only the vertices in the mask. */ def reindex(): VertexPartition[VD] = { - val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD] + val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD] val arbitraryMerge = (a: VD, b: VD) => a for ((k, v) <- this.iterator) { hashMap.setMerge(k, v, arbitraryMerge) @@ -271,8 +271,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet) } - def iterator: Iterator[(VertexID, VD)] = + def iterator: Iterator[(VertexId, VD)] = mask.iterator.map(ind => (index.getValue(ind), values(ind))) - def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind)) + def vidIterator: Iterator[VertexId] = mask.iterator.map(ind => index.getValue(ind)) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala index f493d2dd01..79549fe060 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala @@ -20,5 +20,5 @@ package org.apache.spark.graphx import org.apache.spark.util.collection.OpenHashSet package object impl { - private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexID] + private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexId] } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index 2a6c0aa6b5..e2f6cc1389 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -35,9 +35,9 @@ object ConnectedComponents { * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { val ccGraph = graph.mapVertices { case (vid, _) => vid } - def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { + def sendMessage(edge: EdgeTriplet[VertexId, ED]) = { if (edge.srcAttr < edge.dstAttr) { Iterator((edge.dstId, edge.srcAttr)) } else if (edge.srcAttr > edge.dstAttr) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 2bdd8c9f98..614555a054 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -92,7 +92,7 @@ object PageRank extends Logging { // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = + def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double = resetProb + (1.0 - resetProb) * msgSum def sendMessage(edge: EdgeTriplet[Double, Double]) = Iterator((edge.dstId, edge.srcAttr * edge.attr)) @@ -137,7 +137,7 @@ object PageRank extends Logging { // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = { + def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = { val (oldPR, lastDelta) = attr val newPR = oldPR + (1.0 - resetProb) * msgSum (newPR, newPR - oldPR) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index 9c7a212c5a..c327ce7935 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -79,13 +79,13 @@ object SVDPlusPlus { (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2)) g = g.outerJoinVertices(t0) { - (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => + (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) } def mapTrainF(conf: Conf, u: Double) (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) - : Iterator[(VertexID, (RealVector, RealVector, Double))] = { + : Iterator[(VertexId, (RealVector, RealVector, Double))] = { val (usr, itm) = (et.srcAttr, et.dstAttr) val (p, q) = (usr._1, itm._1) var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) @@ -112,7 +112,7 @@ object SVDPlusPlus { et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2)) g = g.outerJoinVertices(t1) { - (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => + (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd } @@ -123,7 +123,7 @@ object SVDPlusPlus { (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) => (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3)) g = g.outerJoinVertices(t2) { - (vid: VertexID, + (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) => (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4) @@ -133,7 +133,7 @@ object SVDPlusPlus { // calculate error on training set def mapTestF(conf: Conf, u: Double) (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) - : Iterator[(VertexID, Double)] = + : Iterator[(VertexId, Double)] = { val (usr, itm) = (et.srcAttr, et.dstAttr) val (p, q) = (usr._1, itm._1) @@ -146,7 +146,7 @@ object SVDPlusPlus { g.cache() val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) g = g.outerJoinVertices(t3) { - (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => + (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala index ed84f72156..46da38eeb7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala @@ -35,7 +35,7 @@ object StronglyConnectedComponents { * * @return a graph with vertex attributes containing the smallest vertex id in each SCC */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED] = { // the graph we update with final SCC ids, and the graph we return at the end var sccGraph = graph.mapVertices { case (vid, _) => vid } @@ -71,7 +71,7 @@ object StronglyConnectedComponents { // collect min of all my neighbor's scc values, update if it's smaller than mine // then notify any neighbors with scc values larger than mine - sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID]( + sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId]( sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)( (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2), e => { @@ -85,7 +85,7 @@ object StronglyConnectedComponents { // start at root of SCCs. Traverse values in reverse, notify all my neighbors // do not propagate if colors do not match! - sccWorkGraph = Pregel[(VertexID, Boolean), ED, Boolean]( + sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean]( sccWorkGraph, false, activeDirection = EdgeDirection.In)( // vertex is final if it is the root of a color // or it has the same color as a neighbor that is final diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index a124c892dc..7c396e6e66 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -61,7 +61,7 @@ object TriangleCount { (vid, _, optSet) => optSet.getOrElse(null) } // Edge function computes intersection of smaller vertex with larger vertex - def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = { + def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexId, Int)] = { assert(et.srcAttr != null) assert(et.dstAttr != null) val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala index e1ff3ea0d1..425a5164ca 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/package.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala @@ -25,11 +25,11 @@ package object graphx { * A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need * to follow any ordering or any constraints other than uniqueness. */ - type VertexID = Long + type VertexId = Long /** Integer identifer of a graph partition. */ // TODO: Consider using Char. type PartitionID = Int - private[graphx] type VertexSet = OpenHashSet[VertexID] + private[graphx] type VertexSet = OpenHashSet[VertexId] } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index 9805eb3285..7677641bfe 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -50,7 +50,7 @@ object GraphGenerators { val mu = 4 val sigma = 1.3 - val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{ + val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{ src => (src, sampleLogNormal(mu, sigma, numVertices)) } val edges = vertices.flatMap { v => @@ -59,9 +59,9 @@ object GraphGenerators { Graph(vertices, edges, 0) } - def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = { + def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = { val rand = new Random() - Array.fill(maxVertexID) { Edge[Int](src, rand.nextInt(maxVertexID), 1) } + Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) } } /** @@ -206,9 +206,9 @@ object GraphGenerators { */ def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = { // Convert row column address into vertex ids (row major order) - def sub2ind(r: Int, c: Int): VertexID = r * cols + c + def sub2ind(r: Int, c: Int): VertexId = r * cols + c - val vertices: RDD[(VertexID, (Int,Int))] = + val vertices: RDD[(VertexId, (Int,Int))] = sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) ) val edges: RDD[Edge[Double]] = vertices.flatMap{ case (vid, (r,c)) => @@ -228,7 +228,7 @@ object GraphGenerators { * being the center vertex. */ def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { - val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) + val edges: RDD[(VertexId, VertexId)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) Graph.fromEdgeTuples(edges, 1) } // end of starGraph diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index 4a792c0dab..bc2ad5677f 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -28,12 +28,12 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { test("joinVertices") { withSpark { sc => val vertices = - sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2) + sc.parallelize(Seq[(VertexId, String)]((1, "one"), (2, "two"), (3, "three")), 2) val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo")))) val g: Graph[String, String] = Graph(vertices, edges) - val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20))) - val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u } + val tbl = sc.parallelize(Seq[(VertexId, Int)]((1, 10), (2, 20))) + val g1 = g.joinVertices(tbl) { (vid: VertexId, attr: String, u: Int) => attr + u } val v = g1.vertices.collect().toSet assert(v === Set((1, "one10"), (2, "two20"), (3, "three"))) @@ -60,7 +60,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { test ("filter") { withSpark { sc => val n = 5 - val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x))) + val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x))) val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) val graph: Graph[Int, Int] = Graph(vertices, edges).cache() val filteredGraph = graph.filter( @@ -68,7 +68,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { val degrees: VertexRDD[Int] = graph.outDegrees graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} }, - vpred = (vid: VertexID, deg:Int) => deg > 0 + vpred = (vid: VertexId, deg:Int) => deg > 0 ).cache() val v = filteredGraph.vertices.collect().toSet diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index b18bc98e6d..28d34dd9a1 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.rdd._ class GraphSuite extends FunSuite with LocalSparkContext { def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = { - Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v") + Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexId, x: VertexId)), 3), "v") } test("Graph.fromEdgeTuples") { @@ -57,7 +57,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark { sc => val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L) val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) } - val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true))) + val vertices: RDD[(VertexId, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true))) val graph = Graph(vertices, edges, false) assert( graph.edges.count() === rawEdges.size ) // Vertices not explicitly provided but referenced by edges should be created automatically @@ -74,7 +74,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val n = 5 val star = starGraph(sc, n) assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet === - (1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet) + (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet) } } @@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val p = 100 val verts = 1 to n val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x => - verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0) + verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0) assert(graph.edges.partitions.length === p) val partitionedGraph = graph.partitionBy(EdgePartition2D) assert(graph.edges.partitions.length === p) @@ -136,10 +136,10 @@ class GraphSuite extends FunSuite with LocalSparkContext { val star = starGraph(sc, n) // mapVertices preserving type val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2") - assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet) + assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet) // mapVertices changing type val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length) - assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet) + assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, 1)).toSet) } } @@ -168,7 +168,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark { sc => val n = 5 val star = starGraph(sc, n) - assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet) + assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexId, 1)).toSet) } } @@ -191,7 +191,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { test("mask") { withSpark { sc => val n = 5 - val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x))) + val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x))) val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) val graph: Graph[Int, Int] = Graph(vertices, edges).cache() @@ -218,7 +218,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val star = starGraph(sc, n) val doubleStar = Graph.fromEdgeTuples( sc.parallelize((1 to n).flatMap(x => - List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v") + List((0: VertexId, x: VertexId), (0: VertexId, x: VertexId))), 1), "v") val star2 = doubleStar.groupEdges { (a, b) => a} assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) === star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int])) @@ -237,7 +237,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) // activeSetOpt - val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID) + val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId) val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0) val vids = complete.mapVertices((vid, attr) => vid).cache() val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 } @@ -248,10 +248,10 @@ class GraphSuite extends FunSuite with LocalSparkContext { } Iterator((et.srcId, 1)) }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet - assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet) + assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet) // outerJoinVertices followed by mapReduceTriplets(activeSetOpt) - val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3) + val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x+1) % n: VertexId)), 3) val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache() val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache() val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) } @@ -262,7 +262,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { } Iterator((et.dstId, 1)) }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet - assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet) + assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet) } } @@ -277,7 +277,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets( et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)), (a: Int, b: Int) => a + b).collect.toSet - assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0))) + assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0))) // outerJoinVertices preserving type val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString } val newReverseStar = diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala index 936e5c9c86..490b94429e 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala @@ -27,7 +27,7 @@ class PregelSuite extends FunSuite with LocalSparkContext { test("1 iteration") { withSpark { sc => val n = 5 - val starEdges = (1 to n).map(x => (0: VertexID, x: VertexID)) + val starEdges = (1 to n).map(x => (0: VertexId, x: VertexId)) val star = Graph.fromEdgeTuples(sc.parallelize(starEdges, 3), "v").cache() val result = Pregel(star, 0)( (vid, attr, msg) => attr, @@ -41,12 +41,12 @@ class PregelSuite extends FunSuite with LocalSparkContext { withSpark { sc => val n = 5 val chain = Graph.fromEdgeTuples( - sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3), + sc.parallelize((1 until n).map(x => (x: VertexId, x + 1: VertexId)), 3), 0).cache() - assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet) + assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexId, 0)).toSet) val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }.cache() assert(chainWithSeed.vertices.collect.toSet === - Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet) + Set((1: VertexId, 1)) ++ (2 to n).map(x => (x: VertexId, 0)).toSet) val result = Pregel(chainWithSeed, 0)( (vid, attr, msg) => math.max(msg, attr), et => if (et.dstAttr != et.srcAttr) Iterator((et.dstId, et.srcAttr)) else Iterator.empty, diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala index 0c756400f4..e5a582b47b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala @@ -99,7 +99,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext { test("IntAggMsgSerializer") { val conf = new SparkConf(false) - val outMsg = (4: VertexID, 5) + val outMsg = (4: VertexId, 5) val bout = new ByteArrayOutputStream val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) @@ -107,8 +107,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext { bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin) - val inMsg1: (VertexID, Int) = inStrm.readObject() - val inMsg2: (VertexID, Int) = inStrm.readObject() + val inMsg1: (VertexId, Int) = inStrm.readObject() + val inMsg2: (VertexId, Int) = inStrm.readObject() assert(outMsg === inMsg1) assert(outMsg === inMsg2) @@ -119,7 +119,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext { test("LongAggMsgSerializer") { val conf = new SparkConf(false) - val outMsg = (4: VertexID, 1L << 32) + val outMsg = (4: VertexId, 1L << 32) val bout = new ByteArrayOutputStream val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) @@ -127,8 +127,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext { bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin) - val inMsg1: (VertexID, Long) = inStrm.readObject() - val inMsg2: (VertexID, Long) = inStrm.readObject() + val inMsg1: (VertexId, Long) = inStrm.readObject() + val inMsg2: (VertexId, Long) = inStrm.readObject() assert(outMsg === inMsg1) assert(outMsg === inMsg2) @@ -139,7 +139,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext { test("DoubleAggMsgSerializer") { val conf = new SparkConf(false) - val outMsg = (4: VertexID, 5.0) + val outMsg = (4: VertexId, 5.0) val bout = new ByteArrayOutputStream val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) @@ -147,8 +147,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext { bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin) - val inMsg1: (VertexID, Double) = inStrm.readObject() - val inMsg2: (VertexID, Double) = inStrm.readObject() + val inMsg1: (VertexId, Double) = inStrm.readObject() + val inMsg2: (VertexId, Double) = inStrm.readObject() assert(outMsg === inMsg1) assert(outMsg === inMsg2) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala index 1195beba58..e135d1d7ad 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -79,7 +79,7 @@ class EdgePartitionSuite extends FunSuite { test("innerJoin") { def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { val builder = new EdgePartitionBuilder[A] - for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) } + for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) } builder.toEdgePartition } val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index eba8d7b716..3915be15b3 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -100,7 +100,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext { test("Connected Components on a Toy Connected Graph") { withSpark { sc => // Create an RDD for the vertices - val users: RDD[(VertexID, (String, String))] = + val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (4L, ("peter", "student")))) -- cgit v1.2.3