aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCarson Wang <carson.wang@intel.com>2015-08-14 13:38:25 -0700
committerAndrew Or <andrew@databricks.com>2015-08-14 13:38:25 -0700
commit33bae585d4cb25aed2ac32e0d1248f78cc65318b (patch)
tree99c3a3707eada2ee7ae4019c36cf2de0dd9fce9d /core
parentffa05c84fe75663fc33f3d954d1cb1e084ab3280 (diff)
downloadspark-33bae585d4cb25aed2ac32e0d1248f78cc65318b.tar.gz
spark-33bae585d4cb25aed2ac32e0d1248f78cc65318b.tar.bz2
spark-33bae585d4cb25aed2ac32e0d1248f78cc65318b.zip
[SPARK-9809] Task crashes because the internal accumulators are not properly initialized
When a stage failed and another stage was resubmitted with only part of partitions to compute, all the tasks failed with error message: java.util.NoSuchElementException: key not found: peakExecutionMemory. This is because the internal accumulators are not properly initialized for this stage while other codes assume the internal accumulators always exist. Author: Carson Wang <carson.wang@intel.com> Closes #8090 from carsonwang/SPARK-9809.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala3
1 files changed, 2 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7ab5ccf50a..f1c63d0876 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -790,9 +790,10 @@ class DAGScheduler(
}
}
+ // Create internal accumulators if the stage has no accumulators initialized.
// Reset internal accumulators only if this stage is not partially submitted
// Otherwise, we may override existing accumulator values from some tasks
- if (allPartitions == partitionsToCompute) {
+ if (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute) {
stage.resetInternalAccumulators()
}