aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/sql-programming-guide.md33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala88
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala49
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala34
6 files changed, 5 insertions, 224 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 274a8edb0c..63310be22c 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1467,37 +1467,6 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
</td>
</tr>
<tr>
- <td><code>spark.sql.parquet.output.committer.class</code></td>
- <td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
- <td>
- <p>
- The output committer class used by Parquet. The specified class needs to be a subclass of
- <code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>. Typically, it's also a
- subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
- </p>
- <p>
- <b>Note:</b>
- <ul>
- <li>
- This option is automatically ignored if <code>spark.speculation</code> is turned on.
- </li>
- <li>
- This option must be set via Hadoop <code>Configuration</code> rather than Spark
- <code>SQLConf</code>.
- </li>
- <li>
- This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>.
- </li>
- </ul>
- </p>
- <p>
- Spark SQL comes with a builtin
- <code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more
- efficient then the default Parquet output committer when writing data to S3.
- </p>
- </td>
-</tr>
-<tr>
<td><code>spark.sql.parquet.mergeSchema</code></td>
<td><code>false</code></td>
<td>
@@ -2165,8 +2134,6 @@ options.
- In the `sql` dialect, floating point numbers are now parsed as decimal. HiveQL parsing remains
unchanged.
- The canonical name of SQL/DataFrame functions are now lower case (e.g. sum vs SUM).
- - It has been determined that using the DirectOutputCommitter when speculation is enabled is unsafe
- and thus this output committer will not be used when speculation is on, independent of configuration.
- JSON data source will not automatically load new files that are created by other applications
(i.e. files that are not inserted to the dataset through Spark SQL).
For a JSON persistent table (i.e. the metadata of the table is stored in Hive Metastore),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 233ac263aa..f6b7f0854b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -129,16 +129,17 @@ private[sql] abstract class BaseWriterContainer(
outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext)
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
- if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
- // Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
+ if (outputCommitter.getClass.getName.contains("Direct")) {
+ // SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
// attempts, the task will fail because the output file is created from a prior attempt.
// This often means the most visible error to the user is misleading. Augment the error
// to tell the user to look for the actual error.
throw new SparkException("The output file already exists but this could be due to a " +
"failure from an earlier attempt. Look through the earlier logs or stage page for " +
- "the first error.\n File exists error: " + e)
+ "the first error.\n File exists error: " + e.getLocalizedMessage, e)
+ } else {
+ throw e
}
- throw e
}
}
@@ -156,15 +157,6 @@ private[sql] abstract class BaseWriterContainer(
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
- } else if (speculationEnabled) {
- // When speculation is enabled, it's not safe to use customized output committer classes,
- // especially direct output committers (e.g. `DirectParquetOutputCommitter`).
- //
- // See SPARK-9899 for more details.
- logInfo(
- s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
- "because spark.speculation is configured to be true.")
- defaultOutputCommitter
} else {
val configuration = context.getConfiguration
val committerClass = configuration.getClass(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
deleted file mode 100644
index ecadb9e7c6..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-import org.apache.parquet.Log
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
-import org.apache.parquet.hadoop.util.ContextUtil
-
-/**
- * An output committer for writing Parquet files. In stead of writing to the `_temporary` folder
- * like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the
- * destination folder. This can be useful for data stored in S3, where directory operations are
- * relatively expensive.
- *
- * To enable this output committer, users may set the "spark.sql.parquet.output.committer.class"
- * property via Hadoop [[Configuration]]. Not that this property overrides
- * "spark.sql.sources.outputCommitterClass".
- *
- * *NOTE*
- *
- * NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
- * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
- * left empty).
- */
-private[datasources] class DirectParquetOutputCommitter(
- outputPath: Path, context: TaskAttemptContext)
- extends ParquetOutputCommitter(outputPath, context) {
- val LOG = Log.getLog(classOf[ParquetOutputCommitter])
-
- override def getWorkPath: Path = outputPath
- override def abortTask(taskContext: TaskAttemptContext): Unit = {}
- override def commitTask(taskContext: TaskAttemptContext): Unit = {}
- override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
- override def setupJob(jobContext: JobContext): Unit = {}
- override def setupTask(taskContext: TaskAttemptContext): Unit = {}
-
- override def commitJob(jobContext: JobContext) {
- val configuration = ContextUtil.getConfiguration(jobContext)
- val fileSystem = outputPath.getFileSystem(configuration)
-
- if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
- try {
- val outputStatus = fileSystem.getFileStatus(outputPath)
- val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
- try {
- ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
- } catch { case e: Exception =>
- LOG.warn("could not write summary file for " + outputPath, e)
- val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
- if (fileSystem.exists(metadataPath)) {
- fileSystem.delete(metadataPath, true)
- }
- }
- } catch {
- case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
- }
- }
-
- if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
- try {
- val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
- fileSystem.create(successPath).close()
- } catch {
- case e: Exception => LOG.warn("could not write success file for " + outputPath, e)
- }
- }
- }
-}
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index a2fd8da782..5ad95e4b9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -76,13 +76,6 @@ private[sql] class DefaultSource
val conf = ContextUtil.getConfiguration(job)
- // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
- val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
- if (committerClassName == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
- conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
- classOf[DirectParquetOutputCommitter].getCanonicalName)
- }
-
val committerClass =
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index a3017258d6..581095d3dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -445,55 +445,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
- testQuietly("SPARK-6352 DirectParquetOutputCommitter") {
- val clonedConf = new Configuration(hadoopConfiguration)
-
- // Write to a parquet file and let it fail.
- // _temporary should be missing if direct output committer works.
- try {
- hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
- classOf[DirectParquetOutputCommitter].getCanonicalName)
- sqlContext.udf.register("div0", (x: Int) => x / 0)
- withTempPath { dir =>
- intercept[org.apache.spark.SparkException] {
- sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath)
- }
- val path = new Path(dir.getCanonicalPath, "_temporary")
- val fs = path.getFileSystem(hadoopConfiguration)
- assert(!fs.exists(path))
- }
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
- }
-
- testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatibility") {
- val clonedConf = new Configuration(hadoopConfiguration)
-
- // Write to a parquet file and let it fail.
- // _temporary should be missing if direct output committer works.
- try {
- hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
- "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
- sqlContext.udf.register("div0", (x: Int) => x / 0)
- withTempPath { dir =>
- intercept[org.apache.spark.SparkException] {
- sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath)
- }
- val path = new Path(dir.getCanonicalPath, "_temporary")
- val fs = path.getFileSystem(hadoopConfiguration)
- assert(!fs.exists(path))
- }
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
- }
-
-
test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
withTempPath { dir =>
val clonedConf = new Configuration(hadoopConfiguration)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index ea7e905742..10eeb30242 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -668,40 +668,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
}
}
-
- test("SPARK-9899 Disable customized output committer when speculation is on") {
- val clonedConf = new Configuration(hadoopConfiguration)
- val speculationEnabled =
- sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
-
- try {
- withTempPath { dir =>
- // Enables task speculation
- sqlContext.sparkContext.conf.set("spark.speculation", "true")
-
- // Uses a customized output committer which always fails
- hadoopConfiguration.set(
- SQLConf.OUTPUT_COMMITTER_CLASS.key,
- classOf[AlwaysFailOutputCommitter].getName)
-
- // Code below shouldn't throw since customized output committer should be disabled.
- val df = sqlContext.range(10).toDF().coalesce(1)
- df.write.format(dataSourceName).save(dir.getCanonicalPath)
- checkAnswer(
- sqlContext
- .read
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .load(dir.getCanonicalPath),
- df)
- }
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
- }
- }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when