aboutsummaryrefslogtreecommitdiff
path: root/graphx/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-11 12:33:06 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-11 12:33:06 -0800
commit64f73f73a012e99053069b624aaeeb2daa13ee22 (patch)
treef3d8ce5b89a6c6f7814383a2704cf8241d4308a9 /graphx/src
parent732333d78e46ee23025d81ca9fbe6d1e13e9f253 (diff)
downloadspark-64f73f73a012e99053069b624aaeeb2daa13ee22.tar.gz
spark-64f73f73a012e99053069b624aaeeb2daa13ee22.tar.bz2
spark-64f73f73a012e99053069b624aaeeb2daa13ee22.zip
Fix indent and use SparkConf in Analytics
Diffstat (limited to 'graphx/src')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala245
1 files changed, 115 insertions, 130 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index 191e2aa051..8c35f4206e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -19,139 +19,124 @@ object Analytics extends Logging {
}
}
- def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = {
- loggers.map{
- loggerName =>
- val logger = org.apache.log4j.Logger.getLogger(loggerName)
- val prevLevel = logger.getLevel()
- logger.setLevel(level)
- loggerName -> prevLevel
- }.toMap
- }
-
def pickPartitioner(v: String): PartitionStrategy = {
v match {
- case "RandomVertexCut" => RandomVertexCut
- case "EdgePartition1D" => EdgePartition1D
- case "EdgePartition2D" => EdgePartition2D
- case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
- case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v)
- }
+ case "RandomVertexCut" => RandomVertexCut
+ case "EdgePartition1D" => EdgePartition1D
+ case "EdgePartition2D" => EdgePartition2D
+ case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
+ case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v)
+ }
}
- val serializer = "org.apache.spark.serializer.KryoSerializer"
- System.setProperty("spark.serializer", serializer)
- System.setProperty("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
-
- taskType match {
- case "pagerank" => {
-
- var tol:Float = 0.001F
- var outFname = ""
- var numVPart = 4
- var numEPart = 4
- var partitionStrategy: Option[PartitionStrategy] = None
-
- options.foreach{
- case ("tol", v) => tol = v.toFloat
- case ("output", v) => outFname = v
- case ("numVPart", v) => numVPart = v.toInt
- case ("numEPart", v) => numEPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
-
- println("======================================")
- println("| PageRank |")
- println("======================================")
-
- val sc = new SparkContext(host, "PageRank(" + fname + ")")
-
- val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart).cache()
- val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
-
- println("GRAPHX: Number of vertices " + graph.vertices.count)
- println("GRAPHX: Number of edges " + graph.edges.count)
-
- val pr = graph.pageRank(tol).vertices.cache()
-
- println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_))
-
- if (!outFname.isEmpty) {
- logWarning("Saving pageranks of pages to " + outFname)
- pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
- }
-
- sc.stop()
- }
-
- case "cc" => {
-
- var numIter = Int.MaxValue
- var numVPart = 4
- var numEPart = 4
- var isDynamic = false
- var partitionStrategy: Option[PartitionStrategy] = None
-
- options.foreach{
- case ("numIter", v) => numIter = v.toInt
- case ("dynamic", v) => isDynamic = v.toBoolean
- case ("numEPart", v) => numEPart = v.toInt
- case ("numVPart", v) => numVPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
-
- if(!isDynamic && numIter == Int.MaxValue) {
- println("Set number of iterations!")
- sys.exit(1)
- }
- println("======================================")
- println("| Connected Components |")
- println("--------------------------------------")
- println(" Using parameters:")
- println(" \tDynamic: " + isDynamic)
- println(" \tNumIter: " + numIter)
- println("======================================")
-
- val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
- val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart).cache()
- val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
-
- val cc = ConnectedComponents.run(graph)
- println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
- sc.stop()
+
+ val conf = new SparkConf()
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+
+ taskType match {
+ case "pagerank" =>
+ var tol: Float = 0.001F
+ var outFname = ""
+ var numVPart = 4
+ var numEPart = 4
+ var partitionStrategy: Option[PartitionStrategy] = None
+
+ options.foreach{
+ case ("tol", v) => tol = v.toFloat
+ case ("output", v) => outFname = v
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("numEPart", v) => numEPart = v.toInt
+ case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
- case "triangles" => {
- var numVPart = 4
- var numEPart = 4
- // TriangleCount requires the graph to be partitioned
- var partitionStrategy: PartitionStrategy = RandomVertexCut
-
- options.foreach{
- case ("numEPart", v) => numEPart = v.toInt
- case ("numVPart", v) => numVPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
- println("======================================")
- println("| Triangle Count |")
- println("--------------------------------------")
- val sc = new SparkContext(host, "TriangleCount(" + fname + ")")
- val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
- minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
- val triangles = TriangleCount.run(graph)
- println("Triangles: " + triangles.vertices.map {
- case (vid,data) => data.toLong
- }.reduce(_+_) / 3)
- sc.stop()
- }
-
- case _ => {
- println("Invalid task type.")
- }
- }
- }
+ println("======================================")
+ println("| PageRank |")
+ println("======================================")
+
+ val sc = new SparkContext(host, "PageRank(" + fname + ")", conf)
+
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ println("GRAPHX: Number of vertices " + graph.vertices.count)
+ println("GRAPHX: Number of edges " + graph.edges.count)
+
+ val pr = graph.pageRank(tol).vertices.cache()
+
+ println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_))
+
+ if (!outFname.isEmpty) {
+ logWarning("Saving pageranks of pages to " + outFname)
+ pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
+ }
+
+ sc.stop()
+
+ case "cc" =>
+ var numIter = Int.MaxValue
+ var numVPart = 4
+ var numEPart = 4
+ var isDynamic = false
+ var partitionStrategy: Option[PartitionStrategy] = None
+
+ options.foreach{
+ case ("numIter", v) => numIter = v.toInt
+ case ("dynamic", v) => isDynamic = v.toBoolean
+ case ("numEPart", v) => numEPart = v.toInt
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ if(!isDynamic && numIter == Int.MaxValue) {
+ println("Set number of iterations!")
+ sys.exit(1)
+ }
+ println("======================================")
+ println("| Connected Components |")
+ println("--------------------------------------")
+ println(" Using parameters:")
+ println(" \tDynamic: " + isDynamic)
+ println(" \tNumIter: " + numIter)
+ println("======================================")
+
+ val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")", conf)
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ val cc = ConnectedComponents.run(graph)
+ println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
+ sc.stop()
+
+ case "triangles" =>
+ var numVPart = 4
+ var numEPart = 4
+ // TriangleCount requires the graph to be partitioned
+ var partitionStrategy: PartitionStrategy = RandomVertexCut
+
+ options.foreach{
+ case ("numEPart", v) => numEPart = v.toInt
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+ println("======================================")
+ println("| Triangle Count |")
+ println("--------------------------------------")
+ val sc = new SparkContext(host, "TriangleCount(" + fname + ")", conf)
+ val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
+ minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
+ val triangles = TriangleCount.run(graph)
+ println("Triangles: " + triangles.vertices.map {
+ case (vid,data) => data.toLong
+ }.reduce(_ + _) / 3)
+ sc.stop()
+
+ case _ =>
+ println("Invalid task type.")
+ }
+ }
}