aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-11 18:08:49 -0700
committerYin Huai <yhuai@databricks.com>2015-08-11 18:08:49 -0700
commitafa757c98c537965007cad4c61c436887f3ac6a6 (patch)
tree5f93203f626af0f325538f7d78c850c27109fb1a
parent5a5bbc29961630d649d4bd4acd5d19eb537b5fd0 (diff)
downloadspark-afa757c98c537965007cad4c61c436887f3ac6a6.tar.gz
spark-afa757c98c537965007cad4c61c436887f3ac6a6.tar.bz2
spark-afa757c98c537965007cad4c61c436887f3ac6a6.zip
[SPARK-9849] [SQL] DirectParquetOutputCommitter qualified name should be backward compatible
DirectParquetOutputCommitter was moved in SPARK-9763. However, users can explicitly set the class as a config option, so we must be able to resolve the old committer qualified name. Author: Reynold Xin <rxin@databricks.com> Closes #8114 from rxin/SPARK-9849.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala27
2 files changed, 33 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 4086a139be..c71c69b6e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -209,6 +209,13 @@ private[sql] class ParquetRelation(
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = ContextUtil.getConfiguration(job)
+ // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
+ val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
+ if (committerClassname == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
+ conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
+ classOf[DirectParquetOutputCommitter].getCanonicalName)
+ }
+
val committerClass =
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index ee925afe08..cb166349fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -390,7 +390,32 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
}
}
- test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") {
+ test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") {
+ val clonedConf = new Configuration(configuration)
+
+ // Write to a parquet file and let it fail.
+ // _temporary should be missing if direct output committer works.
+ try {
+ configuration.set("spark.sql.parquet.output.committer.class",
+ "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
+ sqlContext.udf.register("div0", (x: Int) => x / 0)
+ withTempPath { dir =>
+ intercept[org.apache.spark.SparkException] {
+ sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
+ }
+ val path = new Path(dir.getCanonicalPath, "_temporary")
+ val fs = path.getFileSystem(configuration)
+ assert(!fs.exists(path))
+ }
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+ }
+ }
+
+
+ test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
withTempPath { dir =>
val clonedConf = new Configuration(configuration)