diff options
5 files changed, 7 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 0609b7b154..c46b7bd043 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -47,13 +47,12 @@ case class Aggregator[K, V, C] ( } combiners.iterator } else { - val combiners = - new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) + val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) while (iter.hasNext) { val (k, v) = iter.next() combiners.insert(k, v) } - combiners.registerBytesSpilled(context.attemptId) + context.taskMetrics.bytesSpilled = combiners.bytesSpilled combiners.iterator } } @@ -76,7 +75,7 @@ case class Aggregator[K, V, C] ( val (k, c) = iter.next() combiners.insert(k, c) } - combiners.registerBytesSpilled(context.attemptId) + context.taskMetrics.bytesSpilled = combiners.bytesSpilled combiners.iterator } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 6ec075070d..08b592df71 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -60,9 +60,6 @@ class SparkEnv private[spark] ( // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() - // A mapping of task ID to number of bytes spilled by that task. This is mainly for book-keeping. - val bytesSpilledMap = mutable.HashMap[Long, Long]() - private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() // A general, soft-reference map for metadata needed during HadoopRDD split computation diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d5dae8902b..dd4aea0fa4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -229,7 +229,6 @@ private[spark] class Executor( m.executorRunTime = (taskFinish - taskStart).toInt m.jvmGCTime = gcTime - startGCTime m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt - m.bytesSpilled = env.bytesSpilledMap.get(taskId).getOrElse(0) } val accumUpdates = Accumulators.values @@ -285,7 +284,6 @@ private[spark] class Executor( shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(Thread.currentThread().getId) } - env.bytesSpilledMap.remove(taskId) runningTasks.remove(taskId) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 6df2b3a52d..34e834132e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -151,7 +151,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: map.insert(kv._1, new CoGroupValue(kv._2, depNum)) } } - map.registerBytesSpilled(context.attemptId) + context.taskMetrics.bytesSpilled = map.bytesSpilled new InterruptibleIterator(context, map.iterator) } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 0100083682..f4e53c4d3e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -86,7 +86,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private var spillCount = 0 // Number of bytes spilled in total - private var bytesSpilled = 0L + private var _bytesSpilled = 0L private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false) @@ -164,15 +164,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( shuffleMemoryMap(Thread.currentThread().getId) = 0 } numPairsInMemory = 0 - bytesSpilled += mapSize + _bytesSpilled += mapSize } /** * Register the total number of bytes spilled by this task */ - def registerBytesSpilled(taskId: Long) { - SparkEnv.get.bytesSpilledMap(taskId) = bytesSpilled - } + def bytesSpilled: Long = _bytesSpilled /** * Return an iterator that merges the in-memory map with the spilled maps. |