From 31c53e917d2cba54f485abec56e4f66726d2a732 Mon Sep 17 00:00:00 2001 From: Denny Date: Tue, 11 Sep 2012 16:10:45 -0700 Subject: Use stageId as index for fileSet caches. --- core/src/main/scala/spark/scheduler/ShuffleMapTask.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index a551bcc782..b9f0a0d6d0 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -22,6 +22,7 @@ object ShuffleMapTask { // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new JHashMap[Int, Array[Byte]] val fileSetCache = new JHashMap[Int, Array[Byte]] + val jarSetCache = new JHashMap[Int, Array[Byte]] def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = { synchronized { @@ -42,8 +43,8 @@ object ShuffleMapTask { } // Since both the JarSet and FileSet have the same format this is used for both. - def serializeFileSet(set : HashMap[String, Long]) : Array[Byte] = { - val old = fileSetCache.get(set.hashCode) + def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = { + val old = cache.get(stageId) if (old != null) { return old } else { @@ -52,7 +53,7 @@ object ShuffleMapTask { objOut.writeObject(set.toArray) objOut.close() val bytes = out.toByteArray - fileSetCache.put(set.hashCode, bytes) + cache.put(stageId, bytes) return bytes } } @@ -84,6 +85,7 @@ object ShuffleMapTask { synchronized { serializedInfoCache.clear() fileSetCache.clear() + jarSetCache.clear() } } } @@ -112,10 +114,10 @@ class ShuffleMapTask( out.writeInt(bytes.length) out.write(bytes) - val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet) + val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache) out.writeInt(fileSetBytes.length) out.write(fileSetBytes) - val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet) + val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache) out.writeInt(jarSetBytes.length) out.write(jarSetBytes) -- cgit v1.2.3