aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala9
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala65
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala110
3 files changed, 152 insertions, 32 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
index 551c339b19..5f35a58364 100644
--- a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
@@ -38,12 +38,13 @@ object SynthBenchmark {
* Options:
* -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank)
* -niters the number of iterations of pagerank to use (Default: 10)
- * -numVertices the number of vertices in the graph (Default: 1000000)
+ * -nverts the number of vertices in the graph (Default: 1000000)
* -numEPart the number of edge partitions in the graph (Default: number of cores)
* -partStrategy the graph partitioning strategy to use
* -mu the mean parameter for the log-normal graph (Default: 4.0)
* -sigma the stdev parameter for the log-normal graph (Default: 1.3)
* -degFile the local file to save the degree information (Default: Empty)
+ * -seed seed to use for RNGs (Default: -1, picks seed randomly)
*/
def main(args: Array[String]) {
val options = args.map {
@@ -62,6 +63,7 @@ object SynthBenchmark {
var mu: Double = 4.0
var sigma: Double = 1.3
var degFile: String = ""
+ var seed: Int = -1
options.foreach {
case ("app", v) => app = v
@@ -72,6 +74,7 @@ object SynthBenchmark {
case ("mu", v) => mu = v.toDouble
case ("sigma", v) => sigma = v.toDouble
case ("degFile", v) => degFile = v
+ case ("seed", v) => seed = v.toInt
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
@@ -85,7 +88,7 @@ object SynthBenchmark {
// Create the graph
println(s"Creating graph...")
val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices,
- numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
+ numEPart.getOrElse(sc.defaultParallelism), mu, sigma, seed)
// Repartition the graph
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache()
@@ -113,7 +116,7 @@ object SynthBenchmark {
println(s"Total PageRank = $totalPR")
} else if (app == "cc") {
println("Running Connected Components")
- val numComponents = graph.connectedComponents.vertices.map(_._2).distinct()
+ val numComponents = graph.connectedComponents.vertices.map(_._2).distinct().count()
println(s"Number of components = $numComponents")
}
val runTime = System.currentTimeMillis() - startTime
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 60149548ab..b8309289fe 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -40,7 +40,7 @@ object GraphGenerators {
val RMATd = 0.25
/**
- * Generate a graph whose vertex out degree is log normal.
+ * Generate a graph whose vertex out degree distribution is log normal.
*
* The default values for mu and sigma are taken from the Pregel paper:
*
@@ -48,33 +48,36 @@ object GraphGenerators {
* Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010.
* Pregel: a system for large-scale graph processing. SIGMOD '10.
*
- * @param sc
- * @param numVertices
- * @param mu
- * @param sigma
- * @return
+ * If the seed is -1 (default), a random seed is chosen. Otherwise, use
+ * the user-specified seed.
+ *
+ * @param sc Spark Context
+ * @param numVertices number of vertices in generated graph
+ * @param numEParts (optional) number of partitions
+ * @param mu (optional, default: 4.0) mean of out-degree distribution
+ * @param sigma (optional, default: 1.3) standard deviation of out-degree distribution
+ * @param seed (optional, default: -1) seed for RNGs, -1 causes a random seed to be chosen
+ * @return Graph object
*/
- def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int,
- mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = {
- val vertices = sc.parallelize(0 until numVertices, numEParts).map { src =>
- // Initialize the random number generator with the source vertex id
- val rand = new Random(src)
- val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * sigma + mu).toLong)
- (src.toLong, degree)
+ def logNormalGraph(
+ sc: SparkContext, numVertices: Int, numEParts: Int = 0, mu: Double = 4.0,
+ sigma: Double = 1.3, seed: Long = -1): Graph[Long, Int] = {
+
+ val evalNumEParts = if (numEParts == 0) sc.defaultParallelism else numEParts
+
+ // Enable deterministic seeding
+ val seedRand = if (seed == -1) new Random() else new Random(seed)
+ val seed1 = seedRand.nextInt()
+ val seed2 = seedRand.nextInt()
+
+ val vertices: RDD[(VertexId, Long)] = sc.parallelize(0 until numVertices, evalNumEParts).map {
+ src => (src, sampleLogNormal(mu, sigma, numVertices, seed = (seed1 ^ src)))
}
+
val edges = vertices.flatMap { case (src, degree) =>
- new Iterator[Edge[Int]] {
- // Initialize the random number generator with the source vertex id
- val rand = new Random(src)
- var i = 0
- override def hasNext(): Boolean = { i < degree }
- override def next(): Edge[Int] = {
- val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i)
- i += 1
- nextEdge
- }
- }
+ generateRandomEdges(src.toInt, degree.toInt, numVertices, seed = (seed2 ^ src))
}
+
Graph(vertices, edges, 0)
}
@@ -82,9 +85,10 @@ object GraphGenerators {
// the edge data is the weight (default 1)
val RMATc = 0.15
- def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = {
- val rand = new Random()
- Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
+ def generateRandomEdges(
+ src: Int, numEdges: Int, maxVertexId: Int, seed: Long = -1): Array[Edge[Int]] = {
+ val rand = if (seed == -1) new Random() else new Random(seed)
+ Array.fill(numEdges) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
}
/**
@@ -97,9 +101,12 @@ object GraphGenerators {
* @param mu the mean of the normal distribution
* @param sigma the standard deviation of the normal distribution
* @param maxVal exclusive upper bound on the value of the sample
+ * @param seed optional seed
*/
- private def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = {
- val rand = new Random()
+ private[spark] def sampleLogNormal(
+ mu: Double, sigma: Double, maxVal: Int, seed: Long = -1): Int = {
+ val rand = if (seed == -1) new Random() else new Random(seed)
+
val sigmaSq = sigma * sigma
val m = math.exp(mu + sigmaSq / 2.0)
// expm1 is exp(m)-1 with better accuracy for tiny m
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
new file mode 100644
index 0000000000..b346d4db2e
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.util
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.graphx.LocalSparkContext
+
+class GraphGeneratorsSuite extends FunSuite with LocalSparkContext {
+
+ test("GraphGenerators.generateRandomEdges") {
+ val src = 5
+ val numEdges10 = 10
+ val numEdges20 = 20
+ val maxVertexId = 100
+
+ val edges10 = GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId)
+ assert(edges10.length == numEdges10)
+
+ val correctSrc = edges10.forall(e => e.srcId == src)
+ assert(correctSrc)
+
+ val correctWeight = edges10.forall(e => e.attr == 1)
+ assert(correctWeight)
+
+ val correctRange = edges10.forall(e => e.dstId >= 0 && e.dstId <= maxVertexId)
+ assert(correctRange)
+
+ val edges20 = GraphGenerators.generateRandomEdges(src, numEdges20, maxVertexId)
+ assert(edges20.length == numEdges20)
+
+ val edges10_round1 =
+ GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 12345)
+ val edges10_round2 =
+ GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 12345)
+ assert(edges10_round1.zip(edges10_round2).forall { case (e1, e2) =>
+ e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
+ })
+
+ val edges10_round3 =
+ GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 3467)
+ assert(!edges10_round1.zip(edges10_round3).forall { case (e1, e2) =>
+ e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
+ })
+ }
+
+ test("GraphGenerators.sampleLogNormal") {
+ val mu = 4.0
+ val sigma = 1.3
+ val maxVal = 100
+
+ val dstId = GraphGenerators.sampleLogNormal(mu, sigma, maxVal)
+ assert(dstId < maxVal)
+
+ val dstId_round1 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 12345)
+ val dstId_round2 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 12345)
+ assert(dstId_round1 == dstId_round2)
+
+ val dstId_round3 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 789)
+ assert(dstId_round1 != dstId_round3)
+ }
+
+ test("GraphGenerators.logNormalGraph") {
+ withSpark { sc =>
+ val mu = 4.0
+ val sigma = 1.3
+ val numVertices100 = 100
+
+ val graph = GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma)
+ assert(graph.vertices.count() == numVertices100)
+
+ val graph_round1 =
+ GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma, seed = 12345)
+ val graph_round2 =
+ GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma, seed = 12345)
+
+ val graph_round1_edges = graph_round1.edges.collect()
+ val graph_round2_edges = graph_round2.edges.collect()
+
+ assert(graph_round1_edges.zip(graph_round2_edges).forall { case (e1, e2) =>
+ e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
+ })
+
+ val graph_round3 =
+ GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma, seed = 567)
+
+ val graph_round3_edges = graph_round3.edges.collect()
+
+ assert(!graph_round1_edges.zip(graph_round3_edges).forall { case (e1, e2) =>
+ e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
+ })
+ }
+ }
+
+}