aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala7
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala19
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala23
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala31
6 files changed, 52 insertions, 44 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index fe03ae4a62..799a9dd1ee 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -66,7 +66,8 @@ class EdgeRDD[@specialized ED: ClassTag](
this
}
- private[graphx] def mapEdgePartitions[ED2: ClassTag](f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
+ private[graphx] def mapEdgePartitions[ED2: ClassTag](
+ f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
: EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
@@ -97,8 +98,8 @@ class EdgeRDD[@specialized ED: ClassTag](
*
* @param other the EdgeRDD to join with
* @param f the join function applied to corresponding values of `this` and `other`
- * @return a new EdgeRDD containing only edges that appear in both `this` and `other`, with values
- * supplied by `f`
+ * @return a new EdgeRDD containing only edges that appear in both `this` and `other`,
+ * with values supplied by `f`
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index eea95d38d5..65a1a8c68f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -171,8 +171,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
: Graph[VD, ED2]
/**
- * Transforms each edge attribute using the map function, passing it the adjacent vertex attributes
- * as well. If adjacent vertex values are not required, consider using `mapEdges` instead.
+ * Transforms each edge attribute using the map function, passing it the adjacent vertex
+ * attributes as well. If adjacent vertex values are not required,
+ * consider using `mapEdges` instead.
*
* @note This does not change the structure of the
* graph or modify the values of this graph. As a consequence
@@ -280,13 +281,13 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* be commutative and associative and is used to combine the output
* of the map phase
*
- * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to consider
- * when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on edges with
- * destination in the active set. If the direction is `Out`, `mapFunc` will only be run on edges
- * originating from vertices in the active set. If the direction is `Either`, `mapFunc` will be
- * run on edges with *either* vertex in the active set. If the direction is `Both`, `mapFunc` will
- * be run on edges with *both* vertices in the active set. The active set must have the same index
- * as the graph's vertices.
+ * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to
+ * consider when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on
+ * edges with destination in the active set. If the direction is `Out`,
+ * `mapFunc` will only be run on edges originating from vertices in the active set. If the
+ * direction is `Either`, `mapFunc` will be run on edges with *either* vertex in the active set
+ * . If the direction is `Both`, `mapFunc` will be run on edges with *both* vertices in the
+ * active set. The active set must have the same index as the graph's vertices.
*
* @example We can use this function to compute the in-degree of each
* vertex
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 929915362c..0470d74cf9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -57,8 +57,9 @@ object PartitionStrategy {
* </pre>
*
* The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
- * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
- * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last
+ * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
+ * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3,
+ * P6)` or the last
* row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be
* replicated to at most `2 * sqrt(numParts)` machines.
*
@@ -66,11 +67,12 @@ object PartitionStrategy {
* balance. To improve balance we first multiply each vertex id by a large prime to shuffle the
* vertex locations.
*
- * One of the limitations of this approach is that the number of machines must either be a perfect
- * square. We partially address this limitation by computing the machine assignment to the next
+ * One of the limitations of this approach is that the number of machines must either be a
+ * perfect square. We partially address this limitation by computing the machine assignment to
+ * the next
* largest perfect square and then mapping back down to the actual number of machines.
- * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square
- * is used.
+ * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect
+ * square is used.
*/
case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index edd59bcf32..d6788d4d4b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -59,7 +59,8 @@ class VertexRDD[@specialized VD: ClassTag](
/**
* Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
- * VertexRDD will be based on a different index and can no longer be quickly joined with this RDD.
+ * VertexRDD will be based on a different index and can no longer be quickly joined with this
+ * RDD.
*/
def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
@@ -101,7 +102,8 @@ class VertexRDD[@specialized VD: ClassTag](
/**
* Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD.
*/
- private[graphx] def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2])
+ private[graphx] def mapVertexPartitions[VD2: ClassTag](
+ f: VertexPartition[VD] => VertexPartition[VD2])
: VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
new VertexRDD(newPartitionsRDD)
@@ -159,8 +161,9 @@ class VertexRDD[@specialized VD: ClassTag](
}
/**
- * Left joins this RDD with another VertexRDD with the same index. This function will fail if both
- * VertexRDDs do not share the same index. The resulting vertex set contains an entry for each
+ * Left joins this RDD with another VertexRDD with the same index. This function will fail if
+ * both VertexRDDs do not share the same index. The resulting vertex set contains an entry for
+ * each
* vertex in `this`. If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
*
* @tparam VD2 the attribute type of the other VertexRDD
@@ -187,8 +190,8 @@ class VertexRDD[@specialized VD: ClassTag](
* Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
* backed by a VertexRDD with the same index then the efficient [[leftZipJoin]] implementation is
* used. The resulting VertexRDD contains an entry for each vertex in `this`. If `other` is
- * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates, the vertex
- * is picked arbitrarily.
+ * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates,
+ * the vertex is picked arbitrarily.
*
* @tparam VD2 the attribute type of the other VertexRDD
* @tparam VD3 the attribute type of the resulting VertexRDD
@@ -238,14 +241,14 @@ class VertexRDD[@specialized VD: ClassTag](
/**
* Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
- * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation is
- * used.
+ * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation
+ * is used.
*
* @param other an RDD containing vertices to join. If there are multiple entries for the same
* vertex, one is picked arbitrarily. Use [[aggregateUsingIndex]] to merge multiple entries.
* @param f the join function applied to corresponding values of `this` and `other`
- * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this`
- * and `other`, with values supplied by `f`
+ * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both
+ * `this` and `other`, with values supplied by `f`
*/
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index f914e0565c..24699dfdd3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -82,7 +82,7 @@ object Analytics extends Logging {
val pr = graph.pageRank(tol).vertices.cache()
- println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_))
+ println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
if (!outFname.isEmpty) {
logWarning("Saving pageranks of pages to " + outFname)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 7677641bfe..f841846c0e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -37,11 +37,7 @@ object GraphGenerators {
val RMATa = 0.45
val RMATb = 0.15
- val RMATc = 0.15
val RMATd = 0.25
-
- // Right now it just generates a bunch of edges where
- // the edge data is the weight (default 1)
/**
* Generate a graph whose vertex out degree is log normal.
*/
@@ -59,15 +55,20 @@ object GraphGenerators {
Graph(vertices, edges, 0)
}
+ // Right now it just generates a bunch of edges where
+ // the edge data is the weight (default 1)
+ val RMATc = 0.15
+
def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = {
val rand = new Random()
Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
}
/**
- * Randomly samples from a log normal distribution whose corresponding normal distribution has the
- * the given mean and standard deviation. It uses the formula `X = exp(m+s*Z)` where `m`, `s` are
- * the mean, standard deviation of the lognormal distribution and `Z ~ N(0, 1)`. In this function,
+ * Randomly samples from a log normal distribution whose corresponding normal distribution has
+ * the given mean and standard deviation. It uses the formula `X = exp(m+s*Z)` where `m`,
+ * `s` are the mean, standard deviation of the lognormal distribution and
+ * `Z ~ N(0, 1)`. In this function,
* `m = e^(mu+sigma^2/2)` and `s = sqrt[(e^(sigma^2) - 1)(e^(2*mu+sigma^2))]`.
*
* @param mu the mean of the normal distribution
@@ -76,7 +77,7 @@ object GraphGenerators {
*/
private def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = {
val rand = new Random()
- val m = math.exp(mu+(sigma*sigma)/2.0)
+ val m = math.exp(mu + (sigma * sigma) / 2.0)
val s = math.sqrt((math.exp(sigma*sigma) - 1) * math.exp(2*mu + sigma*sigma))
// Z ~ N(0, 1)
var X: Double = maxVal
@@ -169,9 +170,9 @@ object GraphGenerators {
val newT = math.round(t.toFloat/2.0).toInt
pickQuadrant(RMATa, RMATb, RMATc, RMATd) match {
case 0 => chooseCell(x, y, newT)
- case 1 => chooseCell(x+newT, y, newT)
- case 2 => chooseCell(x, y+newT, newT)
- case 3 => chooseCell(x+newT, y+newT, newT)
+ case 1 => chooseCell(x + newT, y, newT)
+ case 2 => chooseCell(x, y + newT, newT)
+ case 3 => chooseCell(x + newT, y + newT, newT)
}
}
}
@@ -179,8 +180,8 @@ object GraphGenerators {
// TODO(crankshaw) turn result into an enum (or case class for pattern matching}
private def pickQuadrant(a: Double, b: Double, c: Double, d: Double): Int = {
if (a + b + c + d != 1.0) {
- throw new IllegalArgumentException(
- "R-MAT probability parameters sum to " + (a+b+c+d) + ", should sum to 1.0")
+ throw new IllegalArgumentException("R-MAT probability parameters sum to " + (a + b + c + d)
+ + ", should sum to 1.0")
}
val rand = new Random()
val result = rand.nextDouble()
@@ -212,8 +213,8 @@ object GraphGenerators {
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) =>
- (if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++
- (if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty })
+ (if (r + 1 < rows) { Seq( (sub2ind(r, c), sub2ind(r + 1, c))) } else { Seq.empty }) ++
+ (if (c + 1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c + 1))) } else { Seq.empty })
}.map{ case (src, dst) => Edge(src, dst, 1.0) }
Graph(vertices, edges)
} // end of gridGraph