aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2012-08-07 15:24:16 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2012-08-08 00:47:02 -0700
commitf4aaec7a48450a523ab921b691404bc4b71dea09 (patch)
tree3e88a1031952d5fea02fc60c272b138c2e2c186d /core
parent88b016db2a077acc17e88926328ab99b427c8573 (diff)
downloadspark-f4aaec7a48450a523ab921b691404bc4b71dea09.tar.gz
spark-f4aaec7a48450a523ab921b691404bc4b71dea09.tar.bz2
spark-f4aaec7a48450a523ab921b691404bc4b71dea09.zip
Avoid a copy in ShuffleMapTask by creating an iterator that will be used by the
block manager.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala14
1 files changed, 9 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index db89db903e..c1676aea95 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -124,14 +124,18 @@ class ShuffleMapTask(
val blockManager = SparkEnv.get.blockManager
for (i <- 0 until numOutputSplits) {
val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i
- val arr = new ArrayBuffer[Any]
val iter = buckets(i).entrySet().iterator()
- while (iter.hasNext()) {
- val entry = iter.next()
- arr += ((entry.getKey(), entry.getValue()))
+ // Create an iterator to avoid a copy. blockManager will traverse this
+ // to create an block
+ val entryIter = new Iterator[Any] {
+ override def hasNext() = iter.hasNext()
+ override def next() = {
+ val entry = iter.next()
+ (entry.getKey(), entry.getValue())
+ }
}
// TODO: This should probably be DISK_ONLY
- blockManager.put(blockId, arr.iterator, StorageLevel.MEMORY_ONLY, false)
+ blockManager.put(blockId, entryIter, StorageLevel.MEMORY_ONLY, false)
}
return SparkEnv.get.blockManager.blockManagerId
}