diff options
author | Cheng Lian <lian@databricks.com> | 2015-08-19 14:15:28 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-08-19 14:15:28 -0700 |
commit | f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41 (patch) | |
tree | 040704e897ed4a9696a46d7a67779471baf6a778 /sql | |
parent | 1f4c4fe6dfd8cc52b5fddfd67a31a77edbb1a036 (diff) | |
download | spark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.tar.gz spark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.tar.bz2 spark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.zip |
[SPARK-9899] [SQL] Disables customized output committer when speculation is on
Speculation hates direct output committer, as there are multiple corner cases that may cause data corruption and/or data loss.
Please see this [PR comment] [1] for more details.
[1]: https://github.com/apache/spark/pull/8191#issuecomment-131598385
Author: Cheng Lian <lian@databricks.com>
Closes #8317 from liancheng/spark-9899/speculation-hates-direct-output-committer.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala | 16 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala | 34 |
2 files changed, 49 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index e0147079e6..78f48a5cd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -58,6 +58,9 @@ private[sql] abstract class BaseWriterContainer( // This is only used on driver side. @transient private val jobContext: JobContext = job + private val speculationEnabled: Boolean = + relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) + // The following fields are initialized and used on both driver and executor side. @transient protected var outputCommitter: OutputCommitter = _ @transient private var jobId: JobID = _ @@ -126,10 +129,21 @@ private[sql] abstract class BaseWriterContainer( // associated with the file output format since it is not safe to use a custom // committer for appending. For example, in S3, direct parquet output committer may // leave partial data in the destination dir when the the appending job fails. + // + // See SPARK-8578 for more details logInfo( - s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " + + s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + "for appending.") defaultOutputCommitter + } else if (speculationEnabled) { + // When speculation is enabled, it's not safe to use customized output committer classes, + // especially direct output committers (e.g. `DirectParquetOutputCommitter`). + // + // See SPARK-9899 for more details. + logInfo( + s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + + "because spark.speculation is configured to be true.") + defaultOutputCommitter } else { val committerClass = context.getConfiguration.getClass( SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 8d0d9218dd..5bbca14bad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -570,6 +570,40 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t") } } + + test("SPARK-9899 Disable customized output committer when speculation is on") { + val clonedConf = new Configuration(configuration) + val speculationEnabled = + sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) + + try { + withTempPath { dir => + // Enables task speculation + sqlContext.sparkContext.conf.set("spark.speculation", "true") + + // Uses a customized output committer which always fails + configuration.set( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[AlwaysFailOutputCommitter].getName) + + // Code below shouldn't throw since customized output committer should be disabled. + val df = sqlContext.range(10).coalesce(1) + df.write.format(dataSourceName).save(dir.getCanonicalPath) + checkAnswer( + sqlContext + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .load(dir.getCanonicalPath), + df) + } + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString) + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when |