aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-11-03 22:45:44 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-11-03 22:45:44 -0700
commit44530c310b33cb750922d836d35e56b3979b1382 (patch)
treeb1a73ec7e1e958862f1b9fbf175523da02ded26d
parent820dac5afebc4fd604c02ba74d0bef7d948287c5 (diff)
downloadspark-44530c310b33cb750922d836d35e56b3979b1382.tar.gz
spark-44530c310b33cb750922d836d35e56b3979b1382.tar.bz2
spark-44530c310b33cb750922d836d35e56b3979b1382.zip
Made DFS shuffle's "reduce tasks" fetch inputs in a random order so they
don't all hit the same nodes at the same time.
-rw-r--r--src/scala/spark/DfsShuffle.scala8
-rw-r--r--src/scala/spark/Utils.scala18
2 files changed, 21 insertions, 5 deletions
diff --git a/src/scala/spark/DfsShuffle.scala b/src/scala/spark/DfsShuffle.scala
index bc26afde33..a100ddf05b 100644
--- a/src/scala/spark/DfsShuffle.scala
+++ b/src/scala/spark/DfsShuffle.scala
@@ -80,8 +80,8 @@ extends Logging
}
val fs = DfsShuffle.getFileSystem()
val outputStreams = (0 until numOutputSplits).map(i => {
- val path = new Path(dir, "intermediate-%d-%d".format(myIndex, i))
- new ObjectOutputStream(fs.create(path, 1.toShort))
+ val path = new Path(dir, "%d-to-%d".format(myIndex, i))
+ new ObjectOutputStream(fs.create(path, true))
}).toArray
for ((k, c) <- combiners) {
val bucket = k.hashCode % numOutputSplits
@@ -96,8 +96,8 @@ extends Logging
override def default(key: K) = createCombiner()
}
val fs = DfsShuffle.getFileSystem()
- for (i <- 0 until numInputSplits) {
- val path = new Path(dir, "intermediate-%d-%d".format(i, myIndex))
+ for (i <- Utils.shuffle(0 until numInputSplits)) {
+ val path = new Path(dir, "%d-to-%d".format(i, myIndex))
val inputStream = new ObjectInputStream(fs.open(path))
try {
while (true) {
diff --git a/src/scala/spark/Utils.scala b/src/scala/spark/Utils.scala
index 9d300d229a..1b2fe50c0e 100644
--- a/src/scala/spark/Utils.scala
+++ b/src/scala/spark/Utils.scala
@@ -4,13 +4,14 @@ import java.io._
import java.util.UUID
import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
/**
* Various utility methods used by Spark.
*/
object Utils {
def serialize[T](o: T): Array[Byte] = {
- val bos = new ByteArrayOutputStream
+ val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(o)
oos.close
@@ -95,4 +96,19 @@ object Utils {
out.close()
}
}
+
+ // Shuffle the elements of a collection into a random order, returning the
+ // result in a new collection. Unlike scala.util.Random.shuffle, this method
+ // uses a local random number generator, avoiding inter-thread contention.
+ def shuffle[T](seq: Seq[T]): Seq[T] = {
+ val buf = ArrayBuffer(seq: _*)
+ val rand = new Random()
+ for (i <- (buf.size - 1) to 1 by -1) {
+ val j = rand.nextInt(i)
+ val tmp = buf(j)
+ buf(j) = buf(i)
+ buf(i) = tmp
+ }
+ buf
+ }
}