aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-09-17 22:42:12 -0700
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-09-17 22:42:12 -0700
commit55696e258456798d73325655428899c5b4931730 (patch)
tree498d74f2235fab6972b6ed4329b584372bbb6bf3
parent8b59fb72c45a64b6b49d79080eaff0f675197086 (diff)
downloadspark-55696e258456798d73325655428899c5b4931730.tar.gz
spark-55696e258456798d73325655428899c5b4931730.tar.bz2
spark-55696e258456798d73325655428899c5b4931730.zip
GraphX now builds with all merged changes.
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Analytics.scala (renamed from graph/src/main/scala/spark/graph/Analytics.scala)4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Edge.scala (renamed from graph/src/main/scala/spark/graph/Edge.scala)2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala (renamed from graph/src/main/scala/spark/graph/EdgeDirection.scala)2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala (renamed from graph/src/main/scala/spark/graph/EdgeTriplet.scala)2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala (renamed from graph/src/main/scala/spark/graph/Graph.scala)9
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala (renamed from graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala)4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLab.scala (renamed from graph/src/main/scala/spark/graph/GraphLab.scala)4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala (renamed from graph/src/main/scala/spark/graph/GraphLoader.scala)10
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphOps.scala (renamed from graph/src/main/scala/spark/graph/GraphOps.scala)4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Pregel.scala (renamed from graph/src/main/scala/spark/graph/Pregel.scala)4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Vertex.scala (renamed from graph/src/main/scala/spark/graph/Vertex.scala)2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala (renamed from graph/src/main/scala/spark/graph/impl/EdgePartition.scala)4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala (renamed from graph/src/main/scala/spark/graph/impl/EdgeTripletRDD.scala)18
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala (renamed from graph/src/main/scala/spark/graph/impl/GraphImpl.scala)14
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/package.scala (renamed from graph/src/main/scala/spark/graph/package.scala)2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala76
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala75
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala (renamed from graph/src/main/scala/spark/graph/util/BytecodeUtils.scala)5
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala (renamed from graph/src/main/scala/spark/graph/util/HashUtils.scala)2
-rw-r--r--graph/src/main/scala/spark/graph/perf/BagelTest.scala72
-rw-r--r--graph/src/main/scala/spark/graph/perf/SparkTest.scala72
-rw-r--r--project/SparkBuild.scala16
22 files changed, 211 insertions, 192 deletions
diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index 601a0785e1..09cf81eeeb 100644
--- a/graph/src/main/scala/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -1,6 +1,6 @@
-package spark.graph
+package org.apache.spark.graph
-import spark._
+import org.apache.spark._
diff --git a/graph/src/main/scala/spark/graph/Edge.scala b/graph/src/main/scala/org/apache/spark/graph/Edge.scala
index cb057a467a..20539b8af0 100644
--- a/graph/src/main/scala/spark/graph/Edge.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Edge.scala
@@ -1,4 +1,4 @@
-package spark.graph
+package org.apache.spark.graph
/**
diff --git a/graph/src/main/scala/spark/graph/EdgeDirection.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala
index 38caac44d6..99af2d5458 100644
--- a/graph/src/main/scala/spark/graph/EdgeDirection.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala
@@ -1,4 +1,4 @@
-package spark.graph
+package org.apache.spark.graph
/**
diff --git a/graph/src/main/scala/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
index 3ed8052794..4ade1d7333 100644
--- a/graph/src/main/scala/spark/graph/EdgeTriplet.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
@@ -1,4 +1,4 @@
-package spark.graph
+package org.apache.spark.graph
/**
* An edge triplet represents two vertices and edge along with their attributes.
diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index 594b3b5495..1fb22c56ff 100644
--- a/graph/src/main/scala/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -1,6 +1,7 @@
-package spark.graph
+package org.apache.spark.graph
-import spark.RDD
+
+import org.apache.spark.rdd.RDD
@@ -366,8 +367,8 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
object Graph {
- import spark.graph.impl._
- import spark.SparkContext._
+ import org.apache.spark.graph.impl._
+ import org.apache.spark.SparkContext._
def apply(rawEdges: RDD[(Vid, Vid)], uniqueEdges: Boolean = true): Graph[Int, Int] = {
// Reduce to unique edges.
diff --git a/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
index e1cb77f114..13a22f9051 100644
--- a/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
@@ -1,8 +1,8 @@
-package spark.graph
+package org.apache.spark.graph
import com.esotericsoftware.kryo.Kryo
-import spark.KryoRegistrator
+import org.apache.spark.serializer.KryoRegistrator
class GraphKryoRegistrator extends KryoRegistrator {
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
index f89c2a39d7..1dba813e91 100644
--- a/graph/src/main/scala/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
@@ -1,7 +1,7 @@
-package spark.graph
+package org.apache.spark.graph
import scala.collection.JavaConversions._
-import spark.RDD
+import org.apache.spark.rdd.RDD
/**
* This object implement the graphlab gather-apply-scatter api.
diff --git a/graph/src/main/scala/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
index 7e1a054413..4d7ca1268d 100644
--- a/graph/src/main/scala/spark/graph/GraphLoader.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
@@ -1,9 +1,9 @@
-package spark.graph
+package org.apache.spark.graph
-import spark.RDD
-import spark.SparkContext
-import spark.SparkContext._
-import spark.graph.impl.GraphImpl
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graph.impl.GraphImpl
object GraphLoader {
diff --git a/graph/src/main/scala/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
index d98cd8d44c..8de96680b8 100644
--- a/graph/src/main/scala/spark/graph/GraphOps.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
@@ -1,6 +1,6 @@
-package spark.graph
+package org.apache.spark.graph
-import spark.RDD
+import org.apache.spark.rdd.RDD
class GraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) {
diff --git a/graph/src/main/scala/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 0a564b8041..27b75a7988 100644
--- a/graph/src/main/scala/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -1,6 +1,6 @@
-package spark.graph
+package org.apache.spark.graph
-import spark.RDD
+import org.apache.spark.rdd.RDD
object Pregel {
diff --git a/graph/src/main/scala/spark/graph/Vertex.scala b/graph/src/main/scala/org/apache/spark/graph/Vertex.scala
index 32653571f7..c8671b7f13 100644
--- a/graph/src/main/scala/spark/graph/Vertex.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Vertex.scala
@@ -1,4 +1,4 @@
-package spark.graph
+package org.apache.spark.graph
/**
* A graph vertex consists of a vertex id and attribute.
diff --git a/graph/src/main/scala/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
index 4e0d5f41b9..3d218f27b1 100644
--- a/graph/src/main/scala/spark/graph/impl/EdgePartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
@@ -1,10 +1,10 @@
-package spark.graph.impl
+package org.apache.spark.graph.impl
import scala.collection.mutable.ArrayBuilder
import it.unimi.dsi.fastutil.ints.IntArrayList
-import spark.graph._
+import org.apache.spark.graph._
/**
diff --git a/graph/src/main/scala/spark/graph/impl/EdgeTripletRDD.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala
index f6de8e59af..18d5d2b5aa 100644
--- a/graph/src/main/scala/spark/graph/impl/EdgeTripletRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletRDD.scala
@@ -1,9 +1,15 @@
-package spark.graph.impl
-
-import spark.{Aggregator, HashPartitioner, Partition, RDD, SparkEnv, TaskContext}
-import spark.{Dependency, OneToOneDependency, ShuffleDependency}
-import spark.SparkContext._
-import spark.graph._
+package org.apache.spark.graph.impl
+
+import org.apache.spark.Aggregator
+import org.apache.spark.Partition
+import org.apache.spark.SparkEnv
+import org.apache.spark.TaskContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.Dependency
+import org.apache.spark.OneToOneDependency
+import org.apache.spark.ShuffleDependency
+import org.apache.spark.SparkContext._
+import org.apache.spark.graph._
private[graph]
diff --git a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 08fc016a43..68ac9f724c 100644
--- a/graph/src/main/scala/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -1,12 +1,16 @@
-package spark.graph.impl
+package org.apache.spark.graph.impl
import scala.collection.JavaConversions._
-import spark.{ClosureCleaner, Partitioner, HashPartitioner, RDD}
-import spark.SparkContext._
+import org.apache.spark.SparkContext._
+import org.apache.spark.Partitioner
+import org.apache.spark.HashPartitioner
+import org.apache.spark.util.ClosureCleaner
-import spark.graph._
-import spark.graph.impl.GraphImpl._
+import org.apache.spark.rdd.RDD
+
+import org.apache.spark.graph._
+import org.apache.spark.graph.impl.GraphImpl._
diff --git a/graph/src/main/scala/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala
index d95dcdce08..474ace520f 100644
--- a/graph/src/main/scala/spark/graph/package.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/package.scala
@@ -1,4 +1,4 @@
-package spark
+package org.apache.spark
package object graph {
diff --git a/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala b/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala
new file mode 100644
index 0000000000..eaff27a33e
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala
@@ -0,0 +1,76 @@
+///// This file creates circular dependencies between examples bagle and graph
+
+// package org.apache.spark.graph.perf
+
+// import org.apache.spark._
+// import org.apache.spark.SparkContext._
+// import org.apache.spark.bagel.Bagel
+
+// import org.apache.spark.examples.bagel
+// //import org.apache.spark.bagel.examples._
+// import org.apache.spark.graph._
+
+
+// object BagelTest {
+
+// def main(args: Array[String]) {
+// val host = args(0)
+// val taskType = args(1)
+// val fname = args(2)
+// val options = args.drop(3).map { arg =>
+// arg.dropWhile(_ == '-').split('=') match {
+// case Array(opt, v) => (opt -> v)
+// case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+// }
+// }
+
+// System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+// //System.setProperty("spark.shuffle.compress", "false")
+// System.setProperty("spark.kryo.registrator", "org.apache.spark.bagel.examples.PRKryoRegistrator")
+
+// var numIter = Int.MaxValue
+// var isDynamic = false
+// var tol:Float = 0.001F
+// var outFname = ""
+// var numVPart = 4
+// var numEPart = 4
+
+// options.foreach{
+// case ("numIter", v) => numIter = v.toInt
+// case ("dynamic", v) => isDynamic = v.toBoolean
+// case ("tol", v) => tol = v.toFloat
+// case ("output", v) => outFname = v
+// case ("numVPart", v) => numVPart = v.toInt
+// case ("numEPart", v) => numEPart = v.toInt
+// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+// }
+
+// val sc = new SparkContext(host, "PageRank(" + fname + ")")
+// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
+// val startTime = System.currentTimeMillis
+
+// val numVertices = g.vertices.count()
+
+// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) =>
+// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString)))
+// }
+
+// // Do the computation
+// val epsilon = 0.01 / numVertices
+// val messages = sc.parallelize(Array[(String, PRMessage)]())
+// val utils = new PageRankUtils
+// val result =
+// Bagel.run(
+// sc, vertices, messages, combiner = new PRCombiner(),
+// numPartitions = numVPart)(
+// utils.computeWithCombiner(numVertices, epsilon, numIter))
+
+// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) )
+// if (!outFname.isEmpty) {
+// println("Saving pageranks of pages to " + outFname)
+// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname)
+// }
+// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+// sc.stop()
+// }
+// }
diff --git a/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala b/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala
new file mode 100644
index 0000000000..01bd968550
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala
@@ -0,0 +1,75 @@
+///// This file creates circular dependencies between examples bagle and graph
+
+
+// package org.apache.spark.graph.perf
+
+// import org.apache.spark._
+// import org.apache.spark.SparkContext._
+// import org.apache.spark.bagel.Bagel
+// import org.apache.spark.bagel.examples._
+// import org.apache.spark.graph._
+
+
+// object SparkTest {
+
+// def main(args: Array[String]) {
+// val host = args(0)
+// val taskType = args(1)
+// val fname = args(2)
+// val options = args.drop(3).map { arg =>
+// arg.dropWhile(_ == '-').split('=') match {
+// case Array(opt, v) => (opt -> v)
+// case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+// }
+// }
+
+// System.setProperty("spark.serializer", "org.apache.spark.KryoSerializer")
+// //System.setProperty("spark.shuffle.compress", "false")
+// System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator")
+
+// var numIter = Int.MaxValue
+// var isDynamic = false
+// var tol:Float = 0.001F
+// var outFname = ""
+// var numVPart = 4
+// var numEPart = 4
+
+// options.foreach{
+// case ("numIter", v) => numIter = v.toInt
+// case ("dynamic", v) => isDynamic = v.toBoolean
+// case ("tol", v) => tol = v.toFloat
+// case ("output", v) => outFname = v
+// case ("numVPart", v) => numVPart = v.toInt
+// case ("numEPart", v) => numEPart = v.toInt
+// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+// }
+
+// val sc = new SparkContext(host, "PageRank(" + fname + ")")
+// val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
+// val startTime = System.currentTimeMillis
+
+// val numVertices = g.vertices.count()
+
+// val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) =>
+// (vid.toString, new PRVertex(1.0, neighbors.map(_.toString)))
+// }
+
+// // Do the computation
+// val epsilon = 0.01 / numVertices
+// val messages = sc.parallelize(Array[(String, PRMessage)]())
+// val utils = new PageRankUtils
+// val result =
+// Bagel.run(
+// sc, vertices, messages, combiner = new PRCombiner(),
+// numPartitions = numVPart)(
+// utils.computeWithCombiner(numVertices, epsilon, numIter))
+
+// println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) )
+// if (!outFname.isEmpty) {
+// println("Saving pageranks of pages to " + outFname)
+// result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname)
+// }
+// println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+// sc.stop()
+// }
+// }
diff --git a/graph/src/main/scala/spark/graph/util/BytecodeUtils.scala b/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala
index ac3a1fb957..5db13fe3bc 100644
--- a/graph/src/main/scala/spark/graph/util/BytecodeUtils.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala
@@ -1,4 +1,4 @@
-package spark.graph.util
+package org.apache.spark.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
@@ -7,10 +7,9 @@ import scala.collection.mutable.HashSet
import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor}
import org.objectweb.asm.Opcodes._
-import spark.Utils
-private[graph] object BytecodeUtils {
+private[spark] object BytecodeUtils {
/**
* Test whether the given closure invokes the specified method in the specified class.
diff --git a/graph/src/main/scala/spark/graph/util/HashUtils.scala b/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala
index 0dfaef4c48..cb18ef3d26 100644
--- a/graph/src/main/scala/spark/graph/util/HashUtils.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala
@@ -1,4 +1,4 @@
-package spark.graph.util
+package org.apache.spark.graph.util
object HashUtils {
diff --git a/graph/src/main/scala/spark/graph/perf/BagelTest.scala b/graph/src/main/scala/spark/graph/perf/BagelTest.scala
deleted file mode 100644
index 7547292500..0000000000
--- a/graph/src/main/scala/spark/graph/perf/BagelTest.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-package spark.graph.perf
-
-import spark._
-import spark.SparkContext._
-import spark.bagel.Bagel
-import spark.bagel.examples._
-import spark.graph._
-
-
-object BagelTest {
-
- def main(args: Array[String]) {
- val host = args(0)
- val taskType = args(1)
- val fname = args(2)
- val options = args.drop(3).map { arg =>
- arg.dropWhile(_ == '-').split('=') match {
- case Array(opt, v) => (opt -> v)
- case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
- }
- }
-
- System.setProperty("spark.serializer", "spark.KryoSerializer")
- //System.setProperty("spark.shuffle.compress", "false")
- System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator")
-
- var numIter = Int.MaxValue
- var isDynamic = false
- var tol:Float = 0.001F
- var outFname = ""
- var numVPart = 4
- var numEPart = 4
-
- options.foreach{
- case ("numIter", v) => numIter = v.toInt
- case ("dynamic", v) => isDynamic = v.toBoolean
- case ("tol", v) => tol = v.toFloat
- case ("output", v) => outFname = v
- case ("numVPart", v) => numVPart = v.toInt
- case ("numEPart", v) => numEPart = v.toInt
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
-
- val sc = new SparkContext(host, "PageRank(" + fname + ")")
- val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
- val startTime = System.currentTimeMillis
-
- val numVertices = g.vertices.count()
-
- val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) =>
- (vid.toString, new PRVertex(1.0, neighbors.map(_.toString)))
- }
-
- // Do the computation
- val epsilon = 0.01 / numVertices
- val messages = sc.parallelize(Array[(String, PRMessage)]())
- val utils = new PageRankUtils
- val result =
- Bagel.run(
- sc, vertices, messages, combiner = new PRCombiner(),
- numPartitions = numVPart)(
- utils.computeWithCombiner(numVertices, epsilon, numIter))
-
- println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) )
- if (!outFname.isEmpty) {
- println("Saving pageranks of pages to " + outFname)
- result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname)
- }
- println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
- sc.stop()
- }
-}
diff --git a/graph/src/main/scala/spark/graph/perf/SparkTest.scala b/graph/src/main/scala/spark/graph/perf/SparkTest.scala
deleted file mode 100644
index 85ebd14bcb..0000000000
--- a/graph/src/main/scala/spark/graph/perf/SparkTest.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-package spark.graph.perf
-
-import spark._
-import spark.SparkContext._
-import spark.bagel.Bagel
-import spark.bagel.examples._
-import spark.graph._
-
-
-object SparkTest {
-
- def main(args: Array[String]) {
- val host = args(0)
- val taskType = args(1)
- val fname = args(2)
- val options = args.drop(3).map { arg =>
- arg.dropWhile(_ == '-').split('=') match {
- case Array(opt, v) => (opt -> v)
- case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
- }
- }
-
- System.setProperty("spark.serializer", "spark.KryoSerializer")
- //System.setProperty("spark.shuffle.compress", "false")
- System.setProperty("spark.kryo.registrator", "spark.bagel.examples.PRKryoRegistrator")
-
- var numIter = Int.MaxValue
- var isDynamic = false
- var tol:Float = 0.001F
- var outFname = ""
- var numVPart = 4
- var numEPart = 4
-
- options.foreach{
- case ("numIter", v) => numIter = v.toInt
- case ("dynamic", v) => isDynamic = v.toBoolean
- case ("tol", v) => tol = v.toFloat
- case ("output", v) => outFname = v
- case ("numVPart", v) => numVPart = v.toInt
- case ("numEPart", v) => numEPart = v.toInt
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
-
- val sc = new SparkContext(host, "PageRank(" + fname + ")")
- val g = GraphLoader.textFile(sc, fname, a => 1.0F).withPartitioner(numVPart, numEPart).cache()
- val startTime = System.currentTimeMillis
-
- val numVertices = g.vertices.count()
-
- val vertices = g.collectNeighborIds(EdgeDirection.Out).map { case (vid, neighbors) =>
- (vid.toString, new PRVertex(1.0, neighbors.map(_.toString)))
- }
-
- // Do the computation
- val epsilon = 0.01 / numVertices
- val messages = sc.parallelize(Array[(String, PRMessage)]())
- val utils = new PageRankUtils
- val result =
- Bagel.run(
- sc, vertices, messages, combiner = new PRCombiner(),
- numPartitions = numVPart)(
- utils.computeWithCombiner(numVertices, epsilon, numIter))
-
- println("Total rank: " + result.map{ case (id, r) => r.value }.reduce(_+_) )
- if (!outFname.isEmpty) {
- println("Saving pageranks of pages to " + outFname)
- result.map{ case (id, r) => id + "\t" + r.value }.saveAsTextFile(outFname)
- }
- println("Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
- sc.stop()
- }
-}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 7598060cb9..7dc6c58401 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -42,16 +42,16 @@ object SparkBuild extends Build {
lazy val core = Project("core", file("core"), settings = coreSettings)
lazy val repl = Project("repl", file("repl"), settings = replSettings)
- .dependsOn(core, bagel, mllib)
+ .dependsOn(core, graph, bagel, mllib)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
- .dependsOn(core, mllib, bagel, streaming)
+ .dependsOn(core, mllib, graph, bagel, streaming)
lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core)
- lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn(core)
+ lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn(core)
lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn(core)
@@ -60,7 +60,7 @@ object SparkBuild extends Build {
lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core)
lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings)
- .dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
+ .dependsOn(core, graph, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
// A configuration to set an alternative publishLocalConfiguration
lazy val MavenCompile = config("m2r") extend(Compile)
@@ -77,7 +77,7 @@ object SparkBuild extends Build {
lazy val maybeYarn = if(isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if(isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
lazy val allProjects = Seq[ProjectReference](
- core, repl, examples, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef
+ core, repl, examples, graph, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.apache.spark",
@@ -254,6 +254,10 @@ object SparkBuild extends Build {
name := "spark-tools"
)
+ def graphSettings = sharedSettings ++ Seq(
+ name := "spark-graphx"
+ )
+
def bagelSettings = sharedSettings ++ Seq(
name := "spark-bagel"
)
@@ -265,8 +269,6 @@ object SparkBuild extends Build {
)
)
- def graphSettings = sharedSettings ++ Seq(name := "spark-graph")
-
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
resolvers ++= Seq(