+package org.apache.spark.examples.graphx
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx.PartitionStrategy
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.graphx.util.GraphGenerators
+import java.io.{PrintWriter, FileOutputStream}
+ * The SynthBenchmark application can be used to run various GraphX algorithms on
+ * synthetic log-normal graphs. The intent of this code is to enable users to
+ * profile the GraphX system without access to large graph datasets.
+ */
+object SynthBenchmark {
+ /**
+ * To run this program use the following:
+ *
+ * MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank
+ *
+ * Options:
+ * -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank)
+ * -niters the number of iterations of pagerank to use (Default: 10)
+ * -numVertices the number of vertices in the graph (Default: 1000000)
+ * -numEPart the number of edge partitions in the graph (Default: number of cores)
+ * -partStrategy the graph partitioning strategy to use
+ * -mu the mean parameter for the log-normal graph (Default: 4.0)
+ * -sigma the stdev parameter for the log-normal graph (Default: 1.3)
+ * -degFile the local file to save the degree information (Default: Empty)
+ */
+ def main(args: Array[String]) {
+ val options = args.map {
+ arg =>
+ arg.dropWhile(_ == '-').split('=') match {
+ case Array(opt, v) => (opt -> v)
+ case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+ }
+ }
+ var app = "pagerank"
+ var niter = 10
+ var numVertices = 100000
+ var numEPart: Option[Int] = None
+ var partitionStrategy: Option[PartitionStrategy] = None
+ var mu: Double = 4.0
+ var sigma: Double = 1.3
+ var degFile: String = ""
+ options.foreach {
+ case ("app", v) => app = v
+ case ("niter", v) => niter = v.toInt
+ case ("nverts", v) => numVertices = v.toInt
+ case ("numEPart", v) => numEPart = Some(v.toInt)
+ case ("partStrategy", v) => partitionStrategy = Some(PartitionStrategy.fromString(v))
+ case ("mu", v) => mu = v.toDouble
+ case ("sigma", v) => sigma = v.toDouble
+ case ("degFile", v) => degFile = v
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+ val conf = new SparkConf()
+ .setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+ val sc = new SparkContext(conf)
+ // Create the graph
+ println(s"Creating graph...")
+ val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices,
+ numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
+ // Repartition the graph
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache()
+ var startTime = System.currentTimeMillis()
+ val numEdges = graph.edges.count()
+ println(s"Done creating graph. Num Vertices = $numVertices, Num Edges = $numEdges")
+ val loadTime = System.currentTimeMillis() - startTime
+ // Collect the degree distribution (if desired)
+ if (!degFile.isEmpty) {
+ val fos = new FileOutputStream(degFile)
+ val pos = new PrintWriter(fos)
+ val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0))
+ .map(p => p._2).countByValue()
+ hist.foreach {
+ case (deg, count) => pos.println(s"$deg \t $count")
+ }
+ }
+ // Run PageRank
+ startTime = System.currentTimeMillis()
+ if (app == "pagerank") {
+ println("Running PageRank")
+ val totalPR = graph.staticPageRank(niter).vertices.map(_._2).sum()
+ println(s"Total PageRank = $totalPR")
+ } else if (app == "cc") {
+ println("Running Connected Components")
+ val numComponents = graph.connectedComponents.vertices.map(_._2).distinct()
+ println(s"Number of components = $numComponents")
+ }
+ val runTime = System.currentTimeMillis() - startTime
+ println(s"Num Vertices = $numVertices")
+ println(s"Num Edges = $numEdges")
+ println(s"Creation time = ${loadTime/1000.0} seconds")
+ println(s"Run time = ${runTime/1000.0} seconds")
+ sc.stop()
+ }