diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-11-03 22:45:44 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-11-03 22:45:44 -0700 |
commit | 44530c310b33cb750922d836d35e56b3979b1382 (patch) | |
tree | b1a73ec7e1e958862f1b9fbf175523da02ded26d | |
parent | 820dac5afebc4fd604c02ba74d0bef7d948287c5 (diff) | |
download | spark-44530c310b33cb750922d836d35e56b3979b1382.tar.gz spark-44530c310b33cb750922d836d35e56b3979b1382.tar.bz2 spark-44530c310b33cb750922d836d35e56b3979b1382.zip |
Made DFS shuffle's "reduce tasks" fetch inputs in a random order so they
don't all hit the same nodes at the same time.
-rw-r--r-- | src/scala/spark/DfsShuffle.scala | 8 | ||||
-rw-r--r-- | src/scala/spark/Utils.scala | 18 |
2 files changed, 21 insertions, 5 deletions
diff --git a/src/scala/spark/DfsShuffle.scala b/src/scala/spark/DfsShuffle.scala index bc26afde33..a100ddf05b 100644 --- a/src/scala/spark/DfsShuffle.scala +++ b/src/scala/spark/DfsShuffle.scala @@ -80,8 +80,8 @@ extends Logging } val fs = DfsShuffle.getFileSystem() val outputStreams = (0 until numOutputSplits).map(i => { - val path = new Path(dir, "intermediate-%d-%d".format(myIndex, i)) - new ObjectOutputStream(fs.create(path, 1.toShort)) + val path = new Path(dir, "%d-to-%d".format(myIndex, i)) + new ObjectOutputStream(fs.create(path, true)) }).toArray for ((k, c) <- combiners) { val bucket = k.hashCode % numOutputSplits @@ -96,8 +96,8 @@ extends Logging override def default(key: K) = createCombiner() } val fs = DfsShuffle.getFileSystem() - for (i <- 0 until numInputSplits) { - val path = new Path(dir, "intermediate-%d-%d".format(i, myIndex)) + for (i <- Utils.shuffle(0 until numInputSplits)) { + val path = new Path(dir, "%d-to-%d".format(i, myIndex)) val inputStream = new ObjectInputStream(fs.open(path)) try { while (true) { diff --git a/src/scala/spark/Utils.scala b/src/scala/spark/Utils.scala index 9d300d229a..1b2fe50c0e 100644 --- a/src/scala/spark/Utils.scala +++ b/src/scala/spark/Utils.scala @@ -4,13 +4,14 @@ import java.io._ import java.util.UUID import scala.collection.mutable.ArrayBuffer +import scala.util.Random /** * Various utility methods used by Spark. */ object Utils { def serialize[T](o: T): Array[Byte] = { - val bos = new ByteArrayOutputStream + val bos = new ByteArrayOutputStream() val oos = new ObjectOutputStream(bos) oos.writeObject(o) oos.close @@ -95,4 +96,19 @@ object Utils { out.close() } } + + // Shuffle the elements of a collection into a random order, returning the + // result in a new collection. Unlike scala.util.Random.shuffle, this method + // uses a local random number generator, avoiding inter-thread contention. + def shuffle[T](seq: Seq[T]): Seq[T] = { + val buf = ArrayBuffer(seq: _*) + val rand = new Random() + for (i <- (buf.size - 1) to 1 by -1) { + val j = rand.nextInt(i) + val tmp = buf(j) + buf(j) = buf(i) + buf(i) = tmp + } + buf + } } |