aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala59
1 files changed, 35 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 08d220b40b..83d87b548a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -48,25 +48,29 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private type StageId = Int
private type PartitionId = Int
private type TaskAttemptNumber = Int
-
private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1
+ private case class StageState(numPartitions: Int) {
+ val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
+ val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
+ }
/**
- * Map from active stages's id => partition id => task attempt with exclusive lock on committing
- * output for that partition.
+ * Map from active stages's id => authorized task attempts for each partition id, which hold an
+ * exclusive lock on committing task output for that partition, as well as any known failed
+ * attempts in the stage.
*
* Entries are added to the top-level map when stages start and are removed they finish
* (either successfully or unsuccessfully).
*
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
- private val authorizedCommittersByStage = mutable.Map[StageId, Array[TaskAttemptNumber]]()
+ private val stageStates = mutable.Map[StageId, StageState]()
/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
*/
def isEmpty: Boolean = {
- authorizedCommittersByStage.isEmpty
+ stageStates.isEmpty
}
/**
@@ -105,19 +109,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e.
* the maximum possible value of `context.partitionId`).
*/
- private[scheduler] def stageStart(
- stage: StageId,
- maxPartitionId: Int): Unit = {
- val arr = new Array[TaskAttemptNumber](maxPartitionId + 1)
- java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER)
- synchronized {
- authorizedCommittersByStage(stage) = arr
- }
+ private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized {
+ stageStates(stage) = new StageState(maxPartitionId + 1)
}
// Called by DAGScheduler
private[scheduler] def stageEnd(stage: StageId): Unit = synchronized {
- authorizedCommittersByStage.remove(stage)
+ stageStates.remove(stage)
}
// Called by DAGScheduler
@@ -126,7 +124,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
partition: PartitionId,
attemptNumber: TaskAttemptNumber,
reason: TaskEndReason): Unit = synchronized {
- val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
+ val stageState = stageStates.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
return
})
@@ -137,10 +135,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
s"attempt: $attemptNumber")
case otherReason =>
- if (authorizedCommitters(partition) == attemptNumber) {
+ // Mark the attempt as failed to blacklist from future commit protocol
+ stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber
+ if (stageState.authorizedCommitters(partition) == attemptNumber) {
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
- authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
+ stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER
}
}
}
@@ -149,7 +149,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
if (isDriver) {
coordinatorRef.foreach(_ send StopCoordinator)
coordinatorRef = None
- authorizedCommittersByStage.clear()
+ stageStates.clear()
}
}
@@ -158,13 +158,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
stage: StageId,
partition: PartitionId,
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
- authorizedCommittersByStage.get(stage) match {
- case Some(authorizedCommitters) =>
- authorizedCommitters(partition) match {
+ stageStates.get(stage) match {
+ case Some(state) if attemptFailed(state, partition, attemptNumber) =>
+ logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," +
+ s" partition=$partition as task attempt $attemptNumber has already failed.")
+ false
+ case Some(state) =>
+ state.authorizedCommitters(partition) match {
case NO_AUTHORIZED_COMMITTER =>
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
s"partition=$partition")
- authorizedCommitters(partition) = attemptNumber
+ state.authorizedCommitters(partition) = attemptNumber
true
case existingCommitter =>
// Coordinator should be idempotent when receiving AskPermissionToCommit.
@@ -181,11 +185,18 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
}
}
case None =>
- logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
- s"partition $partition to commit")
+ logDebug(s"Stage $stage has completed, so not allowing" +
+ s" attempt number $attemptNumber of partition $partition to commit")
false
}
}
+
+ private def attemptFailed(
+ stageState: StageState,
+ partition: PartitionId,
+ attempt: TaskAttemptNumber): Boolean = synchronized {
+ stageState.failures.get(partition).exists(_.contains(attempt))
+ }
}
private[spark] object OutputCommitCoordinator {