aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala48
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala68
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala2
-rw-r--r--project/MimaExcludes.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala2
17 files changed, 174 insertions, 69 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index ae5926dd53..ac6eaab20d 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -104,8 +104,7 @@ class SparkHadoopWriter(jobConf: JobConf)
}
def commit() {
- SparkHadoopMapRedUtil.commitTask(
- getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
+ SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
}
def commitJob() {
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 2ae878b3e6..7137246bc3 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -193,9 +193,12 @@ case object TaskKilled extends TaskFailedReason {
* Task requested the driver to commit, but was denied.
*/
@DeveloperApi
-case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
+case class TaskCommitDenied(
+ jobID: Int,
+ partitionID: Int,
+ attemptNumber: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
- s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+ s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
/**
* If a task failed because its attempt to commit was denied, do not count this failure
* towards failing the stage. This is intended to prevent spurious stage failures in cases
diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
index f47d7ef511..7d84889a2d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
@@ -26,8 +26,8 @@ private[spark] class CommitDeniedException(
msg: String,
jobID: Int,
splitID: Int,
- attemptID: Int)
+ attemptNumber: Int)
extends Exception(msg) {
- def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptID)
+ def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
}
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index f405b732e4..f7298e8d5c 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -91,8 +91,7 @@ object SparkHadoopMapRedUtil extends Logging {
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
jobId: Int,
- splitId: Int,
- attemptId: Int): Unit = {
+ splitId: Int): Unit = {
val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
@@ -122,7 +121,8 @@ object SparkHadoopMapRedUtil extends Logging {
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
- val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
+ val taskAttemptNumber = TaskContext.get().attemptNumber()
+ val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
if (canCommit) {
performCommit()
@@ -132,7 +132,7 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
- throw new CommitDeniedException(message, jobId, splitId, attemptId)
+ throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
@@ -143,16 +143,4 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
}
}
-
- def commitTask(
- committer: MapReduceOutputCommitter,
- mrTaskContext: MapReduceTaskAttemptContext,
- sparkTaskContext: TaskContext): Unit = {
- commitTask(
- committer,
- mrTaskContext,
- sparkTaskContext.stageId(),
- sparkTaskContext.partitionId(),
- sparkTaskContext.attemptNumber())
- }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b4f90e8347..3c9a66e504 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1128,8 +1128,11 @@ class DAGScheduler(
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
- outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
- event.taskInfo.attempt, event.reason)
+ outputCommitCoordinator.taskCompleted(
+ stageId,
+ task.partitionId,
+ event.taskInfo.attemptNumber, // this is a task attempt number
+ event.reason)
// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 5d926377ce..add0dedc03 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, RpcEndpoint
private sealed trait OutputCommitCoordinationMessage extends Serializable
private case object StopCoordinator extends OutputCommitCoordinationMessage
-private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
+private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)
/**
* Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
@@ -44,8 +44,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
var coordinatorRef: Option[RpcEndpointRef] = None
private type StageId = Int
- private type PartitionId = Long
- private type TaskAttemptId = Long
+ private type PartitionId = Int
+ private type TaskAttemptNumber = Int
/**
* Map from active stages's id => partition id => task attempt with exclusive lock on committing
@@ -57,7 +57,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
* Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
*/
private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
- private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
+ private type CommittersByStageMap =
+ mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
/**
* Returns whether the OutputCommitCoordinator's internal data structures are all empty.
@@ -75,14 +76,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
*
* @param stage the stage number
* @param partition the partition number
- * @param attempt a unique identifier for this task attempt
+ * @param attemptNumber how many times this task has been attempted
+ * (see [[TaskContext.attemptNumber()]])
* @return true if this task is authorized to commit, false otherwise
*/
def canCommit(
stage: StageId,
partition: PartitionId,
- attempt: TaskAttemptId): Boolean = {
- val msg = AskPermissionToCommitOutput(stage, partition, attempt)
+ attemptNumber: TaskAttemptNumber): Boolean = {
+ val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
coordinatorRef match {
case Some(endpointRef) =>
endpointRef.askWithRetry[Boolean](msg)
@@ -95,7 +97,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
// Called by DAGScheduler
private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
- authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptId]()
+ authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
}
// Called by DAGScheduler
@@ -107,7 +109,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def taskCompleted(
stage: StageId,
partition: PartitionId,
- attempt: TaskAttemptId,
+ attemptNumber: TaskAttemptNumber,
reason: TaskEndReason): Unit = synchronized {
val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
logDebug(s"Ignoring task completion for completed stage")
@@ -117,12 +119,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
case Success =>
// The task output has been committed successfully
case denied: TaskCommitDenied =>
- logInfo(
- s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
+ logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
+ s"attempt: $attemptNumber")
case otherReason =>
- if (authorizedCommitters.get(partition).exists(_ == attempt)) {
- logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
- s" clearing lock")
+ if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
+ logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
+ s"partition=$partition) failed; clearing lock")
authorizedCommitters.remove(partition)
}
}
@@ -140,21 +142,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
private[scheduler] def handleAskPermissionToCommit(
stage: StageId,
partition: PartitionId,
- attempt: TaskAttemptId): Boolean = synchronized {
+ attemptNumber: TaskAttemptNumber): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters.get(partition) match {
case Some(existingCommitter) =>
- logDebug(s"Denying $attempt to commit for stage=$stage, partition=$partition; " +
- s"existingCommitter = $existingCommitter")
+ logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
+ s"partition=$partition; existingCommitter = $existingCommitter")
false
case None =>
- logDebug(s"Authorizing $attempt to commit for stage=$stage, partition=$partition")
- authorizedCommitters(partition) = attempt
+ logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
+ s"partition=$partition")
+ authorizedCommitters(partition) = attemptNumber
true
}
case None =>
- logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
+ logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
+ s"partition $partition to commit")
false
}
}
@@ -174,9 +178,9 @@ private[spark] object OutputCommitCoordinator {
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
+ case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
context.reply(
- outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
+ outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 132a9ced77..f113c2b1b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
- val attempt: Int,
+ val attemptNumber: Int,
val launchTime: Long,
val executorId: String,
val host: String,
@@ -95,7 +95,10 @@ class TaskInfo(
}
}
- def id: String = s"$index.$attempt"
+ @deprecated("Use attemptNumber", "1.6.0")
+ def attempt: Int = attemptNumber
+
+ def id: String = s"$index.$attemptNumber"
def duration: Long = {
if (!finished) {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 390c136df7..24a0b52206 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -127,7 +127,7 @@ private[v1] object AllStagesResource {
new TaskData(
taskId = uiData.taskInfo.taskId,
index = uiData.taskInfo.index,
- attempt = uiData.taskInfo.attempt,
+ attempt = uiData.taskInfo.attemptNumber,
launchTime = new Date(uiData.taskInfo.launchTime),
executorId = uiData.taskInfo.executorId,
host = uiData.taskInfo.host,
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 2b71f55b7b..712782d27b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -621,7 +621,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
serializationTimeProportionPos + serializationTimeProportion
val index = taskInfo.index
- val attempt = taskInfo.attempt
+ val attempt = taskInfo.attemptNumber
val svgTag =
if (totalExecutionTime == 0) {
@@ -967,7 +967,7 @@ private[ui] class TaskDataSource(
new TaskTableRowData(
info.index,
info.taskId,
- info.attempt,
+ info.attemptNumber,
info.speculative,
info.status,
info.taskLocality.toString,
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 24f78744ad..99614a786b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -266,7 +266,7 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
- ("Attempt" -> taskInfo.attempt) ~
+ ("Attempt" -> taskInfo.attemptNumber) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
new file mode 100644
index 0000000000..1ae5b030f0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.{Span, Seconds}
+
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, SparkFunSuite, TaskContext}
+import org.apache.spark.util.Utils
+
+/**
+ * Integration tests for the OutputCommitCoordinator.
+ *
+ * See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
+ */
+class OutputCommitCoordinatorIntegrationSuite
+ extends SparkFunSuite
+ with LocalSparkContext
+ with Timeouts {
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ val conf = new SparkConf()
+ .set("master", "local[2,4]")
+ .set("spark.speculation", "true")
+ .set("spark.hadoop.mapred.output.committer.class",
+ classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
+ sc = new SparkContext("local[2, 4]", "test", conf)
+ }
+
+ test("exception thrown in OutputCommitter.commitTask()") {
+ // Regression test for SPARK-10381
+ failAfter(Span(60, Seconds)) {
+ val tempDir = Utils.createTempDir()
+ try {
+ sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
+ }
+}
+
+private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
+ override def commitTask(context: TaskAttemptContext): Unit = {
+ val ctx = TaskContext.get()
+ if (ctx.attemptNumber < 1) {
+ throw new java.io.FileNotFoundException("Intentional exception")
+ }
+ super.commitTask(context)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index e5ecd4b7c2..6d08d7c5b7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -63,6 +63,9 @@ import scala.language.postfixOps
* was not in SparkHadoopWriter, the tests would still pass because only one of the
* increments would be captured even though the commit in both tasks was executed
* erroneously.
+ *
+ * See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests that do
+ * not use mocks.
*/
class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
@@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
val stage: Int = 1
- val partition: Long = 2
- val authorizedCommitter: Long = 3
- val nonAuthorizedCommitter: Long = 100
+ val partition: Int = 2
+ val authorizedCommitter: Int = 3
+ val nonAuthorizedCommitter: Int = 100
outputCommitCoordinator.stageStart(stage)
- assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
- assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))
+
+ assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
+ assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
// The non-authorized committer fails
outputCommitCoordinator.taskCompleted(
- stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+ stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled)
// New tasks should still not be able to commit because the authorized committer has not failed
assert(
- !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
+ !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(
- stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+ stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled)
// A new task should now be allowed to become the authorized committer
assert(
- outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
+ outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
// There can only be one authorized committer
assert(
- !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
+ !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 47e548ef0d..143c1b901d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -499,7 +499,7 @@ class JsonProtocolSuite extends SparkFunSuite {
private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
assert(info1.taskId === info2.taskId)
assert(info1.index === info2.index)
- assert(info1.attempt === info2.attempt)
+ assert(info1.attemptNumber === info2.attemptNumber)
assert(info1.launchTime === info2.launchTime)
assert(info1.executorId === info2.executorId)
assert(info1.host === info2.host)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 46026c1e90..1c96b09585 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -45,7 +45,7 @@ object MimaExcludes {
excludePackage("org.apache.spark.sql.execution")
) ++
MimaBuild.excludeSparkClass("streaming.flume.FlumeTestUtils") ++
- MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++
+ MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++
Seq(
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.ml.classification.LogisticCostFun.this"),
@@ -53,6 +53,23 @@ object MimaExcludes {
"org.apache.spark.ml.classification.LogisticAggregator.add"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.ml.classification.LogisticAggregator.count")
+ ) ++ Seq(
+ // SPARK-10381 Fix types / units in private AskPermissionToCommitOutput RPC message.
+ // This class is marked as `private` but MiMa still seems to be confused by the change.
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.task"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$2"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.taskAttempt"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$3"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.this"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.apply")
)
case v if v.startsWith("1.5") =>
Seq(
@@ -213,6 +230,23 @@ object MimaExcludes {
// SPARK-9704 Made ProbabilisticClassifier, Identifiable, VectorUDT public APIs
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.mllib.linalg.VectorUDT.serialize")
+ ) ++ Seq(
+ // SPARK-10381 Fix types / units in private AskPermissionToCommitOutput RPC message.
+ // This class is marked as `private` but MiMa still seems to be confused by the change.
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.task"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$2"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.taskAttempt"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$3"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.this"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.scheduler.AskPermissionToCommitOutput.apply")
)
case v if v.startsWith("1.4") =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index f8ef674ed2..cfd64c1d9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -198,8 +198,7 @@ private[sql] abstract class BaseWriterContainer(
}
def commitTask(): Unit = {
- SparkHadoopMapRedUtil.commitTask(
- outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
+ SparkHadoopMapRedUtil.commitTask(outputCommitter, taskAttemptContext, jobId.getId, taskId.getId)
}
def abortTask(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 2bbb41ca77..7a46c69a05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -54,9 +54,9 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
details = ""
)
- private def createTaskInfo(taskId: Int, attempt: Int): TaskInfo = new TaskInfo(
+ private def createTaskInfo(taskId: Int, attemptNumber: Int): TaskInfo = new TaskInfo(
taskId = taskId,
- attempt = attempt,
+ attemptNumber = attemptNumber,
// The following fields are not used in tests
index = 0,
launchTime = 0,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 4ca8042d22..c8d6b71804 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -121,7 +121,7 @@ private[hive] class SparkHiveWriterContainer(
}
protected def commit() {
- SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, attemptID)
+ SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
}
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {