aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-19 14:15:28 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-19 14:15:28 -0700
commitf3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41 (patch)
tree040704e897ed4a9696a46d7a67779471baf6a778 /sql
parent1f4c4fe6dfd8cc52b5fddfd67a31a77edbb1a036 (diff)
downloadspark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.tar.gz
spark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.tar.bz2
spark-f3ff4c41d2e32bd0f2419d1c9c68fcd0c2593e41.zip
[SPARK-9899] [SQL] Disables customized output committer when speculation is on
Speculation hates direct output committer, as there are multiple corner cases that may cause data corruption and/or data loss. Please see this [PR comment] [1] for more details. [1]: https://github.com/apache/spark/pull/8191#issuecomment-131598385 Author: Cheng Lian <lian@databricks.com> Closes #8317 from liancheng/spark-9899/speculation-hates-direct-output-committer.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala34
2 files changed, 49 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index e0147079e6..78f48a5cd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -58,6 +58,9 @@ private[sql] abstract class BaseWriterContainer(
// This is only used on driver side.
@transient private val jobContext: JobContext = job
+ private val speculationEnabled: Boolean =
+ relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
+
// The following fields are initialized and used on both driver and executor side.
@transient protected var outputCommitter: OutputCommitter = _
@transient private var jobId: JobID = _
@@ -126,10 +129,21 @@ private[sql] abstract class BaseWriterContainer(
// associated with the file output format since it is not safe to use a custom
// committer for appending. For example, in S3, direct parquet output committer may
// leave partial data in the destination dir when the the appending job fails.
+ //
+ // See SPARK-8578 for more details
logInfo(
- s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
+ s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
+ } else if (speculationEnabled) {
+ // When speculation is enabled, it's not safe to use customized output committer classes,
+ // especially direct output committers (e.g. `DirectParquetOutputCommitter`).
+ //
+ // See SPARK-9899 for more details.
+ logInfo(
+ s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
+ "because spark.speculation is configured to be true.")
+ defaultOutputCommitter
} else {
val committerClass = context.getConfiguration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 8d0d9218dd..5bbca14bad 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -570,6 +570,40 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
}
}
+
+ test("SPARK-9899 Disable customized output committer when speculation is on") {
+ val clonedConf = new Configuration(configuration)
+ val speculationEnabled =
+ sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
+
+ try {
+ withTempPath { dir =>
+ // Enables task speculation
+ sqlContext.sparkContext.conf.set("spark.speculation", "true")
+
+ // Uses a customized output committer which always fails
+ configuration.set(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key,
+ classOf[AlwaysFailOutputCommitter].getName)
+
+ // Code below shouldn't throw since customized output committer should be disabled.
+ val df = sqlContext.range(10).coalesce(1)
+ df.write.format(dataSourceName).save(dir.getCanonicalPath)
+ checkAnswer(
+ sqlContext
+ .read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .load(dir.getCanonicalPath),
+ df)
+ }
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+ sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
+ }
+ }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when