diff options
author | Larry Xiao <xiaodi@sjtu.edu.cn> | 2014-09-02 18:29:08 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-09-02 18:29:08 -0700 |
commit | 7c92b49d6b62f88fcde883aacb60c5e32ae54b30 (patch) | |
tree | 09e4569245c93b3f8cd80d874292ee01c293adfa /graphx | |
parent | 644e31524a6a9a22c671a368aeb3b4eaeb61cf29 (diff) | |
download | spark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.tar.gz spark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.tar.bz2 spark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.zip |
[SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples
to support ~/spark/bin/run-example GraphXAnalytics triangles
/soc-LiveJournal1.txt --numEPart=256
Author: Larry Xiao <xiaodi@sjtu.edu.cn>
Closes #1766 from larryxiao/1986 and squashes the following commits:
bb77cd9 [Larry Xiao] [SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples
Diffstat (limited to 'graphx')
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala | 161 |
1 files changed, 0 insertions, 161 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala deleted file mode 100644 index c1513a0045..0000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.graphx.lib - -import scala.collection.mutable -import org.apache.spark._ -import org.apache.spark.storage.StorageLevel -import org.apache.spark.graphx._ -import org.apache.spark.graphx.PartitionStrategy._ - -/** - * Driver program for running graph algorithms. - */ -object Analytics extends Logging { - - def main(args: Array[String]): Unit = { - if (args.length < 2) { - System.err.println( - "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]") - System.exit(1) - } - - val taskType = args(0) - val fname = args(1) - val optionsList = args.drop(2).map { arg => - arg.dropWhile(_ == '-').split('=') match { - case Array(opt, v) => (opt -> v) - case _ => throw new IllegalArgumentException("Invalid argument: " + arg) - } - } - val options = mutable.Map(optionsList: _*) - - def pickPartitioner(v: String): PartitionStrategy = { - // TODO: Use reflection rather than listing all the partitioning strategies here. - v match { - case "RandomVertexCut" => RandomVertexCut - case "EdgePartition1D" => EdgePartition1D - case "EdgePartition2D" => EdgePartition2D - case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut - case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v) - } - } - - val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") - .set("spark.locality.wait", "100000") - - val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { - println("Set the number of edge partitions using --numEPart.") - sys.exit(1) - } - val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy") - .map(pickPartitioner(_)) - val edgeStorageLevel = options.remove("edgeStorageLevel") - .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) - val vertexStorageLevel = options.remove("vertexStorageLevel") - .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) - - taskType match { - case "pagerank" => - val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F) - val outFname = options.remove("output").getOrElse("") - val numIterOpt = options.remove("numIter").map(_.toInt) - - options.foreach { - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - println("======================================") - println("| PageRank |") - println("======================================") - - val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) - - val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart, - edgeStorageLevel = edgeStorageLevel, - vertexStorageLevel = vertexStorageLevel).cache() - val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) - - println("GRAPHX: Number of vertices " + graph.vertices.count) - println("GRAPHX: Number of edges " + graph.edges.count) - - val pr = (numIterOpt match { - case Some(numIter) => PageRank.run(graph, numIter) - case None => PageRank.runUntilConvergence(graph, tol) - }).vertices.cache() - - println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _)) - - if (!outFname.isEmpty) { - logWarning("Saving pageranks of pages to " + outFname) - pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) - } - - sc.stop() - - case "cc" => - options.foreach { - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - println("======================================") - println("| Connected Components |") - println("======================================") - - val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) - val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart, - edgeStorageLevel = edgeStorageLevel, - vertexStorageLevel = vertexStorageLevel).cache() - val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) - - val cc = ConnectedComponents.run(graph) - println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) - sc.stop() - - case "triangles" => - options.foreach { - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - println("======================================") - println("| Triangle Count |") - println("======================================") - - val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) - val graph = GraphLoader.edgeListFile(sc, fname, - canonicalOrientation = true, - minEdgePartitions = numEPart, - edgeStorageLevel = edgeStorageLevel, - vertexStorageLevel = vertexStorageLevel) - // TriangleCount requires the graph to be partitioned - .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache() - val triangles = TriangleCount.run(graph) - println("Triangles: " + triangles.vertices.map { - case (vid, data) => data.toLong - }.reduce(_ + _) / 3) - sc.stop() - - case _ => - println("Invalid task type.") - } - } -} |