aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
authorJosh Rosen <rosenville@gmail.com>2012-07-18 17:32:31 -0700
committerJosh Rosen <rosenville@gmail.com>2012-07-18 17:34:29 -0700
commit01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120 (patch)
tree89882e8acda5c8d3cbfecc9bd2aa892ebc496b10 /examples/src/main/scala
parent628bb5ca7f563ab7f11c373572145df403de6fef (diff)
downloadspark-01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120.tar.gz
spark-01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120.tar.bz2
spark-01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120.zip
Add Java API
Add distinct() method to RDD. Fix bug in DoubleRDDFunctions.
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/spark/examples/SparkTC.scala53
1 files changed, 53 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala
new file mode 100644
index 0000000000..fa945b5082
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkTC.scala
@@ -0,0 +1,53 @@
+package spark.examples
+
+import spark._
+import SparkContext._
+import scala.util.Random
+import scala.collection.mutable
+
+object SparkTC {
+
+ val numEdges = 200
+ val numVertices = 100
+ val rand = new Random(42)
+
+ def generateGraph = {
+ val edges: mutable.Set[(Int, Int)] = mutable.Set.empty
+ while (edges.size < numEdges) {
+ val from = rand.nextInt(numVertices)
+ val to = rand.nextInt(numVertices)
+ if (from != to) edges.+=((from, to))
+ }
+ edges.toSeq
+ }
+
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkTC <host> [<slices>]")
+ System.exit(1)
+ }
+ val spark = new SparkContext(args(0), "SparkTC")
+ val slices = if (args.length > 1) args(1).toInt else 2
+ var tc = spark.parallelize(generateGraph, slices)
+
+ // Linear transitive closure: each round grows paths by one edge,
+ // by joining the graph's edges with the already-discovered paths.
+ // e.g. join the path (y, z) from the TC with the edge (x, y) from
+ // the graph to obtain the path (x, z).
+
+ // Because join() joins on keys, the edges are stored in reversed order.
+ val edges = tc.map(x => (x._2, x._1))
+
+ // This join is iterated until a fixed point is reached.
+ var oldCount = 0L
+ do {
+ oldCount = tc.count()
+ // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
+ // then project the result to obtain the new (x, z) paths.
+ tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct()
+ } while (tc.count() != oldCount)
+
+ println("TC has " + tc.count() + " edges.")
+ System.exit(0)
+ }
+}