diff options
author | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2013-09-17 22:42:12 -0700 |
---|---|---|
committer | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2013-09-17 22:42:12 -0700 |
commit | 55696e258456798d73325655428899c5b4931730 (patch) | |
tree | 498d74f2235fab6972b6ed4329b584372bbb6bf3 | |
parent | 8b59fb72c45a64b6b49d79080eaff0f675197086 (diff) | |
download | spark-55696e258456798d73325655428899c5b4931730.tar.gz spark-55696e258456798d73325655428899c5b4931730.tar.bz2 spark-55696e258456798d73325655428899c5b4931730.zip |
GraphX now builds with all merged changes.
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/Analytics.scala (renamed from graph/src/main/scala/spark/graph/Analytics.scala) | 4 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/Edge.scala (renamed from graph/src/main/scala/spark/graph/Edge.scala) | 2 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala (renamed from graph/src/main/scala/spark/graph/EdgeDirection.scala) | 2 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala (renamed from graph/src/main/scala/spark/graph/EdgeTriplet.scala) | 2 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/Graph.scala (renamed from graph/src/main/scala/spark/graph/Graph.scala) | 9 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala (renamed from graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala) | 4 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/GraphLab.scala (renamed from graph/src/main/scala/spark/graph/GraphLab.scala) | 4 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala (renamed from graph/src/main/scala/spark/graph/GraphLoader.scala) | 10 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/GraphOps.scala (renamed from graph/src/main/scala/spark/graph/GraphOps.scala) | 4 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/Pregel.scala (renamed from graph/src/main/scala/spark/graph/Pregel.scala) | 4 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/Vertex.scala (renamed from graph/src/main/scala/spark/graph/Vertex.scala) | 2 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala (renamed from graph/src/main/scala/spark/graph/impl/EdgePartition.scala) | 4 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala (renamed from graph/src/main/scala/spark/graph/impl/EdgeTripletRDD.scala) | 18 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala (renamed from graph/src/main/scala/spark/graph/impl/GraphImpl.scala) | 14 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/package.scala (renamed from graph/src/main/scala/spark/graph/package.scala) | 2 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala | 76 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala | 75 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala (renamed from graph/src/main/scala/spark/graph/util/BytecodeUtils.scala) | 5 | ||||
-rw-r--r-- | graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala (renamed from graph/src/main/scala/spark/graph/util/HashUtils.scala) | 2 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/perf/BagelTest.scala | 72 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/perf/SparkTest.scala | 72 | ||||
-rw-r--r-- | project/SparkBuild.scala | 16 |
22 files changed, 211 insertions, 192 deletions
diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 601a0785e1..09cf81eeeb 100644 --- a/graph/src/main/scala/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -1,6 +1,6 @@ -package spark.graph +package org.apache.spark.graph -import spark._ +import org.apache.spark._ diff --git a/graph/src/main/scala/spark/graph/Edge.scala b/graph/src/main/scala/org/apache/spark/graph/Edge.scala index cb057a467a..20539b8af0 100644 --- a/graph/src/main/scala/spark/graph/Edge.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Edge.scala @@ -1,4 +1,4 @@ -package spark.graph +package org.apache.spark.graph /** diff --git a/graph/src/main/scala/spark/graph/EdgeDirection.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala index 38caac44d6..99af2d5458 100644 --- a/graph/src/main/scala/spark/graph/EdgeDirection.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala @@ -1,4 +1,4 @@ -package spark.graph +package org.apache.spark.graph /** diff --git a/graph/src/main/scala/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala index 3ed8052794..4ade1d7333 100644 --- a/graph/src/main/scala/spark/graph/EdgeTriplet.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala @@ -1,4 +1,4 @@ -package spark.graph +package org.apache.spark.graph /** * An edge triplet represents two vertices and edge along with their attributes. diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 594b3b5495..1fb22c56ff 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -1,6 +1,7 @@ -package spark.graph +package org.apache.spark.graph -import spark.RDD + +import org.apache.spark.rdd.RDD @@ -366,8 +367,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { object Graph { - import spark.graph.impl._ - import spark.SparkContext._ + import org.apache.spark.graph.impl._ + import org.apache.spark.SparkContext._ def apply(rawEdges: RDD[(Vid, Vid)], uniqueEdges: Boolean = true): Graph[Int, Int] = { // Reduce to unique edges. diff --git a/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala index e1cb77f114..13a22f9051 100644 --- a/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -1,8 +1,8 @@ -package spark.graph +package org.apache.spark.graph import com.esotericsoftware.kryo.Kryo -import spark.KryoRegistrator +import org.apache.spark.serializer.KryoRegistrator class GraphKryoRegistrator extends KryoRegistrator { diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala index f89c2a39d7..1dba813e91 100644 --- a/graph/src/main/scala/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -1,7 +1,7 @@ -package spark.graph +package org.apache.spark.graph import scala.collection.JavaConversions._ -import spark.RDD +import org.apache.spark.rdd.RDD /** * This object implement the graphlab gather-apply-scatter api. diff --git a/graph/src/main/scala/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index 7e1a054413..4d7ca1268d 100644 --- a/graph/src/main/scala/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -1,9 +1,9 @@ -package spark.graph +package org.apache.spark.graph -import spark.RDD -import spark.SparkContext -import spark.SparkContext._ -import spark.graph.impl.GraphImpl +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graph.impl.GraphImpl object GraphLoader { diff --git a/graph/src/main/scala/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index d98cd8d44c..8de96680b8 100644 --- a/graph/src/main/scala/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -1,6 +1,6 @@ -package spark.graph +package org.apache.spark.graph -import spark.RDD +import org.apache.spark.rdd.RDD class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) { diff --git a/graph/src/main/scala/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 0a564b8041..27b75a7988 100644 --- a/graph/src/main/scala/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -1,6 +1,6 @@ -package spark.graph +package org.apache.spark.graph -import spark.RDD +import org.apache.spark.rdd.RDD object Pregel { diff --git a/graph/src/main/scala/spark/graph/Vertex.scala b/graph/src/main/scala/org/apache/spark/graph/Vertex.scala index 32653571f7..c8671b7f13 100644 --- a/graph/src/main/scala/spark/graph/Vertex.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Vertex.scala @@ -1,4 +1,4 @@ -package spark.graph +package org.apache.spark.graph /** * A graph vertex consists of a vertex id and attribute. diff --git a/graph/src/main/scala/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala index 4e0d5f41b9..3d218f27b1 100644 --- a/graph/src/main/scala/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala @@ -1,10 +1,10 @@ -package spark.graph.impl +package org.apache.spark.graph.impl import scala.collection.mutable.ArrayBuilder import it.unimi.dsi.fastutil.ints.IntArrayList -import spark.graph._ +import org.apache.spark.graph._ /** diff --git a/graph/src/main/scala/spark/graph/impl/EdgeTripletRDD.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala index f6de8e59af..18d5d2b5aa 100644 --- a/graph/src/main/scala/spark/graph/impl/EdgeTripletRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala @@ -1,9 +1,15 @@ -package spark.graph.impl - -import spark.{Aggregator, HashPartitioner, Partition, RDD, SparkEnv, TaskContext} -import spark.{Dependency, OneToOneDependency, ShuffleDependency} -import spark.SparkContext._ -import spark.graph._ +package org.apache.spark.graph.impl + +import org.apache.spark.Aggregator +import org.apache.spark.Partition +import org.apache.spark.SparkEnv +import org.apache.spark.TaskContext +import org.apache.spark.rdd.RDD +import org.apache.spark.Dependency +import org.apache.spark.OneToOneDependency +import org.apache.spark.ShuffleDependency +import org.apache.spark.SparkContext._ +import org.apache.spark.graph._ private[graph] diff --git a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 08fc016a43..68ac9f724c 100644 --- a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -1,12 +1,16 @@ -package spark.graph.impl +package org.apache.spark.graph.impl import scala.collection.JavaConversions._ -import spark.{ClosureCleaner, Partitioner, HashPartitioner, RDD} -import spark.SparkContext._ +import org.apache.spark.SparkContext._ +import org.apache.spark.Partitioner +import org.apache.spark.HashPartitioner +import org.apache.spark.util.ClosureCleaner -import spark.graph._ -import spark.graph.impl.GraphImpl._ +import org.apache.spark.rdd.RDD + +import org.apache.spark.graph._ +import org.apache.spark.graph.impl.GraphImpl._ diff --git a/graph/src/main/scala/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala index d95dcdce08..474ace520f 100644 --- a/graph/src/main/scala/spark/graph/package.scala +++ b/graph/src/main/scala/org/apache/spark/graph/package.scala @@ -1,4 +1,4 @@ -package spark +package org.apache.spark package object graph { diff --git a/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala b/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala new file mode 100644 index 0000000000..eaff27a33e --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala @@ -0,0 +1,76 @@ +///// This file creates circular dependencies between examples bagle and graph + +// package org.apache.spark.graph.perf + +// import org.apache.spark._ +// import org.apache.spark.SparkContext._ +// import org.apache.spark.bagel.Bagel + +// import org.apache.spark.examples.bagel +// //import org.apache.spark.bagel.examples._ +// import org.apache.spark.graph._ + + +// object BagelTest { + +// def main(args: Array[String]) { +// val host = args(0) +// val taskType = args(1) +// val fname = args(2) +// val options = args.drop(3).map { arg => +// arg.dropWhile(_ == '-').split('=') match { +// case Array(opt, v) => (opt -> v) +// case _ => throw new IllegalArgumentException("Invalid argument: " + arg) +// } +// } + +// System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +// //System.setProperty("spark.shuffle.compress", "false") +// System.setProperty("spark.kryo.registrator", "org.apache.spark.bagel.examples.PRKryoRegistrator") + +// var numIter = Int.MaxValue +// var isDynamic = false +// var tol:Float = 0.001F +// var outFname = "" +// var numVPart = 4 +// var numEPart = 4 + +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("tol", v) => tol = v.toFloat +// case ("output", v) => outFname = v +// case ("numVPart", v) => numVPart = v.toInt +// case ("numEPart", v) => numEPart = v.toInt +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } + +// val sc = new SparkContext(host, "PageRank(" + fname + ")") +// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() +// val startTime = System.currentTimeMillis + +// val numVertices = g.vertices.count() + +// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) => +// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString))) +// } + +// // Do the computation +// val epsilon = 0.01 / numVertices +// val messages = sc.parallelize(Array[(String, PRMessage)]()) +// val utils = new PageRankUtils +// val result = +// Bagel.run( +// sc, vertices, messages, combiner = new PRCombiner(), +// numPartitions = numVPart)( +// utils.computeWithCombiner(numVertices, epsilon, numIter)) + +// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) ) +// if (!outFname.isEmpty) { +// println("Saving pageranks of pages to " + outFname) +// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname) +// } +// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") +// sc.stop() +// } +// } diff --git a/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala b/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala new file mode 100644 index 0000000000..01bd968550 --- /dev/null +++ b/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala @@ -0,0 +1,75 @@ +///// This file creates circular dependencies between examples bagle and graph + + +// package org.apache.spark.graph.perf + +// import org.apache.spark._ +// import org.apache.spark.SparkContext._ +// import org.apache.spark.bagel.Bagel +// import org.apache.spark.bagel.examples._ +// import org.apache.spark.graph._ + + +// object SparkTest { + +// def main(args: Array[String]) { +// val host = args(0) +// val taskType = args(1) +// val fname = args(2) +// val options = args.drop(3).map { arg => +// arg.dropWhile(_ == '-').split('=') match { +// case Array(opt, v) => (opt -> v) +// case _ => throw new IllegalArgumentException("Invalid argument: " + arg) +// } +// } + +// System.setProperty("spark.serializer", "org.apache.spark.KryoSerializer") +// //System.setProperty("spark.shuffle.compress", "false") +// System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator") + +// var numIter = Int.MaxValue +// var isDynamic = false +// var tol:Float = 0.001F +// var outFname = "" +// var numVPart = 4 +// var numEPart = 4 + +// options.foreach{ +// case ("numIter", v) => numIter = v.toInt +// case ("dynamic", v) => isDynamic = v.toBoolean +// case ("tol", v) => tol = v.toFloat +// case ("output", v) => outFname = v +// case ("numVPart", v) => numVPart = v.toInt +// case ("numEPart", v) => numEPart = v.toInt +// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) +// } + +// val sc = new SparkContext(host, "PageRank(" + fname + ")") +// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() +// val startTime = System.currentTimeMillis + +// val numVertices = g.vertices.count() + +// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) => +// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString))) +// } + +// // Do the computation +// val epsilon = 0.01 / numVertices +// val messages = sc.parallelize(Array[(String, PRMessage)]()) +// val utils = new PageRankUtils +// val result = +// Bagel.run( +// sc, vertices, messages, combiner = new PRCombiner(), +// numPartitions = numVPart)( +// utils.computeWithCombiner(numVertices, epsilon, numIter)) + +// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) ) +// if (!outFname.isEmpty) { +// println("Saving pageranks of pages to " + outFname) +// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname) +// } +// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") +// sc.stop() +// } +// } diff --git a/graph/src/main/scala/spark/graph/util/BytecodeUtils.scala b/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala index ac3a1fb957..5db13fe3bc 100644 --- a/graph/src/main/scala/spark/graph/util/BytecodeUtils.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala @@ -1,4 +1,4 @@ -package spark.graph.util +package org.apache.spark.util import java.io.{ByteArrayInputStream, ByteArrayOutputStream} @@ -7,10 +7,9 @@ import scala.collection.mutable.HashSet import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor} import org.objectweb.asm.Opcodes._ -import spark.Utils -private[graph] object BytecodeUtils { +private[spark] object BytecodeUtils { /** * Test whether the given closure invokes the specified method in the specified class. diff --git a/graph/src/main/scala/spark/graph/util/HashUtils.scala b/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala index 0dfaef4c48..cb18ef3d26 100644 --- a/graph/src/main/scala/spark/graph/util/HashUtils.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala @@ -1,4 +1,4 @@ -package spark.graph.util +package org.apache.spark.graph.util object HashUtils { diff --git a/graph/src/main/scala/spark/graph/perf/BagelTest.scala b/graph/src/main/scala/spark/graph/perf/BagelTest.scala deleted file mode 100644 index 7547292500..0000000000 --- a/graph/src/main/scala/spark/graph/perf/BagelTest.scala +++ /dev/null @@ -1,72 +0,0 @@ -package spark.graph.perf - -import spark._ -import spark.SparkContext._ -import spark.bagel.Bagel -import spark.bagel.examples._ -import spark.graph._ - - -object BagelTest { - - def main(args: Array[String]) { - val host = args(0) - val taskType = args(1) - val fname = args(2) - val options = args.drop(3).map { arg => - arg.dropWhile(_ == '-').split('=') match { - case Array(opt, v) => (opt -> v) - case _ => throw new IllegalArgumentException("Invalid argument: " + arg) - } - } - - System.setProperty("spark.serializer", "spark.KryoSerializer") - //System.setProperty("spark.shuffle.compress", "false") - System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator") - - var numIter = Int.MaxValue - var isDynamic = false - var tol:Float = 0.001F - var outFname = "" - var numVPart = 4 - var numEPart = 4 - - options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean - case ("tol", v) => tol = v.toFloat - case ("output", v) => outFname = v - case ("numVPart", v) => numVPart = v.toInt - case ("numEPart", v) => numEPart = v.toInt - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - val sc = new SparkContext(host, "PageRank(" + fname + ")") - val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() - val startTime = System.currentTimeMillis - - val numVertices = g.vertices.count() - - val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) => - (vid.toString, new PRVertex(1.0, neighbors.map(_.toString))) - } - - // Do the computation - val epsilon = 0.01 / numVertices - val messages = sc.parallelize(Array[(String, PRMessage)]()) - val utils = new PageRankUtils - val result = - Bagel.run( - sc, vertices, messages, combiner = new PRCombiner(), - numPartitions = numVPart)( - utils.computeWithCombiner(numVertices, epsilon, numIter)) - - println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) ) - if (!outFname.isEmpty) { - println("Saving pageranks of pages to " + outFname) - result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname) - } - println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") - sc.stop() - } -} diff --git a/graph/src/main/scala/spark/graph/perf/SparkTest.scala b/graph/src/main/scala/spark/graph/perf/SparkTest.scala deleted file mode 100644 index 85ebd14bcb..0000000000 --- a/graph/src/main/scala/spark/graph/perf/SparkTest.scala +++ /dev/null @@ -1,72 +0,0 @@ -package spark.graph.perf - -import spark._ -import spark.SparkContext._ -import spark.bagel.Bagel -import spark.bagel.examples._ -import spark.graph._ - - -object SparkTest { - - def main(args: Array[String]) { - val host = args(0) - val taskType = args(1) - val fname = args(2) - val options = args.drop(3).map { arg => - arg.dropWhile(_ == '-').split('=') match { - case Array(opt, v) => (opt -> v) - case _ => throw new IllegalArgumentException("Invalid argument: " + arg) - } - } - - System.setProperty("spark.serializer", "spark.KryoSerializer") - //System.setProperty("spark.shuffle.compress", "false") - System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator") - - var numIter = Int.MaxValue - var isDynamic = false - var tol:Float = 0.001F - var outFname = "" - var numVPart = 4 - var numEPart = 4 - - options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean - case ("tol", v) => tol = v.toFloat - case ("output", v) => outFname = v - case ("numVPart", v) => numVPart = v.toInt - case ("numEPart", v) => numEPart = v.toInt - case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) - } - - val sc = new SparkContext(host, "PageRank(" + fname + ")") - val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache() - val startTime = System.currentTimeMillis - - val numVertices = g.vertices.count() - - val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) => - (vid.toString, new PRVertex(1.0, neighbors.map(_.toString))) - } - - // Do the computation - val epsilon = 0.01 / numVertices - val messages = sc.parallelize(Array[(String, PRMessage)]()) - val utils = new PageRankUtils - val result = - Bagel.run( - sc, vertices, messages, combiner = new PRCombiner(), - numPartitions = numVPart)( - utils.computeWithCombiner(numVertices, epsilon, numIter)) - - println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) ) - if (!outFname.isEmpty) { - println("Saving pageranks of pages to " + outFname) - result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname) - } - println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") - sc.stop() - } -} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 7598060cb9..7dc6c58401 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -42,16 +42,16 @@ object SparkBuild extends Build { lazy val core = Project("core", file("core"), settings = coreSettings) lazy val repl = Project("repl", file("repl"), settings = replSettings) - .dependsOn(core, bagel, mllib) + .dependsOn(core, graph, bagel, mllib) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) - .dependsOn(core, mllib, bagel, streaming) + .dependsOn(core, mllib, graph, bagel, streaming) lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming) lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core) - lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn(core) + lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn(core) lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn(core) @@ -60,7 +60,7 @@ object SparkBuild extends Build { lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core) lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) - .dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) + .dependsOn(core, graph, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) // A configuration to set an alternative publishLocalConfiguration lazy val MavenCompile = config("m2r") extend(Compile) @@ -77,7 +77,7 @@ object SparkBuild extends Build { lazy val maybeYarn = if(isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]() lazy val maybeYarnRef = if(isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]() lazy val allProjects = Seq[ProjectReference]( - core, repl, examples, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef + core, repl, examples, graph, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.apache.spark", @@ -254,6 +254,10 @@ object SparkBuild extends Build { name := "spark-tools" ) + def graphSettings = sharedSettings ++ Seq( + name := "spark-graphx" + ) + def bagelSettings = sharedSettings ++ Seq( name := "spark-bagel" ) @@ -265,8 +269,6 @@ object SparkBuild extends Build { ) ) - def graphSettings = sharedSettings ++ Seq(name := "spark-graph") - def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", resolvers ++= Seq( |