aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNathan Kronenfeld <nkronenfeld@oculusinfo.com>2014-12-09 23:53:17 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-09 23:53:17 -0800
commit94b377f94487109a1cc3e07dd230b1df7a96e28d (patch)
tree424f1bba8d6644c1e86b76b14c0bec04e2a12357 /core
parentf79c1cfc997c1a7ddee480ca3d46f5341b69d3b7 (diff)
downloadspark-94b377f94487109a1cc3e07dd230b1df7a96e28d.tar.gz
spark-94b377f94487109a1cc3e07dd230b1df7a96e28d.tar.bz2
spark-94b377f94487109a1cc3e07dd230b1df7a96e28d.zip
[SPARK-4772] Clear local copies of accumulators as soon as we're done with them
Accumulators keep thread-local copies of themselves. These copies were only cleared at the beginning of a task. This meant that (a) the memory they used was tied up until the next task ran on that thread, and (b) if a thread died, the memory it had used for accumulators was locked up forever on that worker. This PR clears the thread-local copies of accumulators at the end of each task, in the tasks finally block, to make sure they are cleaned up between tasks. It also stores them in a ThreadLocal object, so that if, for some reason, the thread dies, any memory they are using at the time should be freed up. Author: Nathan Kronenfeld <nkronenfeld@oculusinfo.com> Closes #3570 from nkronenfeld/Accumulator-Improvements and squashes the following commits: a581f3f [Nathan Kronenfeld] Change Accumulators to private[spark] instead of adding mima exclude to get around false positive in mima tests b6c2180 [Nathan Kronenfeld] Include MiMa exclude as per build error instructions - this version incompatibility should be irrelevent, as it will only surface if a master is talking to a worker running a different version of spark. 537baad [Nathan Kronenfeld] Fuller refactoring as intended, incorporating JR's suggestions for ThreadLocal localAccums, and keeping clear(), but also calling it in tasks' finally block, rather than just at the beginning of the task. 39a82f2 [Nathan Kronenfeld] Clear local copies of accumulators as soon as we're done with them
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala3
2 files changed, 10 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 000bbd6b53..5f31bfba3f 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong
+import java.lang.ThreadLocal
import scala.collection.generic.Growable
import scala.collection.mutable.Map
@@ -278,10 +279,12 @@ object AccumulatorParam {
// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
-private object Accumulators {
+private[spark] object Accumulators {
// TODO: Use soft references? => need to make readObject work properly then
val originals = Map[Long, Accumulable[_, _]]()
- val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
+ val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
+ override protected def initialValue() = Map[Long, Accumulable[_, _]]()
+ }
var lastId: Long = 0
def newId(): Long = synchronized {
@@ -293,22 +296,21 @@ private object Accumulators {
if (original) {
originals(a.id) = a
} else {
- val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
- accums(a.id) = a
+ localAccums.get()(a.id) = a
}
}
// Clear the local (non-original) accumulators for the current thread
def clear() {
synchronized {
- localAccums.remove(Thread.currentThread)
+ localAccums.get.clear
}
}
// Get the values of the local accumulators for the current thread (by ID)
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
- for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
+ for ((id, accum) <- localAccums.get) {
ret(id) = accum.localValue
}
return ret
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 835157fc52..52de6980ec 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -172,7 +172,6 @@ private[spark] class Executor(
val startGCTime = gcTime
try {
- Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
@@ -278,6 +277,8 @@ private[spark] class Executor(
env.shuffleMemoryManager.releaseMemoryForThisThread()
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
+ // Release memory used by this thread for accumulators
+ Accumulators.clear()
runningTasks.remove(taskId)
}
}