aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2014-06-03 14:14:48 -0700
committerAnkur Dave <ankurdave@gmail.com>2014-06-03 14:14:48 -0700
commit894ecde04faa7e2054a40825a58b2e9cdaa93c70 (patch)
treecce968cd69ec52aaeb37a8930809d3757a3f975f /examples/src
parentaa41a522d821c989c65fa3f7f2a4d372e39bb958 (diff)
downloadspark-894ecde04faa7e2054a40825a58b2e9cdaa93c70.tar.gz
spark-894ecde04faa7e2054a40825a58b2e9cdaa93c70.tar.bz2
spark-894ecde04faa7e2054a40825a58b2e9cdaa93c70.zip
Synthetic GraphX Benchmark
This PR accomplishes two things: 1. It introduces a Synthetic Benchmark application that generates an arbitrarily large log-normal graph and executes either PageRank or connected components on the graph. This can be used to profile GraphX system on arbitrary clusters without access to large graph datasets 2. This PR improves the implementation of the log-normal graph generator. Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> Author: Ankur Dave <ankurdave@gmail.com> Closes #720 from jegonzal/graphx_synth_benchmark and squashes the following commits: e40812a [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0 bccccad [Ankur Dave] Fix long lines 374678a [Ankur Dave] Bugfix and style changes 1bdf39a [Joseph E. Gonzalez] updating options d943972 [Joseph E. Gonzalez] moving the benchmark application into the examples folder. f4f839a [Joseph E. Gonzalez] Creating a synthetic benchmark script.
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala128
1 files changed, 128 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
new file mode 100644
index 0000000000..551c339b19
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.graphx
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx.PartitionStrategy
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.graphx.util.GraphGenerators
+import java.io.{PrintWriter, FileOutputStream}
+
+/**
+ * The SynthBenchmark application can be used to run various GraphX algorithms on
+ * synthetic log-normal graphs. The intent of this code is to enable users to
+ * profile the GraphX system without access to large graph datasets.
+ */
+object SynthBenchmark {
+
+ /**
+ * To run this program use the following:
+ *
+ * MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank
+ *
+ * Options:
+ * -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank)
+ * -niters the number of iterations of pagerank to use (Default: 10)
+ * -numVertices the number of vertices in the graph (Default: 1000000)
+ * -numEPart the number of edge partitions in the graph (Default: number of cores)
+ * -partStrategy the graph partitioning strategy to use
+ * -mu the mean parameter for the log-normal graph (Default: 4.0)
+ * -sigma the stdev parameter for the log-normal graph (Default: 1.3)
+ * -degFile the local file to save the degree information (Default: Empty)
+ */
+ def main(args: Array[String]) {
+ val options = args.map {
+ arg =>
+ arg.dropWhile(_ == '-').split('=') match {
+ case Array(opt, v) => (opt -> v)
+ case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+ }
+ }
+
+ var app = "pagerank"
+ var niter = 10
+ var numVertices = 100000
+ var numEPart: Option[Int] = None
+ var partitionStrategy: Option[PartitionStrategy] = None
+ var mu: Double = 4.0
+ var sigma: Double = 1.3
+ var degFile: String = ""
+
+ options.foreach {
+ case ("app", v) => app = v
+ case ("niter", v) => niter = v.toInt
+ case ("nverts", v) => numVertices = v.toInt
+ case ("numEPart", v) => numEPart = Some(v.toInt)
+ case ("partStrategy", v) => partitionStrategy = Some(PartitionStrategy.fromString(v))
+ case ("mu", v) => mu = v.toDouble
+ case ("sigma", v) => sigma = v.toDouble
+ case ("degFile", v) => degFile = v
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ val conf = new SparkConf()
+ .setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+
+ val sc = new SparkContext(conf)
+
+ // Create the graph
+ println(s"Creating graph...")
+ val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices,
+ numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
+ // Repartition the graph
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache()
+
+ var startTime = System.currentTimeMillis()
+ val numEdges = graph.edges.count()
+ println(s"Done creating graph. Num Vertices = $numVertices, Num Edges = $numEdges")
+ val loadTime = System.currentTimeMillis() - startTime
+
+ // Collect the degree distribution (if desired)
+ if (!degFile.isEmpty) {
+ val fos = new FileOutputStream(degFile)
+ val pos = new PrintWriter(fos)
+ val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0))
+ .map(p => p._2).countByValue()
+ hist.foreach {
+ case (deg, count) => pos.println(s"$deg \t $count")
+ }
+ }
+
+ // Run PageRank
+ startTime = System.currentTimeMillis()
+ if (app == "pagerank") {
+ println("Running PageRank")
+ val totalPR = graph.staticPageRank(niter).vertices.map(_._2).sum()
+ println(s"Total PageRank = $totalPR")
+ } else if (app == "cc") {
+ println("Running Connected Components")
+ val numComponents = graph.connectedComponents.vertices.map(_._2).distinct()
+ println(s"Number of components = $numComponents")
+ }
+ val runTime = System.currentTimeMillis() - startTime
+
+ println(s"Num Vertices = $numVertices")
+ println(s"Num Edges = $numEdges")
+ println(s"Creation time = ${loadTime/1000.0} seconds")
+ println(s"Run time = ${runTime/1000.0} seconds")
+
+ sc.stop()
+ }
+}