aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-02-28 22:51:01 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-28 22:51:01 -0800
commit2df5f1f00661cd31b9fc37e80345a3f5f856c95f (patch)
tree710eea3dc03ece9c3d5c912aacc4b2de86c54610 /core
parent643300a6e27dac3822f9a3ced0ad5fb3b4f2ad75 (diff)
downloadspark-2df5f1f00661cd31b9fc37e80345a3f5f856c95f.tar.gz
spark-2df5f1f00661cd31b9fc37e80345a3f5f856c95f.tar.bz2
spark-2df5f1f00661cd31b9fc37e80345a3f5f856c95f.zip
[SPARK-6075] Fix bug in that caused lost accumulator updates: do not store WeakReferences in localAccums map
This fixes a non-deterministic bug introduced in #4021 that could cause tasks' accumulator updates to be lost. The problem is that `localAccums` should not hold weak references: after the task finishes running there won't be any strong references to these local accumulators, so they can get garbage-collected before the executor reads the `localAccums` map. We don't need weak references here anyways, since this map is cleared at the end of each task. Author: Josh Rosen <joshrosen@databricks.com> Closes #4835 from JoshRosen/SPARK-6075 and squashes the following commits: 4f4b5b2 [Josh Rosen] Remove defensive assertions that caused test failures in code unrelated to this change 120c7b0 [Josh Rosen] [SPARK-6075] Do not store WeakReferences in localAccums map
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala40
1 files changed, 23 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 30f0ccd73c..bcf832467f 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -280,15 +280,24 @@ object AccumulatorParam {
// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
-private[spark] object Accumulators {
- // Store a WeakReference instead of a StrongReference because this way accumulators can be
- // appropriately garbage collected during long-running jobs and release memory
- type WeakAcc = WeakReference[Accumulable[_, _]]
- val originals = Map[Long, WeakAcc]()
- val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() {
- override protected def initialValue() = Map[Long, WeakAcc]()
+private[spark] object Accumulators extends Logging {
+ /**
+ * This global map holds the original accumulator objects that are created on the driver.
+ * It keeps weak references to these objects so that accumulators can be garbage-collected
+ * once the RDDs and user-code that reference them are cleaned up.
+ */
+ val originals = Map[Long, WeakReference[Accumulable[_, _]]]()
+
+ /**
+ * This thread-local map holds per-task copies of accumulators; it is used to collect the set
+ * of accumulator updates to send back to the driver when tasks complete. After tasks complete,
+ * this map is cleared by `Accumulators.clear()` (see Executor.scala).
+ */
+ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
+ override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
- var lastId: Long = 0
+
+ private var lastId: Long = 0
def newId(): Long = synchronized {
lastId += 1
@@ -297,16 +306,16 @@ private[spark] object Accumulators {
def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
if (original) {
- originals(a.id) = new WeakAcc(a)
+ originals(a.id) = new WeakReference[Accumulable[_, _]](a)
} else {
- localAccums.get()(a.id) = new WeakAcc(a)
+ localAccums.get()(a.id) = a
}
}
// Clear the local (non-original) accumulators for the current thread
def clear() {
synchronized {
- localAccums.get.clear
+ localAccums.get.clear()
}
}
@@ -320,12 +329,7 @@ private[spark] object Accumulators {
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.get) {
- // Since we are now storing weak references, we must check whether the underlying data
- // is valid.
- ret(id) = accum.get match {
- case Some(values) => values.localValue
- case None => None
- }
+ ret(id) = accum.localValue
}
return ret
}
@@ -341,6 +345,8 @@ private[spark] object Accumulators {
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
+ } else {
+ logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}