aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2012-08-08 14:10:23 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2012-08-08 14:10:23 -0700
commit674fcf56bfd9d561f5b16ec20af7df9d5a5bbc13 (patch)
tree2e29fb3b806763d6fc8245b01a8ec65c807def76 /core
parentf4aaec7a48450a523ab921b691404bc4b71dea09 (diff)
downloadspark-674fcf56bfd9d561f5b16ec20af7df9d5a5bbc13.tar.gz
spark-674fcf56bfd9d561f5b16ec20af7df9d5a5bbc13.tar.bz2
spark-674fcf56bfd9d561f5b16ec20af7df9d5a5bbc13.zip
Use JavaConversion to get a scala iterator
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala15
1 files changed, 4 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index c1676aea95..3e965cf817 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -5,6 +5,7 @@ import java.util.HashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
@@ -124,18 +125,10 @@ class ShuffleMapTask(
val blockManager = SparkEnv.get.blockManager
for (i <- 0 until numOutputSplits) {
val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i
- val iter = buckets(i).entrySet().iterator()
- // Create an iterator to avoid a copy. blockManager will traverse this
- // to create an block
- val entryIter = new Iterator[Any] {
- override def hasNext() = iter.hasNext()
- override def next() = {
- val entry = iter.next()
- (entry.getKey(), entry.getValue())
- }
- }
+ // Get a scala iterator from java map
+ val iter = JavaConversions.mapAsScalaMap(buckets(i)).iterator
// TODO: This should probably be DISK_ONLY
- blockManager.put(blockId, entryIter, StorageLevel.MEMORY_ONLY, false)
+ blockManager.put(blockId, iter, StorageLevel.MEMORY_ONLY, false)
}
return SparkEnv.get.blockManager.blockManagerId
}