aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-08-10 00:57:33 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-08-10 00:57:33 -0700
commit3c94e5c1880004bd77beb619fa7e3509dc7c732d (patch)
treeb06a4d0caa7dfb0474f6e9204a93fc200e881322
parente463e7a333577b4e4b693268fba7f4df9f362426 (diff)
parent1803cce6929cb681ffe0a015c52171453003e025 (diff)
downloadspark-3c94e5c1880004bd77beb619fa7e3509dc7c732d.tar.gz
spark-3c94e5c1880004bd77beb619fa7e3509dc7c732d.tar.bz2
spark-3c94e5c1880004bd77beb619fa7e3509dc7c732d.zip
Merge pull request #168 from shivaram/dev
Use JavaConversion to get a scala iterator
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala15
1 files changed, 4 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index c1676aea95..f78e0e5fb2 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -5,6 +5,7 @@ import java.util.HashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
@@ -124,18 +125,10 @@ class ShuffleMapTask(
val blockManager = SparkEnv.get.blockManager
for (i <- 0 until numOutputSplits) {
val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i
- val iter = buckets(i).entrySet().iterator()
- // Create an iterator to avoid a copy. blockManager will traverse this
- // to create an block
- val entryIter = new Iterator[Any] {
- override def hasNext() = iter.hasNext()
- override def next() = {
- val entry = iter.next()
- (entry.getKey(), entry.getValue())
- }
- }
+ // Get a scala iterator from java map
+ val iter: Iterator[(Any, Any)] = buckets(i).iterator
// TODO: This should probably be DISK_ONLY
- blockManager.put(blockId, entryIter, StorageLevel.MEMORY_ONLY, false)
+ blockManager.put(blockId, iter, StorageLevel.MEMORY_ONLY, false)
}
return SparkEnv.get.blockManager.blockManagerId
}