aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-03-31 16:18:39 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-03-31 16:18:39 -0700
commit37326079d818fdb140415a65653767d997613dac (patch)
tree6079788d400ba20b3ce05ae2d4c80d24594e85f9
parent0e00f12d33d28d064c166262b14e012a1aeaa7b0 (diff)
downloadspark-37326079d818fdb140415a65653767d997613dac.tar.gz
spark-37326079d818fdb140415a65653767d997613dac.tar.bz2
spark-37326079d818fdb140415a65653767d997613dac.zip
[SPARK-6614] OutputCommitCoordinator should clear authorized committer only after authorized committer fails, not after any failure
In OutputCommitCoordinator, there is some logic to clear the authorized committer's lock on committing in case that task fails. However, it looks like the current code also clears this lock if other non-authorized tasks fail, which is an obvious bug. In theory, it's possible that this could allow a new committer to start, run to completion, and commit output before the authorized committer finished, but it's unlikely that this race occurs often in practice due to the complex combination of failure and timing conditions that would be required to expose it. This patch addresses this issue and adds a regression test. Thanks to aarondav for spotting this issue. Author: Josh Rosen <joshrosen@databricks.com> Closes #5276 from JoshRosen/SPARK-6614 and squashes the following commits: d532ba7 [Josh Rosen] Check whether failed task was authorized committer cbb3784 [Josh Rosen] Add regression test for SPARK-6614
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala25
2 files changed, 30 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 17055e2f22..9e29fd1382 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -113,9 +113,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
logInfo(
s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
case otherReason =>
- logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
- s" clearing lock")
- authorizedCommitters.remove(partition)
+ if (authorizedCommitters.get(partition).exists(_ == attempt)) {
+ logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
+ s" clearing lock")
+ authorizedCommitters.remove(partition)
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index c8c9578562..cf97707946 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -161,6 +161,31 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
}
assert(tempDir.list().size === 0)
}
+
+ test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
+ val stage: Int = 1
+ val partition: Long = 2
+ val authorizedCommitter: Long = 3
+ val nonAuthorizedCommitter: Long = 100
+ outputCommitCoordinator.stageStart(stage)
+ assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
+ assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))
+ // The non-authorized committer fails
+ outputCommitCoordinator.taskCompleted(
+ stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+ // New tasks should still not be able to commit because the authorized committer has not failed
+ assert(
+ !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
+ // The authorized committer now fails, clearing the lock
+ outputCommitCoordinator.taskCompleted(
+ stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+ // A new task should now be allowed to become the authorized committer
+ assert(
+ outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
+ // There can only be one authorized committer
+ assert(
+ !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
+ }
}
/**