diff options
author | Cheng Lian <lian@databricks.com> | 2015-08-19 14:15:28 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-08-19 14:26:11 -0700 |
commit | b32a31d64588f2fd3926f09482e2bfe968370fc0 (patch) | |
tree | e37401a69af259b17a0182bffb4a64cf4602ae90 /sql | |
parent | d9dfd43d463cd5e3c9e72197850382ad8897b8ac (diff) | |
download | spark-b32a31d64588f2fd3926f09482e2bfe968370fc0.tar.gz spark-b32a31d64588f2fd3926f09482e2bfe968370fc0.tar.bz2 spark-b32a31d64588f2fd3926f09482e2bfe968370fc0.zip |
[SPARK-9899] [SQL] Disables customized output committer when speculation is on
Speculation hates direct output committer, as there are multiple corner cases that may cause data corruption and/or data loss.
Please see this [PR comment] [1] for more details.
[1]: https://github.com/apache/spark/pull/8191#issuecomment-131598385
Author: Cheng Lian <lian@databricks.com>
Closes #8317 from liancheng/spark-9899/speculation-hates-direct-output-committer.
(cherry picked from commit f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41)
Signed-off-by: Michael Armbrust <michael@databricks.com>
Conflicts:
sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala | 16 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala | 34 |
2 files changed, 49 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 427c399f87..11c97e507f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -58,6 +58,9 @@ private[sql] abstract class BaseWriterContainer( // This is only used on driver side. @transient private val jobContext: JobContext = job + private val speculationEnabled: Boolean = + relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) + // The following fields are initialized and used on both driver and executor side. @transient protected var outputCommitter: OutputCommitter = _ @transient private var jobId: JobID = _ @@ -126,10 +129,21 @@ private[sql] abstract class BaseWriterContainer( // associated with the file output format since it is not safe to use a custom // committer for appending. For example, in S3, direct parquet output committer may // leave partial data in the destination dir when the the appending job fails. + // + // See SPARK-8578 for more details logInfo( - s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " + + s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + "for appending.") defaultOutputCommitter + } else if (speculationEnabled) { + // When speculation is enabled, it's not safe to use customized output committer classes, + // especially direct output committers (e.g. `DirectParquetOutputCommitter`). + // + // See SPARK-9899 for more details. + logInfo( + s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + + "because spark.speculation is configured to be true.") + defaultOutputCommitter } else { val committerClass = context.getConfiguration.getClass( SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index af445626fb..33d8730cad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -553,6 +553,40 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } + + test("SPARK-9899 Disable customized output committer when speculation is on") { + val clonedConf = new Configuration(configuration) + val speculationEnabled = + sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) + + try { + withTempPath { dir => + // Enables task speculation + sqlContext.sparkContext.conf.set("spark.speculation", "true") + + // Uses a customized output committer which always fails + configuration.set( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[AlwaysFailOutputCommitter].getName) + + // Code below shouldn't throw since customized output committer should be disabled. + val df = sqlContext.range(10).coalesce(1) + df.write.format(dataSourceName).save(dir.getCanonicalPath) + checkAnswer( + sqlContext + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .load(dir.getCanonicalPath), + df) + } + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString) + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when |