aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
10 files changed, 52 insertions, 52 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index ae5926dd53..ac6eaab20d 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -104,8 +104,7 @@ class SparkHadoopWriter(jobConf: JobConf)
}
def commit() {
- SparkHadoopMapRedUtil.commitTask(
- getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
+ SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
}
def commitJob() {
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 2ae878b3e6..7137246bc3 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -193,9 +193,12 @@ case object TaskKilled extends TaskFailedReason {
* Task requested the driver to commit, but was denied.
*/
@DeveloperApi
-case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
+case class TaskCommitDenied(
+ jobID: Int,
+ partitionID: Int,
+ attemptNumber: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
- s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+ s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
/**
* If a task failed because its attempt to commit was denied, do not count this failure
* towards failing the stage. This is intended to prevent spurious stage failures in cases
diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
index f47d7ef511..7d84889a2d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
@@ -26,8 +26,8 @@ private[spark] class CommitDeniedException(
msg: String,
jobID: Int,
splitID: Int,
- attemptID: Int)
+ attemptNumber: Int)
extends Exception(msg) {
- def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptID)
+ def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
}
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index f405b732e4..f7298e8d5c 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -91,8 +91,7 @@ object SparkHadoopMapRedUtil extends Logging {
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
jobId: Int,
- splitId: Int,
- attemptId: Int): Unit = {
+ splitId: Int): Unit = {
val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
@@ -122,7 +121,8 @@ object SparkHadoopMapRedUtil extends Logging {
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
- val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
+ val taskAttemptNumber = TaskContext.get().attemptNumber()
+ val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
if (canCommit) {
performCommit()
@@ -132,7 +132,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
- throw new CommitDeniedException(message, jobId, splitId, attemptId)
+ throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
@@ -143,16 +143,4 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
}
}
-
- def commitTask(
- committer: MapReduceOutputCommitter,
- mrTaskContext: MapReduceTaskAttemptContext,
- sparkTaskContext: TaskContext): Unit = {
- commitTask(
- committer,
- mrTaskContext,
- sparkTaskContext.stageId(),
- sparkTaskContext.partitionId(),
- sparkTaskContext.attemptNumber())
- }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b4f90e8347..3c9a66e504 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1128,8 +1128,11 @@ class DAGScheduler(
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
- outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
- event.taskInfo.attempt, event.reason)
+ outputCommitCoordinator.taskCompleted(
+ stageId,
+ task.partitionId,
+ event.taskInfo.attemptNumber, // this is a task attempt number
+ event.reason)
// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 5d926377ce..add0dedc03 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, RpcEndpoint
private sealed trait OutputCommitCoordinationMessage extends Serializable
private case object StopCoordinator extends OutputCommitCoordinationMessage
-private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
+private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)
/**
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
@@ -44,8 +44,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
var coordinatorRef: Option[RpcEndpointRef] = None
private type StageId = Int
- private type PartitionId = Long
- private type TaskAttemptId = Long
+ private type PartitionId = Int
+ private type TaskAttemptNumber = Int
/**
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
@@ -57,7 +57,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
- private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
+ private type CommittersByStageMap =
+ mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
@@ -75,14 +76,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
*
* @param stage the stage number
* @param partition the partition number
- * @param attempt a unique identifier for this task attempt
+ * @param attemptNumber how many times this task has been attempted
+ * (see [[TaskContext.attemptNumber()]])
* @return true if this task is authorized to commit, false otherwise
*/
def canCommit(
stage: StageId,
partition: PartitionId,
- attempt: TaskAttemptId): Boolean = {
- val msg = AskPermissionToCommitOutput(stage, partition, attempt)
+ attemptNumber: TaskAttemptNumber): Boolean = {
+ val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
coordinatorRef match {
case Some(endpointRef) =>
endpointRef.askWithRetry[Boolean](msg)
@@ -95,7 +97,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
// Called by DAGScheduler
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
- authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptId]()
+ authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
}
// Called by DAGScheduler
@@ -107,7 +109,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def taskCompleted(
stage: StageId,
partition: PartitionId,
- attempt: TaskAttemptId,
+ attemptNumber: TaskAttemptNumber,
reason: TaskEndReason): Unit = synchronized {
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
@@ -117,12 +119,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
case Success =>
// The task output has been committed successfully
case denied: TaskCommitDenied =>
- logInfo(
- s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
+ logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
+ s"attempt: $attemptNumber")
case otherReason =>
- if (authorizedCommitters.get(partition).exists(_ == attempt)) {
- logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
- s" clearing lock")
+ if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
+ logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
+ s"partition=$partition) failed; clearing lock")
authorizedCommitters.remove(partition)
}
}
@@ -140,21 +142,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def handleAskPermissionToCommit(
stage: StageId,
partition: PartitionId,
- attempt: TaskAttemptId): Boolean = synchronized {
+ attemptNumber: TaskAttemptNumber): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters.get(partition) match {
case Some(existingCommitter) =>
- logDebug(s"Denying $attempt to commit for stage=$stage, partition=$partition; " +
- s"existingCommitter = $existingCommitter")
+ logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
+ s"partition=$partition; existingCommitter = $existingCommitter")
false
case None =>
- logDebug(s"Authorizing $attempt to commit for stage=$stage, partition=$partition")
- authorizedCommitters(partition) = attempt
+ logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
+ s"partition=$partition")
+ authorizedCommitters(partition) = attemptNumber
true
}
case None =>
- logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
+ logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
+ s"partition $partition to commit")
false
}
}
@@ -174,9 +178,9 @@ private[spark] object OutputCommitCoordinator {
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
+ case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
context.reply(
- outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
+ outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 132a9ced77..f113c2b1b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
- val attempt: Int,
+ val attemptNumber: Int,
val launchTime: Long,
val executorId: String,
val host: String,
@@ -95,7 +95,10 @@ class TaskInfo(
}
}
- def id: String = s"$index.$attempt"
+ @deprecated("Use attemptNumber", "1.6.0")
+ def attempt: Int = attemptNumber
+
+ def id: String = s"$index.$attemptNumber"
def duration: Long = {
if (!finished) {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 390c136df7..24a0b52206 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -127,7 +127,7 @@ private[v1] object AllStagesResource {
new TaskData(
taskId = uiData.taskInfo.taskId,
index = uiData.taskInfo.index,
- attempt = uiData.taskInfo.attempt,
+ attempt = uiData.taskInfo.attemptNumber,
launchTime = new Date(uiData.taskInfo.launchTime),
executorId = uiData.taskInfo.executorId,
host = uiData.taskInfo.host,
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 2b71f55b7b..712782d27b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -621,7 +621,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
serializationTimeProportionPos + serializationTimeProportion
val index = taskInfo.index
- val attempt = taskInfo.attempt
+ val attempt = taskInfo.attemptNumber
val svgTag =
if (totalExecutionTime == 0) {
@@ -967,7 +967,7 @@ private[ui] class TaskDataSource(
new TaskTableRowData(
info.index,
info.taskId,
- info.attempt,
+ info.attemptNumber,
info.speculative,
info.status,
info.taskLocality.toString,
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 24f78744ad..99614a786b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -266,7 +266,7 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
- ("Attempt" -> taskInfo.attempt) ~
+ ("Attempt" -> taskInfo.attemptNumber) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~