aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorPatrick Woody <pwoody@palantir.com>2017-03-02 15:55:32 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2017-03-02 15:55:32 -0800
commit433d9eb6151a547af967cc1ac983a789bed60704 (patch)
tree1cdd8a5481d5394aaa3e935e20c0eaf3785e75d0 /core/src/main/scala
parent5ae3516bfb7716f1793eb76b4fdc720b31829d07 (diff)
downloadspark-433d9eb6151a547af967cc1ac983a789bed60704.tar.gz
spark-433d9eb6151a547af967cc1ac983a789bed60704.tar.bz2
spark-433d9eb6151a547af967cc1ac983a789bed60704.zip
[SPARK-19631][CORE] OutputCommitCoordinator should not allow commits for already failed tasks
## What changes were proposed in this pull request? Previously it was possible for there to be a race between a task failure and committing the output of a task. For example, the driver may mark a task attempt as failed due to an executor heartbeat timeout (possibly due to GC), but the task attempt actually ends up coordinating with the OutputCommitCoordinator once the executor recovers and committing its result. This will lead to any retry attempt failing because the task result has already been committed despite the original attempt failing. This ensures that any previously failed task attempts cannot enter the commit protocol. ## How was this patch tested? Added a unit test Author: Patrick Woody <pwoody@palantir.com> Closes #16959 from pwoody/pw/recordFailuresForCommitter.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala59
1 files changed, 35 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 08d220b40b..83d87b548a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -48,25 +48,29 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private type StageId = Int
private type PartitionId = Int
private type TaskAttemptNumber = Int
-
private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1
+ private case class StageState(numPartitions: Int) {
+ val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
+ val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
+ }
/**
- * Map from active stages's id => partition id => task attempt with exclusive lock on committing
- * output for that partition.
+ * Map from active stages's id => authorized task attempts for each partition id, which hold an
+ * exclusive lock on committing task output for that partition, as well as any known failed
+ * attempts in the stage.
*
* Entries are added to the top-level map when stages start and are removed they finish
* (either successfully or unsuccessfully).
*
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
- private val authorizedCommittersByStage = mutable.Map[StageId, Array[TaskAttemptNumber]]()
+ private val stageStates = mutable.Map[StageId, StageState]()
/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
*/
def isEmpty: Boolean = {
- authorizedCommittersByStage.isEmpty
+ stageStates.isEmpty
}
/**
@@ -105,19 +109,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
* the maximum possible value of `context.partitionId`).
*/
- private[scheduler] def stageStart(
- stage: StageId,
- maxPartitionId: Int): Unit = {
- val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
- java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
- synchronized {
- authorizedCommittersByStage(stage) = arr
- }
+ private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized {
+ stageStates(stage) = new StageState(maxPartitionId + 1)
}
// Called by DAGScheduler
private[scheduler] def stageEnd(stage: StageId): Unit = synchronized {
- authorizedCommittersByStage.remove(stage)
+ stageStates.remove(stage)
}
// Called by DAGScheduler
@@ -126,7 +124,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
partition: PartitionId,
attemptNumber: TaskAttemptNumber,
reason: TaskEndReason): Unit = synchronized {
- val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
+ val stageState = stageStates.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
return
})
@@ -137,10 +135,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
s"attempt: $attemptNumber")
case otherReason =>
- if (authorizedCommitters(partition) == attemptNumber) {
+ // Mark the attempt as failed to blacklist from future commit protocol
+ stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber
+ if (stageState.authorizedCommitters(partition) == attemptNumber) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
- authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
+ stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
}
}
}
@@ -149,7 +149,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
if (isDriver) {
coordinatorRef.foreach(_ send StopCoordinator)
coordinatorRef = None
- authorizedCommittersByStage.clear()
+ stageStates.clear()
}
}
@@ -158,13 +158,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
stage: StageId,
partition: PartitionId,
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
- authorizedCommittersByStage.get(stage) match {
- case Some(authorizedCommitters) =>
- authorizedCommitters(partition) match {
+ stageStates.get(stage) match {
+ case Some(state) if attemptFailed(state, partition, attemptNumber) =>
+ logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," +
+ s" partition=$partition as task attempt $attemptNumber has already failed.")
+ false
+ case Some(state) =>
+ state.authorizedCommitters(partition) match {
case NO_AUTHORIZED_COMMITTER =>
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition")
- authorizedCommitters(partition) = attemptNumber
+ state.authorizedCommitters(partition) = attemptNumber
true
case existingCommitter =>
// Coordinator should be idempotent when receiving AskPermissionToCommit.
@@ -181,11 +185,18 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
}
}
case None =>
- logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
- s"partition $partition to commit")
+ logDebug(s"Stage $stage has completed, so not allowing" +
+ s" attempt number $attemptNumber of partition $partition to commit")
false
}
}
+
+ private def attemptFailed(
+ stageState: StageState,
+ partition: PartitionId,
+ attempt: TaskAttemptNumber): Boolean = synchronized {
+ stageState.failures.get(partition).exists(_.contains(attempt))
+ }
}
private[spark] object OutputCommitCoordinator {