diff options
author | Denny <dennybritz@gmail.com> | 2012-09-11 16:10:45 -0700 |
---|---|---|
committer | Denny <dennybritz@gmail.com> | 2012-09-11 16:10:45 -0700 |
commit | 31c53e917d2cba54f485abec56e4f66726d2a732 (patch) | |
tree | 35de1f2f894bea9cd7e983bd928142c3b619f1e9 /core | |
parent | 4d3471dd077e9e9c9038707eb5ba3fb8539c05e0 (diff) | |
download | spark-31c53e917d2cba54f485abec56e4f66726d2a732.tar.gz spark-31c53e917d2cba54f485abec56e4f66726d2a732.tar.bz2 spark-31c53e917d2cba54f485abec56e4f66726d2a732.zip |
Use stageId as index for fileSet caches.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/scheduler/ShuffleMapTask.scala | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index a551bcc782..b9f0a0d6d0 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -22,6 +22,7 @@ object ShuffleMapTask { // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new JHashMap[Int, Array[Byte]] val fileSetCache = new JHashMap[Int, Array[Byte]] + val jarSetCache = new JHashMap[Int, Array[Byte]] def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = { synchronized { @@ -42,8 +43,8 @@ object ShuffleMapTask { } // Since both the JarSet and FileSet have the same format this is used for both. - def serializeFileSet(set : HashMap[String, Long]) : Array[Byte] = { - val old = fileSetCache.get(set.hashCode) + def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = { + val old = cache.get(stageId) if (old != null) { return old } else { @@ -52,7 +53,7 @@ object ShuffleMapTask { objOut.writeObject(set.toArray) objOut.close() val bytes = out.toByteArray - fileSetCache.put(set.hashCode, bytes) + cache.put(stageId, bytes) return bytes } } @@ -84,6 +85,7 @@ object ShuffleMapTask { synchronized { serializedInfoCache.clear() fileSetCache.clear() + jarSetCache.clear() } } } @@ -112,10 +114,10 @@ class ShuffleMapTask( out.writeInt(bytes.length) out.write(bytes) - val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet) + val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache) out.writeInt(fileSetBytes.length) out.write(fileSetBytes) - val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet) + val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache) out.writeInt(jarSetBytes.length) out.write(jarSetBytes) |