aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/graphx-programming-guide.md70
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Edge.scala8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala18
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala32
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala42
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala16
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala32
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala30
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala16
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala44
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/package.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala12
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala10
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala28
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala8
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala18
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala2
33 files changed, 244 insertions, 244 deletions
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index 4bf4743457..3dfed7bea9 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -186,7 +186,7 @@ code constructs a graph from a collection of RDDs:
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
-val users: RDD[(VertexID, (String, String))] =
+val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
@@ -360,7 +360,7 @@ graph contains the following:
{% highlight scala %}
class Graph[VD, ED] {
- def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
+ def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
@@ -382,7 +382,7 @@ val newGraph = Graph(newVertices, graph.edges)
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
{% endhighlight %}
-[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexID,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
+[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]
These operators are often used to initialize the graph for a particular computation or project away
unnecessary properties. For example, given a graph with the out-degrees as the vertex properties
@@ -408,7 +408,7 @@ add more in the future. The following is a list of the basic structural operato
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
- vpred: (VertexID, VD) => Boolean): Graph[VD, ED]
+ vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
@@ -427,11 +427,11 @@ satisfy the edge predicate *and connect vertices that satisfy the vertex predica
operator can be used in number of situations to restrict the graph to the vertices and edges of
interest or eliminate broken links. For example in the following code we remove broken links:
-[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexID,VD)⇒Boolean):Graph[VD,ED]
+[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED]
{% highlight scala %}
// Create an RDD for the vertices
-val users: RDD[(VertexID, (String, String))] =
+val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
@@ -494,9 +494,9 @@ using the *join* operators. Below we list the key join operators:
{% highlight scala %}
class Graph[VD, ED] {
- def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD)
+ def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
- def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2)
+ def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
{% endhighlight %}
@@ -506,7 +506,7 @@ returns a new graph with the vertex properties obtained by applying the user def
to the result of the joined vertices. Vertices without a matching value in the RDD retain their
original value.
-[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexID,U)])((VertexID,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]
+[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED]
> Note that if the RDD contains more than one value for a given vertex only one will be used. It
> is therefore recommended that the input RDD be first made unique using the following which will
@@ -525,7 +525,7 @@ property type. Because not all vertices may have a matching value in the input
function takes an `Option` type. For example, we can setup a graph for PageRank by initializing
vertex properties with their `outDegree`.
-[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexID,U)])((VertexID,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
+[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]
{% highlight scala %}
@@ -559,7 +559,7 @@ PageRank Value, shortest path to the source, and smallest reachable vertex id).
### Map Reduce Triplets (mapReduceTriplets)
<a name="mrTriplets"></a>
-[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
+[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=&gt;Iterator[(org.apache.spark.graphx.VertexId,A)],reduceFunc:(A,A)=&gt;A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A]
The core (heavily optimized) aggregation primitive in GraphX is the
[`mapReduceTriplets`][Graph.mapReduceTriplets] operator:
@@ -567,7 +567,7 @@ The core (heavily optimized) aggregation primitive in GraphX is the
{% highlight scala %}
class Graph[VD, ED] {
def mapReduceTriplets[A](
- map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduce: (A, A) => A)
: VertexRDD[A]
}
@@ -649,13 +649,13 @@ compute the max in, out, and total degrees:
{% highlight scala %}
// Define a reduce operation to compute the highest degree vertex
-def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = {
+def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
-val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max)
-val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max)
-val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max)
+val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
+val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
+val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
{% endhighlight %}
### Collecting Neighbors
@@ -665,14 +665,14 @@ attributes at each vertex. This can be easily accomplished using the
[`collectNeighborIds`][GraphOps.collectNeighborIds] and the
[`collectNeighbors`][GraphOps.collectNeighbors] operators.
-[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexID]]
-[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexID,VD)]]
+[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]]
+[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]]
{% highlight scala %}
class GraphOps[VD, ED] {
- def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
- def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ]
+ def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
+ def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}
{% endhighlight %}
@@ -716,7 +716,7 @@ messages remaining.
The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
of its implementation (note calls to graph.cache have been removed):
-[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexID,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexID,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
+[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED]
{% highlight scala %}
class GraphOps[VD, ED] {
@@ -724,8 +724,8 @@ class GraphOps[VD, ED] {
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
- (vprog: (VertexID, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ (vprog: (VertexId, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
@@ -770,7 +770,7 @@ import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Int, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
-val sourceId: VertexID = 42 // The ultimate source
+val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
@@ -817,7 +817,7 @@ It creates a `Graph` from the specified edges, automatically creating any vertic
{% highlight scala %}
object Graph {
def apply[VD, ED](
- vertices: RDD[(VertexID, VD)],
+ vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null)
: Graph[VD, ED]
@@ -827,7 +827,7 @@ object Graph {
defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD](
- rawEdges: RDD[(VertexID, VertexID)],
+ rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
@@ -843,8 +843,8 @@ object Graph {
[PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy$
[GraphLoader.edgeListFile]: api/graphx/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int]
-[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexID,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
-[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexID,VertexID)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
+[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
+[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int]
[Graph.fromEdges]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED]
# Vertex and Edge RDDs
@@ -868,17 +868,17 @@ additional functionality:
{% highlight scala %}
class VertexRDD[VD] extends RDD[(VertexID, VD)] {
// Filter the vertex set but preserves the internal index
- def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
+ def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
- def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2]
+ def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
- def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3]
- def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2]
+ def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
+ def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
- def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
+ def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
{% endhighlight %}
@@ -896,7 +896,7 @@ both aggregate and then subsequently index the `RDD[(VertexID, A)]`. For exampl
{% highlight scala %}
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
-val rddB: RDD[(VertexID, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
+val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
@@ -922,7 +922,7 @@ def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
-def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3]
+def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
{% endhighlight %}
In most applications we have found that operations on the `EdgeRDD` are accomplished through the
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
index 32f1602698..580faa0866 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
@@ -28,8 +28,8 @@ package org.apache.spark.graphx
* @param attr The attribute associated with the edge
*/
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
- var srcId: VertexID = 0,
- var dstId: VertexID = 0,
+ var srcId: VertexId = 0,
+ var dstId: VertexId = 0,
var attr: ED = null.asInstanceOf[ED])
extends Serializable {
@@ -39,7 +39,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
* @param vid the id one of the two vertices on the edge.
* @return the id of the other vertex on the edge.
*/
- def otherVertexId(vid: VertexID): VertexID =
+ def otherVertexId(vid: VertexId): VertexId =
if (srcId == vid) dstId else { assert(dstId == vid); srcId }
/**
@@ -50,7 +50,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
* @return the relative direction of the edge to the corresponding
* vertex.
*/
- def relativeDirection(vid: VertexID): EdgeDirection =
+ def relativeDirection(vid: VertexId): EdgeDirection =
if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 6efef061d7..fe03ae4a62 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -102,7 +102,7 @@ class EdgeRDD[@specialized ED: ClassTag](
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
- (f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = {
+ (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
@@ -113,7 +113,7 @@ class EdgeRDD[@specialized ED: ClassTag](
})
}
- private[graphx] def collectVertexIDs(): RDD[VertexID] = {
+ private[graphx] def collectVertexIds(): RDD[VertexId] = {
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
}
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index 2c659cb070..fea43c3b2b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -50,7 +50,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
* @param vid the id one of the two vertices on the edge
* @return the attribute for the other vertex on the edge
*/
- def otherVertexAttr(vid: VertexID): VD =
+ def otherVertexAttr(vid: VertexId): VD =
if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr }
/**
@@ -59,7 +59,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
* @param vid the id of one of the two vertices on the edge
* @return the attr for the vertex with that id
*/
- def vertexAttr(vid: VertexID): VD =
+ def vertexAttr(vid: VertexId): VD =
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 7f65244cd9..eea95d38d5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -126,7 +126,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* }}}
*
*/
- def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED]
+ def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
/**
* Transforms each edge attribute in the graph using the map function. The map function is not
@@ -242,7 +242,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
- vpred: (VertexID, VD) => Boolean = ((v, d) => true))
+ vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
/**
@@ -292,7 +292,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* vertex
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
- * val inDeg: RDD[(VertexID, Int)] =
+ * val inDeg: RDD[(VertexId, Int)] =
* mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
* }}}
*
@@ -304,7 +304,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*
*/
def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
: VertexRDD[A]
@@ -328,14 +328,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*
* {{{
* val rawGraph: Graph[_, _] = Graph.textFile("webgraph")
- * val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees()
+ * val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees()
* val graph = rawGraph.outerJoinVertices(outDeg) {
* (vid, data, optDeg) => optDeg.getOrElse(0)
* }
* }}}
*/
- def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
- (mapFunc: (VertexID, VD, Option[U]) => VD2)
+ def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
+ (mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
/**
@@ -364,7 +364,7 @@ object Graph {
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
*/
def fromEdgeTuples[VD: ClassTag](
- rawEdges: RDD[(VertexID, VertexID)],
+ rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
{
@@ -405,7 +405,7 @@ object Graph {
* mentioned in edges but not in vertices
*/
def apply[VD: ClassTag, ED: ClassTag](
- vertices: RDD[(VertexID, VD)],
+ vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
index 6db8a34937..dd380d8c18 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -33,7 +33,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
- kryo.register(classOf[(VertexID, Object)])
+ kryo.register(classOf[(VertexId, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 9b864c1290..0fc1e4df68 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -80,19 +80,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*
* @return the set of neighboring ids for each vertex
*/
- def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] = {
+ def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
val nbrs =
if (edgeDirection == EdgeDirection.Either) {
- graph.mapReduceTriplets[Array[VertexID]](
+ graph.mapReduceTriplets[Array[VertexId]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _
)
} else if (edgeDirection == EdgeDirection.Out) {
- graph.mapReduceTriplets[Array[VertexID]](
+ graph.mapReduceTriplets[Array[VertexId]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId))),
reduceFunc = _ ++ _)
} else if (edgeDirection == EdgeDirection.In) {
- graph.mapReduceTriplets[Array[VertexID]](
+ graph.mapReduceTriplets[Array[VertexId]](
mapFunc = et => Iterator((et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _)
} else {
@@ -100,7 +100,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
}
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
- nbrsOpt.getOrElse(Array.empty[VertexID])
+ nbrsOpt.getOrElse(Array.empty[VertexId])
}
} // end of collectNeighborIds
@@ -116,8 +116,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*
* @return the vertex set of neighboring vertex attributes for each vertex
*/
- def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = {
- val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]](
+ def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
+ val nbrs = graph.mapReduceTriplets[Array[(VertexId,VD)]](
edge => {
val msgToSrc = (edge.srcId, Array((edge.dstId, edge.dstAttr)))
val msgToDst = (edge.dstId, Array((edge.srcId, edge.srcAttr)))
@@ -133,7 +133,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
(a, b) => a ++ b)
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
- nbrsOpt.getOrElse(Array.empty[(VertexID, VD)])
+ nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
}
} // end of collectNeighbor
@@ -164,9 +164,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* }}}
*
*/
- def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD)
+ def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
- val uf = (id: VertexID, data: VD, o: Option[U]) => {
+ val uf = (id: VertexId, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
@@ -197,7 +197,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* val degrees: VertexRDD[Int] = graph.outDegrees
* graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
* },
- * vpred = (vid: VertexID, deg:Int) => deg > 0
+ * vpred = (vid: VertexId, deg:Int) => deg > 0
* )
* }}}
*
@@ -205,7 +205,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
def filter[VD2: ClassTag, ED2: ClassTag](
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
- vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = {
+ vpred: (VertexId, VD2) => Boolean = (v:VertexId, d:VD2) => true): Graph[VD, ED] = {
graph.mask(preprocess(graph).subgraph(epred, vpred))
}
@@ -260,8 +260,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
- vprog: (VertexID, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
+ vprog: (VertexId, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
@@ -293,7 +293,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
*/
- def connectedComponents(): Graph[VertexID, ED] = {
+ def connectedComponents(): Graph[VertexId, ED] = {
ConnectedComponents.run(graph)
}
@@ -312,7 +312,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
*
* @see [[org.apache.spark.graphx.lib.StronglyConnectedComponents$#run]]
*/
- def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = {
+ def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] = {
StronglyConnectedComponents.run(graph, numIter)
}
} // end of GraphOps
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 8ba87976f1..929915362c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -23,7 +23,7 @@ package org.apache.spark.graphx
*/
trait PartitionStrategy extends Serializable {
/** Returns the partition number for a given edge. */
- def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
+ def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
}
/**
@@ -73,9 +73,9 @@ object PartitionStrategy {
* is used.
*/
case object EdgePartition2D extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
- val mixingPrime: VertexID = 1125899906842597L
+ val mixingPrime: VertexId = 1125899906842597L
val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
@@ -87,8 +87,8 @@ object PartitionStrategy {
* source.
*/
case object EdgePartition1D extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
- val mixingPrime: VertexID = 1125899906842597L
+ override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
+ val mixingPrime: VertexId = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
}
}
@@ -99,7 +99,7 @@ object PartitionStrategy {
* random vertex cut that colocates all same-direction edges between two vertices.
*/
case object RandomVertexCut extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
math.abs((src, dst).hashCode()) % numParts
}
}
@@ -111,7 +111,7 @@ object PartitionStrategy {
* regardless of direction.
*/
case object CanonicalRandomVertexCut extends PartitionStrategy {
- override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)
math.abs((lower, higher).hashCode()) % numParts
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 0f6d413593..ac07a594a1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -40,9 +40,9 @@ import scala.reflect.ClassTag
* // Set the vertex attributes to the initial pagerank values
* .mapVertices((id, attr) => 1.0)
*
- * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
+ * def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
* resetProb + (1.0 - resetProb) * msgSum
- * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
+ * def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
* Iterator((edge.dstId, edge.srcAttr * edge.attr))
* def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0
@@ -113,8 +113,8 @@ object Pregel {
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
- (vprog: (VertexID, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ (vprog: (VertexId, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 9a95364cb1..edd59bcf32 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -28,7 +28,7 @@ import org.apache.spark.graphx.impl.MsgRDDFunctions
import org.apache.spark.graphx.impl.VertexPartition
/**
- * Extends `RDD[(VertexID, VD)]` by ensuring that there is only one entry for each vertex and by
+ * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
* pre-indexing the entries for fast, efficient joins. Two VertexRDDs with the same index can be
* joined efficiently. All operations except [[reindex]] preserve the index. To construct a
* `VertexRDD`, use the [[org.apache.spark.graphx.VertexRDD$ VertexRDD object]].
@@ -36,12 +36,12 @@ import org.apache.spark.graphx.impl.VertexPartition
* @example Construct a `VertexRDD` from a plain RDD:
* {{{
* // Construct an initial vertex set
- * val someData: RDD[(VertexID, SomeType)] = loadData(someFile)
+ * val someData: RDD[(VertexId, SomeType)] = loadData(someFile)
* val vset = VertexRDD(someData)
* // If there were redundant values in someData we would use a reduceFunc
* val vset2 = VertexRDD(someData, reduceFunc)
* // Finally we can use the VertexRDD to index another dataset
- * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile)
+ * val otherData: RDD[(VertexId, OtherType)] = loadData(otherFile)
* val vset3 = vset2.innerJoin(otherData) { (vid, a, b) => b }
* // Now we can construct very fast joins between the two sets
* val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
@@ -51,7 +51,7 @@ import org.apache.spark.graphx.impl.VertexPartition
*/
class VertexRDD[@specialized VD: ClassTag](
val partitionsRDD: RDD[VertexPartition[VD]])
- extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+ extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
require(partitionsRDD.partitioner.isDefined)
@@ -92,9 +92,9 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
- * Provides the `RDD[(VertexID, VD)]` equivalent output.
+ * Provides the `RDD[(VertexId, VD)]` equivalent output.
*/
- override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = {
+ override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
}
@@ -114,9 +114,9 @@ class VertexRDD[@specialized VD: ClassTag](
* rather than allocating new memory.
*
* @param pred the user defined predicate, which takes a tuple to conform to the
- * `RDD[(VertexID, VD)]` interface
+ * `RDD[(VertexId, VD)]` interface
*/
- override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] =
+ override def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] =
this.mapVertexPartitions(_.filter(Function.untupled(pred)))
/**
@@ -140,7 +140,7 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a new VertexRDD with values obtained by applying `f` to each of the entries in the
* original VertexRDD. The resulting VertexRDD retains the same index.
*/
- def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] =
+ def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
/**
@@ -172,7 +172,7 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a VertexRDD containing the results of `f`
*/
def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
- (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
+ (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
@@ -200,8 +200,8 @@ class VertexRDD[@specialized VD: ClassTag](
* by `f`.
*/
def leftJoin[VD2: ClassTag, VD3: ClassTag]
- (other: RDD[(VertexID, VD2)])
- (f: (VertexID, VD, Option[VD2]) => VD3)
+ (other: RDD[(VertexId, VD2)])
+ (f: (VertexId, VD, Option[VD2]) => VD3)
: VertexRDD[VD3] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient leftZipJoin
@@ -225,7 +225,7 @@ class VertexRDD[@specialized VD: ClassTag](
* [[innerJoin]] for the behavior of the join.
*/
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
- (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
+ (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
@@ -247,8 +247,8 @@ class VertexRDD[@specialized VD: ClassTag](
* @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this`
* and `other`, with values supplied by `f`
*/
- def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
- (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
+ def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
+ (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin
other match {
@@ -278,7 +278,7 @@ class VertexRDD[@specialized VD: ClassTag](
* messages.
*/
def aggregateUsingIndex[VD2: ClassTag](
- messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
+ messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
val vertexPartition: VertexPartition[VD] = thisIter.next()
@@ -303,8 +303,8 @@ object VertexRDD {
*
* @param rdd the collection of vertex-attribute pairs
*/
- def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = {
- val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
+ def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+ val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
}
@@ -323,8 +323,8 @@ object VertexRDD {
* @param rdd the collection of vertex-attribute pairs
* @param mergeFunc the associative, commutative merge function.
*/
- def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
- val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
+ def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = {
+ val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
}
@@ -338,7 +338,7 @@ object VertexRDD {
* Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using
* `defaultVal` otherwise.
*/
- def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD)
+ def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD)
: VertexRDD[VD] = {
VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
value.getOrElse(default)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 6067ee8c7e..57fa5eefd5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
*/
private[graphx]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
- val srcIds: Array[VertexID],
- val dstIds: Array[VertexID],
+ val srcIds: Array[VertexId],
+ val dstIds: Array[VertexId],
val data: Array[ED],
- val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable {
+ val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
/**
* Reverse all the edges in this partition.
@@ -118,8 +118,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
val builder = new EdgePartitionBuilder[ED]
- var currSrcId: VertexID = null.asInstanceOf[VertexID]
- var currDstId: VertexID = null.asInstanceOf[VertexID]
+ var currSrcId: VertexId = null.asInstanceOf[VertexId]
+ var currDstId: VertexId = null.asInstanceOf[VertexId]
var currAttr: ED = null.asInstanceOf[ED]
var i = 0
while (i < size) {
@@ -153,7 +153,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2])
- (f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = {
+ (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = {
val builder = new EdgePartitionBuilder[ED3]
var i = 0
var j = 0
@@ -210,14 +210,14 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* iterator is generated using an index scan, so it is efficient at skipping edges that don't
* match srcIdPred.
*/
- def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] =
+ def indexIterator(srcIdPred: VertexId => Boolean): Iterator[Edge[ED]] =
index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
/**
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
* cluster must start at position `index`.
*/
- private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] {
+ private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED]
private[this] var pos = index
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index 960eeaccf1..63ccccb056 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -29,22 +29,22 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
var edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */
- def add(src: VertexID, dst: VertexID, d: ED) {
+ def add(src: VertexId, dst: VertexId, d: ED) {
edges += Edge(src, dst, d)
}
def toEdgePartition: EdgePartition[ED] = {
val edgeArray = edges.trim().array
Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
- val srcIds = new Array[VertexID](edgeArray.size)
- val dstIds = new Array[VertexID](edgeArray.size)
+ val srcIds = new Array[VertexId](edgeArray.size)
+ val dstIds = new Array[VertexId](edgeArray.size)
val data = new Array[ED](edgeArray.size)
- val index = new PrimitiveKeyOpenHashMap[VertexID, Int]
+ val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index
if (edgeArray.length > 0) {
index.update(srcIds(0), 0)
- var currSrcId: VertexID = srcIds(0)
+ var currSrcId: VertexId = srcIds(0)
var i = 0
while (i < edgeArray.size) {
srcIds(i) = edgeArray(i).srcId
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 819e3ba93a..886c250d7c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -41,7 +41,7 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
// allocating too many temporary Java objects.
private val triplet = new EdgeTriplet[VD, ED]
- private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray)
+ private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray)
override def hasNext: Boolean = pos < edgePartition.size
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index eee2d58c3d..1d029bf009 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -105,7 +105,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
}
- override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = {
+ override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) {
// The map preserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
@@ -153,7 +153,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = x => true,
- vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
+ vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
// Filter the vertices, reusing the partitioner and the index from this graph
val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
@@ -195,7 +195,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
//////////////////////////////////////////////////////////////////////////////////////////////////
override def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
@@ -225,7 +225,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
+ edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
.filter(e => vPart.isActive(e.dstId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
@@ -236,7 +236,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId))
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
+ edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
}
@@ -267,8 +267,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
} // end of mapReduceTriplets
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
- (other: RDD[(VertexID, U)])
- (updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] =
+ (other: RDD[(VertexId, U)])
+ (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] =
{
if (classTag[VD] equals classTag[VD2]) {
// updateF preserves type, so we can use incremental replication
@@ -312,7 +312,7 @@ object GraphImpl {
}
def apply[VD: ClassTag, ED: ClassTag](
- vertices: RDD[(VertexID, VD)],
+ vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
@@ -321,7 +321,7 @@ object GraphImpl {
// Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices)
val vPartitioned = vertices.partitionBy(partitioner)
- val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner)
+ val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner)
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
}
@@ -355,7 +355,7 @@ object GraphImpl {
/**
* Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
- * data structure (RDD[(VertexID, VertexID, ED)]).
+ * data structure (RDD[(VertexId, VertexId, ED)]).
*
* The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
* pair: the key is the partition id, and the value is an EdgePartition object containing all the
@@ -378,19 +378,19 @@ object GraphImpl {
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
edges.cache()
// Get the set of all vids
- val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size))
+ val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size))
// Create the VertexRDD.
val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
GraphImpl(vertices, edges)
}
/** Collects all vids mentioned in edges and partitions them by partitioner. */
- private def collectVertexIDsFromEdges(
+ private def collectVertexIdsFromEdges(
edges: EdgeRDD[_],
- partitioner: Partitioner): RDD[(VertexID, Int)] = {
+ partitioner: Partitioner): RDD[(VertexId, Int)] = {
// TODO: Consider doing map side distinct before shuffle.
- new ShuffledRDD[VertexID, Int, (VertexID, Int)](
- edges.collectVertexIDs.map(vid => (vid, 0)), partitioner)
- .setSerializer(classOf[VertexIDMsgSerializer].getName)
+ new ShuffledRDD[VertexId, Int, (VertexId, Int)](
+ edges.collectVertexIds.map(vid => (vid, 0)), partitioner)
+ .setSerializer(classOf[VertexIdMsgSerializer].getName)
}
} // end of object GraphImpl
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index cea9d11ebe..e9ee09c361 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -20,16 +20,16 @@ package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.Partitioner
-import org.apache.spark.graphx.{PartitionID, VertexID}
+import org.apache.spark.graphx.{PartitionID, VertexId}
import org.apache.spark.rdd.{ShuffledRDD, RDD}
private[graphx]
class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
@transient var partition: PartitionID,
- var vid: VertexID,
+ var vid: VertexId,
var data: T)
- extends Product2[PartitionID, (VertexID, T)] with Serializable {
+ extends Product2[PartitionID, (VertexId, T)] with Serializable {
override def _1 = partition
@@ -61,7 +61,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef
private[graphx]
class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
- val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner)
+ val rdd = new ShuffledRDD[PartitionID, (VertexId, T), VertexBroadcastMsg[T]](self, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
@@ -99,8 +99,8 @@ object MsgRDDFunctions {
new VertexBroadcastMsgRDDFunctions(rdd)
}
- def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = {
- val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner)
+ def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = {
+ val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 5bdc9339e9..a8154b63ce 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -50,9 +50,9 @@ class ReplicatedVertexView[VD: ClassTag](
* vids from both the source and destination of edges. It must always include both source and
* destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
*/
- private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
+ private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
case Some(prevView) =>
- prevView.localVertexIDMap
+ prevView.localVertexIdMap
case None =>
edges.partitionsRDD.mapPartitions(_.map {
case (pid, epart) =>
@@ -62,7 +62,7 @@ class ReplicatedVertexView[VD: ClassTag](
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
- }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap")
+ }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap")
}
private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
@@ -75,7 +75,7 @@ class ReplicatedVertexView[VD: ClassTag](
srcAttrOnly.unpersist(blocking)
dstAttrOnly.unpersist(blocking)
noAttrs.unpersist(blocking)
- // Don't unpersist localVertexIDMap because a future ReplicatedVertexView may be using it
+ // Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it
// without modification
this
}
@@ -133,8 +133,8 @@ class ReplicatedVertexView[VD: ClassTag](
case None =>
// Within each edge partition, place the shipped vertex attributes into the correct
- // locations specified in localVertexIDMap
- localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
+ // locations specified in localVertexIdMap
+ localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
@@ -157,15 +157,15 @@ class ReplicatedVertexView[VD: ClassTag](
private object ReplicatedVertexView {
protected def buildBuffer[VD: ClassTag](
- pid2vidIter: Iterator[Array[Array[VertexID]]],
+ pid2vidIter: Iterator[Array[Array[VertexId]]],
vertexPartIter: Iterator[VertexPartition[VD]]) = {
- val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
+ val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
val vertexPart: VertexPartition[VD] = vertexPartIter.next()
Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length
- val vids = new PrimitiveVector[VertexID](pid2vid(pid).size)
+ val vids = new PrimitiveVector[VertexId](pid2vid(pid).size)
val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
var i = 0
while (i < size) {
@@ -181,16 +181,16 @@ private object ReplicatedVertexView {
}
protected def buildActiveBuffer(
- pid2vidIter: Iterator[Array[Array[VertexID]]],
+ pid2vidIter: Iterator[Array[Array[VertexId]]],
activePartIter: Iterator[VertexPartition[_]])
- : Iterator[(Int, Array[VertexID])] = {
- val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
+ : Iterator[(Int, Array[VertexId])] = {
+ val pid2vid: Array[Array[VertexId]] = pid2vidIter.next()
val activePart: VertexPartition[_] = activePartIter.next()
Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length
- val actives = new PrimitiveVector[VertexID](vidsCandidate.size)
+ val actives = new PrimitiveVector[VertexId](vidsCandidate.size)
var i = 0
while (i < size) {
val vid = vidsCandidate(i)
@@ -205,8 +205,8 @@ private object ReplicatedVertexView {
}
private[graphx]
-class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD])
+class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
extends Serializable {
- def iterator: Iterator[(VertexID, VD)] =
+ def iterator: Iterator[(VertexId, VD)] =
(0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
index b365d4914e..fe44e1ee0c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
@@ -32,12 +32,12 @@ import org.apache.spark.util.collection.PrimitiveVector
private[impl]
class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
- val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true)
- val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false)
- val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true)
- val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false)
+ val bothAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(true, true)
+ val srcAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(true, false)
+ val dstAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(false, true)
+ val noAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(false, false)
- def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] =
+ def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] =
(includeSrcAttr, includeDstAttr) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
@@ -46,9 +46,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
}
private def createPid2Vid(
- includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = {
+ includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
- val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
+ val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size
val vSet = new VertexSet
@@ -71,7 +71,7 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
val numPartitions = vertices.partitions.size
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
- val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID])
+ val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId])
for ((vid, pid) <- iter) {
pid2vid(pid) += vid
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index bcad1fbc58..c74d487e20 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -25,12 +25,12 @@ import org.apache.spark.graphx._
import org.apache.spark.serializer._
private[graphx]
-class VertexIDMsgSerializer(conf: SparkConf) extends Serializer {
+class VertexIdMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(VertexID, _)]
+ val msg = t.asInstanceOf[(VertexId, _)]
writeVarLong(msg._1, optimizePositive = false)
this
}
@@ -123,7 +123,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(VertexID, Int)]
+ val msg = t.asInstanceOf[(VertexId, Int)]
writeVarLong(msg._1, optimizePositive = false)
writeUnsignedVarInt(msg._2)
this
@@ -147,7 +147,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(VertexID, Long)]
+ val msg = t.asInstanceOf[(VertexId, Long)]
writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg._2, optimizePositive = true)
this
@@ -171,7 +171,7 @@ class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(VertexID, Double)]
+ val msg = t.asInstanceOf[(VertexId, Double)]
writeVarLong(msg._1, optimizePositive = false)
writeDouble(msg._2)
this
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
index f13bdded75..7a54b413dc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -26,18 +26,18 @@ import org.apache.spark.util.collection.BitSet
private[graphx] object VertexPartition {
- def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = {
- val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
+ def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): VertexPartition[VD] = {
+ val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { case (k, v) =>
map(k) = v
}
new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
}
- def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD)
+ def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
: VertexPartition[VD] =
{
- val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
+ val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
iter.foreach { case (k, v) =>
map.setMerge(k, v, mergeFunc)
}
@@ -60,15 +60,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
def size: Int = mask.cardinality()
/** Return the vertex attribute for the given vertex ID. */
- def apply(vid: VertexID): VD = values(index.getPos(vid))
+ def apply(vid: VertexId): VD = values(index.getPos(vid))
- def isDefined(vid: VertexID): Boolean = {
+ def isDefined(vid: VertexId): Boolean = {
val pos = index.getPos(vid)
pos >= 0 && mask.get(pos)
}
/** Look up vid in activeSet, throwing an exception if it is None. */
- def isActive(vid: VertexID): Boolean = {
+ def isActive(vid: VertexId): Boolean = {
activeSet.get.contains(vid)
}
@@ -88,7 +88,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* each of the entries in the original VertexRDD. The resulting
* VertexPartition retains the same index.
*/
- def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = {
+ def map[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexPartition[VD2] = {
// Construct a view of the map transformation
val newValues = new Array[VD2](capacity)
var i = mask.nextSetBit(0)
@@ -108,7 +108,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* RDD can be easily joined with the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
- def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = {
+ def filter(pred: (VertexId, VD) => Boolean): VertexPartition[VD] = {
// Allocate the array to store the results into
val newMask = new BitSet(capacity)
// Iterate over the active bits in the old mask and evaluate the predicate
@@ -146,7 +146,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
/** Left outer join another VertexPartition. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexPartition[VD2])
- (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
leftJoin(createUsingIndex(other.iterator))(f)
@@ -165,14 +165,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
/** Left outer join another iterator of messages. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
- (other: Iterator[(VertexID, VD2)])
- (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ (other: Iterator[(VertexId, VD2)])
+ (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
leftJoin(createUsingIndex(other))(f)
}
/** Inner join another VertexPartition. */
def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
- (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
+ (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
innerJoin(createUsingIndex(other.iterator))(f)
@@ -192,15 +192,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Inner join an iterator of messages.
*/
def innerJoin[U: ClassTag, VD2: ClassTag]
- (iter: Iterator[Product2[VertexID, U]])
- (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
+ (iter: Iterator[Product2[VertexId, U]])
+ (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = {
innerJoin(createUsingIndex(iter))(f)
}
/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
- def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]])
+ def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
: VertexPartition[VD2] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
@@ -218,7 +218,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
* the partition, hidden by the bitmask.
*/
- def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = {
+ def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD](capacity)
System.arraycopy(values, 0, newValues, 0, newValues.length)
@@ -233,7 +233,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
}
def aggregateUsingIndex[VD2: ClassTag](
- iter: Iterator[Product2[VertexID, VD2]],
+ iter: Iterator[Product2[VertexId, VD2]],
reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
@@ -253,7 +253,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition[VD2](index, newValues, newMask)
}
- def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = {
+ def replaceActives(iter: Iterator[VertexId]): VertexPartition[VD] = {
val newActiveSet = new VertexSet
iter.foreach(newActiveSet.add(_))
new VertexPartition(index, values, mask, Some(newActiveSet))
@@ -263,7 +263,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): VertexPartition[VD] = {
- val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD]
+ val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- this.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
@@ -271,8 +271,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
}
- def iterator: Iterator[(VertexID, VD)] =
+ def iterator: Iterator[(VertexId, VD)] =
mask.iterator.map(ind => (index.getValue(ind), values(ind)))
- def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind))
+ def vidIterator: Iterator[VertexId] = mask.iterator.map(ind => index.getValue(ind))
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala
index f493d2dd01..79549fe060 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala
@@ -20,5 +20,5 @@ package org.apache.spark.graphx
import org.apache.spark.util.collection.OpenHashSet
package object impl {
- private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexID]
+ private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexId]
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index 2a6c0aa6b5..e2f6cc1389 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -35,9 +35,9 @@ object ConnectedComponents {
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
- def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
- def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
+ def sendMessage(edge: EdgeTriplet[VertexId, ED]) = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 2bdd8c9f98..614555a054 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -92,7 +92,7 @@ object PageRank extends Logging {
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
- def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
+ def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]) =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
@@ -137,7 +137,7 @@ object PageRank extends Logging {
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
- def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = {
+ def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 9c7a212c5a..c327ce7935 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -79,13 +79,13 @@ object SVDPlusPlus {
(g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
g = g.outerJoinVertices(t0) {
- (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
+ (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}
def mapTrainF(conf: Conf, u: Double)
(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
- : Iterator[(VertexID, (RealVector, RealVector, Double))] = {
+ : Iterator[(VertexId, (RealVector, RealVector, Double))] = {
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
@@ -112,7 +112,7 @@ object SVDPlusPlus {
et => Iterator((et.srcId, et.dstAttr._2)),
(g1: RealVector, g2: RealVector) => g1.add(g2))
g = g.outerJoinVertices(t1) {
- (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
+ (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
}
@@ -123,7 +123,7 @@ object SVDPlusPlus {
(g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
(g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
g = g.outerJoinVertices(t2) {
- (vid: VertexID,
+ (vid: VertexId,
vd: (RealVector, RealVector, Double, Double),
msg: Option[(RealVector, RealVector, Double)]) =>
(vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
@@ -133,7 +133,7 @@ object SVDPlusPlus {
// calculate error on training set
def mapTestF(conf: Conf, u: Double)
(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
- : Iterator[(VertexID, Double)] =
+ : Iterator[(VertexId, Double)] =
{
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
@@ -146,7 +146,7 @@ object SVDPlusPlus {
g.cache()
val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
g = g.outerJoinVertices(t3) {
- (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
+ (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
index ed84f72156..46da38eeb7 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
@@ -35,7 +35,7 @@ object StronglyConnectedComponents {
*
* @return a graph with vertex attributes containing the smallest vertex id in each SCC
*/
- def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED] = {
// the graph we update with final SCC ids, and the graph we return at the end
var sccGraph = graph.mapVertices { case (vid, _) => vid }
@@ -71,7 +71,7 @@ object StronglyConnectedComponents {
// collect min of all my neighbor's scc values, update if it's smaller than mine
// then notify any neighbors with scc values larger than mine
- sccWorkGraph = Pregel[(VertexID, Boolean), ED, VertexID](
+ sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId](
sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)(
(vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2),
e => {
@@ -85,7 +85,7 @@ object StronglyConnectedComponents {
// start at root of SCCs. Traverse values in reverse, notify all my neighbors
// do not propagate if colors do not match!
- sccWorkGraph = Pregel[(VertexID, Boolean), ED, Boolean](
+ sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean](
sccWorkGraph, false, activeDirection = EdgeDirection.In)(
// vertex is final if it is the root of a color
// or it has the same color as a neighbor that is final
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
index a124c892dc..7c396e6e66 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
@@ -61,7 +61,7 @@ object TriangleCount {
(vid, _, optSet) => optSet.getOrElse(null)
}
// Edge function computes intersection of smaller vertex with larger vertex
- def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = {
+ def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexId, Int)] = {
assert(et.srcAttr != null)
assert(et.dstAttr != null)
val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
index e1ff3ea0d1..425a5164ca 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/package.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
@@ -25,11 +25,11 @@ package object graphx {
* A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
* to follow any ordering or any constraints other than uniqueness.
*/
- type VertexID = Long
+ type VertexId = Long
/** Integer identifer of a graph partition. */
// TODO: Consider using Char.
type PartitionID = Int
- private[graphx] type VertexSet = OpenHashSet[VertexID]
+ private[graphx] type VertexSet = OpenHashSet[VertexId]
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 9805eb3285..7677641bfe 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -50,7 +50,7 @@ object GraphGenerators {
val mu = 4
val sigma = 1.3
- val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{
+ val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{
src => (src, sampleLogNormal(mu, sigma, numVertices))
}
val edges = vertices.flatMap { v =>
@@ -59,9 +59,9 @@ object GraphGenerators {
Graph(vertices, edges, 0)
}
- def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = {
+ def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = {
val rand = new Random()
- Array.fill(maxVertexID) { Edge[Int](src, rand.nextInt(maxVertexID), 1) }
+ Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
}
/**
@@ -206,9 +206,9 @@ object GraphGenerators {
*/
def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
// Convert row column address into vertex ids (row major order)
- def sub2ind(r: Int, c: Int): VertexID = r * cols + c
+ def sub2ind(r: Int, c: Int): VertexId = r * cols + c
- val vertices: RDD[(VertexID, (Int,Int))] =
+ val vertices: RDD[(VertexId, (Int,Int))] =
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) =>
@@ -228,7 +228,7 @@ object GraphGenerators {
* being the center vertex.
*/
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
- val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
+ val edges: RDD[(VertexId, VertexId)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
Graph.fromEdgeTuples(edges, 1)
} // end of starGraph
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index 4a792c0dab..bc2ad5677f 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -28,12 +28,12 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
test("joinVertices") {
withSpark { sc =>
val vertices =
- sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2)
+ sc.parallelize(Seq[(VertexId, String)]((1, "one"), (2, "two"), (3, "three")), 2)
val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
val g: Graph[String, String] = Graph(vertices, edges)
- val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20)))
- val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u }
+ val tbl = sc.parallelize(Seq[(VertexId, Int)]((1, 10), (2, 20)))
+ val g1 = g.joinVertices(tbl) { (vid: VertexId, attr: String, u: Int) => attr + u }
val v = g1.vertices.collect().toSet
assert(v === Set((1, "one10"), (2, "two20"), (3, "three")))
@@ -60,7 +60,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
test ("filter") {
withSpark { sc =>
val n = 5
- val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
val graph: Graph[Int, Int] = Graph(vertices, edges).cache()
val filteredGraph = graph.filter(
@@ -68,7 +68,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
val degrees: VertexRDD[Int] = graph.outDegrees
graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
},
- vpred = (vid: VertexID, deg:Int) => deg > 0
+ vpred = (vid: VertexId, deg:Int) => deg > 0
).cache()
val v = filteredGraph.vertices.collect().toSet
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index b18bc98e6d..28d34dd9a1 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext {
def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = {
- Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v")
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexId, x: VertexId)), 3), "v")
}
test("Graph.fromEdgeTuples") {
@@ -57,7 +57,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L)
val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) }
- val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
+ val vertices: RDD[(VertexId, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
val graph = Graph(vertices, edges, false)
assert( graph.edges.count() === rawEdges.size )
// Vertices not explicitly provided but referenced by edges should be created automatically
@@ -74,7 +74,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val n = 5
val star = starGraph(sc, n)
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
- (1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet)
+ (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
}
}
@@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val p = 100
val verts = 1 to n
val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
- verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0)
+ verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0)
assert(graph.edges.partitions.length === p)
val partitionedGraph = graph.partitionBy(EdgePartition2D)
assert(graph.edges.partitions.length === p)
@@ -136,10 +136,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n)
// mapVertices preserving type
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
- assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet)
+ assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
// mapVertices changing type
val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
- assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet)
+ assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
}
}
@@ -168,7 +168,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
- assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet)
+ assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
}
}
@@ -191,7 +191,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mask") {
withSpark { sc =>
val n = 5
- val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexId, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
val graph: Graph[Int, Int] = Graph(vertices, edges).cache()
@@ -218,7 +218,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n)
val doubleStar = Graph.fromEdgeTuples(
sc.parallelize((1 to n).flatMap(x =>
- List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v")
+ List((0: VertexId, x: VertexId), (0: VertexId, x: VertexId))), 1), "v")
val star2 = doubleStar.groupEdges { (a, b) => a}
assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
@@ -237,7 +237,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
// activeSetOpt
- val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID)
+ val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexId, y: VertexId)
val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
val vids = complete.mapVertices((vid, attr) => vid).cache()
val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
@@ -248,10 +248,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
Iterator((et.srcId, 1))
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
- assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet)
+ assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)
// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
- val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3)
+ val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x+1) % n: VertexId)), 3)
val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
@@ -262,7 +262,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
Iterator((et.dstId, 1))
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
- assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet)
+ assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)
}
}
@@ -277,7 +277,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect.toSet
- assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0)))
+ assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
// outerJoinVertices preserving type
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
val newReverseStar =
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
index 936e5c9c86..490b94429e 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
@@ -27,7 +27,7 @@ class PregelSuite extends FunSuite with LocalSparkContext {
test("1 iteration") {
withSpark { sc =>
val n = 5
- val starEdges = (1 to n).map(x => (0: VertexID, x: VertexID))
+ val starEdges = (1 to n).map(x => (0: VertexId, x: VertexId))
val star = Graph.fromEdgeTuples(sc.parallelize(starEdges, 3), "v").cache()
val result = Pregel(star, 0)(
(vid, attr, msg) => attr,
@@ -41,12 +41,12 @@ class PregelSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val chain = Graph.fromEdgeTuples(
- sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3),
+ sc.parallelize((1 until n).map(x => (x: VertexId, x + 1: VertexId)), 3),
0).cache()
- assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet)
+ assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexId, 0)).toSet)
val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }.cache()
assert(chainWithSeed.vertices.collect.toSet ===
- Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet)
+ Set((1: VertexId, 1)) ++ (2 to n).map(x => (x: VertexId, 0)).toSet)
val result = Pregel(chainWithSeed, 0)(
(vid, attr, msg) => math.max(msg, attr),
et => if (et.dstAttr != et.srcAttr) Iterator((et.dstId, et.srcAttr)) else Iterator.empty,
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
index 0c756400f4..e5a582b47b 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
@@ -99,7 +99,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("IntAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: VertexID, 5)
+ val outMsg = (4: VertexId, 5)
val bout = new ByteArrayOutputStream
val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -107,8 +107,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (VertexID, Int) = inStrm.readObject()
- val inMsg2: (VertexID, Int) = inStrm.readObject()
+ val inMsg1: (VertexId, Int) = inStrm.readObject()
+ val inMsg2: (VertexId, Int) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
@@ -119,7 +119,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("LongAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: VertexID, 1L << 32)
+ val outMsg = (4: VertexId, 1L << 32)
val bout = new ByteArrayOutputStream
val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -127,8 +127,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (VertexID, Long) = inStrm.readObject()
- val inMsg2: (VertexID, Long) = inStrm.readObject()
+ val inMsg1: (VertexId, Long) = inStrm.readObject()
+ val inMsg2: (VertexId, Long) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
@@ -139,7 +139,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("DoubleAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: VertexID, 5.0)
+ val outMsg = (4: VertexId, 5.0)
val bout = new ByteArrayOutputStream
val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -147,8 +147,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (VertexID, Double) = inStrm.readObject()
- val inMsg2: (VertexID, Double) = inStrm.readObject()
+ val inMsg1: (VertexId, Double) = inStrm.readObject()
+ val inMsg2: (VertexId, Double) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index 1195beba58..e135d1d7ad 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -79,7 +79,7 @@ class EdgePartitionSuite extends FunSuite {
test("innerJoin") {
def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
val builder = new EdgePartitionBuilder[A]
- for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) }
+ for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) }
builder.toEdgePartition
}
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
index eba8d7b716..3915be15b3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
@@ -100,7 +100,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
test("Connected Components on a Toy Connected Graph") {
withSpark { sc =>
// Create an RDD for the vertices
- val users: RDD[(VertexID, (String, String))] =
+ val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))