aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala27
2 files changed, 33 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 4086a139be..c71c69b6e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -209,6 +209,13 @@ private[sql] class ParquetRelation(
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = ContextUtil.getConfiguration(job)
+ // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
+ val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
+ if (committerClassname == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
+ conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
+ classOf[DirectParquetOutputCommitter].getCanonicalName)
+ }
+
val committerClass =
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index ee925afe08..cb166349fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -390,7 +390,32 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
}
}
- test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") {
+ test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") {
+ val clonedConf = new Configuration(configuration)
+
+ // Write to a parquet file and let it fail.
+ // _temporary should be missing if direct output committer works.
+ try {
+ configuration.set("spark.sql.parquet.output.committer.class",
+ "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
+ sqlContext.udf.register("div0", (x: Int) => x / 0)
+ withTempPath { dir =>
+ intercept[org.apache.spark.SparkException] {
+ sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
+ }
+ val path = new Path(dir.getCanonicalPath, "_temporary")
+ val fs = path.getFileSystem(configuration)
+ assert(!fs.exists(path))
+ }
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+ }
+ }
+
+
+ test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
withTempPath { dir =>
val clonedConf = new Configuration(configuration)