aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-09-15 17:11:21 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-09-15 17:11:21 -0700
commit38700ea40cb1dd0805cc926a9e629f93c99527ad (patch)
treea39eecaab229b50fed9a5c69ea7c7f75c43ff5ea
parent99ecfa5945aedaa71765ecf5cce59964ae52eebe (diff)
downloadspark-38700ea40cb1dd0805cc926a9e629f93c99527ad.tar.gz
spark-38700ea40cb1dd0805cc926a9e629f93c99527ad.tar.bz2
spark-38700ea40cb1dd0805cc926a9e629f93c99527ad.zip
[SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in OutputCommitCoordinator
When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop. This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish). This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code. Author: Josh Rosen <joshrosen@databricks.com> Closes #8544 from JoshRosen/SPARK-10381.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala68
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala2
-rw-r--r--project/MimaExcludes.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala2
17 files changed, 174 insertions, 69 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index ae5926dd53..ac6eaab20d 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -104,8 +104,7 @@ class SparkHadoopWriter(jobConf: JobConf)
}
def commit() {
- SparkHadoopMapRedUtil.commitTask(
- getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
+ SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
}
def commitJob() {
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 2ae878b3e6..7137246bc3 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -193,9 +193,12 @@ case object TaskKilled extends TaskFailedReason {
* Task requested the driver to commit, but was denied.
*/
@DeveloperApi
-case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
+case class TaskCommitDenied(
+ jobID: Int,
+ partitionID: Int,
+ attemptNumber: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
- s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+ s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
/**
* If a task failed because its attempt to commit was denied, do not count this failure
* towards failing the stage. This is intended to prevent spurious stage failures in cases
diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
index f47d7ef511..7d84889a2d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
@@ -26,8 +26,8 @@ private[spark] class CommitDeniedException(
msg: String,
jobID: Int,
splitID: Int,
- attemptID: Int)
+ attemptNumber: Int)
extends Exception(msg) {
- def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptID)
+ def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
}
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index f405b732e4..f7298e8d5c 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -91,8 +91,7 @@ object SparkHadoopMapRedUtil extends Logging {
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
jobId: Int,
- splitId: Int,
- attemptId: Int): Unit = {
+ splitId: Int): Unit = {
val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
@@ -122,7 +121,8 @@ object SparkHadoopMapRedUtil extends Logging {
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
- val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
+ val taskAttemptNumber = TaskContext.get().attemptNumber()
+ val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
if (canCommit) {
performCommit()
@@ -132,7 +132,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
- throw new CommitDeniedException(message, jobId, splitId, attemptId)
+ throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
@@ -143,16 +143,4 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
}
}
-
- def commitTask(
- committer: MapReduceOutputCommitter,
- mrTaskContext: MapReduceTaskAttemptContext,
- sparkTaskContext: TaskContext): Unit = {
- commitTask(
- committer,
- mrTaskContext,
- sparkTaskContext.stageId(),
- sparkTaskContext.partitionId(),
- sparkTaskContext.attemptNumber())
- }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b4f90e8347..3c9a66e504 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1128,8 +1128,11 @@ class DAGScheduler(
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
- outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
- event.taskInfo.attempt, event.reason)
+ outputCommitCoordinator.taskCompleted(
+ stageId,
+ task.partitionId,
+ event.taskInfo.attemptNumber, // this is a task attempt number
+ event.reason)
// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 5d926377ce..add0dedc03 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, RpcEndpoint
private sealed trait OutputCommitCoordinationMessage extends Serializable
private case object StopCoordinator extends OutputCommitCoordinationMessage
-private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
+private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)
/**
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
@@ -44,8 +44,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
var coordinatorRef: Option[RpcEndpointRef] = None
private type StageId = Int
- private type PartitionId = Long
- private type TaskAttemptId = Long
+ private type PartitionId = Int
+ private type TaskAttemptNumber = Int
/**
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
@@ -57,7 +57,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
- private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
+ private type CommittersByStageMap =
+ mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
@@ -75,14 +76,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
*
* @param stage the stage number
* @param partition the partition number
- * @param attempt a unique identifier for this task attempt
+ * @param attemptNumber how many times this task has been attempted
+ * (see [[TaskContext.attemptNumber()]])
* @return true if this task is authorized to commit, false otherwise
*/
def canCommit(
stage: StageId,
partition: PartitionId,
- attempt: TaskAttemptId): Boolean = {
- val msg = AskPermissionToCommitOutput(stage, partition, attempt)
+ attemptNumber: TaskAttemptNumber): Boolean = {
+ val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
coordinatorRef match {
case Some(endpointRef) =>
endpointRef.askWithRetry[Boolean](msg)
@@ -95,7 +97,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
// Called by DAGScheduler
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
- authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptId]()
+ authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
}
// Called by DAGScheduler
@@ -107,7 +109,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def taskCompleted(
stage: StageId,
partition: PartitionId,
- attempt: TaskAttemptId,
+ attemptNumber: TaskAttemptNumber,
reason: TaskEndReason): Unit = synchronized {
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
@@ -117,12 +119,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
case Success =>
// The task output has been committed successfully
case denied: TaskCommitDenied =>
- logInfo(
- s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
+ logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
+ s"attempt: $attemptNumber")
case otherReason =>
- if (authorizedCommitters.get(partition).exists(_ == attempt)) {
- logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
- s" clearing lock")
+ if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
+ logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
+ s"partition=$partition) failed; clearing lock")
authorizedCommitters.remove(partition)
}
}
@@ -140,21 +142,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def handleAskPermissionToCommit(
stage: StageId,
partition: PartitionId,
- attempt: TaskAttemptId): Boolean = synchronized {
+ attemptNumber: TaskAttemptNumber): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters.get(partition) match {
case Some(existingCommitter) =>
- logDebug(s"Denying $attempt to commit for stage=$stage, partition=$partition; " +
- s"existingCommitter = $existingCommitter")
+ logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
+ s"partition=$partition; existingCommitter = $existingCommitter")
false
case None =>
- logDebug(s"Authorizing $attempt to commit for stage=$stage, partition=$partition")
- authorizedCommitters(partition) = attempt
+ logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
+ s"partition=$partition")
+ authorizedCommitters(partition) = attemptNumber
true
}
case None =>
- logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
+ logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
+ s"partition $partition to commit")
false
}
}
@@ -174,9 +178,9 @@ private[spark] object OutputCommitCoordinator {
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
+ case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
context.reply(
- outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
+ outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 132a9ced77..f113c2b1b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
- val attempt: Int,
+ val attemptNumber: Int,
val launchTime: Long,
val executorId: String,
val host: String,
@@ -95,7 +95,10 @@ class TaskInfo(
}
}
- def id: String = s"$index.$attempt"
+ @deprecated("Use attemptNumber", "1.6.0")
+ def attempt: Int = attemptNumber
+
+ def id: String = s"$index.$attemptNumber"
def duration: Long = {
if (!finished) {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 390c136df7..24a0b52206 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -127,7 +127,7 @@ private[v1] object AllStagesResource {
new TaskData(
taskId = uiData.taskInfo.taskId,
index = uiData.taskInfo.index,
- attempt = uiData.taskInfo.attempt,
+ attempt = uiData.taskInfo.attemptNumber,
launchTime = new Date(uiData.taskInfo.launchTime),
executorId = uiData.taskInfo.executorId,
host = uiData.taskInfo.host,
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 2b71f55b7b..712782d27b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -621,7 +621,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
serializationTimeProportionPos + serializationTimeProportion
val index = taskInfo.index
- val attempt = taskInfo.attempt
+ val attempt = taskInfo.attemptNumber
val svgTag =
if (totalExecutionTime == 0) {
@@ -967,7 +967,7 @@ private[ui] class TaskDataSource(
new TaskTableRowData(
info.index,
info.taskId,
- info.attempt,
+ info.attemptNumber,
info.speculative,
info.status,
info.taskLocality.toString,
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 24f78744ad..99614a786b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -266,7 +266,7 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
- ("Attempt" -> taskInfo.attempt) ~
+ ("Attempt" -> taskInfo.attemptNumber) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
new file mode 100644
index 0000000000..1ae5b030f0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.{Span, Seconds}
+
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, SparkFunSuite, TaskContext}
+import org.apache.spark.util.Utils
+
+/**
+ * Integration tests for the OutputCommitCoordinator.
+ *
+ * See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
+ */
+class OutputCommitCoordinatorIntegrationSuite
+ extends SparkFunSuite
+ with LocalSparkContext
+ with Timeouts {
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ val conf = new SparkConf()
+ .set("master", "local[2,4]")
+ .set("spark.speculation", "true")
+ .set("spark.hadoop.mapred.output.committer.class",
+ classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
+ sc = new SparkContext("local[2, 4]", "test", conf)
+ }
+
+ test("exception thrown in OutputCommitter.commitTask()") {
+ // Regression test for SPARK-10381
+ failAfter(Span(60, Seconds)) {
+ val tempDir = Utils.createTempDir()
+ try {
+ sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+ }
+}
+
+private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
+ override def commitTask(context: TaskAttemptContext): Unit = {
+ val ctx = TaskContext.get()
+ if (ctx.attemptNumber < 1) {
+ throw new java.io.FileNotFoundException("Intentional exception")
+ }
+ super.commitTask(context)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index e5ecd4b7c2..6d08d7c5b7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -63,6 +63,9 @@ import scala.language.postfixOps
* was not in SparkHadoopWriter, the tests would still pass because only one of the
* increments would be captured even though the commit in both tasks was executed
* erroneously.
+ *
+ * See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests that do
+ * not use mocks.
*/
class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
@@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
val stage: Int = 1
- val partition: Long = 2
- val authorizedCommitter: Long = 3
- val nonAuthorizedCommitter: Long = 100
+ val partition: Int = 2
+ val authorizedCommitter: Int = 3
+ val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage)
- assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
- assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))
+
+ assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
+ assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
// The non-authorized committer fails
outputCommitCoordinator.taskCompleted(
- stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+ stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled)
// New tasks should still not be able to commit because the authorized committer has not failed
assert(
- !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
+ !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(
- stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+ stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled)
// A new task should now be allowed to become the authorized committer
assert(
- outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
+ outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
// There can only be one authorized committer
assert(
- !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
+ !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 47e548ef0d..143c1b901d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -499,7 +499,7 @@ class JsonProtocolSuite extends SparkFunSuite {
private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
assert(info1.taskId === info2.taskId)
assert(info1.index === info2.index)
- assert(info1.attempt === info2.attempt)
+ assert(info1.attemptNumber === info2.attemptNumber)
assert(info1.launchTime === info2.launchTime)
assert(info1.executorId === info2.executorId)
assert(info1.host === info2.host)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 46026c1e90..1c96b09585 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -45,7 +45,7 @@ object MimaExcludes {
excludePackage("org.apache.spark.sql.execution")
) ++
MimaBuild.excludeSparkClass("streaming.flume.FlumeTestUtils") ++
- MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++
+ MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++
Seq(
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.ml.classification.LogisticCostFun.this"),
@@ -53,6 +53,23 @@ object MimaExcludes {
"org.apache.spark.ml.classification.LogisticAggregator.add"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.ml.classification.LogisticAggregator.count")
+ ) ++ Seq(
+ // SPARK-10381 Fix types / units in private AskPermissionToCommitOutput RPC message.
+ // This class is marked as `private` but MiMa still seems to be confused by the change.
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.task"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$2"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.taskAttempt"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$3"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.this"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.apply")
)
case v if v.startsWith("1.5") =>
Seq(
@@ -213,6 +230,23 @@ object MimaExcludes {
// SPARK-9704 Made ProbabilisticClassifier, Identifiable, VectorUDT public APIs
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.mllib.linalg.VectorUDT.serialize")
+ ) ++ Seq(
+ // SPARK-10381 Fix types / units in private AskPermissionToCommitOutput RPC message.
+ // This class is marked as `private` but MiMa still seems to be confused by the change.
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.task"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$2"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.taskAttempt"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$3"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.this"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.apply")
)
case v if v.startsWith("1.4") =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index f8ef674ed2..cfd64c1d9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -198,8 +198,7 @@ private[sql] abstract class BaseWriterContainer(
}
def commitTask(): Unit = {
- SparkHadoopMapRedUtil.commitTask(
- outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
+ SparkHadoopMapRedUtil.commitTask(outputCommitter, taskAttemptContext, jobId.getId, taskId.getId)
}
def abortTask(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 2bbb41ca77..7a46c69a05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -54,9 +54,9 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
details = ""
)
- private def createTaskInfo(taskId: Int, attempt: Int): TaskInfo = new TaskInfo(
+ private def createTaskInfo(taskId: Int, attemptNumber: Int): TaskInfo = new TaskInfo(
taskId = taskId,
- attempt = attempt,
+ attemptNumber = attemptNumber,
// The following fields are not used in tests
index = 0,
launchTime = 0,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 4ca8042d22..c8d6b71804 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -121,7 +121,7 @@ private[hive] class SparkHiveWriterContainer(
}
protected def commit() {
- SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, attemptID)
+ SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
}
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {