aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-07-23 00:43:26 -0700
committerReynold Xin <rxin@databricks.com>2015-07-23 00:43:26 -0700
commitac3ae0f2be88e0b53f65342efe5fcbe67b5c2106 (patch)
tree02610a8718668dca76c259548b3520b414ed6bda /core
parentb983d493b490ca8bafe7eb988b62a250987ae353 (diff)
downloadspark-ac3ae0f2be88e0b53f65342efe5fcbe67b5c2106.tar.gz
spark-ac3ae0f2be88e0b53f65342efe5fcbe67b5c2106.tar.bz2
spark-ac3ae0f2be88e0b53f65342efe5fcbe67b5c2106.zip
[SPARK-9266] Prevent "managed memory leak detected" exception from masking original exception
When a task fails with an exception and also fails to properly clean up its managed memory, the `spark.unsafe.exceptionOnMemoryLeak` memory leak detection mechanism's exceptions will mask the original exception that caused the task to fail. We should throw the memory leak exception only if no other exception occurred. Author: Josh Rosen <joshrosen@databricks.com> Closes #7603 from JoshRosen/SPARK-9266 and squashes the following commits: c268cb5 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-9266 c1f0167 [Josh Rosen] Fix the error masking problem 448eae8 [Josh Rosen] Add regression test
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/FailureSuite.scala25
2 files changed, 30 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 581b40003c..e76664f1bd 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -209,16 +209,19 @@ private[spark] class Executor(
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
+ var threwException = true
val (value, accumUpdates) = try {
- task.run(
+ val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
+ threwException = false
+ res
} finally {
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
- if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
+ if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index b099cd3fb7..69cb4b44cf 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -141,5 +141,30 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
FailureSuiteState.clear()
}
+ test("managed memory leak error should not mask other failures (SPARK-9266") {
+ val conf = new SparkConf().set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ sc = new SparkContext("local[1,1]", "test", conf)
+
+ // If a task leaks memory but fails due to some other cause, then make sure that the original
+ // cause is preserved
+ val thrownDueToTaskFailure = intercept[SparkException] {
+ sc.parallelize(Seq(0)).mapPartitions { iter =>
+ TaskContext.get().taskMemoryManager().allocate(128)
+ throw new Exception("intentional task failure")
+ iter
+ }.count()
+ }
+ assert(thrownDueToTaskFailure.getMessage.contains("intentional task failure"))
+
+ // If the task succeeded but memory was leaked, then the task should fail due to that leak
+ val thrownDueToMemoryLeak = intercept[SparkException] {
+ sc.parallelize(Seq(0)).mapPartitions { iter =>
+ TaskContext.get().taskMemoryManager().allocate(128)
+ iter
+ }.count()
+ }
+ assert(thrownDueToMemoryLeak.getMessage.contains("memory leak"))
+ }
+
// TODO: Need to add tests with shuffle fetch failures.
}