aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala2
3 files changed, 8 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 891facba33..607283a306 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -33,11 +33,8 @@ object SparkHadoopMapRedUtil extends Logging {
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
* details).
*
- * Output commit coordinator is only contacted when the following two configurations are both set
- * to `true`:
- *
- * - `spark.speculation`
- * - `spark.hadoop.outputCommitCoordination.enabled`
+ * Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled`
+ * is set to true (which is the default).
*/
def commitTask(
committer: MapReduceOutputCommitter,
@@ -64,11 +61,10 @@ object SparkHadoopMapRedUtil extends Logging {
if (committer.needsTaskCommit(mrTaskContext)) {
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
- // We only need to coordinate with the driver if there are multiple concurrent task
- // attempts, which should only occur if speculation is enabled
- val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
- // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
- sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
+ // We only need to coordinate with the driver if there are concurrent task attempts.
+ // Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029).
+ // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs.
+ sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true)
}
if (shouldCoordinateWithDriver) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
index 9f41aca8a1..601f1c378c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -38,7 +38,7 @@ class OutputCommitCoordinatorIntegrationSuite
super.beforeAll()
val conf = new SparkConf()
.set("master", "local[2,4]")
- .set("spark.speculation", "true")
+ .set("spark.hadoop.outputCommitCoordination.enabled", "true")
.set("spark.hadoop.mapred.output.committer.class",
classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
sc = new SparkContext("local[2, 4]", "test", conf)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index c461da65bd..8e509de767 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -77,7 +77,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName)
- .set("spark.speculation", "true")
+ .set("spark.hadoop.outputCommitCoordination.enabled", "true")
sc = new SparkContext(conf) {
override private[spark] def createSparkEnv(
conf: SparkConf,