diff options
author | Cheng Lian <lian@databricks.com> | 2015-08-19 14:15:28 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-08-19 14:15:28 -0700 |
commit | f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41 (patch) | |
tree | 040704e897ed4a9696a46d7a67779471baf6a778 /sql/hive | |
parent | 1f4c4fe6dfd8cc52b5fddfd67a31a77edbb1a036 (diff) | |
download | spark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.tar.gz spark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.tar.bz2 spark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.zip |
[SPARK-9899] [SQL] Disables customized output committer when speculation is on
Speculation hates direct output committer, as there are multiple corner cases that may cause data corruption and/or data loss.
Please see this [PR comment] [1] for more details.
[1]: https://github.com/apache/spark/pull/8191#issuecomment-131598385
Author: Cheng Lian <lian@databricks.com>
Closes #8317 from liancheng/spark-9899/speculation-hates-direct-output-committer.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 8d0d9218dd..5bbca14bad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -570,6 +570,40 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t") } } + + test("SPARK-9899 Disable customized output committer when speculation is on") { + val clonedConf = new Configuration(configuration) + val speculationEnabled = + sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) + + try { + withTempPath { dir => + // Enables task speculation + sqlContext.sparkContext.conf.set("spark.speculation", "true") + + // Uses a customized output committer which always fails + configuration.set( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[AlwaysFailOutputCommitter].getName) + + // Code below shouldn't throw since customized output committer should be disabled. + val df = sqlContext.range(10).coalesce(1) + df.write.format(dataSourceName).save(dir.getCanonicalPath) + checkAnswer( + sqlContext + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .load(dir.getCanonicalPath), + df) + } + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString) + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when |