aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2011-04-27 20:47:07 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2011-04-27 20:47:07 -0700
commit9d78779257b156bec335af4ab2a66bb3cac30ca6 (patch)
tree738420e4f3653510e2803b6cb6764c261feeb8c8 /examples
parent4e4c41026c33d9f8fe75137f32266de75a0aa30e (diff)
parentac7e066383a6878beb0618597c2be6fa9eb1982e (diff)
downloadspark-9d78779257b156bec335af4ab2a66bb3cac30ca6.tar.gz
spark-9d78779257b156bec335af4ab2a66bb3cac30ca6.tar.bz2
spark-9d78779257b156bec335af4ab2a66bb3cac30ca6.zip
Merge branch 'mos-shuffle-tracked' into mos-bt
Conflicts: core/src/main/scala/spark/Broadcast.scala
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/examples/GroupByTest.scala37
-rw-r--r--examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala51
-rw-r--r--examples/src/main/scala/spark/examples/SkewedGroupByTest.scala41
3 files changed, 129 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala
new file mode 100644
index 0000000000..48c02a52c6
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/GroupByTest.scala
@@ -0,0 +1,37 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+import java.util.Random
+
+object GroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.exit(1)
+ }
+
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+
+ val sc = new SparkContext(args(0), "GroupBy Test")
+
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+ var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
+ }
+ arr1
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+
+ println(pairs1.groupByKey(numReducers).count)
+ }
+}
+
diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
new file mode 100644
index 0000000000..c8edb7d8b4
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
@@ -0,0 +1,51 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+import java.util.Random
+
+object SimpleSkewedGroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SimpleSkewedGroupByTest <host> " +
+ "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]")
+ System.exit(1)
+ }
+
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+ var ratio = if (args.length > 5) args(5).toInt else 5.0
+
+ val sc = new SparkContext(args(0), "GroupBy Test")
+
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+ var result = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ val offset = ranGen.nextInt(1000) * numReducers
+ if (ranGen.nextDouble < ratio / (numReducers + ratio - 1)) {
+ // give ratio times higher chance of generating key 0 (for reducer 0)
+ result(i) = (offset, byteArr)
+ } else {
+ // generate a key for one of the other reducers
+ val key = 1 + ranGen.nextInt(numReducers-1) + offset
+ result(i) = (key, byteArr)
+ }
+ }
+ result
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+
+ println("RESULT: " + pairs1.groupByKey(numReducers).count)
+ // Print how many keys each reducer got (for debugging)
+ //println("RESULT: " + pairs1.groupByKey(numReducers)
+ // .map{case (k,v) => (k, v.size)}
+ // .collectAsMap)
+ }
+}
+
diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
new file mode 100644
index 0000000000..e6dec44bed
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
@@ -0,0 +1,41 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+import java.util.Random
+
+object SkewedGroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.exit(1)
+ }
+
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+
+ val sc = new SparkContext(args(0), "GroupBy Test")
+
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+
+ // map output sizes lineraly increase from the 1st to the last
+ numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt
+
+ var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
+ }
+ arr1
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+
+ println(pairs1.groupByKey(numReducers).count)
+ }
+}
+