From e5d376801d57dffb0791980a1786a0a9b45bc491 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Wed, 3 Sep 2014 14:15:22 -0700 Subject: [SPARK-3263][GraphX] Fix changes made to GraphGenerator.logNormalGraph in PR #720 PR #720 made multiple changes to GraphGenerator.logNormalGraph including: * Replacing the call to functions for generating random vertices and edges with in-line implementations with different equations. Based on reading the Pregel paper, I believe the in-line functions are incorrect. * Hard-coding of RNG seeds so that method now generates the same graph for a given number of vertices, edges, mu, and sigma -- user is not able to override seed or specify that seed should be randomly generated. * Backwards-incompatible change to logNormalGraph signature with introduction of new required parameter. * Failed to update scala docs and programming guide for API changes * Added a Synthetic Benchmark in the examples. This PR: * Removes the in-line calls and calls original vertex / edge generation functions again * Adds an optional seed parameter for deterministic behavior (when desired) * Keeps the number of partitions parameter that was added. * Keeps compatibility with the synthetic benchmark example * Maintains backwards-compatible API Author: RJ Nowling Author: Ankur Dave Closes #2168 from rnowling/graphgenrand and squashes the following commits: f1cd79f [Ankur Dave] Style fixes e11918e [RJ Nowling] Fix bad comparisons in unit tests 785ac70 [RJ Nowling] Fix style error c70868d [RJ Nowling] Fix logNormalGraph scala doc for seed 41fd1f8 [RJ Nowling] Fix logNormalGraph scala doc for seed 799f002 [RJ Nowling] Added test for different seeds for sampleLogNormal 43949ad [RJ Nowling] Added test for different seeds for generateRandomEdges 2faf75f [RJ Nowling] Added unit test for logNormalGraph 82f22397 [RJ Nowling] Add unit test for sampleLogNormal b99cba9 [RJ Nowling] Make sampleLogNormal private to Spark (vs private) for unit testing 6803da1 [RJ Nowling] Add GraphGeneratorsSuite with test for generateRandomEdges 1c8fc44 [RJ Nowling] Connected components part of SynthBenchmark was failing to call count on RDD before printing dfbb6dd [RJ Nowling] Fix parameter name in SynthBenchmark docs b5eeb80 [RJ Nowling] Add optional seed parameter to SynthBenchmark and set default to randomly generate a seed 1ff8d30 [RJ Nowling] Fix bug in generateRandomEdges where numVertices instead of numEdges was used to control number of edges to generate 98bb73c [RJ Nowling] Add documentation for logNormalGraph parameters d40141a [RJ Nowling] Fix style error 684804d [RJ Nowling] revert PR #720 which introduce errors in logNormalGraph and messed up seeding of RNGs. Add user-defined optional seed for deterministic behavior c183136 [RJ Nowling] Fix to deterministic GraphGenerators.logNormalGraph that allows generating graphs randomly using optional seed. 015010c [RJ Nowling] Fixed GraphGenerator logNormalGraph API to make backward-incompatible change in commit 894ecde04 --- .../apache/spark/graphx/util/GraphGenerators.scala | 65 ++++++------ .../spark/graphx/util/GraphGeneratorsSuite.scala | 110 +++++++++++++++++++++ 2 files changed, 146 insertions(+), 29 deletions(-) create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index 60149548ab..b8309289fe 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -40,7 +40,7 @@ object GraphGenerators { val RMATd = 0.25 /** - * Generate a graph whose vertex out degree is log normal. + * Generate a graph whose vertex out degree distribution is log normal. * * The default values for mu and sigma are taken from the Pregel paper: * @@ -48,33 +48,36 @@ object GraphGenerators { * Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010. * Pregel: a system for large-scale graph processing. SIGMOD '10. * - * @param sc - * @param numVertices - * @param mu - * @param sigma - * @return + * If the seed is -1 (default), a random seed is chosen. Otherwise, use + * the user-specified seed. + * + * @param sc Spark Context + * @param numVertices number of vertices in generated graph + * @param numEParts (optional) number of partitions + * @param mu (optional, default: 4.0) mean of out-degree distribution + * @param sigma (optional, default: 1.3) standard deviation of out-degree distribution + * @param seed (optional, default: -1) seed for RNGs, -1 causes a random seed to be chosen + * @return Graph object */ - def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int, - mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = { - val vertices = sc.parallelize(0 until numVertices, numEParts).map { src => - // Initialize the random number generator with the source vertex id - val rand = new Random(src) - val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * sigma + mu).toLong) - (src.toLong, degree) + def logNormalGraph( + sc: SparkContext, numVertices: Int, numEParts: Int = 0, mu: Double = 4.0, + sigma: Double = 1.3, seed: Long = -1): Graph[Long, Int] = { + + val evalNumEParts = if (numEParts == 0) sc.defaultParallelism else numEParts + + // Enable deterministic seeding + val seedRand = if (seed == -1) new Random() else new Random(seed) + val seed1 = seedRand.nextInt() + val seed2 = seedRand.nextInt() + + val vertices: RDD[(VertexId, Long)] = sc.parallelize(0 until numVertices, evalNumEParts).map { + src => (src, sampleLogNormal(mu, sigma, numVertices, seed = (seed1 ^ src))) } + val edges = vertices.flatMap { case (src, degree) => - new Iterator[Edge[Int]] { - // Initialize the random number generator with the source vertex id - val rand = new Random(src) - var i = 0 - override def hasNext(): Boolean = { i < degree } - override def next(): Edge[Int] = { - val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i) - i += 1 - nextEdge - } - } + generateRandomEdges(src.toInt, degree.toInt, numVertices, seed = (seed2 ^ src)) } + Graph(vertices, edges, 0) } @@ -82,9 +85,10 @@ object GraphGenerators { // the edge data is the weight (default 1) val RMATc = 0.15 - def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = { - val rand = new Random() - Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) } + def generateRandomEdges( + src: Int, numEdges: Int, maxVertexId: Int, seed: Long = -1): Array[Edge[Int]] = { + val rand = if (seed == -1) new Random() else new Random(seed) + Array.fill(numEdges) { Edge[Int](src, rand.nextInt(maxVertexId), 1) } } /** @@ -97,9 +101,12 @@ object GraphGenerators { * @param mu the mean of the normal distribution * @param sigma the standard deviation of the normal distribution * @param maxVal exclusive upper bound on the value of the sample + * @param seed optional seed */ - private def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = { - val rand = new Random() + private[spark] def sampleLogNormal( + mu: Double, sigma: Double, maxVal: Int, seed: Long = -1): Int = { + val rand = if (seed == -1) new Random() else new Random(seed) + val sigmaSq = sigma * sigma val m = math.exp(mu + sigmaSq / 2.0) // expm1 is exp(m)-1 with better accuracy for tiny m diff --git a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala new file mode 100644 index 0000000000..b346d4db2e --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.util + +import org.scalatest.FunSuite + +import org.apache.spark.graphx.LocalSparkContext + +class GraphGeneratorsSuite extends FunSuite with LocalSparkContext { + + test("GraphGenerators.generateRandomEdges") { + val src = 5 + val numEdges10 = 10 + val numEdges20 = 20 + val maxVertexId = 100 + + val edges10 = GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId) + assert(edges10.length == numEdges10) + + val correctSrc = edges10.forall(e => e.srcId == src) + assert(correctSrc) + + val correctWeight = edges10.forall(e => e.attr == 1) + assert(correctWeight) + + val correctRange = edges10.forall(e => e.dstId >= 0 && e.dstId <= maxVertexId) + assert(correctRange) + + val edges20 = GraphGenerators.generateRandomEdges(src, numEdges20, maxVertexId) + assert(edges20.length == numEdges20) + + val edges10_round1 = + GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 12345) + val edges10_round2 = + GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 12345) + assert(edges10_round1.zip(edges10_round2).forall { case (e1, e2) => + e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr + }) + + val edges10_round3 = + GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 3467) + assert(!edges10_round1.zip(edges10_round3).forall { case (e1, e2) => + e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr + }) + } + + test("GraphGenerators.sampleLogNormal") { + val mu = 4.0 + val sigma = 1.3 + val maxVal = 100 + + val dstId = GraphGenerators.sampleLogNormal(mu, sigma, maxVal) + assert(dstId < maxVal) + + val dstId_round1 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 12345) + val dstId_round2 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 12345) + assert(dstId_round1 == dstId_round2) + + val dstId_round3 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 789) + assert(dstId_round1 != dstId_round3) + } + + test("GraphGenerators.logNormalGraph") { + withSpark { sc => + val mu = 4.0 + val sigma = 1.3 + val numVertices100 = 100 + + val graph = GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma) + assert(graph.vertices.count() == numVertices100) + + val graph_round1 = + GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma, seed = 12345) + val graph_round2 = + GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma, seed = 12345) + + val graph_round1_edges = graph_round1.edges.collect() + val graph_round2_edges = graph_round2.edges.collect() + + assert(graph_round1_edges.zip(graph_round2_edges).forall { case (e1, e2) => + e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr + }) + + val graph_round3 = + GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = sigma, seed = 567) + + val graph_round3_edges = graph_round3.edges.collect() + + assert(!graph_round1_edges.zip(graph_round3_edges).forall { case (e1, e2) => + e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr + }) + } + } + +} -- cgit v1.2.3