aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-03-31 16:18:39 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-03-31 16:21:27 -0700
commitc4c982a65041ce13a55c1f2bd59c9a85cf3edfc5 (patch)
tree5f37e2083115fecef10fc561c3457f212a69898e /core
parentd851646375616bb5dbcf2c3fe4f64702894d26e1 (diff)
downloadspark-c4c982a65041ce13a55c1f2bd59c9a85cf3edfc5.tar.gz
spark-c4c982a65041ce13a55c1f2bd59c9a85cf3edfc5.tar.bz2
spark-c4c982a65041ce13a55c1f2bd59c9a85cf3edfc5.zip
[SPARK-6614] OutputCommitCoordinator should clear authorized committer only after authorized committer fails, not after any failure
In OutputCommitCoordinator, there is some logic to clear the authorized committer's lock on committing in case that task fails. However, it looks like the current code also clears this lock if other non-authorized tasks fail, which is an obvious bug. In theory, it's possible that this could allow a new committer to start, run to completion, and commit output before the authorized committer finished, but it's unlikely that this race occurs often in practice due to the complex combination of failure and timing conditions that would be required to expose it. This patch addresses this issue and adds a regression test. Thanks to aarondav for spotting this issue. Author: Josh Rosen <joshrosen@databricks.com> Closes #5276 from JoshRosen/SPARK-6614 and squashes the following commits: d532ba7 [Josh Rosen] Check whether failed task was authorized committer cbb3784 [Josh Rosen] Add regression test for SPARK-6614
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala25
2 files changed, 30 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 759df023a6..25d90f03a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -118,9 +118,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
logInfo(
s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
case otherReason =>
- logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
- s" clearing lock")
- authorizedCommitters.remove(partition)
+ if (authorizedCommitters.get(partition).exists(_ == attempt)) {
+ logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
+ s" clearing lock")
+ authorizedCommitters.remove(partition)
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 3cc860caa1..732d466848 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -161,6 +161,31 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
}
assert(tempDir.list().size === 0)
}
+
+ test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
+ val stage: Int = 1
+ val partition: Long = 2
+ val authorizedCommitter: Long = 3
+ val nonAuthorizedCommitter: Long = 100
+ outputCommitCoordinator.stageStart(stage)
+ assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
+ assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))
+ // The non-authorized committer fails
+ outputCommitCoordinator.taskCompleted(
+ stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+ // New tasks should still not be able to commit because the authorized committer has not failed
+ assert(
+ !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
+ // The authorized committer now fails, clearing the lock
+ outputCommitCoordinator.taskCompleted(
+ stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+ // A new task should now be allowed to become the authorized committer
+ assert(
+ outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
+ // There can only be one authorized committer
+ assert(
+ !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
+ }
}
/**