diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-07-23 00:43:26 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-07-23 00:43:26 -0700 |
commit | ac3ae0f2be88e0b53f65342efe5fcbe67b5c2106 (patch) | |
tree | 02610a8718668dca76c259548b3520b414ed6bda /core | |
parent | b983d493b490ca8bafe7eb988b62a250987ae353 (diff) | |
download | spark-ac3ae0f2be88e0b53f65342efe5fcbe67b5c2106.tar.gz spark-ac3ae0f2be88e0b53f65342efe5fcbe67b5c2106.tar.bz2 spark-ac3ae0f2be88e0b53f65342efe5fcbe67b5c2106.zip |
[SPARK-9266] Prevent "managed memory leak detected" exception from masking original exception
When a task fails with an exception and also fails to properly clean up its managed memory, the `spark.unsafe.exceptionOnMemoryLeak` memory leak detection mechanism's exceptions will mask the original exception that caused the task to fail. We should throw the memory leak exception only if no other exception occurred.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #7603 from JoshRosen/SPARK-9266 and squashes the following commits:
c268cb5 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-9266
c1f0167 [Josh Rosen] Fix the error masking problem
448eae8 [Josh Rosen] Add regression test
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/Executor.scala | 7 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/FailureSuite.scala | 25 |
2 files changed, 30 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 581b40003c..e76664f1bd 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -209,16 +209,19 @@ private[spark] class Executor( // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() + var threwException = true val (value, accumUpdates) = try { - task.run( + val res = task.run( taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) + threwException = false + res } finally { val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" - if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) { + if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) { throw new SparkException(errMsg) } else { logError(errMsg) diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index b099cd3fb7..69cb4b44cf 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -141,5 +141,30 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext { FailureSuiteState.clear() } + test("managed memory leak error should not mask other failures (SPARK-9266") { + val conf = new SparkConf().set("spark.unsafe.exceptionOnMemoryLeak", "true") + sc = new SparkContext("local[1,1]", "test", conf) + + // If a task leaks memory but fails due to some other cause, then make sure that the original + // cause is preserved + val thrownDueToTaskFailure = intercept[SparkException] { + sc.parallelize(Seq(0)).mapPartitions { iter => + TaskContext.get().taskMemoryManager().allocate(128) + throw new Exception("intentional task failure") + iter + }.count() + } + assert(thrownDueToTaskFailure.getMessage.contains("intentional task failure")) + + // If the task succeeded but memory was leaked, then the task should fail due to that leak + val thrownDueToMemoryLeak = intercept[SparkException] { + sc.parallelize(Seq(0)).mapPartitions { iter => + TaskContext.get().taskMemoryManager().allocate(128) + iter + }.count() + } + assert(thrownDueToMemoryLeak.getMessage.contains("memory leak")) + } + // TODO: Need to add tests with shuffle fetch failures. } |