aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-09-15 17:11:21 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-09-15 17:11:21 -0700
commit38700ea40cb1dd0805cc926a9e629f93c99527ad (patch)
treea39eecaab229b50fed9a5c69ea7c7f75c43ff5ea /sql
parent99ecfa5945aedaa71765ecf5cce59964ae52eebe (diff)
downloadspark-38700ea40cb1dd0805cc926a9e629f93c99527ad.tar.gz
spark-38700ea40cb1dd0805cc926a9e629f93c99527ad.tar.bz2
spark-38700ea40cb1dd0805cc926a9e629f93c99527ad.zip
[SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in OutputCommitCoordinator
When speculative execution is enabled, consider a scenario where the authorized committer of a particular output partition fails during the OutputCommitter.commitTask() call. In this case, the OutputCommitCoordinator is supposed to release that committer's exclusive lock on committing once that task fails. However, due to a unit mismatch (we used task attempt number in one place and task attempt id in another) the lock will not be released, causing Spark to go into an infinite retry loop. This bug was masked by the fact that the OutputCommitCoordinator does not have enough end-to-end tests (the current tests use many mocks). Other factors contributing to this bug are the fact that we have many similarly-named identifiers that have different semantics but the same data types (e.g. attemptNumber and taskAttemptId, with inconsistent variable naming which makes them difficult to distinguish). This patch adds a regression test and fixes this bug by always using task attempt numbers throughout this code. Author: Josh Rosen <joshrosen@databricks.com> Closes #8544 from JoshRosen/SPARK-10381.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala2
3 files changed, 4 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index f8ef674ed2..cfd64c1d9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -198,8 +198,7 @@ private[sql] abstract class BaseWriterContainer(
}
def commitTask(): Unit = {
- SparkHadoopMapRedUtil.commitTask(
- outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
+ SparkHadoopMapRedUtil.commitTask(outputCommitter, taskAttemptContext, jobId.getId, taskId.getId)
}
def abortTask(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 2bbb41ca77..7a46c69a05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -54,9 +54,9 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
details = ""
)
- private def createTaskInfo(taskId: Int, attempt: Int): TaskInfo = new TaskInfo(
+ private def createTaskInfo(taskId: Int, attemptNumber: Int): TaskInfo = new TaskInfo(
taskId = taskId,
- attempt = attempt,
+ attemptNumber = attemptNumber,
// The following fields are not used in tests
index = 0,
launchTime = 0,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 4ca8042d22..c8d6b71804 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -121,7 +121,7 @@ private[hive] class SparkHiveWriterContainer(
}
protected def commit() {
- SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, attemptID)
+ SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
}
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {