aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-01-12 22:34:33 -0800
committerAndrew Or <andrewor14@gmail.com>2014-01-12 22:34:33 -0800
commit8d40e7222f2a0a421349621105dc4c69bd7f1bb8 (patch)
treed41309fcaff1c35e01a395f616bf50b6fc35a8a8
parentbb8098f203e61111faddf2e1a04b03d62037e6c7 (diff)
downloadspark-8d40e7222f2a0a421349621105dc4c69bd7f1bb8.tar.gz
spark-8d40e7222f2a0a421349621105dc4c69bd7f1bb8.tar.bz2
spark-8d40e7222f2a0a421349621105dc4c69bd7f1bb8.zip
Get rid of spill map in SparkEnv
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala8
5 files changed, 7 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 0609b7b154..c46b7bd043 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -47,13 +47,12 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
} else {
- val combiners =
- new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
}
- combiners.registerBytesSpilled(context.attemptId)
+ context.taskMetrics.bytesSpilled = combiners.bytesSpilled
combiners.iterator
}
}
@@ -76,7 +75,7 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
- combiners.registerBytesSpilled(context.attemptId)
+ context.taskMetrics.bytesSpilled = combiners.bytesSpilled
combiners.iterator
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6ec075070d..08b592df71 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -60,9 +60,6 @@ class SparkEnv private[spark] (
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
- // A mapping of task ID to number of bytes spilled by that task. This is mainly for book-keeping.
- val bytesSpilledMap = mutable.HashMap[Long, Long]()
-
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
// A general, soft-reference map for metadata needed during HadoopRDD split computation
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d5dae8902b..dd4aea0fa4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -229,7 +229,6 @@ private[spark] class Executor(
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
- m.bytesSpilled = env.bytesSpilledMap.get(taskId).getOrElse(0)
}
val accumUpdates = Accumulators.values
@@ -285,7 +284,6 @@ private[spark] class Executor(
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
- env.bytesSpilledMap.remove(taskId)
runningTasks.remove(taskId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 6df2b3a52d..34e834132e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -151,7 +151,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
map.insert(kv._1, new CoGroupValue(kv._2, depNum))
}
}
- map.registerBytesSpilled(context.attemptId)
+ context.taskMetrics.bytesSpilled = map.bytesSpilled
new InterruptibleIterator(context, map.iterator)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 0100083682..f4e53c4d3e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -86,7 +86,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private var spillCount = 0
// Number of bytes spilled in total
- private var bytesSpilled = 0L
+ private var _bytesSpilled = 0L
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
@@ -164,15 +164,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
shuffleMemoryMap(Thread.currentThread().getId) = 0
}
numPairsInMemory = 0
- bytesSpilled += mapSize
+ _bytesSpilled += mapSize
}
/**
* Register the total number of bytes spilled by this task
*/
- def registerBytesSpilled(taskId: Long) {
- SparkEnv.get.bytesSpilledMap(taskId) = bytesSpilled
- }
+ def bytesSpilled: Long = _bytesSpilled
/**
* Return an iterator that merges the in-memory map with the spilled maps.