1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mapred
import java.io.IOException
import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter}
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
object SparkHadoopMapRedUtil extends Logging {
/**
* Commits a task output. Before committing the task output, we need to know whether some other
* task attempt might be racing to commit the same output partition. Therefore, coordinate with
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
* details).
*
* Output commit coordinator is only contacted when the following two configurations are both set
* to `true`:
*
* - `spark.speculation`
* - `spark.hadoop.outputCommitCoordination.enabled`
*/
def commitTask(
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
jobId: Int,
splitId: Int): Unit = {
val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
// Called after we have decided to commit
def performCommit(): Unit = {
try {
committer.commitTask(mrTaskContext)
logInfo(s"$mrTaskAttemptID: Committed")
} catch {
case cause: IOException =>
logError(s"Error committing the output of task: $mrTaskAttemptID", cause)
committer.abortTask(mrTaskContext)
throw cause
}
}
// First, check whether the task's output has already been committed by some other attempt
if (committer.needsTaskCommit(mrTaskContext)) {
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
// We only need to coordinate with the driver if there are multiple concurrent task
// attempts, which should only occur if speculation is enabled
val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
}
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val taskAttemptNumber = TaskContext.get().attemptNumber()
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
if (canCommit) {
performCommit()
} else {
val message =
s"$mrTaskAttemptID: Not committed because the driver did not authorize commit"
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
performCommit()
}
} else {
// Some other attempt committed the output, so we do nothing and signal success
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
}
}
}
|