aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/FailureSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/FailureSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/FailureSuite.scala25
1 files changed, 25 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index b099cd3fb7..69cb4b44cf 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -141,5 +141,30 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
FailureSuiteState.clear()
}
+ test("managed memory leak error should not mask other failures (SPARK-9266") {
+ val conf = new SparkConf().set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ sc = new SparkContext("local[1,1]", "test", conf)
+
+ // If a task leaks memory but fails due to some other cause, then make sure that the original
+ // cause is preserved
+ val thrownDueToTaskFailure = intercept[SparkException] {
+ sc.parallelize(Seq(0)).mapPartitions { iter =>
+ TaskContext.get().taskMemoryManager().allocate(128)
+ throw new Exception("intentional task failure")
+ iter
+ }.count()
+ }
+ assert(thrownDueToTaskFailure.getMessage.contains("intentional task failure"))
+
+ // If the task succeeded but memory was leaked, then the task should fail due to that leak
+ val thrownDueToMemoryLeak = intercept[SparkException] {
+ sc.parallelize(Seq(0)).mapPartitions { iter =>
+ TaskContext.get().taskMemoryManager().allocate(128)
+ iter
+ }.count()
+ }
+ assert(thrownDueToMemoryLeak.getMessage.contains("memory leak"))
+ }
+
// TODO: Need to add tests with shuffle fetch failures.
}