aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-09-11 16:10:45 -0700
committerDenny <dennybritz@gmail.com>2012-09-11 16:10:45 -0700
commit31c53e917d2cba54f485abec56e4f66726d2a732 (patch)
tree35de1f2f894bea9cd7e983bd928142c3b619f1e9 /core
parent4d3471dd077e9e9c9038707eb5ba3fb8539c05e0 (diff)
downloadspark-31c53e917d2cba54f485abec56e4f66726d2a732.tar.gz
spark-31c53e917d2cba54f485abec56e4f66726d2a732.tar.bz2
spark-31c53e917d2cba54f485abec56e4f66726d2a732.zip
Use stageId as index for fileSet caches.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala12
1 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index a551bcc782..b9f0a0d6d0 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -22,6 +22,7 @@ object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new JHashMap[Int, Array[Byte]]
val fileSetCache = new JHashMap[Int, Array[Byte]]
+ val jarSetCache = new JHashMap[Int, Array[Byte]]
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = {
synchronized {
@@ -42,8 +43,8 @@ object ShuffleMapTask {
}
// Since both the JarSet and FileSet have the same format this is used for both.
- def serializeFileSet(set : HashMap[String, Long]) : Array[Byte] = {
- val old = fileSetCache.get(set.hashCode)
+ def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = {
+ val old = cache.get(stageId)
if (old != null) {
return old
} else {
@@ -52,7 +53,7 @@ object ShuffleMapTask {
objOut.writeObject(set.toArray)
objOut.close()
val bytes = out.toByteArray
- fileSetCache.put(set.hashCode, bytes)
+ cache.put(stageId, bytes)
return bytes
}
}
@@ -84,6 +85,7 @@ object ShuffleMapTask {
synchronized {
serializedInfoCache.clear()
fileSetCache.clear()
+ jarSetCache.clear()
}
}
}
@@ -112,10 +114,10 @@ class ShuffleMapTask(
out.writeInt(bytes.length)
out.write(bytes)
- val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet)
+ val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache)
out.writeInt(fileSetBytes.length)
out.write(fileSetBytes)
- val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet)
+ val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache)
out.writeInt(jarSetBytes.length)
out.write(jarSetBytes)