aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala25
2 files changed, 30 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 17055e2f22..9e29fd1382 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -113,9 +113,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
logInfo(
s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
case otherReason =>
- logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
- s" clearing lock")
- authorizedCommitters.remove(partition)
+ if (authorizedCommitters.get(partition).exists(_ == attempt)) {
+ logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
+ s" clearing lock")
+ authorizedCommitters.remove(partition)
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index c8c9578562..cf97707946 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -161,6 +161,31 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
}
assert(tempDir.list().size === 0)
}
+
+ test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
+ val stage: Int = 1
+ val partition: Long = 2
+ val authorizedCommitter: Long = 3
+ val nonAuthorizedCommitter: Long = 100
+ outputCommitCoordinator.stageStart(stage)
+ assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
+ assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))
+ // The non-authorized committer fails
+ outputCommitCoordinator.taskCompleted(
+ stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+ // New tasks should still not be able to commit because the authorized committer has not failed
+ assert(
+ !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
+ // The authorized committer now fails, clearing the lock
+ outputCommitCoordinator.taskCompleted(
+ stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+ // A new task should now be allowed to become the authorized committer
+ assert(
+ outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
+ // There can only be one authorized committer
+ assert(
+ !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
+ }
}
/**