diff options
author | Josh Rosen <rosenville@gmail.com> | 2012-07-24 09:43:02 -0700 |
---|---|---|
committer | Josh Rosen <rosenville@gmail.com> | 2012-07-24 09:47:00 -0700 |
commit | 6a78e88237cd2d2947cf5461e9da8359a16a0228 (patch) | |
tree | 079c8f49adbacb80babe2abc50dc293a23af5593 /examples/src | |
parent | 042dcbde33c878d165b777964f5abb646257ab94 (diff) | |
download | spark-6a78e88237cd2d2947cf5461e9da8359a16a0228.tar.gz spark-6a78e88237cd2d2947cf5461e9da8359a16a0228.tar.bz2 spark-6a78e88237cd2d2947cf5461e9da8359a16a0228.zip |
Minor cleanup and optimizations in Java API.
- Add override keywords.
- Cache RDDs and counts in TC example.
- Clean up JavaRDDLike's abstract methods.
Diffstat (limited to 'examples/src')
-rw-r--r-- | examples/src/main/java/spark/examples/JavaTC.java | 13 | ||||
-rw-r--r-- | examples/src/main/scala/spark/examples/SparkTC.scala | 10 |
2 files changed, 13 insertions, 10 deletions
diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java index d76bcbbe85..e6ca69ff97 100644 --- a/examples/src/main/java/spark/examples/JavaTC.java +++ b/examples/src/main/java/spark/examples/JavaTC.java @@ -45,14 +45,14 @@ public class JavaTC { JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC"); Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2; - JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices); + JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache(); // Linear transitive closure: each round grows paths by one edge, // by joining the graph's edges with the already-discovered paths. // e.g. join the path (y, z) from the TC with the edge (x, y) from // the graph to obtain the path (x, z). - // Because join() joins on keys, the edges are stored in reversed order. + // Because join() joins on keys, the edges are stored in reversed order. JavaPairRDD<Integer, Integer> edges = tc.map(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { @Override @@ -62,13 +62,14 @@ public class JavaTC { }); long oldCount = 0; + long nextCount = tc.count(); do { - oldCount = tc.count(); + oldCount = nextCount; // Perform the join, obtaining an RDD of (y, (z, x)) pairs, // then project the result to obtain the new (x, z) paths. - - tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct(); - } while (tc.count() != oldCount); + tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct().cache(); + nextCount = tc.count(); + } while (nextCount != oldCount); System.out.println("TC has " + tc.count() + " edges."); System.exit(0); diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala index fa945b5082..a095476a23 100644 --- a/examples/src/main/scala/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/spark/examples/SparkTC.scala @@ -28,7 +28,7 @@ object SparkTC { } val spark = new SparkContext(args(0), "SparkTC") val slices = if (args.length > 1) args(1).toInt else 2 - var tc = spark.parallelize(generateGraph, slices) + var tc = spark.parallelize(generateGraph, slices).cache() // Linear transitive closure: each round grows paths by one edge, // by joining the graph's edges with the already-discovered paths. @@ -40,12 +40,14 @@ object SparkTC { // This join is iterated until a fixed point is reached. var oldCount = 0L + var nextCount = tc.count() do { - oldCount = tc.count() + oldCount = nextCount // Perform the join, obtaining an RDD of (y, (z, x)) pairs, // then project the result to obtain the new (x, z) paths. - tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct() - } while (tc.count() != oldCount) + tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache(); + nextCount = tc.count() + } while (nextCount != oldCount) println("TC has " + tc.count() + " edges.") System.exit(0) |