aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-26 22:02:28 -0700
committerReynold Xin <rxin@databricks.com>2016-04-26 22:02:28 -0700
commit8fda5a73dc165fda2229a27c5a9e148b43b91c3a (patch)
treeda0fe0131f4ff26eafbec226f1668debfb60dab4
parentd8a83a564ff3fd0281007adbf8aa3757da8a2c2b (diff)
downloadspark-8fda5a73dc165fda2229a27c5a9e148b43b91c3a.tar.gz
spark-8fda5a73dc165fda2229a27c5a9e148b43b91c3a.tar.bz2
spark-8fda5a73dc165fda2229a27c5a9e148b43b91c3a.zip
[SPARK-14913][SQL] Simplify configuration API
## What changes were proposed in this pull request? We currently expose both Hadoop configuration and Spark SQL configuration in RuntimeConfig. I think we can remove the Hadoop configuration part, and simply generate Hadoop Configuration on the fly by passing all the SQL configurations into it. This way, there is a single interface (in Java/Scala/Python/SQL) for end-users. As part of this patch, I also removed some config options deprecated in Spark 1.x. ## How was this patch tested? Updated relevant tests. Author: Reynold Xin <rxin@databricks.com> Closes #12689 from rxin/SPARK-14913.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala65
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala64
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala73
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala82
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala65
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala93
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala133
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala36
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/RuntimeConfigSuite.scala31
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala30
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala)229
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala2
42 files changed, 368 insertions, 671 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index c0811f6a4f..6a600c1379 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -297,7 +297,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
// If offsets have already been created, we trying to resume a query.
val checkpointPath = new Path(checkpointLocation, "offsets")
- val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.hadoopConf)
+ val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
index e90a042431..bf97d728b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -17,33 +17,44 @@
package org.apache.spark.sql
+import org.apache.spark.sql.internal.SQLConf
+
/**
- * Runtime configuration interface for Spark. To access this, use `SparkSession.conf`.
+ * Runtime configuration interface for Spark. To access this, use [[SparkSession.conf]].
+ *
+ * Options set here are automatically propagated to the Hadoop configuration during I/O.
*
* @since 2.0.0
*/
-abstract class RuntimeConfig {
+class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
/**
* Sets the given Spark runtime configuration property.
*
* @since 2.0.0
*/
- def set(key: String, value: String): RuntimeConfig
+ def set(key: String, value: String): RuntimeConfig = {
+ sqlConf.setConfString(key, value)
+ this
+ }
/**
* Sets the given Spark runtime configuration property.
*
* @since 2.0.0
*/
- def set(key: String, value: Boolean): RuntimeConfig
+ def set(key: String, value: Boolean): RuntimeConfig = {
+ set(key, value.toString)
+ }
/**
* Sets the given Spark runtime configuration property.
*
* @since 2.0.0
*/
- def set(key: String, value: Long): RuntimeConfig
+ def set(key: String, value: Long): RuntimeConfig = {
+ set(key, value.toString)
+ }
/**
* Returns the value of Spark runtime configuration property for the given key.
@@ -52,49 +63,27 @@ abstract class RuntimeConfig {
* @since 2.0.0
*/
@throws[NoSuchElementException]("if the key is not set")
- def get(key: String): String
+ def get(key: String): String = {
+ sqlConf.getConfString(key)
+ }
/**
* Returns the value of Spark runtime configuration property for the given key.
*
* @since 2.0.0
*/
- def getOption(key: String): Option[String]
+ def getOption(key: String): Option[String] = {
+ try Option(get(key)) catch {
+ case _: NoSuchElementException => None
+ }
+ }
/**
* Resets the configuration property for the given key.
*
* @since 2.0.0
*/
- def unset(key: String): Unit
-
- /**
- * Sets the given Hadoop configuration property. This is passed directly to Hadoop during I/O.
- *
- * @since 2.0.0
- */
- def setHadoop(key: String, value: String): RuntimeConfig
-
- /**
- * Returns the value of the Hadoop configuration property.
- *
- * @throws NoSuchElementException if the key is not set
- * @since 2.0.0
- */
- @throws[NoSuchElementException]("if the key is not set")
- def getHadoop(key: String): String
-
- /**
- * Returns the value of the Hadoop configuration property.
- *
- * @since 2.0.0
- */
- def getHadoopOption(key: String): Option[String]
-
- /**
- * Resets the Hadoop configuration property for the given key.
- *
- * @since 2.0.0
- */
- def unsetHadoop(key: String): Unit
+ def unset(key: String): Unit = {
+ sqlConf.unsetConf(key)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 6477f42680..f05546a567 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan,
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{CatalogImpl, RuntimeConfigImpl, SessionState, SharedState}
+import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -201,9 +201,7 @@ class SparkSession private(
* @group config
* @since 2.0.0
*/
- @transient lazy val conf: RuntimeConfig = {
- new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf)
- }
+ @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf)
/**
* Set Spark SQL configuration properties.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
index b6f7808398..54ff5ae7d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
@@ -77,7 +77,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
catalogTable.storage.locationUri.map { p =>
val path = new Path(p)
try {
- val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf)
+ val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
calculateTableSize(fs, path)
} catch {
case NonFatal(e) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index 952a0d676f..bbb2a2235f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -62,70 +62,6 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)
- case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) =>
- val runFunc = (sparkSession: SparkSession) => {
- logWarning(
- s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " +
- s"External sort will continue to be used.")
- Seq(Row(SQLConf.Deprecated.EXTERNAL_SORT, "true"))
- }
- (keyValueOutput, runFunc)
-
- case Some((SQLConf.Deprecated.USE_SQL_AGGREGATE2, Some(value))) =>
- val runFunc = (sparkSession: SparkSession) => {
- logWarning(
- s"Property ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} is deprecated and " +
- s"will be ignored. ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} will " +
- s"continue to be true.")
- Seq(Row(SQLConf.Deprecated.USE_SQL_AGGREGATE2, "true"))
- }
- (keyValueOutput, runFunc)
-
- case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
- val runFunc = (sparkSession: SparkSession) => {
- logWarning(
- s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " +
- s"will be ignored. Tungsten will continue to be used.")
- Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
- }
- (keyValueOutput, runFunc)
-
- case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) =>
- val runFunc = (sparkSession: SparkSession) => {
- logWarning(
- s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " +
- s"will be ignored. Codegen will continue to be used.")
- Seq(Row(SQLConf.Deprecated.CODEGEN_ENABLED, "true"))
- }
- (keyValueOutput, runFunc)
-
- case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) =>
- val runFunc = (sparkSession: SparkSession) => {
- logWarning(
- s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " +
- s"will be ignored. Unsafe mode will continue to be used.")
- Seq(Row(SQLConf.Deprecated.UNSAFE_ENABLED, "true"))
- }
- (keyValueOutput, runFunc)
-
- case Some((SQLConf.Deprecated.SORTMERGE_JOIN, Some(value))) =>
- val runFunc = (sparkSession: SparkSession) => {
- logWarning(
- s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " +
- s"will be ignored. Sort merge join will continue to be used.")
- Seq(Row(SQLConf.Deprecated.SORTMERGE_JOIN, "true"))
- }
- (keyValueOutput, runFunc)
-
- case Some((SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED, Some(value))) =>
- val runFunc = (sparkSession: SparkSession) => {
- logWarning(
- s"Property ${SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED} is " +
- s"deprecated and will be ignored. Vectorized parquet reader will be used instead.")
- Seq(Row(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, "true"))
- }
- (keyValueOutput, runFunc)
-
// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sparkSession: SparkSession) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 8d9feec9b8..f38e260bc9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -210,7 +210,7 @@ case class LoadData(
// Follow Hive's behavior:
// If no schema or authority is provided with non-local inpath,
// we will use hadoop configuration "fs.default.name".
- val defaultFSConf = sparkSession.sessionState.hadoopConf.get("fs.default.name")
+ val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.default.name")
val defaultFS = if (defaultFSConf == null) {
new URI("")
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 8e72e06b1f..2f3826f72b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -131,7 +131,7 @@ case class DataSource(
val allPaths = caseInsensitiveOptions.get("path")
val globbedPaths = allPaths.toSeq.flatMap { path =>
val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf)
+ val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
@@ -226,7 +226,7 @@ case class DataSource(
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
- val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf)
+ val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
val res = fs.exists(metadataPath)
res
@@ -284,7 +284,7 @@ case class DataSource(
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val globbedPaths = allPaths.flatMap { path =>
val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sparkSession.sessionState.hadoopConf)
+ val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
@@ -374,7 +374,7 @@ case class DataSource(
val path = new Path(caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
}))
- val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf)
+ val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index c26cae84d7..615906a52e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.mutable.ArrayBuffer
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.internal.Logging
@@ -107,9 +106,6 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
- val hadoopConf = new Configuration(files.sparkSession.sessionState.hadoopConf)
- files.options.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) }
-
val readFile = files.fileFormat.buildReader(
sparkSession = files.sparkSession,
dataSchema = files.dataSchema,
@@ -117,7 +113,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
requiredSchema = prunedDataSchema,
filters = pushedDownFilters,
options = files.options,
- hadoopConf = hadoopConf)
+ hadoopConf = files.sparkSession.sessionState.newHadoopConfWithOptions(files.options))
val plannedPartitions = files.bucketSpec match {
case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index fa954975b8..4df7d0ce4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -78,7 +78,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
s"cannot save to file.")
}
- val hadoopConf = new Configuration(sparkSession.sessionState.hadoopConf)
+ val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
@@ -106,10 +106,6 @@ private[sql] case class InsertIntoHadoopFsRelation(
val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
-
- // Also set the options in Hadoop Configuration
- options.foreach { case (k, v) => if (v ne null) job.getConfiguration.set(k, v) }
-
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
val partitionSet = AttributeSet(partitionColumns)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 3058e79201..25f88d9c39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -274,7 +274,7 @@ class HDFSFileCatalog(
partitionSchema: Option[StructType])
extends FileCatalog with Logging {
- private val hadoopConf = new Configuration(sparkSession.sessionState.hadoopConf)
+ private val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index b6b3907e3e..62446583a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -128,7 +128,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
private def createBaseRdd(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus]): RDD[String] = {
- val job = Job.getInstance(sparkSession.sessionState.hadoopConf)
+ val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
val conf = job.getConfiguration
val paths = inputPaths.map(_.getPath)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 5be8770790..c689ad08ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -652,7 +652,7 @@ private[sql] object ParquetRelation extends Logging {
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat
val serializedConf =
- new SerializableConfiguration(sparkSession.sessionState.hadoopConf)
+ new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
// !! HACK ALERT !!
//
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 61c9c88cb3..70aea7fa49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -45,7 +45,7 @@ class FileStreamSink(
private val basePath = new Path(path)
private val logPath = new Path(basePath, FileStreamSink.metadataDir)
private val fileLog = new FileStreamSinkLog(sparkSession, logPath.toUri.toString)
- private val fs = basePath.getFileSystem(sparkSession.sessionState.hadoopConf)
+ private val fs = basePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index e22a05bd3b..681addea02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -38,7 +38,7 @@ class FileStreamSource(
override val schema: StructType,
dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
- private val fs = new Path(path).getFileSystem(sparkSession.sessionState.hadoopConf)
+ private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 99bf20c746..9fe06a6c36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -211,7 +211,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
}
private def createFileManager(): FileManager = {
- val hadoopConf = new Configuration(sparkSession.sessionState.hadoopConf)
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
try {
new FileContextManager(metadataPath, hadoopConf)
} catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
index b2bc31634c..4f699719c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
@@ -30,7 +30,7 @@ class StreamFileCatalog(sparkSession: SparkSession, path: Path) extends FileCata
val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
val metadataLog = new FileStreamSinkLog(sparkSession, metadataDirectory.toUri.toString)
- val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf)
+ val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
override def paths: Seq[Path] = path :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index 635bb86607..e16dda8a5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -46,7 +46,7 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = dataRDD.context.broadcast(
- new SerializableConfiguration(sessionState.hadoopConf))
+ new SerializableConfiguration(sessionState.newHadoopConf()))
override protected def getPartitions: Array[Partition] = dataRDD.partitions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
deleted file mode 100644
index 137323583b..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.internal
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark.sql.RuntimeConfig
-
-
-/**
- * Implementation for [[RuntimeConfig]].
- */
-class RuntimeConfigImpl(
- sqlConf: SQLConf = new SQLConf,
- hadoopConf: Configuration = new Configuration)
- extends RuntimeConfig {
-
- override def set(key: String, value: String): RuntimeConfig = {
- sqlConf.setConfString(key, value)
- this
- }
-
- override def set(key: String, value: Boolean): RuntimeConfig = set(key, value.toString)
-
- override def set(key: String, value: Long): RuntimeConfig = set(key, value.toString)
-
- @throws[NoSuchElementException]("if the key is not set")
- override def get(key: String): String = sqlConf.getConfString(key)
-
- override def getOption(key: String): Option[String] = {
- try Option(get(key)) catch {
- case _: NoSuchElementException => None
- }
- }
-
- override def unset(key: String): Unit = sqlConf.unsetConf(key)
-
- override def setHadoop(key: String, value: String): RuntimeConfig = hadoopConf.synchronized {
- hadoopConf.set(key, value)
- this
- }
-
- @throws[NoSuchElementException]("if the key is not set")
- override def getHadoop(key: String): String = hadoopConf.synchronized {
- Option(hadoopConf.get(key)).getOrElse {
- throw new NoSuchElementException(key)
- }
- }
-
- override def getHadoopOption(key: String): Option[String] = hadoopConf.synchronized {
- Option(hadoopConf.get(key))
- }
-
- override def unsetHadoop(key: String): Unit = hadoopConf.synchronized {
- hadoopConf.unset(key)
- }
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b268a4fef7..6fbf32676f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -516,13 +516,6 @@ object SQLConf {
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
- val EXTERNAL_SORT = "spark.sql.planner.externalSort"
- val USE_SQL_AGGREGATE2 = "spark.sql.useAggregate2"
- val TUNGSTEN_ENABLED = "spark.sql.tungsten.enabled"
- val CODEGEN_ENABLED = "spark.sql.codegen"
- val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
- val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
- val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = "spark.sql.parquet.enableUnsafeRowRecordReader"
}
}
@@ -764,7 +757,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
settings.remove(key)
}
- private[spark] def unsetConf(entry: ConfigEntry[_]): Unit = {
+ def unsetConf(entry: ConfigEntry[_]): Unit = {
settings.remove(entry.key)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 1bda572e63..63e0dc7e7e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -48,8 +48,21 @@ private[sql] class SessionState(sparkSession: SparkSession) {
* SQL-specific key-value configurations.
*/
lazy val conf: SQLConf = new SQLConf
- lazy val hadoopConf: Configuration = {
- new Configuration(sparkSession.sparkContext.hadoopConfiguration)
+
+ def newHadoopConf(): Configuration = {
+ val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)
+ conf.getAllConfs.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) }
+ hadoopConf
+ }
+
+ def newHadoopConfWithOptions(options: Map[String, String]): Configuration = {
+ val hadoopConf = newHadoopConf()
+ options.foreach { case (k, v) =>
+ if ((v ne null) && k != "path" && k != "paths") {
+ hadoopConf.set(k, v)
+ }
+ }
+ hadoopConf
}
// Automatically extract `spark.sql.*` entries and put it in our SQLConf
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index d49cc103b5..2f62ad4850 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -85,27 +85,4 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love")
}
- test("Hadoop conf interaction between SQLContext and SparkContext") {
- val mySpecialKey = "mai.special.key"
- val mySpecialValue = "msv"
- try {
- sc.hadoopConfiguration.set(mySpecialKey, mySpecialValue)
- val sqlContext = SQLContext.getOrCreate(sc)
- val sessionState = sqlContext.sessionState
- assert(sessionState.hadoopConf.get(mySpecialKey) === mySpecialValue)
- assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === mySpecialValue)
- // mutating hadoop conf in SQL doesn't mutate the underlying one
- sessionState.hadoopConf.set(mySpecialKey, "no no no")
- assert(sessionState.hadoopConf.get(mySpecialKey) === "no no no")
- assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "no no no")
- assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue)
- sqlContext.runtimeConf.setHadoop(mySpecialKey, "yes yes yes")
- assert(sessionState.hadoopConf.get(mySpecialKey) === "yes yes yes")
- assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "yes yes yes")
- assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue)
- } finally {
- sc.hadoopConfiguration.unset(mySpecialKey)
- }
- }
-
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index f73d485acf..ac2af77a6e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -292,56 +292,50 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
test("Locality support for FileScanRDD - one file per partition") {
- withHadoopConf(
- "fs.file.impl" -> classOf[LocalityTestFileSystem].getName,
- "fs.file.impl.disable.cache" -> "true"
- ) {
- withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
- val table =
- createTable(files = Seq(
- "file1" -> 10,
- "file2" -> 10
- ))
-
- checkScan(table) { partitions =>
- val Seq(p1, p2) = partitions
- assert(p1.files.length == 1)
- assert(p1.files.flatMap(_.locations).length == 1)
- assert(p2.files.length == 1)
- assert(p2.files.flatMap(_.locations).length == 1)
-
- val fileScanRDD = getFileScanRDD(table)
- assert(partitions.flatMap(fileScanRDD.preferredLocations).length == 2)
- }
+ withSQLConf(
+ SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10",
+ "fs.file.impl" -> classOf[LocalityTestFileSystem].getName,
+ "fs.file.impl.disable.cache" -> "true") {
+ val table =
+ createTable(files = Seq(
+ "file1" -> 10,
+ "file2" -> 10
+ ))
+
+ checkScan(table) { partitions =>
+ val Seq(p1, p2) = partitions
+ assert(p1.files.length == 1)
+ assert(p1.files.flatMap(_.locations).length == 1)
+ assert(p2.files.length == 1)
+ assert(p2.files.flatMap(_.locations).length == 1)
+
+ val fileScanRDD = getFileScanRDD(table)
+ assert(partitions.flatMap(fileScanRDD.preferredLocations).length == 2)
}
}
}
test("Locality support for FileScanRDD - large file") {
- withHadoopConf(
- "fs.file.impl" -> classOf[LocalityTestFileSystem].getName,
- "fs.file.impl.disable.cache" -> "true"
- ) {
- withSQLConf(
+ withSQLConf(
SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10",
- SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0"
- ) {
- val table =
- createTable(files = Seq(
- "file1" -> 15,
- "file2" -> 5
- ))
-
- checkScan(table) { partitions =>
- val Seq(p1, p2) = partitions
- assert(p1.files.length == 1)
- assert(p1.files.flatMap(_.locations).length == 1)
- assert(p2.files.length == 2)
- assert(p2.files.flatMap(_.locations).length == 2)
-
- val fileScanRDD = getFileScanRDD(table)
- assert(partitions.flatMap(fileScanRDD.preferredLocations).length == 3)
- }
+ SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0",
+ "fs.file.impl" -> classOf[LocalityTestFileSystem].getName,
+ "fs.file.impl.disable.cache" -> "true") {
+ val table =
+ createTable(files = Seq(
+ "file1" -> 15,
+ "file2" -> 5
+ ))
+
+ checkScan(table) { partitions =>
+ val Seq(p1, p2) = partitions
+ assert(p1.files.length == 1)
+ assert(p1.files.flatMap(_.locations).length == 1)
+ assert(p2.files.length == 2)
+ assert(p2.files.flatMap(_.locations).length == 2)
+
+ val fileScanRDD = getFileScanRDD(table)
+ assert(partitions.flatMap(fileScanRDD.preferredLocations).length == 3)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 9baae80f15..ceda920ddc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -430,42 +430,37 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
test("SPARK-13543 Write the output as uncompressed via option()") {
- val clonedConf = new Configuration(hadoopConfiguration)
- hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
- hadoopConfiguration
- .set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
- hadoopConfiguration
- .set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
- hadoopConfiguration.set("mapreduce.map.output.compress", "true")
- hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
+ val extraOptions = Map(
+ "mapreduce.output.fileoutputformat.compress" -> "true",
+ "mapreduce.output.fileoutputformat.compress.type" -> CompressionType.BLOCK.toString,
+ "mapreduce.map.output.compress" -> "true",
+ "mapreduce.map.output.compress.codec" -> classOf[GzipCodec].getName
+ )
withTempDir { dir =>
- try {
- val csvDir = new File(dir, "csv").getCanonicalPath
- val cars = sqlContext.read
- .format("csv")
- .option("header", "true")
- .load(testFile(carsFile))
-
- cars.coalesce(1).write
- .format("csv")
- .option("header", "true")
- .option("compression", "none")
- .save(csvDir)
-
- val compressedFiles = new File(csvDir).listFiles()
- assert(compressedFiles.exists(!_.getName.endsWith(".csv.gz")))
-
- val carsCopy = sqlContext.read
- .format("csv")
- .option("header", "true")
- .load(csvDir)
-
- verifyCars(carsCopy, withHeader = true)
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
+ val csvDir = new File(dir, "csv").getCanonicalPath
+ val cars = sqlContext.read
+ .format("csv")
+ .option("header", "true")
+ .options(extraOptions)
+ .load(testFile(carsFile))
+
+ cars.coalesce(1).write
+ .format("csv")
+ .option("header", "true")
+ .option("compression", "none")
+ .options(extraOptions)
+ .save(csvDir)
+
+ val compressedFiles = new File(csvDir).listFiles()
+ assert(compressedFiles.exists(!_.getName.endsWith(".csv.gz")))
+
+ val carsCopy = sqlContext.read
+ .format("csv")
+ .option("header", "true")
+ .options(extraOptions)
+ .load(csvDir)
+
+ verifyCars(carsCopy, withHeader = true)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index e5588bec4b..b1279abd63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1507,23 +1507,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
df.write.json(path + "/p=2")
assert(sqlContext.read.json(path).count() === 4)
- val clonedConf = new Configuration(hadoopConfiguration)
- try {
- // Setting it twice as the name of the propery has changed between hadoop versions.
- hadoopConfiguration.setClass(
- "mapred.input.pathFilter.class",
- classOf[TestFileFilter],
- classOf[PathFilter])
- hadoopConfiguration.setClass(
- "mapreduce.input.pathFilter.class",
- classOf[TestFileFilter],
- classOf[PathFilter])
- assert(sqlContext.read.json(path).count() === 2)
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
+ val extraOptions = Map(
+ "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
+ "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
+ )
+ assert(sqlContext.read.options(extraOptions).json(path).count() === 2)
}
}
@@ -1609,45 +1597,40 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
test("SPARK-13543 Write the output as uncompressed via option()") {
- val clonedConf = new Configuration(hadoopConfiguration)
- hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
- hadoopConfiguration
- .set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
- hadoopConfiguration
- .set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
- hadoopConfiguration.set("mapreduce.map.output.compress", "true")
- hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
+ val extraOptions = Map[String, String](
+ "mapreduce.output.fileoutputformat.compress" -> "true",
+ "mapreduce.output.fileoutputformat.compress.type" -> CompressionType.BLOCK.toString,
+ "mapreduce.output.fileoutputformat.compress.codec" -> classOf[GzipCodec].getName,
+ "mapreduce.map.output.compress" -> "true",
+ "mapreduce.map.output.compress.codec" -> classOf[GzipCodec].getName
+ )
withTempDir { dir =>
- try {
- val dir = Utils.createTempDir()
- dir.delete()
-
- val path = dir.getCanonicalPath
- primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
-
- val jsonDF = sqlContext.read.json(path)
- val jsonDir = new File(dir, "json").getCanonicalPath
- jsonDF.coalesce(1).write
- .format("json")
- .option("compression", "none")
- .save(jsonDir)
-
- val compressedFiles = new File(jsonDir).listFiles()
- assert(compressedFiles.exists(!_.getName.endsWith(".json.gz")))
-
- val jsonCopy = sqlContext.read
- .format("json")
- .load(jsonDir)
-
- assert(jsonCopy.count == jsonDF.count)
- val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
- val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
- checkAnswer(jsonCopySome, jsonDFSome)
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
+ val dir = Utils.createTempDir()
+ dir.delete()
+
+ val path = dir.getCanonicalPath
+ primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+
+ val jsonDF = sqlContext.read.json(path)
+ val jsonDir = new File(dir, "json").getCanonicalPath
+ jsonDF.coalesce(1).write
+ .format("json")
+ .option("compression", "none")
+ .options(extraOptions)
+ .save(jsonDir)
+
+ val compressedFiles = new File(jsonDir).listFiles()
+ assert(compressedFiles.exists(!_.getName.endsWith(".json.gz")))
+
+ val jsonCopy = sqlContext.read
+ .format("json")
+ .options(extraOptions)
+ .load(jsonDir)
+
+ assert(jsonCopy.count == jsonDF.count)
+ val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+ val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+ checkAnswer(jsonCopySome, jsonDFSome)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
index 4217c81ff3..45cc6810d4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
@@ -38,14 +38,15 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq
}
protected def readParquetSchema(path: String, pathFilter: Path => Boolean): MessageType = {
+ val hadoopConf = sqlContext.sessionState.newHadoopConf()
val fsPath = new Path(path)
- val fs = fsPath.getFileSystem(hadoopConfiguration)
+ val fs = fsPath.getFileSystem(hadoopConf)
val parquetFiles = fs.listStatus(fsPath, new PathFilter {
override def accept(path: Path): Boolean = pathFilter(path)
}).toSeq.asJava
val footers =
- ParquetFileReader.readAllFootersInParallel(hadoopConfiguration, parquetFiles, true)
+ ParquetFileReader.readAllFootersInParallel(hadoopConf, parquetFiles, true)
footers.asScala.head.getParquetMetadata.getFileMetaData.getSchema
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 0aab36ae38..32fe5ba127 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -113,7 +113,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
- val conf = new Configuration(sqlContext.sessionState.hadoopConf)
+ val conf = sqlContext.sessionState.newHadoopConf()
writeMetadata(parquetSchema, path, conf)
readParquetFile(path.toString)(df => {
val sparkTypes = df.schema.map(_.dataType)
@@ -250,7 +250,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
- val conf = new Configuration(sqlContext.sessionState.hadoopConf)
+ val conf = sqlContext.sessionState.newHadoopConf()
writeMetadata(parquetSchema, path, conf)
val errorMessage = intercept[Throwable] {
sqlContext.read.parquet(path.toString).printSchema()
@@ -271,7 +271,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
- val conf = new Configuration(sqlContext.sessionState.hadoopConf)
+ val conf = sqlContext.sessionState.newHadoopConf()
writeMetadata(parquetSchema, path, conf)
val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType)
assert(sparkTypes === expectedSparkTypes)
@@ -279,9 +279,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
test("compression codec") {
+ val hadoopConf = sqlContext.sessionState.newHadoopConf()
def compressionCodecFor(path: String, codecName: String): String = {
val codecs = for {
- footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConfiguration)
+ footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
block <- footer.getParquetMetadata.getBlocks.asScala
column <- block.getColumns.asScala
} yield column.getCodec.name()
@@ -350,17 +351,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
test("write metadata") {
+ val hadoopConf = sqlContext.sessionState.newHadoopConf()
withTempPath { file =>
val path = new Path(file.toURI.toString)
- val fs = FileSystem.getLocal(hadoopConfiguration)
+ val fs = FileSystem.getLocal(hadoopConf)
val schema = StructType.fromAttributes(ScalaReflection.attributesFor[(Int, String)])
- writeMetadata(schema, path, hadoopConfiguration)
+ writeMetadata(schema, path, hadoopConf)
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
val expectedSchema = new CatalystSchemaConverter().convert(schema)
- val actualSchema = readFooter(path, hadoopConfiguration).getFileMetaData.getSchema
+ val actualSchema = readFooter(path, hadoopConf).getFileMetaData.getSchema
actualSchema.checkContains(expectedSchema)
expectedSchema.checkContains(actualSchema)
@@ -431,7 +433,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withTempPath { location =>
val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
val path = new Path(location.getCanonicalPath)
- val conf = new Configuration(sqlContext.sessionState.hadoopConf)
+ val conf = sqlContext.sessionState.newHadoopConf()
writeMetadata(parquetSchema, path, conf, extraMetadata)
readParquetFile(path.toString) { df =>
@@ -446,26 +448,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
+ val extraOptions = Map(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName,
+ "spark.sql.parquet.output.committer.class" ->
+ classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName
+ )
withTempPath { dir =>
- val clonedConf = new Configuration(hadoopConfiguration)
-
- hadoopConfiguration.set(
- SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[ParquetOutputCommitter].getCanonicalName)
-
- hadoopConfiguration.set(
- "spark.sql.parquet.output.committer.class",
- classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName)
-
- try {
- val message = intercept[SparkException] {
- sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath)
- }.getCause.getMessage
- assert(message === "Intentional exception for testing purposes")
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
+ val message = intercept[SparkException] {
+ sqlContext.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath)
+ }.getCause.getMessage
+ assert(message === "Intentional exception for testing purposes")
}
}
@@ -482,36 +474,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
- val clonedConf = new Configuration(hadoopConfiguration)
-
// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
- hadoopConfiguration.set(
- "spark.sql.parquet.output.committer.class",
- classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName)
+ val extraOptions = Map[String, String](
+ "spark.sql.parquet.output.committer.class" ->
+ classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
+ )
- try {
- // Before fixing SPARK-7837, the following code results in an NPE because both
- // `commitTask()` and `abortTask()` try to close output writers.
+ // Before fixing SPARK-7837, the following code results in an NPE because both
+ // `commitTask()` and `abortTask()` try to close output writers.
- withTempPath { dir =>
- val m1 = intercept[SparkException] {
- sqlContext.range(1).coalesce(1).write.parquet(dir.getCanonicalPath)
- }.getCause.getMessage
- assert(m1.contains("Intentional exception for testing purposes"))
- }
+ withTempPath { dir =>
+ val m1 = intercept[SparkException] {
+ sqlContext.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
+ }.getCause.getMessage
+ assert(m1.contains("Intentional exception for testing purposes"))
+ }
- withTempPath { dir =>
- val m2 = intercept[SparkException] {
- val df = sqlContext.range(1).select('id as 'a, 'id as 'b).coalesce(1)
- df.write.partitionBy("a").parquet(dir.getCanonicalPath)
- }.getCause.getMessage
- assert(m2.contains("Intentional exception for testing purposes"))
- }
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
+ withTempPath { dir =>
+ val m2 = intercept[SparkException] {
+ val df = sqlContext.range(1).select('id as 'a, 'id as 'b).coalesce(1)
+ df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
+ }.getCause.getMessage
+ assert(m2.contains("Intentional exception for testing purposes"))
}
}
@@ -519,31 +504,27 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// For dictionary encoding, Parquet changes the encoding types according to its writer
// version. So, this test checks one of the encoding types in order to ensure that
// the file is written with writer version2.
+ val extraOptions = Map[String, String](
+ // Write a Parquet file with writer version2.
+ ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString,
+ // By default, dictionary encoding is enabled from Parquet 1.2.0 but
+ // it is enabled just in case.
+ ParquetOutputFormat.ENABLE_DICTIONARY -> "true"
+ )
+
+ val hadoopConf = sqlContext.sessionState.newHadoopConfWithOptions(extraOptions)
+
withTempPath { dir =>
- val clonedConf = new Configuration(hadoopConfiguration)
- try {
- // Write a Parquet file with writer version2.
- hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION,
- ParquetProperties.WriterVersion.PARQUET_2_0.toString)
-
- // By default, dictionary encoding is enabled from Parquet 1.2.0 but
- // it is enabled just in case.
- hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true)
- val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
- sqlContext.range(1 << 16).selectExpr("(id % 4) AS i")
- .coalesce(1).write.mode("overwrite").parquet(path)
-
- val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head
- val columnChunkMetadata = blockMetadata.getColumns.asScala.head
-
- // If the file is written with version2, this should include
- // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
- assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
- } finally {
- // Manually clear the hadoop configuration for other tests.
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
+ val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
+ sqlContext.range(1 << 16).selectExpr("(id % 4) AS i")
+ .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path)
+
+ val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head
+ val columnChunkMetadata = blockMetadata.getColumns.asScala.head
+
+ // If the file is written with version2, this should include
+ // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
+ assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 47330f1db3..923c0b350e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -87,28 +87,22 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-13543 Write the output as uncompressed via option()") {
- val clonedConf = new Configuration(hadoopConfiguration)
- hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
- hadoopConfiguration
- .set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
- hadoopConfiguration
- .set("mapreduce.output.fileoutputformat.compress.codec", classOf[GzipCodec].getName)
- hadoopConfiguration.set("mapreduce.map.output.compress", "true")
- hadoopConfiguration.set("mapreduce.map.output.compress.codec", classOf[GzipCodec].getName)
+ val extraOptions = Map[String, String](
+ "mapreduce.output.fileoutputformat.compress" -> "true",
+ "mapreduce.output.fileoutputformat.compress.type" -> CompressionType.BLOCK.toString,
+ "mapreduce.map.output.compress" -> "true",
+ "mapreduce.output.fileoutputformat.compress.codec" -> classOf[GzipCodec].getName,
+ "mapreduce.map.output.compress.codec" -> classOf[GzipCodec].getName
+ )
withTempDir { dir =>
- try {
- val testDf = sqlContext.read.text(testFile)
- val tempDir = Utils.createTempDir()
- val tempDirPath = tempDir.getAbsolutePath
- testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath)
- val compressedFiles = new File(tempDirPath).listFiles()
- assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
- verifyFrame(sqlContext.read.text(tempDirPath).toDF())
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
+ val testDf = sqlContext.read.text(testFile)
+ val tempDir = Utils.createTempDir()
+ val tempDirPath = tempDir.getAbsolutePath
+ testDf.write.option("compression", "none")
+ .options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath)
+ val compressedFiles = new File(tempDirPath).listFiles()
+ assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
+ verifyFrame(sqlContext.read.options(extraOptions).text(tempDirPath).toDF())
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index df127d958e..7b413dda1e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -217,7 +217,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") {
withFileStreamSinkLog { sinkLog =>
- val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sessionState.hadoopConf)
+ val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sessionState.newHadoopConf())
def listBatchFiles(): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 129b5a8c36..5f92c5bb9b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -82,7 +82,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
- sqlContext.sessionState.hadoopConf.set(
+ sqlContext.conf.setConfString(
s"fs.$scheme.impl",
classOf[FakeFileSystem].getName)
withTempDir { temp =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/RuntimeConfigSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/RuntimeConfigSuite.scala
index f809e01169..a629b73ac0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/RuntimeConfigSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/RuntimeConfigSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.RuntimeConfig
class RuntimeConfigSuite extends SparkFunSuite {
- private def newConf(): RuntimeConfig = new RuntimeConfigImpl
+ private def newConf(): RuntimeConfig = new RuntimeConfig
test("set and get") {
val conf = newConf()
@@ -54,33 +54,4 @@ class RuntimeConfigSuite extends SparkFunSuite {
conf.get("k1")
}
}
-
- test("set and get hadoop configuration") {
- val conf = newConf()
- conf
- .setHadoop("k1", "v1")
- .setHadoop("k2", "v2")
-
- assert(conf.getHadoop("k1") == "v1")
- assert(conf.getHadoop("k2") == "v2")
-
- intercept[NoSuchElementException] {
- conf.get("notset")
- }
- }
-
- test("getHadoopOption") {
- val conf = newConf().setHadoop("k1", "v1")
- assert(conf.getHadoopOption("k1") == Some("v1"))
- assert(conf.getHadoopOption("notset") == None)
- }
-
- test("unsetHadoop") {
- val conf = newConf().setHadoop("k1", "v1")
- assert(conf.getHadoop("k1") == "v1")
- conf.unsetHadoop("k1")
- intercept[NoSuchElementException] {
- conf.getHadoop("k1")
- }
- }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 5577c9f3ee..ffb206af0e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -85,34 +85,6 @@ private[sql] trait SQLTestUtils
}
/**
- * The Hadoop configuration used by the active [[SQLContext]].
- */
- protected def hadoopConfiguration: Configuration = {
- sqlContext.sessionState.hadoopConf
- }
-
- /**
- * Sets all Hadoop configurations specified in `pairs`, calls `f`, and then restore all Hadoop
- * configurations.
- */
- protected def withHadoopConf(pairs: (String, String)*)(f: => Unit): Unit = {
- val (keys, _) = pairs.unzip
- val originalValues = keys.map(key => Option(hadoopConfiguration.get(key)))
-
- try {
- pairs.foreach { case (key, value) =>
- hadoopConfiguration.set(key, value)
- }
- f
- } finally {
- keys.zip(originalValues).foreach {
- case (key, Some(value)) => hadoopConfiguration.set(key, value)
- case (key, None) => hadoopConfiguration.unset(key)
- }
- }
- }
-
- /**
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
* configurations.
*
@@ -164,7 +136,7 @@ private[sql] trait SQLTestUtils
} finally {
// If the test failed part way, we don't want to mask the failure by failing to remove
// temp tables that never got created.
- try functions.foreach { case (functionName, isTemporary) =>
+ functions.foreach { case (functionName, isTemporary) =>
val withTemporary = if (isTemporary) "TEMPORARY" else ""
sqlContext.sql(s"DROP $withTemporary FUNCTION IF EXISTS $functionName")
assert(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 9799c6d42b..5ef80b9aa3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -18,8 +18,8 @@
package org.apache.spark.sql.test
import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext}
-import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SQLConf}
+import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
/**
* A special [[SQLContext]] prepared for testing.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c4db4f307c..58c10b7b1d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -549,7 +549,7 @@ private[hive] class MetaStoreFileCatalog(
Some(partitionSpecFromHive.partitionColumns)) {
override def getStatus(path: Path): Array[FileStatus] = {
- val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf)
+ val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.listStatus(path)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 4f81967a5b..d6a847f3ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -57,7 +57,7 @@ private[sql] class DefaultSource
files: Seq[FileStatus]): Option[StructType] = {
OrcFileOperator.readSchema(
files.map(_.getPath.toUri.toString),
- Some(new Configuration(sparkSession.sessionState.hadoopConf))
+ Some(sparkSession.sessionState.newHadoopConf())
)
}
@@ -278,7 +278,7 @@ private[orc] case class OrcTableScan(
with HiveInspectors {
def execute(): RDD[InternalRow] = {
- val job = Job.getInstance(new Configuration(sparkSession.sessionState.hadoopConf))
+ val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
val conf = job.getConfiguration
// Tries to push down filters if ORC filter push-down is enabled
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index f74e5cd6f5..1d8f24cb27 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext}
+import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
@@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{RuntimeConfigImpl, SQLConf}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{ShutdownHookManager, Utils}
// SPARK-3729: Test key required to check for initialization errors with config.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 31ba735708..b21ca4f26e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -374,7 +374,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val expectedPath =
sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
val filesystemPath = new Path(expectedPath)
- val fs = filesystemPath.getFileSystem(sqlContext.sessionState.hadoopConf)
+ val fs = filesystemPath.getFileSystem(sqlContext.sessionState.newHadoopConf())
if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
// It is a managed table when we do not specify the location.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index a4c6d3c185..8b3f2d1a0c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -266,14 +266,6 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
intercept[AnalysisException] {
sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""")
}
-
- // Unset default URI Scheme and Authority: throw exception
- val originalFsName = hiveContext.sessionState.hadoopConf.get("fs.default.name")
- hiveContext.sessionState.hadoopConf.unset("fs.default.name")
- intercept[AnalysisException] {
- sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
- }
- hiveContext.sessionState.hadoopConf.set("fs.default.name", originalFsName)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e23272de85..687a4a7e51 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -36,7 +36,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
val expectedTablePath =
hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
val filesystemPath = new Path(expectedTablePath)
- val fs = filesystemPath.getFileSystem(hiveContext.sessionState.hadoopConf)
+ val fs = filesystemPath.getFileSystem(hiveContext.sessionState.newHadoopConf())
fs.exists(filesystemPath)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index f11c055fb9..b97da1ffdc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -94,7 +94,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
.orc(path)
// Check if this is compressed as ZLIB.
- val conf = sqlContext.sessionState.hadoopConf
+ val conf = sqlContext.sessionState.newHadoopConf()
val fs = FileSystem.getLocal(conf)
val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
assert(maybeOrcFile.isDefined)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 0a0cdf60e8..62d9f5a339 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -181,7 +181,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
// Following codec is supported in hive-0.13.1, ignore it now
ignore("Other compression options for writing to an ORC file - 0.13.1 and above") {
val data = (1 to 100).map(i => (i, s"val_$i"))
- val conf = sqlContext.sessionState.hadoopConf
+ val conf = sqlContext.sessionState.newHadoopConf()
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY")
withOrcFile(data) { file =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 3b16468e76..67b403a9bd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -17,10 +17,8 @@
package org.apache.spark.sql.sources
-import scala.collection.JavaConverters._
import scala.util.Random
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
@@ -117,56 +115,55 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
new UDT.MyDenseVectorUDT()
).filter(supportsDataType)
- try {
- for (dataType <- supportedDataTypes) {
- for (parquetDictionaryEncodingEnabled <- Seq(true, false)) {
- test(s"test all data types - $dataType with parquet.enable.dictionary = " +
- s"$parquetDictionaryEncodingEnabled") {
-
- hadoopConfiguration.setBoolean("parquet.enable.dictionary",
- parquetDictionaryEncodingEnabled)
-
- withTempPath { file =>
- val path = file.getCanonicalPath
-
- val dataGenerator = RandomDataGenerator.forType(
- dataType = dataType,
- nullable = true,
- new Random(System.nanoTime())
- ).getOrElse {
- fail(s"Failed to create data generator for schema $dataType")
- }
-
- // Create a DF for the schema with random data. The index field is used to sort the
- // DataFrame. This is a workaround for SPARK-10591.
- val schema = new StructType()
- .add("index", IntegerType, nullable = false)
- .add("col", dataType, nullable = true)
- val rdd =
- sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
- val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
-
- df.write
- .mode("overwrite")
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .save(path)
-
- val loadedDF = sqlContext
- .read
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .schema(df.schema)
- .load(path)
- .orderBy("index")
-
- checkAnswer(loadedDF, df)
+ for (dataType <- supportedDataTypes) {
+ for (parquetDictionaryEncodingEnabled <- Seq(true, false)) {
+ test(s"test all data types - $dataType with parquet.enable.dictionary = " +
+ s"$parquetDictionaryEncodingEnabled") {
+
+ val extraOptions = Map[String, String](
+ "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString
+ )
+
+ withTempPath { file =>
+ val path = file.getCanonicalPath
+
+ val dataGenerator = RandomDataGenerator.forType(
+ dataType = dataType,
+ nullable = true,
+ new Random(System.nanoTime())
+ ).getOrElse {
+ fail(s"Failed to create data generator for schema $dataType")
}
+
+ // Create a DF for the schema with random data. The index field is used to sort the
+ // DataFrame. This is a workaround for SPARK-10591.
+ val schema = new StructType()
+ .add("index", IntegerType, nullable = false)
+ .add("col", dataType, nullable = true)
+ val rdd =
+ sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+ val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+ df.write
+ .mode("overwrite")
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .options(extraOptions)
+ .save(path)
+
+ val loadedDF = sqlContext
+ .read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .schema(df.schema)
+ .options(extraOptions)
+ .load(path)
+ .orderBy("index")
+
+ checkAnswer(loadedDF, df)
}
}
}
- } finally {
- hadoopConfiguration.unset("parquet.enable.dictionary")
}
test("save()/load() - non-partitioned table - Overwrite") {
@@ -209,7 +206,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
testDF.write.mode(SaveMode.Ignore).format(dataSourceName).save(file.getCanonicalPath)
val path = new Path(file.getCanonicalPath)
- val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
+ val fs = path.getFileSystem(sqlContext.sessionState.newHadoopConf())
assert(fs.listStatus(path).isEmpty)
}
}
@@ -510,7 +507,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
s"${file.getCanonicalFile}/p1=2/p2=bar"
).map { p =>
val path = new Path(p)
- val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
+ val fs = path.getFileSystem(sqlContext.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
}
@@ -605,53 +602,45 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
test("SPARK-8578 specified custom output committer will not be used to append data") {
- val clonedConf = new Configuration(hadoopConfiguration)
- try {
- val df = sqlContext.range(1, 10).toDF("i")
- withTempPath { dir =>
- df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
- hadoopConfiguration.set(
- SQLConf.OUTPUT_COMMITTER_CLASS.key,
- classOf[AlwaysFailOutputCommitter].getName)
- // Since Parquet has its own output committer setting, also set it
- // to AlwaysFailParquetOutputCommitter at here.
- hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
- classOf[AlwaysFailParquetOutputCommitter].getName)
- // Because there data already exists,
- // this append should succeed because we will use the output committer associated
- // with file format and AlwaysFailOutputCommitter will not be used.
- df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
- checkAnswer(
- sqlContext.read
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .load(dir.getCanonicalPath),
- df.union(df))
-
- // This will fail because AlwaysFailOutputCommitter is used when we do append.
- intercept[Exception] {
- df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath)
- }
+ val extraOptions = Map[String, String](
+ SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName,
+ // Since Parquet has its own output committer setting, also set it
+ // to AlwaysFailParquetOutputCommitter at here.
+ "spark.sql.parquet.output.committer.class" ->
+ classOf[AlwaysFailParquetOutputCommitter].getName
+ )
+
+ val df = sqlContext.range(1, 10).toDF("i")
+ withTempPath { dir =>
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ // Because there data already exists,
+ // this append should succeed because we will use the output committer associated
+ // with file format and AlwaysFailOutputCommitter will not be used.
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ checkAnswer(
+ sqlContext.read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .options(extraOptions)
+ .load(dir.getCanonicalPath),
+ df.union(df))
+
+ // This will fail because AlwaysFailOutputCommitter is used when we do append.
+ intercept[Exception] {
+ df.write.mode("overwrite")
+ .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
}
- withTempPath { dir =>
- hadoopConfiguration.set(
- SQLConf.OUTPUT_COMMITTER_CLASS.key,
- classOf[AlwaysFailOutputCommitter].getName)
- // Since Parquet has its own output committer setting, also set it
- // to AlwaysFailParquetOutputCommitter at here.
- hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
- classOf[AlwaysFailParquetOutputCommitter].getName)
- // Because there is no existing data,
- // this append will fail because AlwaysFailOutputCommitter is used when we do append
- // and there is no existing data.
- intercept[Exception] {
- df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
- }
+ }
+ withTempPath { dir =>
+ // Because there is no existing data,
+ // this append will fail because AlwaysFailOutputCommitter is used when we do append
+ // and there is no existing data.
+ intercept[Exception] {
+ df.write.mode("append")
+ .options(extraOptions)
+ .format(dataSourceName)
+ .save(dir.getCanonicalPath)
}
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
@@ -671,38 +660,38 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
test("Locality support for FileScanRDD") {
- withHadoopConf(
+ val options = Map[String, String](
"fs.file.impl" -> classOf[LocalityTestFileSystem].getName,
"fs.file.impl.disable.cache" -> "true"
- ) {
- withTempPath { dir =>
- val path = "file://" + dir.getCanonicalPath
- val df1 = sqlContext.range(4)
- df1.coalesce(1).write.mode("overwrite").format(dataSourceName).save(path)
- df1.coalesce(1).write.mode("append").format(dataSourceName).save(path)
-
- def checkLocality(): Unit = {
- val df2 = sqlContext.read
- .format(dataSourceName)
- .option("dataSchema", df1.schema.json)
- .load(path)
-
- val Some(fileScanRDD) = df2.queryExecution.executedPlan.collectFirst {
- case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] =>
- scan.rdd.asInstanceOf[FileScanRDD]
- }
+ )
+ withTempPath { dir =>
+ val path = "file://" + dir.getCanonicalPath
+ val df1 = sqlContext.range(4)
+ df1.coalesce(1).write.mode("overwrite").options(options).format(dataSourceName).save(path)
+ df1.coalesce(1).write.mode("append").options(options).format(dataSourceName).save(path)
- val partitions = fileScanRDD.partitions
- val preferredLocations = partitions.flatMap(fileScanRDD.preferredLocations)
+ def checkLocality(): Unit = {
+ val df2 = sqlContext.read
+ .format(dataSourceName)
+ .option("dataSchema", df1.schema.json)
+ .options(options)
+ .load(path)
- assert(preferredLocations.distinct.length == 2)
+ val Some(fileScanRDD) = df2.queryExecution.executedPlan.collectFirst {
+ case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] =>
+ scan.rdd.asInstanceOf[FileScanRDD]
}
- checkLocality()
+ val partitions = fileScanRDD.partitions
+ val preferredLocations = partitions.flatMap(fileScanRDD.preferredLocations)
- withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
- checkLocality()
- }
+ assert(preferredLocations.distinct.length == 2)
+ }
+
+ checkLocality()
+
+ withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
+ checkLocality()
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 19749a9713..1d104889fe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -132,7 +132,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
val summaryPath = new Path(path, "_metadata")
val commonSummaryPath = new Path(path, "_common_metadata")
- val fs = summaryPath.getFileSystem(hadoopConfiguration)
+ val fs = summaryPath.getFileSystem(sqlContext.sessionState.newHadoopConf())
fs.delete(summaryPath, true)
fs.delete(commonSummaryPath, true)