aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-02-19 15:57:23 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-02-19 15:57:23 -0800
commit983fa2d62029e7334fb661cb65c8cadaa4b86d1c (patch)
tree821694b53512200930a84753a0bafb982e2e85e8 /core
parent091f6a7830bbee01fa580fbb0336b9f4fcac0dfa (diff)
downloadspark-983fa2d62029e7334fb661cb65c8cadaa4b86d1c.tar.gz
spark-983fa2d62029e7334fb661cb65c8cadaa4b86d1c.tar.bz2
spark-983fa2d62029e7334fb661cb65c8cadaa4b86d1c.zip
[SPARK-13407] Guard against garbage-collected accumulators in TaskMetrics.fromAccumulatorUpdates
`TaskMetrics.fromAccumulatorUpdates()` can fail if accumulators have been garbage-collected on the driver. To guard against this, this patch introduces `ListenerTaskMetrics`, a subclass of `TaskMetrics` which is used only in `TaskMetrics.fromAccumulatorUpdates()` and which eliminates the need to access the original accumulators on the driver. Author: Josh Rosen <joshrosen@databricks.com> Closes #11276 from JoshRosen/accum-updates-fix.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala55
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala10
2 files changed, 33 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 8ff0620f83..9da9cb5940 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -364,6 +364,27 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
}
+/**
+ * Internal subclass of [[TaskMetrics]] which is used only for posting events to listeners.
+ * Its purpose is to obviate the need for the driver to reconstruct the original accumulators,
+ * which might have been garbage-collected. See SPARK-13407 for more details.
+ *
+ * Instances of this class should be considered read-only and users should not call `inc*()` or
+ * `set*()` methods. While we could override the setter methods to throw
+ * UnsupportedOperationException, we choose not to do so because the overrides would quickly become
+ * out-of-date when new metrics are added.
+ */
+private[spark] class ListenerTaskMetrics(
+ initialAccums: Seq[Accumulator[_]],
+ accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics(initialAccums) {
+
+ override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates
+
+ override private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
+ throw new UnsupportedOperationException("This TaskMetrics is read-only")
+ }
+}
+
private[spark] object TaskMetrics extends Logging {
def empty: TaskMetrics = new TaskMetrics
@@ -397,33 +418,15 @@ private[spark] object TaskMetrics extends Logging {
// Initial accumulators are passed into the TaskMetrics constructor first because these
// are required to be uniquely named. The rest of the accumulators from this task are
// registered later because they need not satisfy this requirement.
- val (initialAccumInfos, otherAccumInfos) = accumUpdates
- .filter { info => info.update.isDefined }
- .partition { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
- val initialAccums = initialAccumInfos.map { info =>
- val accum = InternalAccumulator.create(info.name.get)
- accum.setValueAny(info.update.get)
- accum
- }
- // We don't know the types of the rest of the accumulators, so we try to find the same ones
- // that were previously registered here on the driver and make copies of them. It is important
- // that we copy the accumulators here since they are used across many tasks and we want to
- // maintain a snapshot of their local task values when we post them to listeners downstream.
- val otherAccums = otherAccumInfos.flatMap { info =>
- val id = info.id
- val acc = Accumulators.get(id).map { a =>
- val newAcc = a.copy()
- newAcc.setValueAny(info.update.get)
- newAcc
+ val definedAccumUpdates = accumUpdates.filter { info => info.update.isDefined }
+ val initialAccums = definedAccumUpdates
+ .filter { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
+ .map { info =>
+ val accum = InternalAccumulator.create(info.name.get)
+ accum.setValueAny(info.update.get)
+ accum
}
- if (acc.isEmpty) {
- logWarning(s"encountered unregistered accumulator $id when reconstructing task metrics.")
- }
- acc
- }
- val metrics = new TaskMetrics(initialAccums)
- otherAccums.foreach(metrics.registerAccumulator)
- metrics
+ new ListenerTaskMetrics(initialAccums, definedAccumUpdates)
}
}
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 3a1a67cdc0..d91f50f18f 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -475,10 +475,9 @@ class TaskMetricsSuite extends SparkFunSuite {
}
val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1)
- // Test this with additional accumulators. Only the ones registered with `Accumulators`
- // will show up in the reconstructed TaskMetrics. In practice, all accumulators created
+ // Test this with additional accumulators to ensure that we do not crash when handling
+ // updates from unregistered accumulators. In practice, all accumulators created
// on the driver, internal or not, should be registered with `Accumulators` at some point.
- // Here we show that reconstruction will succeed even if there are unregistered accumulators.
val param = IntAccumulatorParam
val registeredAccums = Seq(
new Accumulator(0, param, Some("a"), internal = true, countFailedValues = true),
@@ -497,9 +496,8 @@ class TaskMetricsSuite extends SparkFunSuite {
val registeredAccumInfos = registeredAccums.map(makeInfo)
val unregisteredAccumInfos = unregisteredAccums.map(makeInfo)
val accumUpdates2 = accumUpdates1 ++ registeredAccumInfos ++ unregisteredAccumInfos
- val metrics2 = TaskMetrics.fromAccumulatorUpdates(accumUpdates2)
- // accumulators that were not registered with `Accumulators` will not show up
- assertUpdatesEquals(metrics2.accumulatorUpdates(), accumUpdates1 ++ registeredAccumInfos)
+ // Simply checking that this does not crash:
+ TaskMetrics.fromAccumulatorUpdates(accumUpdates2)
}
}