aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-06-26 18:22:12 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-06-26 18:22:12 -0700
commitc4dd68ae21f63c85b8c6e6f77bfdf19e14835e65 (patch)
tree8f0b70868ef06644ef93ad16c3f87229dfcb61ca /examples
parentb626562d5487908a65d78de71566ecd12d253df0 (diff)
parentdb7a2c4897aabf0b00a8df347dae1ad579e767f9 (diff)
downloadspark-c4dd68ae21f63c85b8c6e6f77bfdf19e14835e65.tar.gz
spark-c4dd68ae21f63c85b8c6e6f77bfdf19e14835e65.tar.bz2
spark-c4dd68ae21f63c85b8c6e6f77bfdf19e14835e65.zip
Merge branch 'mos-bt'
This merge keeps only the broadcast work in mos-bt because the structure of shuffle has changed with the new RDD design. We still need some kind of parallel shuffle but that will be added later. Conflicts: core/src/main/scala/spark/BitTorrentBroadcast.scala core/src/main/scala/spark/ChainedBroadcast.scala core/src/main/scala/spark/RDD.scala core/src/main/scala/spark/SparkContext.scala core/src/main/scala/spark/Utils.scala core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala core/src/main/scala/spark/shuffle/DfsShuffle.scala
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/examples/BroadcastTest.scala9
-rw-r--r--examples/src/main/scala/spark/examples/GroupByTest.scala37
-rw-r--r--examples/src/main/scala/spark/examples/MultiBroadcastTest.scala30
-rw-r--r--examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala51
-rw-r--r--examples/src/main/scala/spark/examples/SkewedGroupByTest.scala41
5 files changed, 161 insertions, 7 deletions
diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala
index 2506de5ae5..f3a173b183 100644
--- a/examples/src/main/scala/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala
@@ -5,9 +5,10 @@ import spark.SparkContext
object BroadcastTest {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: BroadcastTest <host> [<slices>]")
+ System.err.println("Usage: BroadcastTest <host> [<slices>] [numElem]")
System.exit(1)
}
+
val spark = new SparkContext(args(0), "Broadcast Test")
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
@@ -16,14 +17,8 @@ object BroadcastTest {
for (i <- 0 until arr1.length)
arr1(i) = i
-// var arr2 = new Array[Int](num * 2)
-// for (i <- 0 until arr2.length)
-// arr2(i) = i
-
val barr1 = spark.broadcast(arr1)
-// val barr2 = spark.broadcast(arr2)
spark.parallelize(1 to 10, slices).foreach {
-// i => println(barr1.value.size + barr2.value.size)
i => println(barr1.value.size)
}
}
diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala
new file mode 100644
index 0000000000..48c02a52c6
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/GroupByTest.scala
@@ -0,0 +1,37 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+import java.util.Random
+
+object GroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.exit(1)
+ }
+
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+
+ val sc = new SparkContext(args(0), "GroupBy Test")
+
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+ var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
+ }
+ arr1
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+
+ println(pairs1.groupByKey(numReducers).count)
+ }
+}
+
diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
new file mode 100644
index 0000000000..c5c63ec4cb
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
@@ -0,0 +1,30 @@
+package spark.examples
+
+import spark.SparkContext
+
+object MultiBroadcastTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: BroadcastTest <host> [<slices>] [numElem]")
+ System.exit(1)
+ }
+
+ val spark = new SparkContext(args(0), "Broadcast Test")
+ val slices = if (args.length > 1) args(1).toInt else 2
+ val num = if (args.length > 2) args(2).toInt else 1000000
+
+ var arr1 = new Array[Int](num)
+ for (i <- 0 until arr1.length)
+ arr1(i) = i
+
+ var arr2 = new Array[Int](num)
+ for (i <- 0 until arr2.length)
+ arr2(i) = i
+
+ val barr1 = spark.broadcast(arr1)
+ val barr2 = spark.broadcast(arr2)
+ spark.parallelize(1 to 10, slices).foreach {
+ i => println(barr1.value.size + barr2.value.size)
+ }
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
new file mode 100644
index 0000000000..c8edb7d8b4
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
@@ -0,0 +1,51 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+import java.util.Random
+
+object SimpleSkewedGroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SimpleSkewedGroupByTest <host> " +
+ "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]")
+ System.exit(1)
+ }
+
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+ var ratio = if (args.length > 5) args(5).toInt else 5.0
+
+ val sc = new SparkContext(args(0), "GroupBy Test")
+
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+ var result = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ val offset = ranGen.nextInt(1000) * numReducers
+ if (ranGen.nextDouble < ratio / (numReducers + ratio - 1)) {
+ // give ratio times higher chance of generating key 0 (for reducer 0)
+ result(i) = (offset, byteArr)
+ } else {
+ // generate a key for one of the other reducers
+ val key = 1 + ranGen.nextInt(numReducers-1) + offset
+ result(i) = (key, byteArr)
+ }
+ }
+ result
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+
+ println("RESULT: " + pairs1.groupByKey(numReducers).count)
+ // Print how many keys each reducer got (for debugging)
+ //println("RESULT: " + pairs1.groupByKey(numReducers)
+ // .map{case (k,v) => (k, v.size)}
+ // .collectAsMap)
+ }
+}
+
diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
new file mode 100644
index 0000000000..e6dec44bed
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
@@ -0,0 +1,41 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+import java.util.Random
+
+object SkewedGroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.exit(1)
+ }
+
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+
+ val sc = new SparkContext(args(0), "GroupBy Test")
+
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+
+ // map output sizes lineraly increase from the 1st to the last
+ numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt
+
+ var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
+ }
+ arr1
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+
+ println(pairs1.groupByKey(numReducers).count)
+ }
+}
+