aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-07 00:51:45 -0700
committerReynold Xin <rxin@databricks.com>2016-04-07 00:51:45 -0700
commit9ca0760d6769199f164a661655912f028234eb1c (patch)
tree9077fffc4e74921b25bc15b2e41150f98ac7000c
parente11aa9ec5c3cdcd8ca08d2486a7208840ad77bf8 (diff)
downloadspark-9ca0760d6769199f164a661655912f028234eb1c.tar.gz
spark-9ca0760d6769199f164a661655912f028234eb1c.tar.bz2
spark-9ca0760d6769199f164a661655912f028234eb1c.zip
[SPARK-10063][SQL] Remove DirectParquetOutputCommitter
## What changes were proposed in this pull request? This patch removes DirectParquetOutputCommitter. This was initially created by Databricks as a faster way to write Parquet data to S3. However, given how the underlying S3 Hadoop implementation works, this committer only works when there are no failures. If there are multiple attempts of the same task (e.g. speculation or task failures or node failures), the output data can be corrupted. I don't think this performance optimization outweighs the correctness issue. ## How was this patch tested? Removed the related tests also. Author: Reynold Xin <rxin@databricks.com> Closes #12229 from rxin/SPARK-10063.
-rw-r--r--docs/sql-programming-guide.md33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala88
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala49
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala34
6 files changed, 5 insertions, 224 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 274a8edb0c..63310be22c 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1467,37 +1467,6 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
</td>
</tr>
<tr>
- <td><code>spark.sql.parquet.output.committer.class</code></td>
- <td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
- <td>
- <p>
- The output committer class used by Parquet. The specified class needs to be a subclass of
- <code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>. Typically, it's also a
- subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
- </p>
- <p>
- <b>Note:</b>
- <ul>
- <li>
- This option is automatically ignored if <code>spark.speculation</code> is turned on.
- </li>
- <li>
- This option must be set via Hadoop <code>Configuration</code> rather than Spark
- <code>SQLConf</code>.
- </li>
- <li>
- This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>.
- </li>
- </ul>
- </p>
- <p>
- Spark SQL comes with a builtin
- <code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more
- efficient then the default Parquet output committer when writing data to S3.
- </p>
- </td>
-</tr>
-<tr>
<td><code>spark.sql.parquet.mergeSchema</code></td>
<td><code>false</code></td>
<td>
@@ -2165,8 +2134,6 @@ options.
- In the `sql` dialect, floating point numbers are now parsed as decimal. HiveQL parsing remains
unchanged.
- The canonical name of SQL/DataFrame functions are now lower case (e.g. sum vs SUM).
- - It has been determined that using the DirectOutputCommitter when speculation is enabled is unsafe
- and thus this output committer will not be used when speculation is on, independent of configuration.
- JSON data source will not automatically load new files that are created by other applications
(i.e. files that are not inserted to the dataset through Spark SQL).
For a JSON persistent table (i.e. the metadata of the table is stored in Hive Metastore),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 233ac263aa..f6b7f0854b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -129,16 +129,17 @@ private[sql] abstract class BaseWriterContainer(
outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext)
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
- if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
- // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
+ if (outputCommitter.getClass.getName.contains("Direct")) {
+ // SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
// attempts, the task will fail because the output file is created from a prior attempt.
// This often means the most visible error to the user is misleading. Augment the error
// to tell the user to look for the actual error.
throw new SparkException("The output file already exists but this could be due to a " +
"failure from an earlier attempt. Look through the earlier logs or stage page for " +
- "the first error.\n File exists error: " + e)
+ "the first error.\n File exists error: " + e.getLocalizedMessage, e)
+ } else {
+ throw e
}
- throw e
}
}
@@ -156,15 +157,6 @@ private[sql] abstract class BaseWriterContainer(
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
- } else if (speculationEnabled) {
- // When speculation is enabled, it's not safe to use customized output committer classes,
- // especially direct output committers (e.g. `DirectParquetOutputCommitter`).
- //
- // See SPARK-9899 for more details.
- logInfo(
- s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
- "because spark.speculation is configured to be true.")
- defaultOutputCommitter
} else {
val configuration = context.getConfiguration
val committerClass = configuration.getClass(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
deleted file mode 100644
index ecadb9e7c6..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-import org.apache.parquet.Log
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
-import org.apache.parquet.hadoop.util.ContextUtil
-
-/**
- * An output committer for writing Parquet files. In stead of writing to the `_temporary` folder
- * like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the
- * destination folder. This can be useful for data stored in S3, where directory operations are
- * relatively expensive.
- *
- * To enable this output committer, users may set the "spark.sql.parquet.output.committer.class"
- * property via Hadoop [[Configuration]]. Not that this property overrides
- * "spark.sql.sources.outputCommitterClass".
- *
- * *NOTE*
- *
- * NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
- * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
- * left empty).
- */
-private[datasources] class DirectParquetOutputCommitter(
- outputPath: Path, context: TaskAttemptContext)
- extends ParquetOutputCommitter(outputPath, context) {
- val LOG = Log.getLog(classOf[ParquetOutputCommitter])
-
- override def getWorkPath: Path = outputPath
- override def abortTask(taskContext: TaskAttemptContext): Unit = {}
- override def commitTask(taskContext: TaskAttemptContext): Unit = {}
- override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
- override def setupJob(jobContext: JobContext): Unit = {}
- override def setupTask(taskContext: TaskAttemptContext): Unit = {}
-
- override def commitJob(jobContext: JobContext) {
- val configuration = ContextUtil.getConfiguration(jobContext)
- val fileSystem = outputPath.getFileSystem(configuration)
-
- if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
- try {
- val outputStatus = fileSystem.getFileStatus(outputPath)
- val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
- try {
- ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
- } catch { case e: Exception =>
- LOG.warn("could not write summary file for " + outputPath, e)
- val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
- if (fileSystem.exists(metadataPath)) {
- fileSystem.delete(metadataPath, true)
- }
- }
- } catch {
- case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
- }
- }
-
- if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
- try {
- val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
- fileSystem.create(successPath).close()
- } catch {
- case e: Exception => LOG.warn("could not write success file for " + outputPath, e)
- }
- }
- }
-}
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index a2fd8da782..5ad95e4b9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -76,13 +76,6 @@ private[sql] class DefaultSource
val conf = ContextUtil.getConfiguration(job)
- // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
- val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
- if (committerClassName == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
- conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
- classOf[DirectParquetOutputCommitter].getCanonicalName)
- }
-
val committerClass =
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index a3017258d6..581095d3dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -445,55 +445,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
- testQuietly("SPARK-6352 DirectParquetOutputCommitter") {
- val clonedConf = new Configuration(hadoopConfiguration)
-
- // Write to a parquet file and let it fail.
- // _temporary should be missing if direct output committer works.
- try {
- hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
- classOf[DirectParquetOutputCommitter].getCanonicalName)
- sqlContext.udf.register("div0", (x: Int) => x / 0)
- withTempPath { dir =>
- intercept[org.apache.spark.SparkException] {
- sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath)
- }
- val path = new Path(dir.getCanonicalPath, "_temporary")
- val fs = path.getFileSystem(hadoopConfiguration)
- assert(!fs.exists(path))
- }
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
- }
-
- testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatibility") {
- val clonedConf = new Configuration(hadoopConfiguration)
-
- // Write to a parquet file and let it fail.
- // _temporary should be missing if direct output committer works.
- try {
- hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
- "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
- sqlContext.udf.register("div0", (x: Int) => x / 0)
- withTempPath { dir =>
- intercept[org.apache.spark.SparkException] {
- sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath)
- }
- val path = new Path(dir.getCanonicalPath, "_temporary")
- val fs = path.getFileSystem(hadoopConfiguration)
- assert(!fs.exists(path))
- }
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
- }
-
-
test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
withTempPath { dir =>
val clonedConf = new Configuration(hadoopConfiguration)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index ea7e905742..10eeb30242 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -668,40 +668,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
}
}
-
- test("SPARK-9899 Disable customized output committer when speculation is on") {
- val clonedConf = new Configuration(hadoopConfiguration)
- val speculationEnabled =
- sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
-
- try {
- withTempPath { dir =>
- // Enables task speculation
- sqlContext.sparkContext.conf.set("spark.speculation", "true")
-
- // Uses a customized output committer which always fails
- hadoopConfiguration.set(
- SQLConf.OUTPUT_COMMITTER_CLASS.key,
- classOf[AlwaysFailOutputCommitter].getName)
-
- // Code below shouldn't throw since customized output committer should be disabled.
- val df = sqlContext.range(10).toDF().coalesce(1)
- df.write.format(dataSourceName).save(dir.getCanonicalPath)
- checkAnswer(
- sqlContext
- .read
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .load(dir.getCanonicalPath),
- df)
- }
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
- }
- }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when