diff options
author | Larry Xiao <xiaodi@sjtu.edu.cn> | 2014-09-02 18:29:08 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-09-02 18:29:08 -0700 |
commit | 7c92b49d6b62f88fcde883aacb60c5e32ae54b30 (patch) | |
tree | 09e4569245c93b3f8cd80d874292ee01c293adfa /examples | |
parent | 644e31524a6a9a22c671a368aeb3b4eaeb61cf29 (diff) | |
download | spark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.tar.gz spark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.tar.bz2 spark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.zip |
[SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples
to support ~/spark/bin/run-example GraphXAnalytics triangles
/soc-LiveJournal1.txt --numEPart=256
Author: Larry Xiao <xiaodi@sjtu.edu.cn>
Closes #1766 from larryxiao/1986 and squashes the following commits:
bb77cd9 [Larry Xiao] [SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala | 162 | ||||
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala | 2 |
2 files changed, 163 insertions, 1 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala new file mode 100644 index 0000000000..c4317a6aec --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.graphx + +import scala.collection.mutable +import org.apache.spark._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.graphx._ +import org.apache.spark.graphx.lib._ +import org.apache.spark.graphx.PartitionStrategy._ + +/** + * Driver program for running graph algorithms. + */ +object Analytics extends Logging { + + def main(args: Array[String]): Unit = { + if (args.length < 2) { + System.err.println( + "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]") + System.exit(1) + } + + val taskType = args(0) + val fname = args(1) + val optionsList = args.drop(2).map { arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + val options = mutable.Map(optionsList: _*) + + def pickPartitioner(v: String): PartitionStrategy = { + // TODO: Use reflection rather than listing all the partitioning strategies here. + v match { + case "RandomVertexCut" => RandomVertexCut + case "EdgePartition1D" => EdgePartition1D + case "EdgePartition2D" => EdgePartition2D + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut + case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v) + } + } + + val conf = new SparkConf() + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + .set("spark.locality.wait", "100000") + + val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { + println("Set the number of edge partitions using --numEPart.") + sys.exit(1) + } + val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy") + .map(pickPartitioner(_)) + val edgeStorageLevel = options.remove("edgeStorageLevel") + .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + val vertexStorageLevel = options.remove("vertexStorageLevel") + .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + + taskType match { + case "pagerank" => + val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F) + val outFname = options.remove("output").getOrElse("") + val numIterOpt = options.remove("numIter").map(_.toInt) + + options.foreach { + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| PageRank |") + println("======================================") + + val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) + + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + println("GRAPHX: Number of vertices " + graph.vertices.count) + println("GRAPHX: Number of edges " + graph.edges.count) + + val pr = (numIterOpt match { + case Some(numIter) => PageRank.run(graph, numIter) + case None => PageRank.runUntilConvergence(graph, tol) + }).vertices.cache() + + println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _)) + + if (!outFname.isEmpty) { + logWarning("Saving pageranks of pages to " + outFname) + pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) + } + + sc.stop() + + case "cc" => + options.foreach { + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| Connected Components |") + println("======================================") + + val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + val cc = ConnectedComponents.run(graph) + println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) + sc.stop() + + case "triangles" => + options.foreach { + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| Triangle Count |") + println("======================================") + + val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) + val graph = GraphLoader.edgeListFile(sc, fname, + canonicalOrientation = true, + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel) + // TriangleCount requires the graph to be partitioned + .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache() + val triangles = TriangleCount.run(graph) + println("Triangles: " + triangles.vertices.map { + case (vid, data) => data.toLong + }.reduce(_ + _) / 3) + sc.stop() + + case _ => + println("Invalid task type.") + } + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala index 6ef3b62dcb..bdc8fa7f99 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala @@ -20,7 +20,7 @@ package org.apache.spark.examples.graphx import org.apache.spark.SparkContext._ import org.apache.spark._ import org.apache.spark.graphx._ -import org.apache.spark.graphx.lib.Analytics +import org.apache.spark.examples.graphx.Analytics /** * Uses GraphX to run PageRank on a LiveJournal social network graph. Download the dataset from |