aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-19 14:15:28 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-19 14:15:28 -0700
commitf3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41 (patch)
tree040704e897ed4a9696a46d7a67779471baf6a778 /sql/hive
parent1f4c4fe6dfd8cc52b5fddfd67a31a77edbb1a036 (diff)
downloadspark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.tar.gz
spark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.tar.bz2
spark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.zip
[SPARK-9899] [SQL] Disables customized output committer when speculation is on
Speculation hates direct output committer, as there are multiple corner cases that may cause data corruption and/or data loss. Please see this [PR comment] [1] for more details. [1]: https://github.com/apache/spark/pull/8191#issuecomment-131598385 Author: Cheng Lian <lian@databricks.com> Closes #8317 from liancheng/spark-9899/speculation-hates-direct-output-committer.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala34
1 files changed, 34 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 8d0d9218dd..5bbca14bad 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -570,6 +570,40 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
}
}
+
+ test("SPARK-9899 Disable customized output committer when speculation is on") {
+ val clonedConf = new Configuration(configuration)
+ val speculationEnabled =
+ sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
+
+ try {
+ withTempPath { dir =>
+ // Enables task speculation
+ sqlContext.sparkContext.conf.set("spark.speculation", "true")
+
+ // Uses a customized output committer which always fails
+ configuration.set(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key,
+ classOf[AlwaysFailOutputCommitter].getName)
+
+ // Code below shouldn't throw since customized output committer should be disabled.
+ val df = sqlContext.range(10).coalesce(1)
+ df.write.format(dataSourceName).save(dir.getCanonicalPath)
+ checkAnswer(
+ sqlContext
+ .read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .load(dir.getCanonicalPath),
+ df)
+ }
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+ sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
+ }
+ }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when