diff options
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/spark/scheduler/ShuffleMapTask.scala | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index a551bcc782..b9f0a0d6d0 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -22,6 +22,7 @@ object ShuffleMapTask { // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new JHashMap[Int, Array[Byte]] val fileSetCache = new JHashMap[Int, Array[Byte]] + val jarSetCache = new JHashMap[Int, Array[Byte]] def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = { synchronized { @@ -42,8 +43,8 @@ object ShuffleMapTask { } // Since both the JarSet and FileSet have the same format this is used for both. - def serializeFileSet(set : HashMap[String, Long]) : Array[Byte] = { - val old = fileSetCache.get(set.hashCode) + def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = { + val old = cache.get(stageId) if (old != null) { return old } else { @@ -52,7 +53,7 @@ object ShuffleMapTask { objOut.writeObject(set.toArray) objOut.close() val bytes = out.toByteArray - fileSetCache.put(set.hashCode, bytes) + cache.put(stageId, bytes) return bytes } } @@ -84,6 +85,7 @@ object ShuffleMapTask { synchronized { serializedInfoCache.clear() fileSetCache.clear() + jarSetCache.clear() } } } @@ -112,10 +114,10 @@ class ShuffleMapTask( out.writeInt(bytes.length) out.write(bytes) - val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet) + val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache) out.writeInt(fileSetBytes.length) out.write(fileSetBytes) - val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet) + val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache) out.writeInt(jarSetBytes.length) out.write(jarSetBytes) |