aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-07 17:49:39 -0700
committerAndrew Or <andrew@databricks.com>2016-04-07 17:49:39 -0700
commit3e29e372ff518827bae9dcd26087946fde476843 (patch)
tree3abd3e22678fb63e347832e1241bf94fd2a8e6b9 /core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
parent30e980ad8e6443dddd54f3c2d48b3904499545cf (diff)
downloadspark-3e29e372ff518827bae9dcd26087946fde476843.tar.gz
spark-3e29e372ff518827bae9dcd26087946fde476843.tar.bz2
spark-3e29e372ff518827bae9dcd26087946fde476843.zip
[SPARK-14468] Always enable OutputCommitCoordinator
## What changes were proposed in this pull request? `OutputCommitCoordinator` was introduced to deal with concurrent task attempts racing to write output, leading to data loss or corruption. For more detail, read the [JIRA description](https://issues.apache.org/jira/browse/SPARK-14468). Before: `OutputCommitCoordinator` is enabled only if speculation is enabled. After: `OutputCommitCoordinator` is always enabled. Users may still disable this through `spark.hadoop.outputCommitCoordination.enabled`, but they really shouldn't... ## How was this patch tested? `OutputCommitCoordinator*Suite` Author: Andrew Or <andrew@databricks.com> Closes #12244 from andrewor14/always-occ.
Diffstat (limited to 'core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala16
1 files changed, 6 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 891facba33..607283a306 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -33,11 +33,8 @@ object SparkHadoopMapRedUtil extends Logging {
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
* details).
*
- * Output commit coordinator is only contacted when the following two configurations are both set
- * to `true`:
- *
- * - `spark.speculation`
- * - `spark.hadoop.outputCommitCoordination.enabled`
+ * Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled`
+ * is set to true (which is the default).
*/
def commitTask(
committer: MapReduceOutputCommitter,
@@ -64,11 +61,10 @@ object SparkHadoopMapRedUtil extends Logging {
if (committer.needsTaskCommit(mrTaskContext)) {
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
- // We only need to coordinate with the driver if there are multiple concurrent task
- // attempts, which should only occur if speculation is enabled
- val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
- // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
- sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
+ // We only need to coordinate with the driver if there are concurrent task attempts.
+ // Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029).
+ // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs.
+ sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true)
}
if (shouldCoordinateWithDriver) {