aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-25 17:52:25 -0700
committerReynold Xin <rxin@databricks.com>2016-04-25 17:52:25 -0700
commitcfa64882fc0728d7becf55b8a424926e4ca93887 (patch)
treeb2fc0df57667f548809cdbabb0d142bec101fcec
parentf36c9c83798877256efa1447a6b9be5aa47a7e87 (diff)
downloadspark-cfa64882fc0728d7becf55b8a424926e4ca93887.tar.gz
spark-cfa64882fc0728d7becf55b8a424926e4ca93887.tar.bz2
spark-cfa64882fc0728d7becf55b8a424926e4ca93887.zip
[SPARK-14902][SQL] Expose RuntimeConfig in SparkSession
## What changes were proposed in this pull request? `RuntimeConfig` is the new user-facing API in 2.0 added in #11378. Until now, however, it's been dead code. This patch uses `RuntimeConfig` in `SessionState` and exposes that through the `SparkSession`. ## How was this patch tested? New test in `SQLContextSuite`. Author: Andrew Or <andrew@databricks.com> Closes #12669 from andrewor14/use-runtime-conf.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala4
38 files changed, 131 insertions, 81 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 0745ef47ff..99d92b9257 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -297,7 +297,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
// If offsets have already been created, we trying to resume a query.
val checkpointPath = new Path(checkpointLocation, "offsets")
- val fs = checkpointPath.getFileSystem(df.sqlContext.sparkContext.hadoopConfiguration)
+ val fs = checkpointPath.getFileSystem(df.sqlContext.sessionState.hadoopConf)
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4c9977c8c7..dde139608a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -103,7 +103,8 @@ class SQLContext private[sql](
protected[sql] def sessionState: SessionState = sparkSession.sessionState
protected[sql] def sharedState: SharedState = sparkSession.sharedState
- protected[sql] def conf: SQLConf = sparkSession.conf
+ protected[sql] def conf: SQLConf = sessionState.conf
+ protected[sql] def runtimeConf: RuntimeConfig = sparkSession.conf
protected[sql] def cacheManager: CacheManager = sparkSession.cacheManager
protected[sql] def listener: SQLListener = sparkSession.listener
protected[sql] def externalCatalog: ExternalCatalog = sparkSession.externalCatalog
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 3561765642..00256bd0be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, LogicalRelation}
import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SharedState}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -76,8 +76,8 @@ class SparkSession private(
}
/**
- * State isolated across sessions, including SQL configurations, temporary tables,
- * registered functions, and everything else that accepts a [[SQLConf]].
+ * State isolated across sessions, including SQL configurations, temporary tables, registered
+ * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
*/
@transient
protected[sql] lazy val sessionState: SessionState = {
@@ -103,7 +103,6 @@ class SparkSession private(
_wrapped = sqlContext
}
- protected[sql] def conf: SQLConf = sessionState.conf
protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
protected[sql] def listener: SQLListener = sharedState.listener
protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
@@ -191,6 +190,22 @@ class SparkSession private(
| Methods for accessing or mutating configurations |
* -------------------------------------------------- */
+ @transient private lazy val _conf: RuntimeConfig = {
+ new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf)
+ }
+
+ /**
+ * Runtime configuration interface for Spark.
+ *
+ * This is the interface through which the user can get and set all Spark and Hadoop
+ * configurations that are relevant to Spark SQL. When getting the value of a config,
+ * this defaults to the value set in the underlying [[SparkContext]], if any.
+ *
+ * @group config
+ * @since 2.0.0
+ */
+ def conf: RuntimeConfig = _conf
+
/**
* Set Spark SQL configuration properties.
*
@@ -213,7 +228,7 @@ class SparkSession private(
* @group config
* @since 2.0.0
*/
- def getConf(key: String): String = conf.getConfString(key)
+ def getConf(key: String): String = sessionState.conf.getConfString(key)
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -222,7 +237,9 @@ class SparkSession private(
* @group config
* @since 2.0.0
*/
- def getConf(key: String, defaultValue: String): String = conf.getConfString(key, defaultValue)
+ def getConf(key: String, defaultValue: String): String = {
+ sessionState.conf.getConfString(key, defaultValue)
+ }
/**
* Return all the configuration properties that have been set (i.e. not the default).
@@ -231,7 +248,7 @@ class SparkSession private(
* @group config
* @since 2.0.0
*/
- def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
+ def getAllConfs: immutable.Map[String, String] = sessionState.conf.getAllConfs
/**
* Set the given Spark SQL configuration property.
@@ -244,7 +261,7 @@ class SparkSession private(
* Return the value of Spark SQL configuration property for the given key. If the key is not set
* yet, return `defaultValue` in [[ConfigEntry]].
*/
- protected[sql] def getConf[T](entry: ConfigEntry[T]): T = conf.getConf(entry)
+ protected[sql] def getConf[T](entry: ConfigEntry[T]): T = sessionState.conf.getConf(entry)
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -252,7 +269,7 @@ class SparkSession private(
* desired one.
*/
protected[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
- conf.getConf(entry, defaultValue)
+ sessionState.conf.getConf(entry, defaultValue)
}
@@ -601,7 +618,7 @@ class SparkSession private(
*/
@Experimental
def createExternalTable(tableName: String, path: String): DataFrame = {
- val dataSourceName = conf.defaultDataSourceName
+ val dataSourceName = sessionState.conf.defaultDataSourceName
createExternalTable(tableName, path, dataSourceName)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
index 7fa246ba51..e6c5351106 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
@@ -77,7 +77,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
catalogTable.storage.locationUri.map { p =>
val path = new Path(p)
try {
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
calculateTableSize(fs, path)
} catch {
case NonFatal(e) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index eae8fe8975..5cac9d879f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -210,7 +210,7 @@ case class LoadData(
// Follow Hive's behavior:
// If no schema or authority is provided with non-local inpath,
// we will use hadoop configuration "fs.default.name".
- val defaultFSConf = sqlContext.sparkContext.hadoopConfiguration.get("fs.default.name")
+ val defaultFSConf = sqlContext.sessionState.hadoopConf.get("fs.default.name")
val defaultFS = if (defaultFSConf == null) {
new URI("")
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 07bc8ae148..4e7214ce83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -131,7 +131,7 @@ case class DataSource(
val allPaths = caseInsensitiveOptions.get("path")
val globbedPaths = allPaths.toSeq.flatMap { path =>
val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
@@ -225,7 +225,7 @@ case class DataSource(
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
- val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
val res = fs.exists(metadataPath)
res
@@ -284,7 +284,7 @@ case class DataSource(
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val globbedPaths = allPaths.flatMap { path =>
val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
@@ -374,7 +374,7 @@ case class DataSource(
val path = new Path(caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
}))
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 95629a9923..a636ca2f29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.IOException
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
@@ -77,7 +78,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
s"cannot save to file.")
}
- val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+ val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 61ec7ed2b1..7d407a7747 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -103,7 +103,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val csvOptions = new CSVOptions(options)
val headers = requiredSchema.fields.map(_.name)
- val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val conf = new Configuration(sqlContext.sessionState.hadoopConf)
val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
(file: PartitionedFile) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 4063c6ebce..731b0047e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -271,7 +271,7 @@ class HDFSFileCatalog(
val partitionSchema: Option[StructType])
extends FileCatalog with Logging {
- private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ private val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 7773ff550f..580a0e1de6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -98,7 +98,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val conf = new Configuration(sqlContext.sessionState.hadoopConf)
val broadcastedConf =
sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
@@ -126,7 +126,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
private def createBaseRdd(sqlContext: SQLContext, inputPaths: Seq[FileStatus]): RDD[String] = {
- val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+ val job = Job.getInstance(sqlContext.sessionState.hadoopConf)
val conf = job.getConfiguration
val paths = inputPaths.map(_.getPath)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index bbbbc5ebe9..28c6664085 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -263,7 +263,7 @@ private[sql] class DefaultSource
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val parquetConf = new Configuration(sqlContext.sessionState.hadoopConf)
parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
parquetConf.set(
CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
@@ -648,7 +648,7 @@ private[sql] object ParquetRelation extends Logging {
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
- val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
+ val serializedConf = new SerializableConfiguration(sqlContext.sessionState.hadoopConf)
// !! HACK ALERT !!
//
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index fa0df61ca5..f7ac1ac8e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -90,7 +90,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val conf = new Configuration(sqlContext.sessionState.hadoopConf)
val broadcastedConf =
sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 4f722a514b..a86108862f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -45,7 +45,7 @@ class FileStreamSink(
private val basePath = new Path(path)
private val logPath = new Path(basePath, FileStreamSink.metadataDir)
private val fileLog = new FileStreamSinkLog(sqlContext, logPath.toUri.toString)
- private val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ private val fs = basePath.getFileSystem(sqlContext.sessionState.hadoopConf)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 51c3aee835..aeb64c929c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -38,7 +38,7 @@ class FileStreamSource(
override val schema: StructType,
dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
- private val fs = new Path(path).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ private val fs = new Path(path).getFileSystem(sqlContext.sessionState.hadoopConf)
private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index b52f7a28b4..dd6760d341 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -212,7 +212,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
}
private def createFileManager(): FileManager = {
- val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+ val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
try {
new FileContextManager(metadataPath, hadoopConf)
} catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index de4305f564..d5e4dd8f78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -63,7 +63,7 @@ case class StateStoreRestoreExec(
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
- new StateStoreConf(sqlContext.conf),
+ sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
iter.flatMap { row =>
@@ -92,7 +92,7 @@ case class StateStoreSaveExec(
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
- new StateStoreConf(sqlContext.conf),
+ sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
new Iterator[InternalRow] {
private[this] val baseIterator = iter
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
index 95b5129351..a08a4bb4c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
@@ -30,7 +30,7 @@ class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog
val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
val metadataLog = new FileStreamSinkLog(sqlContext, metadataDirectory.toUri.toString)
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
override def paths: Seq[Path] = path :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index d708486d8e..635bb86607 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
@@ -37,13 +38,15 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
storeVersion: Long,
keySchema: StructType,
valueSchema: StructType,
- storeConf: StateStoreConf,
+ sessionState: SessionState,
@transient private val storeCoordinator: Option[StateStoreCoordinatorRef])
extends RDD[U](dataRDD) {
+ private val storeConf = new StateStoreConf(sessionState.conf)
+
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = dataRDD.context.broadcast(
- new SerializableConfiguration(dataRDD.context.hadoopConfiguration))
+ new SerializableConfiguration(sessionState.hadoopConf))
override protected def getPartitions: Array[Partition] = dataRDD.partitions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
index 9b6d0918e2..4914a9d722 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.types.StructType
package object state {
@@ -43,7 +44,7 @@ package object state {
storeVersion,
keySchema,
valueSchema,
- new StateStoreConf(sqlContext.conf),
+ sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator))(
storeUpdateFunction)
}
@@ -55,7 +56,7 @@ package object state {
storeVersion: Long,
keySchema: StructType,
valueSchema: StructType,
- storeConf: StateStoreConf,
+ sessionState: SessionState,
storeCoordinator: Option[StateStoreCoordinatorRef])(
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {
val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)
@@ -67,7 +68,7 @@ package object state {
storeVersion,
keySchema,
valueSchema,
- storeConf,
+ sessionState,
storeCoordinator)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
index 058df1e3c1..137323583b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
@@ -17,20 +17,21 @@
package org.apache.spark.sql.internal
+import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.sql.RuntimeConfig
+
/**
* Implementation for [[RuntimeConfig]].
*/
-class RuntimeConfigImpl extends RuntimeConfig {
-
- private val conf = new SQLConf
-
- private val hadoopConf = java.util.Collections.synchronizedMap(
- new java.util.HashMap[String, String]())
+class RuntimeConfigImpl(
+ sqlConf: SQLConf = new SQLConf,
+ hadoopConf: Configuration = new Configuration)
+ extends RuntimeConfig {
override def set(key: String, value: String): RuntimeConfig = {
- conf.setConfString(key, value)
+ sqlConf.setConfString(key, value)
this
}
@@ -39,7 +40,7 @@ class RuntimeConfigImpl extends RuntimeConfig {
override def set(key: String, value: Long): RuntimeConfig = set(key, value.toString)
@throws[NoSuchElementException]("if the key is not set")
- override def get(key: String): String = conf.getConfString(key)
+ override def get(key: String): String = sqlConf.getConfString(key)
override def getOption(key: String): Option[String] = {
try Option(get(key)) catch {
@@ -47,27 +48,26 @@ class RuntimeConfigImpl extends RuntimeConfig {
}
}
- override def unset(key: String): Unit = conf.unsetConf(key)
+ override def unset(key: String): Unit = sqlConf.unsetConf(key)
- override def setHadoop(key: String, value: String): RuntimeConfig = {
- hadoopConf.put(key, value)
+ override def setHadoop(key: String, value: String): RuntimeConfig = hadoopConf.synchronized {
+ hadoopConf.set(key, value)
this
}
@throws[NoSuchElementException]("if the key is not set")
override def getHadoop(key: String): String = hadoopConf.synchronized {
- if (hadoopConf.containsKey(key)) {
- hadoopConf.get(key)
- } else {
+ Option(hadoopConf.get(key)).getOrElse {
throw new NoSuchElementException(key)
}
}
- override def getHadoopOption(key: String): Option[String] = {
- try Option(getHadoop(key)) catch {
- case _: NoSuchElementException => None
- }
+ override def getHadoopOption(key: String): Option[String] = hadoopConf.synchronized {
+ Option(hadoopConf.get(key))
+ }
+
+ override def unsetHadoop(key: String): Unit = hadoopConf.synchronized {
+ hadoopConf.unset(key)
}
- override def unsetHadoop(key: String): Unit = hadoopConf.remove(key)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index f683bbbeb5..04ad729659 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -21,6 +21,8 @@ import java.util.Properties
import scala.collection.JavaConverters._
+import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
@@ -46,6 +48,7 @@ private[sql] class SessionState(ctx: SQLContext) {
* SQL-specific key-value configurations.
*/
lazy val conf: SQLConf = new SQLConf
+ lazy val hadoopConf: Configuration = new Configuration(ctx.sparkContext.hadoopConfiguration)
// Automatically extract `spark.sql.*` entries and put it in our SQLConf
setConf(SQLContext.getSQLProperties(ctx.sparkContext.getConf))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 2f62ad4850..d49cc103b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -85,4 +85,27 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love")
}
+ test("Hadoop conf interaction between SQLContext and SparkContext") {
+ val mySpecialKey = "mai.special.key"
+ val mySpecialValue = "msv"
+ try {
+ sc.hadoopConfiguration.set(mySpecialKey, mySpecialValue)
+ val sqlContext = SQLContext.getOrCreate(sc)
+ val sessionState = sqlContext.sessionState
+ assert(sessionState.hadoopConf.get(mySpecialKey) === mySpecialValue)
+ assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === mySpecialValue)
+ // mutating hadoop conf in SQL doesn't mutate the underlying one
+ sessionState.hadoopConf.set(mySpecialKey, "no no no")
+ assert(sessionState.hadoopConf.get(mySpecialKey) === "no no no")
+ assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "no no no")
+ assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue)
+ sqlContext.runtimeConf.setHadoop(mySpecialKey, "yes yes yes")
+ assert(sessionState.hadoopConf.get(mySpecialKey) === "yes yes yes")
+ assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "yes yes yes")
+ assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue)
+ } finally {
+ sc.hadoopConfiguration.unset(mySpecialKey)
+ }
+ }
+
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 581095d3dc..0aab36ae38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -113,7 +113,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
- val conf = sparkContext.hadoopConfiguration
+ val conf = new Configuration(sqlContext.sessionState.hadoopConf)
writeMetadata(parquetSchema, path, conf)
readParquetFile(path.toString)(df => {
val sparkTypes = df.schema.map(_.dataType)
@@ -250,7 +250,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
- val conf = sparkContext.hadoopConfiguration
+ val conf = new Configuration(sqlContext.sessionState.hadoopConf)
writeMetadata(parquetSchema, path, conf)
val errorMessage = intercept[Throwable] {
sqlContext.read.parquet(path.toString).printSchema()
@@ -271,7 +271,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withTempPath { location =>
val path = new Path(location.getCanonicalPath)
- val conf = sparkContext.hadoopConfiguration
+ val conf = new Configuration(sqlContext.sessionState.hadoopConf)
writeMetadata(parquetSchema, path, conf)
val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType)
assert(sparkTypes === expectedSparkTypes)
@@ -431,7 +431,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
withTempPath { location =>
val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
val path = new Path(location.getCanonicalPath)
- val conf = sparkContext.hadoopConfiguration
+ val conf = new Configuration(sqlContext.sessionState.hadoopConf)
writeMetadata(parquetSchema, path, conf, extraMetadata)
readParquetFile(path.toString) { df =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 70c2a82990..a164f4c733 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -217,7 +217,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") {
withFileStreamSinkLog { sinkLog =>
- val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sessionState.hadoopConf)
def listBatchFiles(): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 1328142704..22e011cfb7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -82,7 +82,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
- sqlContext.sparkContext.hadoopConfiguration.set(
+ sqlContext.sessionState.hadoopConf.set(
s"fs.$scheme.impl",
classOf[FakeFileSystem].getName)
withTempDir { temp =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 5691105235..fcfac359f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -88,7 +88,7 @@ private[sql] trait SQLTestUtils
* The Hadoop configuration used by the active [[SQLContext]].
*/
protected def hadoopConfiguration: Configuration = {
- sparkContext.hadoopConfiguration
+ sqlContext.sessionState.hadoopConf
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 431ac8e2c8..d270775af6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -18,8 +18,8 @@
package org.apache.spark.sql.test
import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext}
+import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SQLConf}
/**
* A special [[SQLContext]] prepared for testing.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 13f29e08fb..edb87b94ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -544,7 +544,7 @@ private[hive] class MetaStoreFileCatalog(
extends HDFSFileCatalog(ctx, Map.empty, paths, Some(partitionSpecFromHive.partitionColumns)) {
override def getStatus(path: Path): Array[FileStatus] = {
- val fs = path.getFileSystem(ctx.sparkContext.hadoopConfiguration)
+ val fs = path.getFileSystem(ctx.sessionState.hadoopConf)
fs.listStatus(path)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 6457a904eb..bf0288c9f7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -223,6 +223,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
}
+ // TODO: why do we get this from SparkConf but not SQLConf?
def hiveThriftServerSingleSession: Boolean = {
ctx.sparkContext.conf.getBoolean(
"spark.sql.hive.thriftServer.singleSession", defaultValue = false)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 4250a87341..1095f5fd95 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -57,7 +57,7 @@ private[sql] class DefaultSource
files: Seq[FileStatus]): Option[StructType] = {
OrcFileOperator.readSchema(
files.map(_.getPath.toUri.toString),
- Some(sqlContext.sparkContext.hadoopConfiguration)
+ Some(new Configuration(sqlContext.sessionState.hadoopConf))
)
}
@@ -115,7 +115,7 @@ private[sql] class DefaultSource
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
- val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val orcConf = new Configuration(sqlContext.sessionState.hadoopConf)
if (sqlContext.conf.orcFilterPushDown) {
// Sets pushed predicates
@@ -278,7 +278,7 @@ private[orc] case class OrcTableScan(
with HiveInspectors {
def execute(): RDD[InternalRow] = {
- val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+ val job = Job.getInstance(new Configuration(sqlContext.sessionState.hadoopConf))
val conf = job.getConfiguration
// Tries to push down filters if ORC filter push-down is enabled
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index bf099e09e3..04b2494043 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
@@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{RuntimeConfigImpl, SQLConf}
import org.apache.spark.util.{ShutdownHookManager, Utils}
// SPARK-3729: Test key required to check for initialization errors with config.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 68244cdb11..5965cdc81c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -374,7 +374,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val expectedPath =
sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
val filesystemPath = new Path(expectedPath)
- val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
+ val fs = filesystemPath.getFileSystem(sqlContext.sessionState.hadoopConf)
if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
// It is a managed table when we do not specify the location.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 014c1009ed..8b4e4dced8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -239,12 +239,12 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
// Unset default URI Scheme and Authority: throw exception
- val originalFsName = hiveContext.sparkContext.hadoopConfiguration.get("fs.default.name")
- hiveContext.sparkContext.hadoopConfiguration.unset("fs.default.name")
+ val originalFsName = hiveContext.sessionState.hadoopConf.get("fs.default.name")
+ hiveContext.sessionState.hadoopConf.unset("fs.default.name")
intercept[AnalysisException] {
sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
}
- hiveContext.sparkContext.hadoopConfiguration.set("fs.default.name", originalFsName)
+ hiveContext.sessionState.hadoopConf.set("fs.default.name", originalFsName)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 206d911e0d..fd19fcbd4e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -36,7 +36,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
val expectedTablePath =
hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
val filesystemPath = new Path(expectedTablePath)
- val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
+ val fs = filesystemPath.getFileSystem(hiveContext.sessionState.hadoopConf)
fs.exists(filesystemPath)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 2345c1cf9c..f11c055fb9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -94,7 +94,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
.orc(path)
// Check if this is compressed as ZLIB.
- val conf = sparkContext.hadoopConfiguration
+ val conf = sqlContext.sessionState.hadoopConf
val fs = FileSystem.getLocal(conf)
val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
assert(maybeOrcFile.isDefined)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 4fb78ac02c..0a0cdf60e8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -181,7 +181,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
// Following codec is supported in hive-0.13.1, ignore it now
ignore("Other compression options for writing to an ORC file - 0.13.1 and above") {
val data = (1 to 100).map(i => (i, s"val_$i"))
- val conf = sparkContext.hadoopConfiguration
+ val conf = sqlContext.sessionState.hadoopConf
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY")
withOrcFile(data) { file =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index dad4f87ae3..eced8ed57f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -74,7 +74,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
inputAttributes.find(_.name == field.name)
}
- val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val conf = new Configuration(sqlContext.sessionState.hadoopConf)
val broadcastedConf =
sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 5378336ff8..3b16468e76 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -209,7 +209,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
testDF.write.mode(SaveMode.Ignore).format(dataSourceName).save(file.getCanonicalPath)
val path = new Path(file.getCanonicalPath)
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
assert(fs.listStatus(path).isEmpty)
}
}
@@ -510,7 +510,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
s"${file.getCanonicalFile}/p1=2/p2=bar"
).map { p =>
val path = new Path(p)
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
}