From 8fda5a73dc165fda2229a27c5a9e148b43b91c3a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 26 Apr 2016 22:02:28 -0700 Subject: [SPARK-14913][SQL] Simplify configuration API ## What changes were proposed in this pull request? We currently expose both Hadoop configuration and Spark SQL configuration in RuntimeConfig. I think we can remove the Hadoop configuration part, and simply generate Hadoop Configuration on the fly by passing all the SQL configurations into it. This way, there is a single interface (in Java/Scala/Python/SQL) for end-users. As part of this patch, I also removed some config options deprecated in Spark 1.x. ## How was this patch tested? Updated relevant tests. Author: Reynold Xin Closes #12689 from rxin/SPARK-14913. --- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../scala/org/apache/spark/sql/RuntimeConfig.scala | 65 +- .../scala/org/apache/spark/sql/SparkSession.scala | 6 +- .../spark/sql/execution/command/AnalyzeTable.scala | 2 +- .../spark/sql/execution/command/SetCommand.scala | 64 -- .../spark/sql/execution/command/tables.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 8 +- .../execution/datasources/FileSourceStrategy.scala | 6 +- .../datasources/InsertIntoHadoopFsRelation.scala | 6 +- .../datasources/fileSourceInterfaces.scala | 2 +- .../execution/datasources/json/JSONRelation.scala | 2 +- .../datasources/parquet/ParquetRelation.scala | 2 +- .../sql/execution/streaming/FileStreamSink.scala | 2 +- .../sql/execution/streaming/FileStreamSource.scala | 2 +- .../sql/execution/streaming/HDFSMetadataLog.scala | 2 +- .../execution/streaming/StreamFileCatalog.scala | 2 +- .../execution/streaming/state/StateStoreRDD.scala | 2 +- .../spark/sql/internal/RuntimeConfigImpl.scala | 73 -- .../org/apache/spark/sql/internal/SQLConf.scala | 9 +- .../apache/spark/sql/internal/SessionState.scala | 17 +- .../org/apache/spark/sql/SQLContextSuite.scala | 23 - .../datasources/FileSourceStrategySuite.scala | 82 ++- .../sql/execution/datasources/csv/CSVSuite.scala | 65 +- .../sql/execution/datasources/json/JsonSuite.scala | 93 ++- .../parquet/ParquetCompatibilityTest.scala | 5 +- .../datasources/parquet/ParquetIOSuite.scala | 133 ++-- .../sql/execution/datasources/text/TextSuite.scala | 36 +- .../streaming/FileStreamSinkLogSuite.scala | 2 +- .../execution/streaming/HDFSMetadataLogSuite.scala | 2 +- .../spark/sql/internal/RuntimeConfigSuite.scala | 31 +- .../org/apache/spark/sql/test/SQLTestUtils.scala | 30 +- .../org/apache/spark/sql/test/TestSQLContext.scala | 4 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../apache/spark/sql/hive/orc/OrcRelation.scala | 4 +- .../org/apache/spark/sql/hive/test/TestHive.scala | 4 +- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../sql/hive/execution/HiveCommandSuite.scala | 8 - .../spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 2 +- .../apache/spark/sql/hive/orc/OrcQuerySuite.scala | 2 +- .../spark/sql/sources/HadoopFsRelationTest.scala | 722 ++++++++++++++++++++ .../sql/sources/ParquetHadoopFsRelationSuite.scala | 2 +- .../spark/sql/sources/hadoopFsRelationSuites.scala | 733 --------------------- 43 files changed, 981 insertions(+), 1284 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c0811f6a4f..6a600c1379 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -297,7 +297,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { // If offsets have already been created, we trying to resume a query. val checkpointPath = new Path(checkpointLocation, "offsets") - val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.hadoopConf) + val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) if (fs.exists(checkpointPath)) { throw new AnalysisException( s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala index e90a042431..bf97d728b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -17,33 +17,44 @@ package org.apache.spark.sql +import org.apache.spark.sql.internal.SQLConf + /** - * Runtime configuration interface for Spark. To access this, use `SparkSession.conf`. + * Runtime configuration interface for Spark. To access this, use [[SparkSession.conf]]. + * + * Options set here are automatically propagated to the Hadoop configuration during I/O. * * @since 2.0.0 */ -abstract class RuntimeConfig { +class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { /** * Sets the given Spark runtime configuration property. * * @since 2.0.0 */ - def set(key: String, value: String): RuntimeConfig + def set(key: String, value: String): RuntimeConfig = { + sqlConf.setConfString(key, value) + this + } /** * Sets the given Spark runtime configuration property. * * @since 2.0.0 */ - def set(key: String, value: Boolean): RuntimeConfig + def set(key: String, value: Boolean): RuntimeConfig = { + set(key, value.toString) + } /** * Sets the given Spark runtime configuration property. * * @since 2.0.0 */ - def set(key: String, value: Long): RuntimeConfig + def set(key: String, value: Long): RuntimeConfig = { + set(key, value.toString) + } /** * Returns the value of Spark runtime configuration property for the given key. @@ -52,49 +63,27 @@ abstract class RuntimeConfig { * @since 2.0.0 */ @throws[NoSuchElementException]("if the key is not set") - def get(key: String): String + def get(key: String): String = { + sqlConf.getConfString(key) + } /** * Returns the value of Spark runtime configuration property for the given key. * * @since 2.0.0 */ - def getOption(key: String): Option[String] + def getOption(key: String): Option[String] = { + try Option(get(key)) catch { + case _: NoSuchElementException => None + } + } /** * Resets the configuration property for the given key. * * @since 2.0.0 */ - def unset(key: String): Unit - - /** - * Sets the given Hadoop configuration property. This is passed directly to Hadoop during I/O. - * - * @since 2.0.0 - */ - def setHadoop(key: String, value: String): RuntimeConfig - - /** - * Returns the value of the Hadoop configuration property. - * - * @throws NoSuchElementException if the key is not set - * @since 2.0.0 - */ - @throws[NoSuchElementException]("if the key is not set") - def getHadoop(key: String): String - - /** - * Returns the value of the Hadoop configuration property. - * - * @since 2.0.0 - */ - def getHadoopOption(key: String): Option[String] - - /** - * Resets the Hadoop configuration property for the given key. - * - * @since 2.0.0 - */ - def unsetHadoop(key: String): Unit + def unset(key: String): Unit = { + sqlConf.unsetConf(key) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 6477f42680..f05546a567 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener -import org.apache.spark.sql.internal.{CatalogImpl, RuntimeConfigImpl, SessionState, SharedState} +import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{DataType, LongType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager @@ -201,9 +201,7 @@ class SparkSession private( * @group config * @since 2.0.0 */ - @transient lazy val conf: RuntimeConfig = { - new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf) - } + @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf) /** * Set Spark SQL configuration properties. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala index b6f7808398..54ff5ae7d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala @@ -77,7 +77,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand { catalogTable.storage.locationUri.map { p => val path = new Path(p) try { - val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf) + val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) calculateTableSize(fs, path) } catch { case NonFatal(e) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index 952a0d676f..bbb2a2235f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -62,70 +62,6 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm } (keyValueOutput, runFunc) - case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) => - val runFunc = (sparkSession: SparkSession) => { - logWarning( - s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " + - s"External sort will continue to be used.") - Seq(Row(SQLConf.Deprecated.EXTERNAL_SORT, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.USE_SQL_AGGREGATE2, Some(value))) => - val runFunc = (sparkSession: SparkSession) => { - logWarning( - s"Property ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} is deprecated and " + - s"will be ignored. ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} will " + - s"continue to be true.") - Seq(Row(SQLConf.Deprecated.USE_SQL_AGGREGATE2, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) => - val runFunc = (sparkSession: SparkSession) => { - logWarning( - s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " + - s"will be ignored. Tungsten will continue to be used.") - Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) => - val runFunc = (sparkSession: SparkSession) => { - logWarning( - s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " + - s"will be ignored. Codegen will continue to be used.") - Seq(Row(SQLConf.Deprecated.CODEGEN_ENABLED, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) => - val runFunc = (sparkSession: SparkSession) => { - logWarning( - s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " + - s"will be ignored. Unsafe mode will continue to be used.") - Seq(Row(SQLConf.Deprecated.UNSAFE_ENABLED, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.SORTMERGE_JOIN, Some(value))) => - val runFunc = (sparkSession: SparkSession) => { - logWarning( - s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " + - s"will be ignored. Sort merge join will continue to be used.") - Seq(Row(SQLConf.Deprecated.SORTMERGE_JOIN, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED, Some(value))) => - val runFunc = (sparkSession: SparkSession) => { - logWarning( - s"Property ${SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED} is " + - s"deprecated and will be ignored. Vectorized parquet reader will be used instead.") - Seq(Row(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, "true")) - } - (keyValueOutput, runFunc) - // Configures a single property. case Some((key, Some(value))) => val runFunc = (sparkSession: SparkSession) => { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 8d9feec9b8..f38e260bc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -210,7 +210,7 @@ case class LoadData( // Follow Hive's behavior: // If no schema or authority is provided with non-local inpath, // we will use hadoop configuration "fs.default.name". - val defaultFSConf = sparkSession.sessionState.hadoopConf.get("fs.default.name") + val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.default.name") val defaultFS = if (defaultFSConf == null) { new URI("") } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 8e72e06b1f..2f3826f72b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -131,7 +131,7 @@ case class DataSource( val allPaths = caseInsensitiveOptions.get("path") val globbedPaths = allPaths.toSeq.flatMap { path => val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf) + val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray @@ -226,7 +226,7 @@ case class DataSource( case Seq(singlePath) => try { val hdfsPath = new Path(singlePath) - val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf) + val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir) val res = fs.exists(metadataPath) res @@ -284,7 +284,7 @@ case class DataSource( val allPaths = caseInsensitiveOptions.get("path") ++ paths val globbedPaths = allPaths.flatMap { path => val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf) + val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) @@ -374,7 +374,7 @@ case class DataSource( val path = new Path(caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") })) - val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf) + val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) path.makeQualified(fs.getUri, fs.getWorkingDirectory) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index c26cae84d7..615906a52e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable.ArrayBuffer -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.internal.Logging @@ -107,9 +106,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") - val hadoopConf = new Configuration(files.sparkSession.sessionState.hadoopConf) - files.options.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) } - val readFile = files.fileFormat.buildReader( sparkSession = files.sparkSession, dataSchema = files.dataSchema, @@ -117,7 +113,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { requiredSchema = prunedDataSchema, filters = pushedDownFilters, options = files.options, - hadoopConf = hadoopConf) + hadoopConf = files.sparkSession.sessionState.newHadoopConfWithOptions(files.options)) val plannedPartitions = files.bucketSpec match { case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index fa954975b8..4df7d0ce4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -78,7 +78,7 @@ private[sql] case class InsertIntoHadoopFsRelation( s"cannot save to file.") } - val hadoopConf = new Configuration(sparkSession.sessionState.hadoopConf) + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options) val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) @@ -106,10 +106,6 @@ private[sql] case class InsertIntoHadoopFsRelation( val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) - - // Also set the options in Hadoop Configuration - options.foreach { case (k, v) => if (v ne null) job.getConfiguration.set(k, v) } - FileOutputFormat.setOutputPath(job, qualifiedOutputPath) val partitionSet = AttributeSet(partitionColumns) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 3058e79201..25f88d9c39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -274,7 +274,7 @@ class HDFSFileCatalog( partitionSchema: Option[StructType]) extends FileCatalog with Logging { - private val hadoopConf = new Configuration(sparkSession.sessionState.hadoopConf) + private val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus] var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index b6b3907e3e..62446583a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -128,7 +128,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { private def createBaseRdd( sparkSession: SparkSession, inputPaths: Seq[FileStatus]): RDD[String] = { - val job = Job.getInstance(sparkSession.sessionState.hadoopConf) + val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) val conf = job.getConfiguration val paths = inputPaths.map(_.getPath) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 5be8770790..c689ad08ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -652,7 +652,7 @@ private[sql] object ParquetRelation extends Logging { val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat val serializedConf = - new SerializableConfiguration(sparkSession.sessionState.hadoopConf) + new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()) // !! HACK ALERT !! // diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 61c9c88cb3..70aea7fa49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -45,7 +45,7 @@ class FileStreamSink( private val basePath = new Path(path) private val logPath = new Path(basePath, FileStreamSink.metadataDir) private val fileLog = new FileStreamSinkLog(sparkSession, logPath.toUri.toString) - private val fs = basePath.getFileSystem(sparkSession.sessionState.hadoopConf) + private val fs = basePath.getFileSystem(sparkSession.sessionState.newHadoopConf()) override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index e22a05bd3b..681addea02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -38,7 +38,7 @@ class FileStreamSource( override val schema: StructType, dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging { - private val fs = new Path(path).getFileSystem(sparkSession.sessionState.hadoopConf) + private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath) private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 99bf20c746..9fe06a6c36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -211,7 +211,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) } private def createFileManager(): FileManager = { - val hadoopConf = new Configuration(sparkSession.sessionState.hadoopConf) + val hadoopConf = sparkSession.sessionState.newHadoopConf() try { new FileContextManager(metadataPath, hadoopConf) } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala index b2bc31634c..4f699719c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala @@ -30,7 +30,7 @@ class StreamFileCatalog(sparkSession: SparkSession, path: Path) extends FileCata val metadataDirectory = new Path(path, FileStreamSink.metadataDir) logInfo(s"Reading streaming file log from $metadataDirectory") val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString) - val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf) + val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) override def paths: Seq[Path] = path :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index 635bb86607..e16dda8a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -46,7 +46,7 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = dataRDD.context.broadcast( - new SerializableConfiguration(sessionState.hadoopConf)) + new SerializableConfiguration(sessionState.newHadoopConf())) override protected def getPartitions: Array[Partition] = dataRDD.partitions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala deleted file mode 100644 index 137323583b..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.internal - -import org.apache.hadoop.conf.Configuration - -import org.apache.spark.sql.RuntimeConfig - - -/** - * Implementation for [[RuntimeConfig]]. - */ -class RuntimeConfigImpl( - sqlConf: SQLConf = new SQLConf, - hadoopConf: Configuration = new Configuration) - extends RuntimeConfig { - - override def set(key: String, value: String): RuntimeConfig = { - sqlConf.setConfString(key, value) - this - } - - override def set(key: String, value: Boolean): RuntimeConfig = set(key, value.toString) - - override def set(key: String, value: Long): RuntimeConfig = set(key, value.toString) - - @throws[NoSuchElementException]("if the key is not set") - override def get(key: String): String = sqlConf.getConfString(key) - - override def getOption(key: String): Option[String] = { - try Option(get(key)) catch { - case _: NoSuchElementException => None - } - } - - override def unset(key: String): Unit = sqlConf.unsetConf(key) - - override def setHadoop(key: String, value: String): RuntimeConfig = hadoopConf.synchronized { - hadoopConf.set(key, value) - this - } - - @throws[NoSuchElementException]("if the key is not set") - override def getHadoop(key: String): String = hadoopConf.synchronized { - Option(hadoopConf.get(key)).getOrElse { - throw new NoSuchElementException(key) - } - } - - override def getHadoopOption(key: String): Option[String] = hadoopConf.synchronized { - Option(hadoopConf.get(key)) - } - - override def unsetHadoop(key: String): Unit = hadoopConf.synchronized { - hadoopConf.unset(key) - } - -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b268a4fef7..6fbf32676f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -516,13 +516,6 @@ object SQLConf { object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" - val EXTERNAL_SORT = "spark.sql.planner.externalSort" - val USE_SQL_AGGREGATE2 = "spark.sql.useAggregate2" - val TUNGSTEN_ENABLED = "spark.sql.tungsten.enabled" - val CODEGEN_ENABLED = "spark.sql.codegen" - val UNSAFE_ENABLED = "spark.sql.unsafe.enabled" - val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin" - val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = "spark.sql.parquet.enableUnsafeRowRecordReader" } } @@ -764,7 +757,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { settings.remove(key) } - private[spark] def unsetConf(entry: ConfigEntry[_]): Unit = { + def unsetConf(entry: ConfigEntry[_]): Unit = { settings.remove(entry.key) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 1bda572e63..63e0dc7e7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -48,8 +48,21 @@ private[sql] class SessionState(sparkSession: SparkSession) { * SQL-specific key-value configurations. */ lazy val conf: SQLConf = new SQLConf - lazy val hadoopConf: Configuration = { - new Configuration(sparkSession.sparkContext.hadoopConfiguration) + + def newHadoopConf(): Configuration = { + val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration) + conf.getAllConfs.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) } + hadoopConf + } + + def newHadoopConfWithOptions(options: Map[String, String]): Configuration = { + val hadoopConf = newHadoopConf() + options.foreach { case (k, v) => + if ((v ne null) && k != "path" && k != "paths") { + hadoopConf.set(k, v) + } + } + hadoopConf } // Automatically extract `spark.sql.*` entries and put it in our SQLConf diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index d49cc103b5..2f62ad4850 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -85,27 +85,4 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love") } - test("Hadoop conf interaction between SQLContext and SparkContext") { - val mySpecialKey = "mai.special.key" - val mySpecialValue = "msv" - try { - sc.hadoopConfiguration.set(mySpecialKey, mySpecialValue) - val sqlContext = SQLContext.getOrCreate(sc) - val sessionState = sqlContext.sessionState - assert(sessionState.hadoopConf.get(mySpecialKey) === mySpecialValue) - assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === mySpecialValue) - // mutating hadoop conf in SQL doesn't mutate the underlying one - sessionState.hadoopConf.set(mySpecialKey, "no no no") - assert(sessionState.hadoopConf.get(mySpecialKey) === "no no no") - assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "no no no") - assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue) - sqlContext.runtimeConf.setHadoop(mySpecialKey, "yes yes yes") - assert(sessionState.hadoopConf.get(mySpecialKey) === "yes yes yes") - assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "yes yes yes") - assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue) - } finally { - sc.hadoopConfiguration.unset(mySpecialKey) - } - } - } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index f73d485acf..ac2af77a6e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -292,56 +292,50 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } test("Locality support for FileScanRDD - one file per partition") { - withHadoopConf( - "fs.file.impl" -> classOf[LocalityTestFileSystem].getName, - "fs.file.impl.disable.cache" -> "true" - ) { - withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") { - val table = - createTable(files = Seq( - "file1" -> 10, - "file2" -> 10 - )) - - checkScan(table) { partitions => - val Seq(p1, p2) = partitions - assert(p1.files.length == 1) - assert(p1.files.flatMap(_.locations).length == 1) - assert(p2.files.length == 1) - assert(p2.files.flatMap(_.locations).length == 1) - - val fileScanRDD = getFileScanRDD(table) - assert(partitions.flatMap(fileScanRDD.preferredLocations).length == 2) - } + withSQLConf( + SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10", + "fs.file.impl" -> classOf[LocalityTestFileSystem].getName, + "fs.file.impl.disable.cache" -> "true") { + val table = + createTable(files = Seq( + "file1" -> 10, + "file2" -> 10 + )) + + checkScan(table) { partitions => + val Seq(p1, p2) = partitions + assert(p1.files.length == 1) + assert(p1.files.flatMap(_.locations).length == 1) + assert(p2.files.length == 1) + assert(p2.files.flatMap(_.locations).length == 1) + + val fileScanRDD = getFileScanRDD(table) + assert(partitions.flatMap(fileScanRDD.preferredLocations).length == 2) } } } test("Locality support for FileScanRDD - large file") { - withHadoopConf( - "fs.file.impl" -> classOf[LocalityTestFileSystem].getName, - "fs.file.impl.disable.cache" -> "true" - ) { - withSQLConf( + withSQLConf( SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10", - SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0" - ) { - val table = - createTable(files = Seq( - "file1" -> 15, - "file2" -> 5 - )) - - checkScan(table) { partitions => - val Seq(p1, p2) = partitions - assert(p1.files.length == 1) - assert(p1.files.flatMap(_.locations).length == 1) - assert(p2.files.length == 2) - assert(p2.files.flatMap(_.locations).length == 2) - - val fileScanRDD = getFileScanRDD(table) - assert(partitions.flatMap(fileScanRDD.preferredLocations).length == 3) - } + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0", + "fs.file.impl" -> classOf[LocalityTestFileSystem].getName, + "fs.file.impl.disable.cache" -> "true") { + val table = + createTable(files = Seq( + "file1" -> 15, + "file2" -> 5 + )) + + checkScan(table) { partitions => + val Seq(p1, p2) = partitions + assert(p1.files.length == 1) + assert(p1.files.flatMap(_.locations).length == 1) + assert(p2.files.length == 2) + assert(p2.files.flatMap(_.locations).length == 2) + + val fileScanRDD = getFileScanRDD(table) + assert(partitions.flatMap(fileScanRDD.preferredLocations).length == 3) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 9baae80f15..ceda920ddc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -430,42 +430,37 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("SPARK-13543 Write the output as uncompressed via option()") { - val clonedConf = new Configuration(hadoopConfiguration) - hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") - hadoopConfiguration - .set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) - hadoopConfiguration - .set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName) - hadoopConfiguration.set("mapreduce.map.output.compress", "true") - hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName) + val extraOptions = Map( + "mapreduce.output.fileoutputformat.compress" -> "true", + "mapreduce.output.fileoutputformat.compress.type" -> CompressionType.BLOCK.toString, + "mapreduce.map.output.compress" -> "true", + "mapreduce.map.output.compress.codec" -> classOf[GzipCodec].getName + ) withTempDir { dir => - try { - val csvDir = new File(dir, "csv").getCanonicalPath - val cars = sqlContext.read - .format("csv") - .option("header", "true") - .load(testFile(carsFile)) - - cars.coalesce(1).write - .format("csv") - .option("header", "true") - .option("compression", "none") - .save(csvDir) - - val compressedFiles = new File(csvDir).listFiles() - assert(compressedFiles.exists(!_.getName.endsWith(".csv.gz"))) - - val carsCopy = sqlContext.read - .format("csv") - .option("header", "true") - .load(csvDir) - - verifyCars(carsCopy, withHeader = true) - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) - } + val csvDir = new File(dir, "csv").getCanonicalPath + val cars = sqlContext.read + .format("csv") + .option("header", "true") + .options(extraOptions) + .load(testFile(carsFile)) + + cars.coalesce(1).write + .format("csv") + .option("header", "true") + .option("compression", "none") + .options(extraOptions) + .save(csvDir) + + val compressedFiles = new File(csvDir).listFiles() + assert(compressedFiles.exists(!_.getName.endsWith(".csv.gz"))) + + val carsCopy = sqlContext.read + .format("csv") + .option("header", "true") + .options(extraOptions) + .load(csvDir) + + verifyCars(carsCopy, withHeader = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index e5588bec4b..b1279abd63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1507,23 +1507,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { df.write.json(path + "/p=2") assert(sqlContext.read.json(path).count() === 4) - val clonedConf = new Configuration(hadoopConfiguration) - try { - // Setting it twice as the name of the propery has changed between hadoop versions. - hadoopConfiguration.setClass( - "mapred.input.pathFilter.class", - classOf[TestFileFilter], - classOf[PathFilter]) - hadoopConfiguration.setClass( - "mapreduce.input.pathFilter.class", - classOf[TestFileFilter], - classOf[PathFilter]) - assert(sqlContext.read.json(path).count() === 2) - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) - } + val extraOptions = Map( + "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName, + "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName + ) + assert(sqlContext.read.options(extraOptions).json(path).count() === 2) } } @@ -1609,45 +1597,40 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-13543 Write the output as uncompressed via option()") { - val clonedConf = new Configuration(hadoopConfiguration) - hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") - hadoopConfiguration - .set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) - hadoopConfiguration - .set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName) - hadoopConfiguration.set("mapreduce.map.output.compress", "true") - hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName) + val extraOptions = Map[String, String]( + "mapreduce.output.fileoutputformat.compress" -> "true", + "mapreduce.output.fileoutputformat.compress.type" -> CompressionType.BLOCK.toString, + "mapreduce.output.fileoutputformat.compress.codec" -> classOf[GzipCodec].getName, + "mapreduce.map.output.compress" -> "true", + "mapreduce.map.output.compress.codec" -> classOf[GzipCodec].getName + ) withTempDir { dir => - try { - val dir = Utils.createTempDir() - dir.delete() - - val path = dir.getCanonicalPath - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) - - val jsonDF = sqlContext.read.json(path) - val jsonDir = new File(dir, "json").getCanonicalPath - jsonDF.coalesce(1).write - .format("json") - .option("compression", "none") - .save(jsonDir) - - val compressedFiles = new File(jsonDir).listFiles() - assert(compressedFiles.exists(!_.getName.endsWith(".json.gz"))) - - val jsonCopy = sqlContext.read - .format("json") - .load(jsonDir) - - assert(jsonCopy.count == jsonDF.count) - val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") - val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") - checkAnswer(jsonCopySome, jsonDFSome) - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) - } + val dir = Utils.createTempDir() + dir.delete() + + val path = dir.getCanonicalPath + primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + + val jsonDF = sqlContext.read.json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write + .format("json") + .option("compression", "none") + .options(extraOptions) + .save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(!_.getName.endsWith(".json.gz"))) + + val jsonCopy = sqlContext.read + .format("json") + .options(extraOptions) + .load(jsonDir) + + assert(jsonCopy.count == jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index 4217c81ff3..45cc6810d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -38,14 +38,15 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq } protected def readParquetSchema(path: String, pathFilter: Path => Boolean): MessageType = { + val hadoopConf = sqlContext.sessionState.newHadoopConf() val fsPath = new Path(path) - val fs = fsPath.getFileSystem(hadoopConfiguration) + val fs = fsPath.getFileSystem(hadoopConf) val parquetFiles = fs.listStatus(fsPath, new PathFilter { override def accept(path: Path): Boolean = pathFilter(path) }).toSeq.asJava val footers = - ParquetFileReader.readAllFootersInParallel(hadoopConfiguration, parquetFiles, true) + ParquetFileReader.readAllFootersInParallel(hadoopConf, parquetFiles, true) footers.asScala.head.getParquetMetadata.getFileMetaData.getSchema } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 0aab36ae38..32fe5ba127 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -113,7 +113,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val path = new Path(location.getCanonicalPath) - val conf = new Configuration(sqlContext.sessionState.hadoopConf) + val conf = sqlContext.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf) readParquetFile(path.toString)(df => { val sparkTypes = df.schema.map(_.dataType) @@ -250,7 +250,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val path = new Path(location.getCanonicalPath) - val conf = new Configuration(sqlContext.sessionState.hadoopConf) + val conf = sqlContext.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf) val errorMessage = intercept[Throwable] { sqlContext.read.parquet(path.toString).printSchema() @@ -271,7 +271,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val path = new Path(location.getCanonicalPath) - val conf = new Configuration(sqlContext.sessionState.hadoopConf) + val conf = sqlContext.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf) val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType) assert(sparkTypes === expectedSparkTypes) @@ -279,9 +279,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("compression codec") { + val hadoopConf = sqlContext.sessionState.newHadoopConf() def compressionCodecFor(path: String, codecName: String): String = { val codecs = for { - footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConfiguration) + footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) block <- footer.getParquetMetadata.getBlocks.asScala column <- block.getColumns.asScala } yield column.getCodec.name() @@ -350,17 +351,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("write metadata") { + val hadoopConf = sqlContext.sessionState.newHadoopConf() withTempPath { file => val path = new Path(file.toURI.toString) - val fs = FileSystem.getLocal(hadoopConfiguration) + val fs = FileSystem.getLocal(hadoopConf) val schema = StructType.fromAttributes(ScalaReflection.attributesFor[(Int, String)]) - writeMetadata(schema, path, hadoopConfiguration) + writeMetadata(schema, path, hadoopConf) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) val expectedSchema = new CatalystSchemaConverter().convert(schema) - val actualSchema = readFooter(path, hadoopConfiguration).getFileMetaData.getSchema + val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema actualSchema.checkContains(expectedSchema) expectedSchema.checkContains(actualSchema) @@ -431,7 +433,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) val path = new Path(location.getCanonicalPath) - val conf = new Configuration(sqlContext.sessionState.hadoopConf) + val conf = sqlContext.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf, extraMetadata) readParquetFile(path.toString) { df => @@ -446,26 +448,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") { + val extraOptions = Map( + SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, + "spark.sql.parquet.output.committer.class" -> + classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName + ) withTempPath { dir => - val clonedConf = new Configuration(hadoopConfiguration) - - hadoopConfiguration.set( - SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[ParquetOutputCommitter].getCanonicalName) - - hadoopConfiguration.set( - "spark.sql.parquet.output.committer.class", - classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName) - - try { - val message = intercept[SparkException] { - sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(message === "Intentional exception for testing purposes") - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) - } + val message = intercept[SparkException] { + sqlContext.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(message === "Intentional exception for testing purposes") } } @@ -482,36 +474,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-7837 Do not close output writer twice when commitTask() fails") { - val clonedConf = new Configuration(hadoopConfiguration) - // Using a output committer that always fail when committing a task, so that both // `commitTask()` and `abortTask()` are invoked. - hadoopConfiguration.set( - "spark.sql.parquet.output.committer.class", - classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName) + val extraOptions = Map[String, String]( + "spark.sql.parquet.output.committer.class" -> + classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName + ) - try { - // Before fixing SPARK-7837, the following code results in an NPE because both - // `commitTask()` and `abortTask()` try to close output writers. + // Before fixing SPARK-7837, the following code results in an NPE because both + // `commitTask()` and `abortTask()` try to close output writers. - withTempPath { dir => - val m1 = intercept[SparkException] { - sqlContext.range(1).coalesce(1).write.parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(m1.contains("Intentional exception for testing purposes")) - } + withTempPath { dir => + val m1 = intercept[SparkException] { + sqlContext.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(m1.contains("Intentional exception for testing purposes")) + } - withTempPath { dir => - val m2 = intercept[SparkException] { - val df = sqlContext.range(1).select('id as 'a, 'id as 'b).coalesce(1) - df.write.partitionBy("a").parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(m2.contains("Intentional exception for testing purposes")) - } - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) + withTempPath { dir => + val m2 = intercept[SparkException] { + val df = sqlContext.range(1).select('id as 'a, 'id as 'b).coalesce(1) + df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(m2.contains("Intentional exception for testing purposes")) } } @@ -519,31 +504,27 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { // For dictionary encoding, Parquet changes the encoding types according to its writer // version. So, this test checks one of the encoding types in order to ensure that // the file is written with writer version2. + val extraOptions = Map[String, String]( + // Write a Parquet file with writer version2. + ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString, + // By default, dictionary encoding is enabled from Parquet 1.2.0 but + // it is enabled just in case. + ParquetOutputFormat.ENABLE_DICTIONARY -> "true" + ) + + val hadoopConf = sqlContext.sessionState.newHadoopConfWithOptions(extraOptions) + withTempPath { dir => - val clonedConf = new Configuration(hadoopConfiguration) - try { - // Write a Parquet file with writer version2. - hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION, - ParquetProperties.WriterVersion.PARQUET_2_0.toString) - - // By default, dictionary encoding is enabled from Parquet 1.2.0 but - // it is enabled just in case. - hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true) - val path = s"${dir.getCanonicalPath}/part-r-0.parquet" - sqlContext.range(1 << 16).selectExpr("(id % 4) AS i") - .coalesce(1).write.mode("overwrite").parquet(path) - - val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head - val columnChunkMetadata = blockMetadata.getColumns.asScala.head - - // If the file is written with version2, this should include - // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY - assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) - } finally { - // Manually clear the hadoop configuration for other tests. - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) - } + val path = s"${dir.getCanonicalPath}/part-r-0.parquet" + sqlContext.range(1 << 16).selectExpr("(id % 4) AS i") + .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) + + val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head + val columnChunkMetadata = blockMetadata.getColumns.asScala.head + + // If the file is written with version2, this should include + // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY + assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 47330f1db3..923c0b350e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -87,28 +87,22 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("SPARK-13543 Write the output as uncompressed via option()") { - val clonedConf = new Configuration(hadoopConfiguration) - hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true") - hadoopConfiguration - .set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) - hadoopConfiguration - .set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName) - hadoopConfiguration.set("mapreduce.map.output.compress", "true") - hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName) + val extraOptions = Map[String, String]( + "mapreduce.output.fileoutputformat.compress" -> "true", + "mapreduce.output.fileoutputformat.compress.type" -> CompressionType.BLOCK.toString, + "mapreduce.map.output.compress" -> "true", + "mapreduce.output.fileoutputformat.compress.codec" -> classOf[GzipCodec].getName, + "mapreduce.map.output.compress.codec" -> classOf[GzipCodec].getName + ) withTempDir { dir => - try { - val testDf = sqlContext.read.text(testFile) - val tempDir = Utils.createTempDir() - val tempDirPath = tempDir.getAbsolutePath - testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath) - val compressedFiles = new File(tempDirPath).listFiles() - assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz"))) - verifyFrame(sqlContext.read.text(tempDirPath).toDF()) - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) - } + val testDf = sqlContext.read.text(testFile) + val tempDir = Utils.createTempDir() + val tempDirPath = tempDir.getAbsolutePath + testDf.write.option("compression", "none") + .options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath) + val compressedFiles = new File(tempDirPath).listFiles() + assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz"))) + verifyFrame(sqlContext.read.options(extraOptions).text(tempDirPath).toDF()) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index df127d958e..7b413dda1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -217,7 +217,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3", SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") { withFileStreamSinkLog { sinkLog => - val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sessionState.hadoopConf) + val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sessionState.newHadoopConf()) def listBatchFiles(): Set[String] = { fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 129b5a8c36..5f92c5bb9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -82,7 +82,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") { - sqlContext.sessionState.hadoopConf.set( + sqlContext.conf.setConfString( s"fs.$scheme.impl", classOf[FakeFileSystem].getName) withTempDir { temp => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/RuntimeConfigSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/RuntimeConfigSuite.scala index f809e01169..a629b73ac0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/RuntimeConfigSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/RuntimeConfigSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.RuntimeConfig class RuntimeConfigSuite extends SparkFunSuite { - private def newConf(): RuntimeConfig = new RuntimeConfigImpl + private def newConf(): RuntimeConfig = new RuntimeConfig test("set and get") { val conf = newConf() @@ -54,33 +54,4 @@ class RuntimeConfigSuite extends SparkFunSuite { conf.get("k1") } } - - test("set and get hadoop configuration") { - val conf = newConf() - conf - .setHadoop("k1", "v1") - .setHadoop("k2", "v2") - - assert(conf.getHadoop("k1") == "v1") - assert(conf.getHadoop("k2") == "v2") - - intercept[NoSuchElementException] { - conf.get("notset") - } - } - - test("getHadoopOption") { - val conf = newConf().setHadoop("k1", "v1") - assert(conf.getHadoopOption("k1") == Some("v1")) - assert(conf.getHadoopOption("notset") == None) - } - - test("unsetHadoop") { - val conf = newConf().setHadoop("k1", "v1") - assert(conf.getHadoop("k1") == "v1") - conf.unsetHadoop("k1") - intercept[NoSuchElementException] { - conf.getHadoop("k1") - } - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 5577c9f3ee..ffb206af0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -84,34 +84,6 @@ private[sql] trait SQLTestUtils } } - /** - * The Hadoop configuration used by the active [[SQLContext]]. - */ - protected def hadoopConfiguration: Configuration = { - sqlContext.sessionState.hadoopConf - } - - /** - * Sets all Hadoop configurations specified in `pairs`, calls `f`, and then restore all Hadoop - * configurations. - */ - protected def withHadoopConf(pairs: (String, String)*)(f: => Unit): Unit = { - val (keys, _) = pairs.unzip - val originalValues = keys.map(key => Option(hadoopConfiguration.get(key))) - - try { - pairs.foreach { case (key, value) => - hadoopConfiguration.set(key, value) - } - f - } finally { - keys.zip(originalValues).foreach { - case (key, Some(value)) => hadoopConfiguration.set(key, value) - case (key, None) => hadoopConfiguration.unset(key) - } - } - } - /** * Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL * configurations. @@ -164,7 +136,7 @@ private[sql] trait SQLTestUtils } finally { // If the test failed part way, we don't want to mask the failure by failing to remove // temp tables that never got created. - try functions.foreach { case (functionName, isTemporary) => + functions.foreach { case (functionName, isTemporary) => val withTemporary = if (isTemporary) "TEMPORARY" else "" sqlContext.sql(s"DROP $withTemporary FUNCTION IF EXISTS $functionName") assert( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 9799c6d42b..5ef80b9aa3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.test import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext} -import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SQLConf} +import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.internal.{SessionState, SQLConf} /** * A special [[SQLContext]] prepared for testing. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c4db4f307c..58c10b7b1d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -549,7 +549,7 @@ private[hive] class MetaStoreFileCatalog( Some(partitionSpecFromHive.partitionColumns)) { override def getStatus(path: Path): Array[FileStatus] = { - val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf) + val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) fs.listStatus(path) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 4f81967a5b..d6a847f3ba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -57,7 +57,7 @@ private[sql] class DefaultSource files: Seq[FileStatus]): Option[StructType] = { OrcFileOperator.readSchema( files.map(_.getPath.toUri.toString), - Some(new Configuration(sparkSession.sessionState.hadoopConf)) + Some(sparkSession.sessionState.newHadoopConf()) ) } @@ -278,7 +278,7 @@ private[orc] case class OrcTableScan( with HiveInspectors { def execute(): RDD[InternalRow] = { - val job = Job.getInstance(new Configuration(sparkSession.sessionState.hadoopConf)) + val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) val conf = job.getConfiguration // Tries to push down filters if ORC filter push-down is enabled diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index f74e5cd6f5..1d8f24cb27 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION -import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext} +import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo @@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{RuntimeConfigImpl, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{ShutdownHookManager, Utils} // SPARK-3729: Test key required to check for initialization errors with config. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 31ba735708..b21ca4f26e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -374,7 +374,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val expectedPath = sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) val filesystemPath = new Path(expectedPath) - val fs = filesystemPath.getFileSystem(sqlContext.sessionState.hadoopConf) + val fs = filesystemPath.getFileSystem(sqlContext.sessionState.newHadoopConf()) if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) // It is a managed table when we do not specify the location. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index a4c6d3c185..8b3f2d1a0c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -266,14 +266,6 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto intercept[AnalysisException] { sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""") } - - // Unset default URI Scheme and Authority: throw exception - val originalFsName = hiveContext.sessionState.hadoopConf.get("fs.default.name") - hiveContext.sessionState.hadoopConf.unset("fs.default.name") - intercept[AnalysisException] { - sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""") - } - hiveContext.sessionState.hadoopConf.set("fs.default.name", originalFsName) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index e23272de85..687a4a7e51 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -36,7 +36,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val expectedTablePath = hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier) val filesystemPath = new Path(expectedTablePath) - val fs = filesystemPath.getFileSystem(hiveContext.sessionState.hadoopConf) + val fs = filesystemPath.getFileSystem(hiveContext.sessionState.newHadoopConf()) fs.exists(filesystemPath) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index f11c055fb9..b97da1ffdc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -94,7 +94,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { .orc(path) // Check if this is compressed as ZLIB. - val conf = sqlContext.sessionState.hadoopConf + val conf = sqlContext.sessionState.newHadoopConf() val fs = FileSystem.getLocal(conf) val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) assert(maybeOrcFile.isDefined) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 0a0cdf60e8..62d9f5a339 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -181,7 +181,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { // Following codec is supported in hive-0.13.1, ignore it now ignore("Other compression options for writing to an ORC file - 0.13.1 and above") { val data = (1 to 100).map(i => (i, s"val_$i")) - val conf = sqlContext.sessionState.hadoopConf + val conf = sqlContext.sessionState.newHadoopConf() conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY") withOrcFile(data) { file => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala new file mode 100644 index 0000000000..67b403a9bd --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import scala.util.Random + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.ParquetOutputCommitter + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + + +abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with TestHiveSingleton { + import sqlContext.implicits._ + + val dataSourceName: String + + protected def supportsDataType(dataType: DataType): Boolean = true + + val dataSchema = + StructType( + Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", StringType, nullable = false))) + + lazy val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b") + + lazy val partitionedTestDF1 = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2") + + lazy val partitionedTestDF2 = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2") + + lazy val partitionedTestDF = partitionedTestDF1.union(partitionedTestDF2) + + def checkQueries(df: DataFrame): Unit = { + // Selects everything + checkAnswer( + df, + for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) + + // Simple filtering and partition pruning + checkAnswer( + df.filter('a > 1 && 'p1 === 2), + for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2)) + + // Simple projection and filtering + checkAnswer( + df.filter('a > 1).select('b, 'a + 1), + for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1)) + + // Simple projection and partition pruning + checkAnswer( + df.filter('a > 1 && 'p1 < 2).select('b, 'p1), + for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1)) + + // Project many copies of columns with different types (reproduction for SPARK-7858) + checkAnswer( + df.filter('a > 1 && 'p1 < 2).select('b, 'b, 'b, 'b, 'p1, 'p1, 'p1, 'p1), + for (i <- 2 to 3; _ <- Seq("foo", "bar")) + yield Row(s"val_$i", s"val_$i", s"val_$i", s"val_$i", 1, 1, 1, 1)) + + // Self-join + df.registerTempTable("t") + withTempTable("t") { + checkAnswer( + sql( + """SELECT l.a, r.b, l.p1, r.p2 + |FROM t l JOIN t r + |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2 + """.stripMargin), + for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) + } + } + + private val supportedDataTypes = Seq( + StringType, BinaryType, + NullType, BooleanType, + ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), + DateType, TimestampType, + ArrayType(IntegerType), + MapType(StringType, LongType), + new StructType() + .add("f1", FloatType, nullable = true) + .add("f2", ArrayType(BooleanType, containsNull = true), nullable = true), + new UDT.MyDenseVectorUDT() + ).filter(supportsDataType) + + for (dataType <- supportedDataTypes) { + for (parquetDictionaryEncodingEnabled <- Seq(true, false)) { + test(s"test all data types - $dataType with parquet.enable.dictionary = " + + s"$parquetDictionaryEncodingEnabled") { + + val extraOptions = Map[String, String]( + "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString + ) + + withTempPath { file => + val path = file.getCanonicalPath + + val dataGenerator = RandomDataGenerator.forType( + dataType = dataType, + nullable = true, + new Random(System.nanoTime()) + ).getOrElse { + fail(s"Failed to create data generator for schema $dataType") + } + + // Create a DF for the schema with random data. The index field is used to sort the + // DataFrame. This is a workaround for SPARK-10591. + val schema = new StructType() + .add("index", IntegerType, nullable = false) + .add("col", dataType, nullable = true) + val rdd = + sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) + val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1) + + df.write + .mode("overwrite") + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .options(extraOptions) + .save(path) + + val loadedDF = sqlContext + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .schema(df.schema) + .options(extraOptions) + .load(path) + .orderBy("index") + + checkAnswer(loadedDF, df) + } + } + } + } + + test("save()/load() - non-partitioned table - Overwrite") { + withTempPath { file => + testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) + testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) + + checkAnswer( + sqlContext.read.format(dataSourceName) + .option("path", file.getCanonicalPath) + .option("dataSchema", dataSchema.json) + .load(), + testDF.collect()) + } + } + + test("save()/load() - non-partitioned table - Append") { + withTempPath { file => + testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) + testDF.write.mode(SaveMode.Append).format(dataSourceName).save(file.getCanonicalPath) + + checkAnswer( + sqlContext.read.format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(file.getCanonicalPath).orderBy("a"), + testDF.union(testDF).orderBy("a").collect()) + } + } + + test("save()/load() - non-partitioned table - ErrorIfExists") { + withTempDir { file => + intercept[AnalysisException] { + testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).save(file.getCanonicalPath) + } + } + } + + test("save()/load() - non-partitioned table - Ignore") { + withTempDir { file => + testDF.write.mode(SaveMode.Ignore).format(dataSourceName).save(file.getCanonicalPath) + + val path = new Path(file.getCanonicalPath) + val fs = path.getFileSystem(sqlContext.sessionState.newHadoopConf()) + assert(fs.listStatus(path).isEmpty) + } + } + + test("save()/load() - partitioned table - simple queries") { + withTempPath { file => + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.ErrorIfExists) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + checkQueries( + sqlContext.read.format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(file.getCanonicalPath)) + } + } + + test("save()/load() - partitioned table - Overwrite") { + withTempPath { file => + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + checkAnswer( + sqlContext.read.format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(file.getCanonicalPath), + partitionedTestDF.collect()) + } + } + + test("save()/load() - partitioned table - Append") { + withTempPath { file => + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Append) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + checkAnswer( + sqlContext.read.format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(file.getCanonicalPath), + partitionedTestDF.union(partitionedTestDF).collect()) + } + } + + test("save()/load() - partitioned table - Append - new partition values") { + withTempPath { file => + partitionedTestDF1.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + partitionedTestDF2.write + .format(dataSourceName) + .mode(SaveMode.Append) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + checkAnswer( + sqlContext.read.format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(file.getCanonicalPath), + partitionedTestDF.collect()) + } + } + + test("save()/load() - partitioned table - ErrorIfExists") { + withTempDir { file => + intercept[AnalysisException] { + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.ErrorIfExists) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + } + } + } + + test("save()/load() - partitioned table - Ignore") { + withTempDir { file => + partitionedTestDF.write + .format(dataSourceName).mode(SaveMode.Ignore).save(file.getCanonicalPath) + + val path = new Path(file.getCanonicalPath) + val fs = path.getFileSystem(SparkHadoopUtil.get.conf) + assert(fs.listStatus(path).isEmpty) + } + } + + test("saveAsTable()/load() - non-partitioned table - Overwrite") { + testDF.write.format(dataSourceName).mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .saveAsTable("t") + + withTable("t") { + checkAnswer(sqlContext.table("t"), testDF.collect()) + } + } + + test("saveAsTable()/load() - non-partitioned table - Append") { + testDF.write.format(dataSourceName).mode(SaveMode.Overwrite).saveAsTable("t") + testDF.write.format(dataSourceName).mode(SaveMode.Append).saveAsTable("t") + + withTable("t") { + checkAnswer(sqlContext.table("t"), testDF.union(testDF).orderBy("a").collect()) + } + } + + test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + intercept[AnalysisException] { + testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t") + } + } + } + + test("saveAsTable()/load() - non-partitioned table - Ignore") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t") + assert(sqlContext.table("t").collect().isEmpty) + } + } + + test("saveAsTable()/load() - partitioned table - simple queries") { + partitionedTestDF.write.format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .saveAsTable("t") + + withTable("t") { + checkQueries(sqlContext.table("t")) + } + } + + test("saveAsTable()/load() - partitioned table - boolean type") { + sqlContext.range(2) + .select('id, ('id % 2 === 0).as("b")) + .write.partitionBy("b").saveAsTable("t") + + withTable("t") { + checkAnswer( + sqlContext.table("t").sort('id), + Row(0, true) :: Row(1, false) :: Nil + ) + } + } + + test("saveAsTable()/load() - partitioned table - Overwrite") { + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + withTable("t") { + checkAnswer(sqlContext.table("t"), partitionedTestDF.collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append") { + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Append) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + withTable("t") { + checkAnswer(sqlContext.table("t"), partitionedTestDF.union(partitionedTestDF).collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append - new partition values") { + partitionedTestDF1.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + partitionedTestDF2.write + .format(dataSourceName) + .mode(SaveMode.Append) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + withTable("t") { + checkAnswer(sqlContext.table("t"), partitionedTestDF.collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") { + partitionedTestDF1.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + // Using only a subset of all partition columns + intercept[Throwable] { + partitionedTestDF2.write + .format(dataSourceName) + .mode(SaveMode.Append) + .option("dataSchema", dataSchema.json) + .partitionBy("p1") + .saveAsTable("t") + } + } + + test("saveAsTable()/load() - partitioned table - ErrorIfExists") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + intercept[AnalysisException] { + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.ErrorIfExists) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + } + } + } + + test("saveAsTable()/load() - partitioned table - Ignore") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Ignore) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + assert(sqlContext.table("t").collect().isEmpty) + } + } + + test("Hadoop style globbing") { + withTempPath { file => + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + val df = sqlContext.read + .format(dataSourceName) + .option("dataSchema", dataSchema.json) + .option("basePath", file.getCanonicalPath) + .load(s"${file.getCanonicalPath}/p1=*/p2=???") + + val expectedPaths = Set( + s"${file.getCanonicalFile}/p1=1/p2=foo", + s"${file.getCanonicalFile}/p1=2/p2=foo", + s"${file.getCanonicalFile}/p1=1/p2=bar", + s"${file.getCanonicalFile}/p1=2/p2=bar" + ).map { p => + val path = new Path(p) + val fs = path.getFileSystem(sqlContext.sessionState.newHadoopConf()) + path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString + } + + val actualPaths = df.queryExecution.analyzed.collectFirst { + case LogicalRelation(relation: HadoopFsRelation, _, _) => + relation.location.paths.map(_.toString).toSet + }.getOrElse { + fail("Expect an FSBasedRelation, but none could be found") + } + + assert(actualPaths === expectedPaths) + checkAnswer(df, partitionedTestDF.collect()) + } + } + + test("SPARK-9735 Partition column type casting") { + withTempPath { file => + val df = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 1.0d, p2, 123, 123.123f)).toDF("a", "b", "p1", "p2", "p3", "f") + + val input = df.select( + 'a, + 'b, + 'p1.cast(StringType).as('ps1), + 'p2, + 'p3.cast(FloatType).as('pf1), + 'f) + + withTempTable("t") { + input + .write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("ps1", "p2", "pf1", "f") + .saveAsTable("t") + + input + .write + .format(dataSourceName) + .mode(SaveMode.Append) + .partitionBy("ps1", "p2", "pf1", "f") + .saveAsTable("t") + + val realData = input.collect() + + checkAnswer(sqlContext.table("t"), realData ++ realData) + } + } + } + + test("SPARK-7616: adjust column name order accordingly when saving partitioned table") { + val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c") + + df.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("c", "a") + .saveAsTable("t") + + withTable("t") { + checkAnswer(sqlContext.table("t").select('b, 'c, 'a), df.select('b, 'c, 'a).collect()) + } + } + + // NOTE: This test suite is not super deterministic. On nodes with only relatively few cores + // (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or + // more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this + // requirement. We probably want to move this test case to spark-integration-tests or spark-perf + // later. + test("SPARK-8406: Avoids name collision while writing files") { + withTempPath { dir => + val path = dir.getCanonicalPath + sqlContext + .range(10000) + .repartition(250) + .write + .mode(SaveMode.Overwrite) + .format(dataSourceName) + .save(path) + + assertResult(10000) { + sqlContext + .read + .format(dataSourceName) + .option("dataSchema", StructType(StructField("id", LongType) :: Nil).json) + .load(path) + .count() + } + } + } + + test("SPARK-8578 specified custom output committer will not be used to append data") { + val extraOptions = Map[String, String]( + SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName, + // Since Parquet has its own output committer setting, also set it + // to AlwaysFailParquetOutputCommitter at here. + "spark.sql.parquet.output.committer.class" -> + classOf[AlwaysFailParquetOutputCommitter].getName + ) + + val df = sqlContext.range(1, 10).toDF("i") + withTempPath { dir => + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + // Because there data already exists, + // this append should succeed because we will use the output committer associated + // with file format and AlwaysFailOutputCommitter will not be used. + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + checkAnswer( + sqlContext.read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .options(extraOptions) + .load(dir.getCanonicalPath), + df.union(df)) + + // This will fail because AlwaysFailOutputCommitter is used when we do append. + intercept[Exception] { + df.write.mode("overwrite") + .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath) + } + } + withTempPath { dir => + // Because there is no existing data, + // this append will fail because AlwaysFailOutputCommitter is used when we do append + // and there is no existing data. + intercept[Exception] { + df.write.mode("append") + .options(extraOptions) + .format(dataSourceName) + .save(dir.getCanonicalPath) + } + } + } + + test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") { + val df = Seq( + (1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")), + (2, "v2", Array(4, 5, 6), Map("k2" -> "v2"), Tuple2(2, "5")), + (3, "v3", Array(7, 8, 9), Map("k3" -> "v3"), Tuple2(3, "6"))).toDF("a", "b", "c", "d", "e") + withTempDir { file => + intercept[AnalysisException] { + df.write.format(dataSourceName).partitionBy("c", "d", "e").save(file.getCanonicalPath) + } + } + intercept[AnalysisException] { + df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t") + } + } + + test("Locality support for FileScanRDD") { + val options = Map[String, String]( + "fs.file.impl" -> classOf[LocalityTestFileSystem].getName, + "fs.file.impl.disable.cache" -> "true" + ) + withTempPath { dir => + val path = "file://" + dir.getCanonicalPath + val df1 = sqlContext.range(4) + df1.coalesce(1).write.mode("overwrite").options(options).format(dataSourceName).save(path) + df1.coalesce(1).write.mode("append").options(options).format(dataSourceName).save(path) + + def checkLocality(): Unit = { + val df2 = sqlContext.read + .format(dataSourceName) + .option("dataSchema", df1.schema.json) + .options(options) + .load(path) + + val Some(fileScanRDD) = df2.queryExecution.executedPlan.collectFirst { + case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] => + scan.rdd.asInstanceOf[FileScanRDD] + } + + val partitions = fileScanRDD.partitions + val preferredLocations = partitions.flatMap(fileScanRDD.preferredLocations) + + assert(preferredLocations.distinct.length == 2) + } + + checkLocality() + + withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") { + checkLocality() + } + } + } +} + +// This class is used to test SPARK-8578. We should not use any custom output committer when +// we actually append data to an existing dir. +class AlwaysFailOutputCommitter( + outputPath: Path, + context: TaskAttemptContext) + extends FileOutputCommitter(outputPath, context) { + + override def commitJob(context: JobContext): Unit = { + sys.error("Intentional job commitment failure for testing purpose.") + } +} + +// This class is used to test SPARK-8578. We should not use any custom output committer when +// we actually append data to an existing dir. +class AlwaysFailParquetOutputCommitter( + outputPath: Path, + context: TaskAttemptContext) + extends ParquetOutputCommitter(outputPath, context) { + + override def commitJob(context: JobContext): Unit = { + sys.error("Intentional job commitment failure for testing purpose.") + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 19749a9713..1d104889fe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -132,7 +132,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { val summaryPath = new Path(path, "_metadata") val commonSummaryPath = new Path(path, "_common_metadata") - val fs = summaryPath.getFileSystem(hadoopConfiguration) + val fs = summaryPath.getFileSystem(sqlContext.sessionState.newHadoopConf()) fs.delete(summaryPath, true) fs.delete(commonSummaryPath, true) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala deleted file mode 100644 index 3b16468e76..0000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ /dev/null @@ -1,733 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import scala.collection.JavaConverters._ -import scala.util.Random - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter -import org.apache.parquet.hadoop.ParquetOutputCommitter - -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql._ -import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation} -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types._ - - -abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with TestHiveSingleton { - import sqlContext.implicits._ - - val dataSourceName: String - - protected def supportsDataType(dataType: DataType): Boolean = true - - val dataSchema = - StructType( - Seq( - StructField("a", IntegerType, nullable = false), - StructField("b", StringType, nullable = false))) - - lazy val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b") - - lazy val partitionedTestDF1 = (for { - i <- 1 to 3 - p2 <- Seq("foo", "bar") - } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2") - - lazy val partitionedTestDF2 = (for { - i <- 1 to 3 - p2 <- Seq("foo", "bar") - } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2") - - lazy val partitionedTestDF = partitionedTestDF1.union(partitionedTestDF2) - - def checkQueries(df: DataFrame): Unit = { - // Selects everything - checkAnswer( - df, - for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) - - // Simple filtering and partition pruning - checkAnswer( - df.filter('a > 1 && 'p1 === 2), - for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2)) - - // Simple projection and filtering - checkAnswer( - df.filter('a > 1).select('b, 'a + 1), - for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1)) - - // Simple projection and partition pruning - checkAnswer( - df.filter('a > 1 && 'p1 < 2).select('b, 'p1), - for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1)) - - // Project many copies of columns with different types (reproduction for SPARK-7858) - checkAnswer( - df.filter('a > 1 && 'p1 < 2).select('b, 'b, 'b, 'b, 'p1, 'p1, 'p1, 'p1), - for (i <- 2 to 3; _ <- Seq("foo", "bar")) - yield Row(s"val_$i", s"val_$i", s"val_$i", s"val_$i", 1, 1, 1, 1)) - - // Self-join - df.registerTempTable("t") - withTempTable("t") { - checkAnswer( - sql( - """SELECT l.a, r.b, l.p1, r.p2 - |FROM t l JOIN t r - |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2 - """.stripMargin), - for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) - } - } - - private val supportedDataTypes = Seq( - StringType, BinaryType, - NullType, BooleanType, - ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), - MapType(StringType, LongType), - new StructType() - .add("f1", FloatType, nullable = true) - .add("f2", ArrayType(BooleanType, containsNull = true), nullable = true), - new UDT.MyDenseVectorUDT() - ).filter(supportsDataType) - - try { - for (dataType <- supportedDataTypes) { - for (parquetDictionaryEncodingEnabled <- Seq(true, false)) { - test(s"test all data types - $dataType with parquet.enable.dictionary = " + - s"$parquetDictionaryEncodingEnabled") { - - hadoopConfiguration.setBoolean("parquet.enable.dictionary", - parquetDictionaryEncodingEnabled) - - withTempPath { file => - val path = file.getCanonicalPath - - val dataGenerator = RandomDataGenerator.forType( - dataType = dataType, - nullable = true, - new Random(System.nanoTime()) - ).getOrElse { - fail(s"Failed to create data generator for schema $dataType") - } - - // Create a DF for the schema with random data. The index field is used to sort the - // DataFrame. This is a workaround for SPARK-10591. - val schema = new StructType() - .add("index", IntegerType, nullable = false) - .add("col", dataType, nullable = true) - val rdd = - sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) - val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1) - - df.write - .mode("overwrite") - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .save(path) - - val loadedDF = sqlContext - .read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .schema(df.schema) - .load(path) - .orderBy("index") - - checkAnswer(loadedDF, df) - } - } - } - } - } finally { - hadoopConfiguration.unset("parquet.enable.dictionary") - } - - test("save()/load() - non-partitioned table - Overwrite") { - withTempPath { file => - testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) - testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) - - checkAnswer( - sqlContext.read.format(dataSourceName) - .option("path", file.getCanonicalPath) - .option("dataSchema", dataSchema.json) - .load(), - testDF.collect()) - } - } - - test("save()/load() - non-partitioned table - Append") { - withTempPath { file => - testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) - testDF.write.mode(SaveMode.Append).format(dataSourceName).save(file.getCanonicalPath) - - checkAnswer( - sqlContext.read.format(dataSourceName) - .option("dataSchema", dataSchema.json) - .load(file.getCanonicalPath).orderBy("a"), - testDF.union(testDF).orderBy("a").collect()) - } - } - - test("save()/load() - non-partitioned table - ErrorIfExists") { - withTempDir { file => - intercept[AnalysisException] { - testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).save(file.getCanonicalPath) - } - } - } - - test("save()/load() - non-partitioned table - Ignore") { - withTempDir { file => - testDF.write.mode(SaveMode.Ignore).format(dataSourceName).save(file.getCanonicalPath) - - val path = new Path(file.getCanonicalPath) - val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf) - assert(fs.listStatus(path).isEmpty) - } - } - - test("save()/load() - partitioned table - simple queries") { - withTempPath { file => - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.ErrorIfExists) - .partitionBy("p1", "p2") - .save(file.getCanonicalPath) - - checkQueries( - sqlContext.read.format(dataSourceName) - .option("dataSchema", dataSchema.json) - .load(file.getCanonicalPath)) - } - } - - test("save()/load() - partitioned table - Overwrite") { - withTempPath { file => - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .partitionBy("p1", "p2") - .save(file.getCanonicalPath) - - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .partitionBy("p1", "p2") - .save(file.getCanonicalPath) - - checkAnswer( - sqlContext.read.format(dataSourceName) - .option("dataSchema", dataSchema.json) - .load(file.getCanonicalPath), - partitionedTestDF.collect()) - } - } - - test("save()/load() - partitioned table - Append") { - withTempPath { file => - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .partitionBy("p1", "p2") - .save(file.getCanonicalPath) - - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.Append) - .partitionBy("p1", "p2") - .save(file.getCanonicalPath) - - checkAnswer( - sqlContext.read.format(dataSourceName) - .option("dataSchema", dataSchema.json) - .load(file.getCanonicalPath), - partitionedTestDF.union(partitionedTestDF).collect()) - } - } - - test("save()/load() - partitioned table - Append - new partition values") { - withTempPath { file => - partitionedTestDF1.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .partitionBy("p1", "p2") - .save(file.getCanonicalPath) - - partitionedTestDF2.write - .format(dataSourceName) - .mode(SaveMode.Append) - .partitionBy("p1", "p2") - .save(file.getCanonicalPath) - - checkAnswer( - sqlContext.read.format(dataSourceName) - .option("dataSchema", dataSchema.json) - .load(file.getCanonicalPath), - partitionedTestDF.collect()) - } - } - - test("save()/load() - partitioned table - ErrorIfExists") { - withTempDir { file => - intercept[AnalysisException] { - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.ErrorIfExists) - .partitionBy("p1", "p2") - .save(file.getCanonicalPath) - } - } - } - - test("save()/load() - partitioned table - Ignore") { - withTempDir { file => - partitionedTestDF.write - .format(dataSourceName).mode(SaveMode.Ignore).save(file.getCanonicalPath) - - val path = new Path(file.getCanonicalPath) - val fs = path.getFileSystem(SparkHadoopUtil.get.conf) - assert(fs.listStatus(path).isEmpty) - } - } - - test("saveAsTable()/load() - non-partitioned table - Overwrite") { - testDF.write.format(dataSourceName).mode(SaveMode.Overwrite) - .option("dataSchema", dataSchema.json) - .saveAsTable("t") - - withTable("t") { - checkAnswer(sqlContext.table("t"), testDF.collect()) - } - } - - test("saveAsTable()/load() - non-partitioned table - Append") { - testDF.write.format(dataSourceName).mode(SaveMode.Overwrite).saveAsTable("t") - testDF.write.format(dataSourceName).mode(SaveMode.Append).saveAsTable("t") - - withTable("t") { - checkAnswer(sqlContext.table("t"), testDF.union(testDF).orderBy("a").collect()) - } - } - - test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - intercept[AnalysisException] { - testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t") - } - } - } - - test("saveAsTable()/load() - non-partitioned table - Ignore") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t") - assert(sqlContext.table("t").collect().isEmpty) - } - } - - test("saveAsTable()/load() - partitioned table - simple queries") { - partitionedTestDF.write.format(dataSourceName) - .mode(SaveMode.Overwrite) - .option("dataSchema", dataSchema.json) - .saveAsTable("t") - - withTable("t") { - checkQueries(sqlContext.table("t")) - } - } - - test("saveAsTable()/load() - partitioned table - boolean type") { - sqlContext.range(2) - .select('id, ('id % 2 === 0).as("b")) - .write.partitionBy("b").saveAsTable("t") - - withTable("t") { - checkAnswer( - sqlContext.table("t").sort('id), - Row(0, true) :: Row(1, false) :: Nil - ) - } - } - - test("saveAsTable()/load() - partitioned table - Overwrite") { - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .option("dataSchema", dataSchema.json) - .partitionBy("p1", "p2") - .saveAsTable("t") - - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .option("dataSchema", dataSchema.json) - .partitionBy("p1", "p2") - .saveAsTable("t") - - withTable("t") { - checkAnswer(sqlContext.table("t"), partitionedTestDF.collect()) - } - } - - test("saveAsTable()/load() - partitioned table - Append") { - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .option("dataSchema", dataSchema.json) - .partitionBy("p1", "p2") - .saveAsTable("t") - - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.Append) - .option("dataSchema", dataSchema.json) - .partitionBy("p1", "p2") - .saveAsTable("t") - - withTable("t") { - checkAnswer(sqlContext.table("t"), partitionedTestDF.union(partitionedTestDF).collect()) - } - } - - test("saveAsTable()/load() - partitioned table - Append - new partition values") { - partitionedTestDF1.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .option("dataSchema", dataSchema.json) - .partitionBy("p1", "p2") - .saveAsTable("t") - - partitionedTestDF2.write - .format(dataSourceName) - .mode(SaveMode.Append) - .option("dataSchema", dataSchema.json) - .partitionBy("p1", "p2") - .saveAsTable("t") - - withTable("t") { - checkAnswer(sqlContext.table("t"), partitionedTestDF.collect()) - } - } - - test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") { - partitionedTestDF1.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .option("dataSchema", dataSchema.json) - .partitionBy("p1", "p2") - .saveAsTable("t") - - // Using only a subset of all partition columns - intercept[Throwable] { - partitionedTestDF2.write - .format(dataSourceName) - .mode(SaveMode.Append) - .option("dataSchema", dataSchema.json) - .partitionBy("p1") - .saveAsTable("t") - } - } - - test("saveAsTable()/load() - partitioned table - ErrorIfExists") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - intercept[AnalysisException] { - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.ErrorIfExists) - .option("dataSchema", dataSchema.json) - .partitionBy("p1", "p2") - .saveAsTable("t") - } - } - } - - test("saveAsTable()/load() - partitioned table - Ignore") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.Ignore) - .option("dataSchema", dataSchema.json) - .partitionBy("p1", "p2") - .saveAsTable("t") - - assert(sqlContext.table("t").collect().isEmpty) - } - } - - test("Hadoop style globbing") { - withTempPath { file => - partitionedTestDF.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .partitionBy("p1", "p2") - .save(file.getCanonicalPath) - - val df = sqlContext.read - .format(dataSourceName) - .option("dataSchema", dataSchema.json) - .option("basePath", file.getCanonicalPath) - .load(s"${file.getCanonicalPath}/p1=*/p2=???") - - val expectedPaths = Set( - s"${file.getCanonicalFile}/p1=1/p2=foo", - s"${file.getCanonicalFile}/p1=2/p2=foo", - s"${file.getCanonicalFile}/p1=1/p2=bar", - s"${file.getCanonicalFile}/p1=2/p2=bar" - ).map { p => - val path = new Path(p) - val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf) - path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString - } - - val actualPaths = df.queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: HadoopFsRelation, _, _) => - relation.location.paths.map(_.toString).toSet - }.getOrElse { - fail("Expect an FSBasedRelation, but none could be found") - } - - assert(actualPaths === expectedPaths) - checkAnswer(df, partitionedTestDF.collect()) - } - } - - test("SPARK-9735 Partition column type casting") { - withTempPath { file => - val df = (for { - i <- 1 to 3 - p2 <- Seq("foo", "bar") - } yield (i, s"val_$i", 1.0d, p2, 123, 123.123f)).toDF("a", "b", "p1", "p2", "p3", "f") - - val input = df.select( - 'a, - 'b, - 'p1.cast(StringType).as('ps1), - 'p2, - 'p3.cast(FloatType).as('pf1), - 'f) - - withTempTable("t") { - input - .write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .partitionBy("ps1", "p2", "pf1", "f") - .saveAsTable("t") - - input - .write - .format(dataSourceName) - .mode(SaveMode.Append) - .partitionBy("ps1", "p2", "pf1", "f") - .saveAsTable("t") - - val realData = input.collect() - - checkAnswer(sqlContext.table("t"), realData ++ realData) - } - } - } - - test("SPARK-7616: adjust column name order accordingly when saving partitioned table") { - val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c") - - df.write - .format(dataSourceName) - .mode(SaveMode.Overwrite) - .partitionBy("c", "a") - .saveAsTable("t") - - withTable("t") { - checkAnswer(sqlContext.table("t").select('b, 'c, 'a), df.select('b, 'c, 'a).collect()) - } - } - - // NOTE: This test suite is not super deterministic. On nodes with only relatively few cores - // (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or - // more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this - // requirement. We probably want to move this test case to spark-integration-tests or spark-perf - // later. - test("SPARK-8406: Avoids name collision while writing files") { - withTempPath { dir => - val path = dir.getCanonicalPath - sqlContext - .range(10000) - .repartition(250) - .write - .mode(SaveMode.Overwrite) - .format(dataSourceName) - .save(path) - - assertResult(10000) { - sqlContext - .read - .format(dataSourceName) - .option("dataSchema", StructType(StructField("id", LongType) :: Nil).json) - .load(path) - .count() - } - } - } - - test("SPARK-8578 specified custom output committer will not be used to append data") { - val clonedConf = new Configuration(hadoopConfiguration) - try { - val df = sqlContext.range(1, 10).toDF("i") - withTempPath { dir => - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - hadoopConfiguration.set( - SQLConf.OUTPUT_COMMITTER_CLASS.key, - classOf[AlwaysFailOutputCommitter].getName) - // Since Parquet has its own output committer setting, also set it - // to AlwaysFailParquetOutputCommitter at here. - hadoopConfiguration.set("spark.sql.parquet.output.committer.class", - classOf[AlwaysFailParquetOutputCommitter].getName) - // Because there data already exists, - // this append should succeed because we will use the output committer associated - // with file format and AlwaysFailOutputCommitter will not be used. - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - checkAnswer( - sqlContext.read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .load(dir.getCanonicalPath), - df.union(df)) - - // This will fail because AlwaysFailOutputCommitter is used when we do append. - intercept[Exception] { - df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath) - } - } - withTempPath { dir => - hadoopConfiguration.set( - SQLConf.OUTPUT_COMMITTER_CLASS.key, - classOf[AlwaysFailOutputCommitter].getName) - // Since Parquet has its own output committer setting, also set it - // to AlwaysFailParquetOutputCommitter at here. - hadoopConfiguration.set("spark.sql.parquet.output.committer.class", - classOf[AlwaysFailParquetOutputCommitter].getName) - // Because there is no existing data, - // this append will fail because AlwaysFailOutputCommitter is used when we do append - // and there is no existing data. - intercept[Exception] { - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - } - } - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - hadoopConfiguration.clear() - clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) - } - } - - test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") { - val df = Seq( - (1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")), - (2, "v2", Array(4, 5, 6), Map("k2" -> "v2"), Tuple2(2, "5")), - (3, "v3", Array(7, 8, 9), Map("k3" -> "v3"), Tuple2(3, "6"))).toDF("a", "b", "c", "d", "e") - withTempDir { file => - intercept[AnalysisException] { - df.write.format(dataSourceName).partitionBy("c", "d", "e").save(file.getCanonicalPath) - } - } - intercept[AnalysisException] { - df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t") - } - } - - test("Locality support for FileScanRDD") { - withHadoopConf( - "fs.file.impl" -> classOf[LocalityTestFileSystem].getName, - "fs.file.impl.disable.cache" -> "true" - ) { - withTempPath { dir => - val path = "file://" + dir.getCanonicalPath - val df1 = sqlContext.range(4) - df1.coalesce(1).write.mode("overwrite").format(dataSourceName).save(path) - df1.coalesce(1).write.mode("append").format(dataSourceName).save(path) - - def checkLocality(): Unit = { - val df2 = sqlContext.read - .format(dataSourceName) - .option("dataSchema", df1.schema.json) - .load(path) - - val Some(fileScanRDD) = df2.queryExecution.executedPlan.collectFirst { - case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] => - scan.rdd.asInstanceOf[FileScanRDD] - } - - val partitions = fileScanRDD.partitions - val preferredLocations = partitions.flatMap(fileScanRDD.preferredLocations) - - assert(preferredLocations.distinct.length == 2) - } - - checkLocality() - - withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") { - checkLocality() - } - } - } - } -} - -// This class is used to test SPARK-8578. We should not use any custom output committer when -// we actually append data to an existing dir. -class AlwaysFailOutputCommitter( - outputPath: Path, - context: TaskAttemptContext) - extends FileOutputCommitter(outputPath, context) { - - override def commitJob(context: JobContext): Unit = { - sys.error("Intentional job commitment failure for testing purpose.") - } -} - -// This class is used to test SPARK-8578. We should not use any custom output committer when -// we actually append data to an existing dir. -class AlwaysFailParquetOutputCommitter( - outputPath: Path, - context: TaskAttemptContext) - extends ParquetOutputCommitter(outputPath, context) { - - override def commitJob(context: JobContext): Unit = { - sys.error("Intentional job commitment failure for testing purpose.") - } -} -- cgit v1.2.3