aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala11
1 files changed, 11 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 0c362b881d..83ed127520 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -195,6 +195,17 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
0 until rdd.partitions.size)
}
+
+ test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
+ val stage: Int = 1
+ val partition: Int = 1
+ val failedAttempt: Int = 0
+ outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
+ outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt,
+ reason = ExecutorLostFailure("0", exitCausedByApp = true, None))
+ assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt))
+ assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 1))
+ }
}
/**