aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala128
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala9
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala41
-rw-r--r--project/MimaExcludes.scala4
4 files changed, 171 insertions, 11 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
new file mode 100644
index 0000000000..551c339b19
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.graphx
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx.PartitionStrategy
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.graphx.util.GraphGenerators
+import java.io.{PrintWriter, FileOutputStream}
+
+/**
+ * The SynthBenchmark application can be used to run various GraphX algorithms on
+ * synthetic log-normal graphs. The intent of this code is to enable users to
+ * profile the GraphX system without access to large graph datasets.
+ */
+object SynthBenchmark {
+
+ /**
+ * To run this program use the following:
+ *
+ * MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank
+ *
+ * Options:
+ * -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank)
+ * -niters the number of iterations of pagerank to use (Default: 10)
+ * -numVertices the number of vertices in the graph (Default: 1000000)
+ * -numEPart the number of edge partitions in the graph (Default: number of cores)
+ * -partStrategy the graph partitioning strategy to use
+ * -mu the mean parameter for the log-normal graph (Default: 4.0)
+ * -sigma the stdev parameter for the log-normal graph (Default: 1.3)
+ * -degFile the local file to save the degree information (Default: Empty)
+ */
+ def main(args: Array[String]) {
+ val options = args.map {
+ arg =>
+ arg.dropWhile(_ == '-').split('=') match {
+ case Array(opt, v) => (opt -> v)
+ case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+ }
+ }
+
+ var app = "pagerank"
+ var niter = 10
+ var numVertices = 100000
+ var numEPart: Option[Int] = None
+ var partitionStrategy: Option[PartitionStrategy] = None
+ var mu: Double = 4.0
+ var sigma: Double = 1.3
+ var degFile: String = ""
+
+ options.foreach {
+ case ("app", v) => app = v
+ case ("niter", v) => niter = v.toInt
+ case ("nverts", v) => numVertices = v.toInt
+ case ("numEPart", v) => numEPart = Some(v.toInt)
+ case ("partStrategy", v) => partitionStrategy = Some(PartitionStrategy.fromString(v))
+ case ("mu", v) => mu = v.toDouble
+ case ("sigma", v) => sigma = v.toDouble
+ case ("degFile", v) => degFile = v
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ val conf = new SparkConf()
+ .setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+
+ val sc = new SparkContext(conf)
+
+ // Create the graph
+ println(s"Creating graph...")
+ val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices,
+ numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
+ // Repartition the graph
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache()
+
+ var startTime = System.currentTimeMillis()
+ val numEdges = graph.edges.count()
+ println(s"Done creating graph. Num Vertices = $numVertices, Num Edges = $numEdges")
+ val loadTime = System.currentTimeMillis() - startTime
+
+ // Collect the degree distribution (if desired)
+ if (!degFile.isEmpty) {
+ val fos = new FileOutputStream(degFile)
+ val pos = new PrintWriter(fos)
+ val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0))
+ .map(p => p._2).countByValue()
+ hist.foreach {
+ case (deg, count) => pos.println(s"$deg \t $count")
+ }
+ }
+
+ // Run PageRank
+ startTime = System.currentTimeMillis()
+ if (app == "pagerank") {
+ println("Running PageRank")
+ val totalPR = graph.staticPageRank(niter).vertices.map(_._2).sum()
+ println(s"Total PageRank = $totalPR")
+ } else if (app == "cc") {
+ println("Running Connected Components")
+ val numComponents = graph.connectedComponents.vertices.map(_._2).distinct()
+ println(s"Number of components = $numComponents")
+ }
+ val runTime = System.currentTimeMillis() - startTime
+
+ println(s"Num Vertices = $numVertices")
+ println(s"Num Edges = $numEdges")
+ println(s"Creation time = ${loadTime/1000.0} seconds")
+ println(s"Run time = ${runTime/1000.0} seconds")
+
+ sc.stop()
+ }
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 1526ccef06..ef412cfd4e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -119,4 +119,13 @@ object PartitionStrategy {
math.abs((lower, higher).hashCode()) % numParts
}
}
+
+ /** Returns the PartitionStrategy with the specified name. */
+ def fromString(s: String): PartitionStrategy = s match {
+ case "RandomVertexCut" => RandomVertexCut
+ case "EdgePartition1D" => EdgePartition1D
+ case "EdgePartition2D" => EdgePartition2D
+ case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
+ case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + s)
+ }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index a3c8de3f90..635514f09e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -38,19 +38,42 @@ object GraphGenerators {
val RMATa = 0.45
val RMATb = 0.15
val RMATd = 0.25
+
/**
* Generate a graph whose vertex out degree is log normal.
+ *
+ * The default values for mu and sigma are taken from the Pregel paper:
+ *
+ * Grzegorz Malewicz, Matthew H. Austern, Aart J.C Bik, James C. Dehnert,
+ * Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010.
+ * Pregel: a system for large-scale graph processing. SIGMOD '10.
+ *
+ * @param sc
+ * @param numVertices
+ * @param mu
+ * @param sigma
+ * @return
*/
- def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = {
- // based on Pregel settings
- val mu = 4
- val sigma = 1.3
-
- val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{
- src => (src, sampleLogNormal(mu, sigma, numVertices))
+ def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int,
+ mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = {
+ val vertices = sc.parallelize(0 until numVertices, numEParts).map { src =>
+ // Initialize the random number generator with the source vertex id
+ val rand = new Random(src)
+ val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * sigma + mu).toLong)
+ (src.toLong, degree)
}
- val edges = vertices.flatMap { v =>
- generateRandomEdges(v._1.toInt, v._2, numVertices)
+ val edges = vertices.flatMap { case (src, degree) =>
+ new Iterator[Edge[Int]] {
+ // Initialize the random number generator with the source vertex id
+ val rand = new Random(src)
+ var i = 0
+ override def hasNext(): Boolean = { i < degree }
+ override def next(): Edge[Int] = {
+ val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i)
+ i += 1
+ nextEdge
+ }
+ }
}
Graph(vertices, edges, 0)
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index ecb389de55..fc9cbeaec6 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -35,7 +35,8 @@ object MimaExcludes {
val excludes =
SparkBuild.SPARK_VERSION match {
case v if v.startsWith("1.1") =>
- Seq()
+ Seq(
+ MimaBuild.excludeSparkPackage("graphx"))
case v if v.startsWith("1.0") =>
Seq(
MimaBuild.excludeSparkPackage("api.java"),
@@ -58,4 +59,3 @@ object MimaExcludes {
case _ => Seq()
}
}
-