aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-11-06 10:53:57 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-11-06 10:53:57 -0700
commitd9ea6d69a5171d397bb9d79c9dc4517f292f6a44 (patch)
treed8f91417b6cf05bb79b6eadecd5825820f6822a7 /src
parent16ff4dc0be5dedf6e0334a6fab9f729d98e8c233 (diff)
downloadspark-d9ea6d69a5171d397bb9d79c9dc4517f292f6a44.tar.gz
spark-d9ea6d69a5171d397bb9d79c9dc4517f292f6a44.tar.bz2
spark-d9ea6d69a5171d397bb9d79c9dc4517f292f6a44.zip
Create output files one by one instead of at the same time in the map
phase of DfsShuffle.
Diffstat (limited to 'src')
-rw-r--r--src/scala/spark/DfsShuffle.scala23
1 files changed, 11 insertions, 12 deletions
diff --git a/src/scala/spark/DfsShuffle.scala b/src/scala/spark/DfsShuffle.scala
index 256bf4ea9c..2ef0321a63 100644
--- a/src/scala/spark/DfsShuffle.scala
+++ b/src/scala/spark/DfsShuffle.scala
@@ -38,26 +38,25 @@ extends Logging
numberedSplitRdd.foreach((pair: (Int, Iterator[(K, V)])) => {
val myIndex = pair._1
val myIterator = pair._2
- val combiners = new HashMap[K, C]
+ val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[K, C])
for ((k, v) <- myIterator) {
- combiners(k) = combiners.get(k) match {
+ var bucketId = k.hashCode % numOutputSplits
+ if (bucketId < 0) { // Fix bucket ID if hash code was negative
+ bucketId += numOutputSplits
+ }
+ val bucket = buckets(bucketId)
+ bucket(k) = bucket.get(k) match {
case Some(c) => mergeValue(c, v)
case None => createCombiner(v)
}
}
val fs = DfsShuffle.getFileSystem()
- val outputStreams = (0 until numOutputSplits).map(i => {
+ for (i <- 0 until numOutputSplits) {
val path = new Path(dir, "%d-to-%d".format(myIndex, i))
- new ObjectOutputStream(fs.create(path, true))
- }).toArray
- for ((k, c) <- combiners) {
- var bucket = k.hashCode % numOutputSplits
- if (bucket < 0) {
- bucket += numOutputSplits
- }
- outputStreams(bucket).writeObject((k, c))
+ val out = new ObjectOutputStream(fs.create(path, true))
+ buckets(i).foreach(pair => out.writeObject(pair))
+ out.close()
}
- outputStreams.foreach(_.close())
})
// Return an RDD that does each of the merges for a given partition