aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
authorLarry Xiao <xiaodi@sjtu.edu.cn>2014-09-02 18:29:08 -0700
committerAnkur Dave <ankurdave@gmail.com>2014-09-02 18:29:08 -0700
commit7c92b49d6b62f88fcde883aacb60c5e32ae54b30 (patch)
tree09e4569245c93b3f8cd80d874292ee01c293adfa /examples/src/main/scala
parent644e31524a6a9a22c671a368aeb3b4eaeb61cf29 (diff)
downloadspark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.tar.gz
spark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.tar.bz2
spark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.zip
[SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples
to support ~/spark/bin/run-example GraphXAnalytics triangles /soc-LiveJournal1.txt --numEPart=256 Author: Larry Xiao <xiaodi@sjtu.edu.cn> Closes #1766 from larryxiao/1986 and squashes the following commits: bb77cd9 [Larry Xiao] [SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala162
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala2
2 files changed, 163 insertions, 1 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
new file mode 100644
index 0000000000..c4317a6aec
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.graphx
+
+import scala.collection.mutable
+import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.lib._
+import org.apache.spark.graphx.PartitionStrategy._
+
+/**
+ * Driver program for running graph algorithms.
+ */
+object Analytics extends Logging {
+
+ def main(args: Array[String]): Unit = {
+ if (args.length < 2) {
+ System.err.println(
+ "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]")
+ System.exit(1)
+ }
+
+ val taskType = args(0)
+ val fname = args(1)
+ val optionsList = args.drop(2).map { arg =>
+ arg.dropWhile(_ == '-').split('=') match {
+ case Array(opt, v) => (opt -> v)
+ case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+ }
+ }
+ val options = mutable.Map(optionsList: _*)
+
+ def pickPartitioner(v: String): PartitionStrategy = {
+ // TODO: Use reflection rather than listing all the partitioning strategies here.
+ v match {
+ case "RandomVertexCut" => RandomVertexCut
+ case "EdgePartition1D" => EdgePartition1D
+ case "EdgePartition2D" => EdgePartition2D
+ case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
+ case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v)
+ }
+ }
+
+ val conf = new SparkConf()
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+ .set("spark.locality.wait", "100000")
+
+ val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
+ println("Set the number of edge partitions using --numEPart.")
+ sys.exit(1)
+ }
+ val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
+ .map(pickPartitioner(_))
+ val edgeStorageLevel = options.remove("edgeStorageLevel")
+ .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+ val vertexStorageLevel = options.remove("vertexStorageLevel")
+ .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+
+ taskType match {
+ case "pagerank" =>
+ val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
+ val outFname = options.remove("output").getOrElse("")
+ val numIterOpt = options.remove("numIter").map(_.toInt)
+
+ options.foreach {
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ println("======================================")
+ println("| PageRank |")
+ println("======================================")
+
+ val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
+
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ println("GRAPHX: Number of vertices " + graph.vertices.count)
+ println("GRAPHX: Number of edges " + graph.edges.count)
+
+ val pr = (numIterOpt match {
+ case Some(numIter) => PageRank.run(graph, numIter)
+ case None => PageRank.runUntilConvergence(graph, tol)
+ }).vertices.cache()
+
+ println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
+
+ if (!outFname.isEmpty) {
+ logWarning("Saving pageranks of pages to " + outFname)
+ pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+ }
+
+ sc.stop()
+
+ case "cc" =>
+ options.foreach {
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ println("======================================")
+ println("| Connected Components |")
+ println("======================================")
+
+ val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ val cc = ConnectedComponents.run(graph)
+ println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
+ sc.stop()
+
+ case "triangles" =>
+ options.foreach {
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ println("======================================")
+ println("| Triangle Count |")
+ println("======================================")
+
+ val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
+ val graph = GraphLoader.edgeListFile(sc, fname,
+ canonicalOrientation = true,
+ minEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel)
+ // TriangleCount requires the graph to be partitioned
+ .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
+ val triangles = TriangleCount.run(graph)
+ println("Triangles: " + triangles.vertices.map {
+ case (vid, data) => data.toLong
+ }.reduce(_ + _) / 3)
+ sc.stop()
+
+ case _ =>
+ println("Invalid task type.")
+ }
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
index 6ef3b62dcb..bdc8fa7f99 100644
--- a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
@@ -20,7 +20,7 @@ package org.apache.spark.examples.graphx
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.graphx._
-import org.apache.spark.graphx.lib.Analytics
+import org.apache.spark.examples.graphx.Analytics
/**
* Uses GraphX to run PageRank on a LiveJournal social network graph. Download the dataset from