aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorLarry Xiao <xiaodi@sjtu.edu.cn>2014-09-02 18:29:08 -0700
committerAnkur Dave <ankurdave@gmail.com>2014-09-02 18:29:08 -0700
commit7c92b49d6b62f88fcde883aacb60c5e32ae54b30 (patch)
tree09e4569245c93b3f8cd80d874292ee01c293adfa /graphx
parent644e31524a6a9a22c671a368aeb3b4eaeb61cf29 (diff)
downloadspark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.tar.gz
spark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.tar.bz2
spark-7c92b49d6b62f88fcde883aacb60c5e32ae54b30.zip
[SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples
to support ~/spark/bin/run-example GraphXAnalytics triangles /soc-LiveJournal1.txt --numEPart=256 Author: Larry Xiao <xiaodi@sjtu.edu.cn> Closes #1766 from larryxiao/1986 and squashes the following commits: bb77cd9 [Larry Xiao] [SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala161
1 files changed, 0 insertions, 161 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
deleted file mode 100644
index c1513a0045..0000000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx.lib
-
-import scala.collection.mutable
-import org.apache.spark._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.graphx._
-import org.apache.spark.graphx.PartitionStrategy._
-
-/**
- * Driver program for running graph algorithms.
- */
-object Analytics extends Logging {
-
- def main(args: Array[String]): Unit = {
- if (args.length < 2) {
- System.err.println(
- "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]")
- System.exit(1)
- }
-
- val taskType = args(0)
- val fname = args(1)
- val optionsList = args.drop(2).map { arg =>
- arg.dropWhile(_ == '-').split('=') match {
- case Array(opt, v) => (opt -> v)
- case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
- }
- }
- val options = mutable.Map(optionsList: _*)
-
- def pickPartitioner(v: String): PartitionStrategy = {
- // TODO: Use reflection rather than listing all the partitioning strategies here.
- v match {
- case "RandomVertexCut" => RandomVertexCut
- case "EdgePartition1D" => EdgePartition1D
- case "EdgePartition2D" => EdgePartition2D
- case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
- case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v)
- }
- }
-
- val conf = new SparkConf()
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
- .set("spark.locality.wait", "100000")
-
- val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
- println("Set the number of edge partitions using --numEPart.")
- sys.exit(1)
- }
- val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
- .map(pickPartitioner(_))
- val edgeStorageLevel = options.remove("edgeStorageLevel")
- .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
- val vertexStorageLevel = options.remove("vertexStorageLevel")
- .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
-
- taskType match {
- case "pagerank" =>
- val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
- val outFname = options.remove("output").getOrElse("")
- val numIterOpt = options.remove("numIter").map(_.toInt)
-
- options.foreach {
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
-
- println("======================================")
- println("| PageRank |")
- println("======================================")
-
- val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
-
- val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart,
- edgeStorageLevel = edgeStorageLevel,
- vertexStorageLevel = vertexStorageLevel).cache()
- val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
-
- println("GRAPHX: Number of vertices " + graph.vertices.count)
- println("GRAPHX: Number of edges " + graph.edges.count)
-
- val pr = (numIterOpt match {
- case Some(numIter) => PageRank.run(graph, numIter)
- case None => PageRank.runUntilConvergence(graph, tol)
- }).vertices.cache()
-
- println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
-
- if (!outFname.isEmpty) {
- logWarning("Saving pageranks of pages to " + outFname)
- pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
- }
-
- sc.stop()
-
- case "cc" =>
- options.foreach {
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
-
- println("======================================")
- println("| Connected Components |")
- println("======================================")
-
- val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
- val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart,
- edgeStorageLevel = edgeStorageLevel,
- vertexStorageLevel = vertexStorageLevel).cache()
- val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
-
- val cc = ConnectedComponents.run(graph)
- println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
- sc.stop()
-
- case "triangles" =>
- options.foreach {
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
-
- println("======================================")
- println("| Triangle Count |")
- println("======================================")
-
- val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
- val graph = GraphLoader.edgeListFile(sc, fname,
- canonicalOrientation = true,
- minEdgePartitions = numEPart,
- edgeStorageLevel = edgeStorageLevel,
- vertexStorageLevel = vertexStorageLevel)
- // TriangleCount requires the graph to be partitioned
- .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
- val triangles = TriangleCount.run(graph)
- println("Triangles: " + triangles.vertices.map {
- case (vid, data) => data.toLong
- }.reduce(_ + _) / 3)
- sc.stop()
-
- case _ =>
- println("Invalid task type.")
- }
- }
-}