diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-06-26 18:22:12 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-06-26 18:22:12 -0700 |
commit | c4dd68ae21f63c85b8c6e6f77bfdf19e14835e65 (patch) | |
tree | 8f0b70868ef06644ef93ad16c3f87229dfcb61ca /examples/src/main | |
parent | b626562d5487908a65d78de71566ecd12d253df0 (diff) | |
parent | db7a2c4897aabf0b00a8df347dae1ad579e767f9 (diff) | |
download | spark-c4dd68ae21f63c85b8c6e6f77bfdf19e14835e65.tar.gz spark-c4dd68ae21f63c85b8c6e6f77bfdf19e14835e65.tar.bz2 spark-c4dd68ae21f63c85b8c6e6f77bfdf19e14835e65.zip |
Merge branch 'mos-bt'
This merge keeps only the broadcast work in mos-bt because the structure
of shuffle has changed with the new RDD design. We still need some kind
of parallel shuffle but that will be added later.
Conflicts:
core/src/main/scala/spark/BitTorrentBroadcast.scala
core/src/main/scala/spark/ChainedBroadcast.scala
core/src/main/scala/spark/RDD.scala
core/src/main/scala/spark/SparkContext.scala
core/src/main/scala/spark/Utils.scala
core/src/main/scala/spark/shuffle/BasicLocalFileShuffle.scala
core/src/main/scala/spark/shuffle/DfsShuffle.scala
Diffstat (limited to 'examples/src/main')
5 files changed, 161 insertions, 7 deletions
diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala index 2506de5ae5..f3a173b183 100644 --- a/examples/src/main/scala/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala @@ -5,9 +5,10 @@ import spark.SparkContext object BroadcastTest { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: BroadcastTest <host> [<slices>]") + System.err.println("Usage: BroadcastTest <host> [<slices>] [numElem]") System.exit(1) } + val spark = new SparkContext(args(0), "Broadcast Test") val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 @@ -16,14 +17,8 @@ object BroadcastTest { for (i <- 0 until arr1.length) arr1(i) = i -// var arr2 = new Array[Int](num * 2) -// for (i <- 0 until arr2.length) -// arr2(i) = i - val barr1 = spark.broadcast(arr1) -// val barr2 = spark.broadcast(arr2) spark.parallelize(1 to 10, slices).foreach { -// i => println(barr1.value.size + barr2.value.size) i => println(barr1.value.size) } } diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala new file mode 100644 index 0000000000..48c02a52c6 --- /dev/null +++ b/examples/src/main/scala/spark/examples/GroupByTest.scala @@ -0,0 +1,37 @@ +package spark.examples + +import spark.SparkContext +import spark.SparkContext._ +import java.util.Random + +object GroupByTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]") + System.exit(1) + } + + var numMappers = if (args.length > 1) args(1).toInt else 2 + var numKVPairs = if (args.length > 2) args(2).toInt else 1000 + var valSize = if (args.length > 3) args(3).toInt else 1000 + var numReducers = if (args.length > 4) args(4).toInt else numMappers + + val sc = new SparkContext(args(0), "GroupBy Test") + + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => + val ranGen = new Random + var arr1 = new Array[(Int, Array[Byte])](numKVPairs) + for (i <- 0 until numKVPairs) { + val byteArr = new Array[Byte](valSize) + ranGen.nextBytes(byteArr) + arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) + } + arr1 + }.cache + // Enforce that everything has been calculated and in cache + pairs1.count + + println(pairs1.groupByKey(numReducers).count) + } +} + diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala new file mode 100644 index 0000000000..c5c63ec4cb --- /dev/null +++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala @@ -0,0 +1,30 @@ +package spark.examples + +import spark.SparkContext + +object MultiBroadcastTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: BroadcastTest <host> [<slices>] [numElem]") + System.exit(1) + } + + val spark = new SparkContext(args(0), "Broadcast Test") + val slices = if (args.length > 1) args(1).toInt else 2 + val num = if (args.length > 2) args(2).toInt else 1000000 + + var arr1 = new Array[Int](num) + for (i <- 0 until arr1.length) + arr1(i) = i + + var arr2 = new Array[Int](num) + for (i <- 0 until arr2.length) + arr2(i) = i + + val barr1 = spark.broadcast(arr1) + val barr2 = spark.broadcast(arr2) + spark.parallelize(1 to 10, slices).foreach { + i => println(barr1.value.size + barr2.value.size) + } + } +} diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala new file mode 100644 index 0000000000..c8edb7d8b4 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala @@ -0,0 +1,51 @@ +package spark.examples + +import spark.SparkContext +import spark.SparkContext._ +import java.util.Random + +object SimpleSkewedGroupByTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SimpleSkewedGroupByTest <host> " + + "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]") + System.exit(1) + } + + var numMappers = if (args.length > 1) args(1).toInt else 2 + var numKVPairs = if (args.length > 2) args(2).toInt else 1000 + var valSize = if (args.length > 3) args(3).toInt else 1000 + var numReducers = if (args.length > 4) args(4).toInt else numMappers + var ratio = if (args.length > 5) args(5).toInt else 5.0 + + val sc = new SparkContext(args(0), "GroupBy Test") + + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => + val ranGen = new Random + var result = new Array[(Int, Array[Byte])](numKVPairs) + for (i <- 0 until numKVPairs) { + val byteArr = new Array[Byte](valSize) + ranGen.nextBytes(byteArr) + val offset = ranGen.nextInt(1000) * numReducers + if (ranGen.nextDouble < ratio / (numReducers + ratio - 1)) { + // give ratio times higher chance of generating key 0 (for reducer 0) + result(i) = (offset, byteArr) + } else { + // generate a key for one of the other reducers + val key = 1 + ranGen.nextInt(numReducers-1) + offset + result(i) = (key, byteArr) + } + } + result + }.cache + // Enforce that everything has been calculated and in cache + pairs1.count + + println("RESULT: " + pairs1.groupByKey(numReducers).count) + // Print how many keys each reducer got (for debugging) + //println("RESULT: " + pairs1.groupByKey(numReducers) + // .map{case (k,v) => (k, v.size)} + // .collectAsMap) + } +} + diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala new file mode 100644 index 0000000000..e6dec44bed --- /dev/null +++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala @@ -0,0 +1,41 @@ +package spark.examples + +import spark.SparkContext +import spark.SparkContext._ +import java.util.Random + +object SkewedGroupByTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]") + System.exit(1) + } + + var numMappers = if (args.length > 1) args(1).toInt else 2 + var numKVPairs = if (args.length > 2) args(2).toInt else 1000 + var valSize = if (args.length > 3) args(3).toInt else 1000 + var numReducers = if (args.length > 4) args(4).toInt else numMappers + + val sc = new SparkContext(args(0), "GroupBy Test") + + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => + val ranGen = new Random + + // map output sizes lineraly increase from the 1st to the last + numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt + + var arr1 = new Array[(Int, Array[Byte])](numKVPairs) + for (i <- 0 until numKVPairs) { + val byteArr = new Array[Byte](valSize) + ranGen.nextBytes(byteArr) + arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) + } + arr1 + }.cache + // Enforce that everything has been calculated and in cache + pairs1.count + + println(pairs1.groupByKey(numReducers).count) + } +} + |