From cfa64882fc0728d7becf55b8a424926e4ca93887 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 25 Apr 2016 17:52:25 -0700 Subject: [SPARK-14902][SQL] Expose RuntimeConfig in SparkSession ## What changes were proposed in this pull request? `RuntimeConfig` is the new user-facing API in 2.0 added in #11378. Until now, however, it's been dead code. This patch uses `RuntimeConfig` in `SessionState` and exposes that through the `SparkSession`. ## How was this patch tested? New test in `SQLContextSuite`. Author: Andrew Or Closes #12669 from andrewor14/use-runtime-conf. --- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../scala/org/apache/spark/sql/SQLContext.scala | 3 +- .../scala/org/apache/spark/sql/SparkSession.scala | 37 +++++++++++++++------ .../spark/sql/execution/command/AnalyzeTable.scala | 2 +- .../spark/sql/execution/command/tables.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 8 ++--- .../datasources/InsertIntoHadoopFsRelation.scala | 3 +- .../execution/datasources/csv/DefaultSource.scala | 2 +- .../datasources/fileSourceInterfaces.scala | 2 +- .../execution/datasources/json/JSONRelation.scala | 4 +-- .../datasources/parquet/ParquetRelation.scala | 4 +-- .../execution/datasources/text/DefaultSource.scala | 2 +- .../sql/execution/streaming/FileStreamSink.scala | 2 +- .../sql/execution/streaming/FileStreamSource.scala | 2 +- .../sql/execution/streaming/HDFSMetadataLog.scala | 2 +- .../execution/streaming/StatefulAggregate.scala | 4 +-- .../execution/streaming/StreamFileCatalog.scala | 2 +- .../execution/streaming/state/StateStoreRDD.scala | 7 ++-- .../sql/execution/streaming/state/package.scala | 7 ++-- .../spark/sql/internal/RuntimeConfigImpl.scala | 38 +++++++++++----------- .../apache/spark/sql/internal/SessionState.scala | 3 ++ .../org/apache/spark/sql/SQLContextSuite.scala | 23 +++++++++++++ .../datasources/parquet/ParquetIOSuite.scala | 8 ++--- .../streaming/FileStreamSinkLogSuite.scala | 2 +- .../execution/streaming/HDFSMetadataLogSuite.scala | 2 +- .../org/apache/spark/sql/test/SQLTestUtils.scala | 2 +- .../org/apache/spark/sql/test/TestSQLContext.scala | 4 +-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../apache/spark/sql/hive/HiveSessionState.scala | 1 + .../apache/spark/sql/hive/orc/OrcRelation.scala | 6 ++-- .../org/apache/spark/sql/hive/test/TestHive.scala | 4 +-- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../sql/hive/execution/HiveCommandSuite.scala | 6 ++-- .../spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 2 +- .../apache/spark/sql/hive/orc/OrcQuerySuite.scala | 2 +- .../spark/sql/sources/SimpleTextRelation.scala | 2 +- .../spark/sql/sources/hadoopFsRelationSuites.scala | 4 +-- 38 files changed, 131 insertions(+), 81 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 0745ef47ff..99d92b9257 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -297,7 +297,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { // If offsets have already been created, we trying to resume a query. val checkpointPath = new Path(checkpointLocation, "offsets") - val fs = checkpointPath.getFileSystem(df.sqlContext.sparkContext.hadoopConfiguration) + val fs = checkpointPath.getFileSystem(df.sqlContext.sessionState.hadoopConf) if (fs.exists(checkpointPath)) { throw new AnalysisException( s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 4c9977c8c7..dde139608a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -103,7 +103,8 @@ class SQLContext private[sql]( protected[sql] def sessionState: SessionState = sparkSession.sessionState protected[sql] def sharedState: SharedState = sparkSession.sharedState - protected[sql] def conf: SQLConf = sparkSession.conf + protected[sql] def conf: SQLConf = sessionState.conf + protected[sql] def runtimeConf: RuntimeConfig = sparkSession.conf protected[sql] def cacheManager: CacheManager = sparkSession.cacheManager protected[sql] def listener: SQLListener = sparkSession.listener protected[sql] def externalCatalog: ExternalCatalog = sparkSession.externalCatalog diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 3561765642..00256bd0be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.execution.datasources.{CreateTableUsing, LogicalRelation} import org.apache.spark.sql.execution.ui.SQLListener -import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} +import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SharedState} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{DataType, LongType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager @@ -76,8 +76,8 @@ class SparkSession private( } /** - * State isolated across sessions, including SQL configurations, temporary tables, - * registered functions, and everything else that accepts a [[SQLConf]]. + * State isolated across sessions, including SQL configurations, temporary tables, registered + * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]]. */ @transient protected[sql] lazy val sessionState: SessionState = { @@ -103,7 +103,6 @@ class SparkSession private( _wrapped = sqlContext } - protected[sql] def conf: SQLConf = sessionState.conf protected[sql] def cacheManager: CacheManager = sharedState.cacheManager protected[sql] def listener: SQLListener = sharedState.listener protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog @@ -191,6 +190,22 @@ class SparkSession private( | Methods for accessing or mutating configurations | * -------------------------------------------------- */ + @transient private lazy val _conf: RuntimeConfig = { + new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf) + } + + /** + * Runtime configuration interface for Spark. + * + * This is the interface through which the user can get and set all Spark and Hadoop + * configurations that are relevant to Spark SQL. When getting the value of a config, + * this defaults to the value set in the underlying [[SparkContext]], if any. + * + * @group config + * @since 2.0.0 + */ + def conf: RuntimeConfig = _conf + /** * Set Spark SQL configuration properties. * @@ -213,7 +228,7 @@ class SparkSession private( * @group config * @since 2.0.0 */ - def getConf(key: String): String = conf.getConfString(key) + def getConf(key: String): String = sessionState.conf.getConfString(key) /** * Return the value of Spark SQL configuration property for the given key. If the key is not set @@ -222,7 +237,9 @@ class SparkSession private( * @group config * @since 2.0.0 */ - def getConf(key: String, defaultValue: String): String = conf.getConfString(key, defaultValue) + def getConf(key: String, defaultValue: String): String = { + sessionState.conf.getConfString(key, defaultValue) + } /** * Return all the configuration properties that have been set (i.e. not the default). @@ -231,7 +248,7 @@ class SparkSession private( * @group config * @since 2.0.0 */ - def getAllConfs: immutable.Map[String, String] = conf.getAllConfs + def getAllConfs: immutable.Map[String, String] = sessionState.conf.getAllConfs /** * Set the given Spark SQL configuration property. @@ -244,7 +261,7 @@ class SparkSession private( * Return the value of Spark SQL configuration property for the given key. If the key is not set * yet, return `defaultValue` in [[ConfigEntry]]. */ - protected[sql] def getConf[T](entry: ConfigEntry[T]): T = conf.getConf(entry) + protected[sql] def getConf[T](entry: ConfigEntry[T]): T = sessionState.conf.getConf(entry) /** * Return the value of Spark SQL configuration property for the given key. If the key is not set @@ -252,7 +269,7 @@ class SparkSession private( * desired one. */ protected[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = { - conf.getConf(entry, defaultValue) + sessionState.conf.getConf(entry, defaultValue) } @@ -601,7 +618,7 @@ class SparkSession private( */ @Experimental def createExternalTable(tableName: String, path: String): DataFrame = { - val dataSourceName = conf.defaultDataSourceName + val dataSourceName = sessionState.conf.defaultDataSourceName createExternalTable(tableName, path, dataSourceName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala index 7fa246ba51..e6c5351106 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala @@ -77,7 +77,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand { catalogTable.storage.locationUri.map { p => val path = new Path(p) try { - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf) calculateTableSize(fs, path) } catch { case NonFatal(e) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index eae8fe8975..5cac9d879f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -210,7 +210,7 @@ case class LoadData( // Follow Hive's behavior: // If no schema or authority is provided with non-local inpath, // we will use hadoop configuration "fs.default.name". - val defaultFSConf = sqlContext.sparkContext.hadoopConfiguration.get("fs.default.name") + val defaultFSConf = sqlContext.sessionState.hadoopConf.get("fs.default.name") val defaultFS = if (defaultFSConf == null) { new URI("") } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 07bc8ae148..4e7214ce83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -131,7 +131,7 @@ case class DataSource( val allPaths = caseInsensitiveOptions.get("path") val globbedPaths = allPaths.toSeq.flatMap { path => val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray @@ -225,7 +225,7 @@ case class DataSource( case Seq(singlePath) => try { val hdfsPath = new Path(singlePath) - val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf) val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir) val res = fs.exists(metadataPath) res @@ -284,7 +284,7 @@ case class DataSource( val allPaths = caseInsensitiveOptions.get("path") ++ paths val globbedPaths = allPaths.flatMap { path => val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) @@ -374,7 +374,7 @@ case class DataSource( val path = new Path(caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") })) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf) path.makeQualified(fs.getUri, fs.getWorkingDirectory) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 95629a9923..a636ca2f29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat @@ -77,7 +78,7 @@ private[sql] case class InsertIntoHadoopFsRelation( s"cannot save to file.") } - val hadoopConf = sqlContext.sparkContext.hadoopConfiguration + val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf) val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index 61ec7ed2b1..7d407a7747 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -103,7 +103,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { val csvOptions = new CSVOptions(options) val headers = requiredSchema.fields.map(_.name) - val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val conf = new Configuration(sqlContext.sessionState.hadoopConf) val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) (file: PartitionedFile) => { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 4063c6ebce..731b0047e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -271,7 +271,7 @@ class HDFSFileCatalog( val partitionSchema: Option[StructType]) extends FileCatalog with Logging { - private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + private val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf) var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus] var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 7773ff550f..580a0e1de6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -98,7 +98,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { - val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val conf = new Configuration(sqlContext.sessionState.hadoopConf) val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) @@ -126,7 +126,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { } private def createBaseRdd(sqlContext: SQLContext, inputPaths: Seq[FileStatus]): RDD[String] = { - val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) + val job = Job.getInstance(sqlContext.sessionState.hadoopConf) val conf = job.getConfiguration val paths = inputPaths.map(_.getPath) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index bbbbc5ebe9..28c6664085 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -263,7 +263,7 @@ private[sql] class DefaultSource requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { - val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val parquetConf = new Configuration(sqlContext.sessionState.hadoopConf) parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) parquetConf.set( CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, @@ -648,7 +648,7 @@ private[sql] object ParquetRelation extends Logging { val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat - val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration) + val serializedConf = new SerializableConfiguration(sqlContext.sessionState.hadoopConf) // !! HACK ALERT !! // diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index fa0df61ca5..f7ac1ac8e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -90,7 +90,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { - val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val conf = new Configuration(sqlContext.sessionState.hadoopConf) val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 4f722a514b..a86108862f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -45,7 +45,7 @@ class FileStreamSink( private val basePath = new Path(path) private val logPath = new Path(basePath, FileStreamSink.metadataDir) private val fileLog = new FileStreamSinkLog(sqlContext, logPath.toUri.toString) - private val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + private val fs = basePath.getFileSystem(sqlContext.sessionState.hadoopConf) override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 51c3aee835..aeb64c929c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -38,7 +38,7 @@ class FileStreamSource( override val schema: StructType, dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging { - private val fs = new Path(path).getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + private val fs = new Path(path).getFileSystem(sqlContext.sessionState.hadoopConf) private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath) private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index b52f7a28b4..dd6760d341 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -212,7 +212,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) } private def createFileManager(): FileManager = { - val hadoopConf = sqlContext.sparkContext.hadoopConfiguration + val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf) try { new FileContextManager(metadataPath, hadoopConf) } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index de4305f564..d5e4dd8f78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -63,7 +63,7 @@ case class StateStoreRestoreExec( storeVersion = getStateId.batchId, keyExpressions.toStructType, child.output.toStructType, - new StateStoreConf(sqlContext.conf), + sqlContext.sessionState, Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) => val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) iter.flatMap { row => @@ -92,7 +92,7 @@ case class StateStoreSaveExec( storeVersion = getStateId.batchId, keyExpressions.toStructType, child.output.toStructType, - new StateStoreConf(sqlContext.conf), + sqlContext.sessionState, Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) => new Iterator[InternalRow] { private[this] val baseIterator = iter diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala index 95b5129351..a08a4bb4c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala @@ -30,7 +30,7 @@ class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog val metadataDirectory = new Path(path, FileStreamSink.metadataDir) logInfo(s"Reading streaming file log from $metadataDirectory") val metadataLog = new FileStreamSinkLog(sqlContext, metadataDirectory.toUri.toString) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf) override def paths: Seq[Path] = path :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala index d708486d8e..635bb86607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -37,13 +38,15 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( storeVersion: Long, keySchema: StructType, valueSchema: StructType, - storeConf: StateStoreConf, + sessionState: SessionState, @transient private val storeCoordinator: Option[StateStoreCoordinatorRef]) extends RDD[U](dataRDD) { + private val storeConf = new StateStoreConf(sessionState.conf) + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it private val confBroadcast = dataRDD.context.broadcast( - new SerializableConfiguration(dataRDD.context.hadoopConfiguration)) + new SerializableConfiguration(sessionState.hadoopConf)) override protected def getPartitions: Array[Partition] = dataRDD.partitions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala index 9b6d0918e2..4914a9d722 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType package object state { @@ -43,7 +44,7 @@ package object state { storeVersion, keySchema, valueSchema, - new StateStoreConf(sqlContext.conf), + sqlContext.sessionState, Some(sqlContext.streams.stateStoreCoordinator))( storeUpdateFunction) } @@ -55,7 +56,7 @@ package object state { storeVersion: Long, keySchema: StructType, valueSchema: StructType, - storeConf: StateStoreConf, + sessionState: SessionState, storeCoordinator: Option[StateStoreCoordinatorRef])( storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = { val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction) @@ -67,7 +68,7 @@ package object state { storeVersion, keySchema, valueSchema, - storeConf, + sessionState, storeCoordinator) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala index 058df1e3c1..137323583b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala @@ -17,20 +17,21 @@ package org.apache.spark.sql.internal +import org.apache.hadoop.conf.Configuration + import org.apache.spark.sql.RuntimeConfig + /** * Implementation for [[RuntimeConfig]]. */ -class RuntimeConfigImpl extends RuntimeConfig { - - private val conf = new SQLConf - - private val hadoopConf = java.util.Collections.synchronizedMap( - new java.util.HashMap[String, String]()) +class RuntimeConfigImpl( + sqlConf: SQLConf = new SQLConf, + hadoopConf: Configuration = new Configuration) + extends RuntimeConfig { override def set(key: String, value: String): RuntimeConfig = { - conf.setConfString(key, value) + sqlConf.setConfString(key, value) this } @@ -39,7 +40,7 @@ class RuntimeConfigImpl extends RuntimeConfig { override def set(key: String, value: Long): RuntimeConfig = set(key, value.toString) @throws[NoSuchElementException]("if the key is not set") - override def get(key: String): String = conf.getConfString(key) + override def get(key: String): String = sqlConf.getConfString(key) override def getOption(key: String): Option[String] = { try Option(get(key)) catch { @@ -47,27 +48,26 @@ class RuntimeConfigImpl extends RuntimeConfig { } } - override def unset(key: String): Unit = conf.unsetConf(key) + override def unset(key: String): Unit = sqlConf.unsetConf(key) - override def setHadoop(key: String, value: String): RuntimeConfig = { - hadoopConf.put(key, value) + override def setHadoop(key: String, value: String): RuntimeConfig = hadoopConf.synchronized { + hadoopConf.set(key, value) this } @throws[NoSuchElementException]("if the key is not set") override def getHadoop(key: String): String = hadoopConf.synchronized { - if (hadoopConf.containsKey(key)) { - hadoopConf.get(key) - } else { + Option(hadoopConf.get(key)).getOrElse { throw new NoSuchElementException(key) } } - override def getHadoopOption(key: String): Option[String] = { - try Option(getHadoop(key)) catch { - case _: NoSuchElementException => None - } + override def getHadoopOption(key: String): Option[String] = hadoopConf.synchronized { + Option(hadoopConf.get(key)) + } + + override def unsetHadoop(key: String): Unit = hadoopConf.synchronized { + hadoopConf.unset(key) } - override def unsetHadoop(key: String): Unit = hadoopConf.remove(key) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index f683bbbeb5..04ad729659 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -21,6 +21,8 @@ import java.util.Properties import scala.collection.JavaConverters._ +import org.apache.hadoop.conf.Configuration + import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} @@ -46,6 +48,7 @@ private[sql] class SessionState(ctx: SQLContext) { * SQL-specific key-value configurations. */ lazy val conf: SQLConf = new SQLConf + lazy val hadoopConf: Configuration = new Configuration(ctx.sparkContext.hadoopConfiguration) // Automatically extract `spark.sql.*` entries and put it in our SQLConf setConf(SQLContext.getSQLProperties(ctx.sparkContext.getConf)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index 2f62ad4850..d49cc103b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -85,4 +85,27 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love") } + test("Hadoop conf interaction between SQLContext and SparkContext") { + val mySpecialKey = "mai.special.key" + val mySpecialValue = "msv" + try { + sc.hadoopConfiguration.set(mySpecialKey, mySpecialValue) + val sqlContext = SQLContext.getOrCreate(sc) + val sessionState = sqlContext.sessionState + assert(sessionState.hadoopConf.get(mySpecialKey) === mySpecialValue) + assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === mySpecialValue) + // mutating hadoop conf in SQL doesn't mutate the underlying one + sessionState.hadoopConf.set(mySpecialKey, "no no no") + assert(sessionState.hadoopConf.get(mySpecialKey) === "no no no") + assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "no no no") + assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue) + sqlContext.runtimeConf.setHadoop(mySpecialKey, "yes yes yes") + assert(sessionState.hadoopConf.get(mySpecialKey) === "yes yes yes") + assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "yes yes yes") + assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue) + } finally { + sc.hadoopConfiguration.unset(mySpecialKey) + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 581095d3dc..0aab36ae38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -113,7 +113,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val path = new Path(location.getCanonicalPath) - val conf = sparkContext.hadoopConfiguration + val conf = new Configuration(sqlContext.sessionState.hadoopConf) writeMetadata(parquetSchema, path, conf) readParquetFile(path.toString)(df => { val sparkTypes = df.schema.map(_.dataType) @@ -250,7 +250,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val path = new Path(location.getCanonicalPath) - val conf = sparkContext.hadoopConfiguration + val conf = new Configuration(sqlContext.sessionState.hadoopConf) writeMetadata(parquetSchema, path, conf) val errorMessage = intercept[Throwable] { sqlContext.read.parquet(path.toString).printSchema() @@ -271,7 +271,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val path = new Path(location.getCanonicalPath) - val conf = sparkContext.hadoopConfiguration + val conf = new Configuration(sqlContext.sessionState.hadoopConf) writeMetadata(parquetSchema, path, conf) val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType) assert(sparkTypes === expectedSparkTypes) @@ -431,7 +431,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) val path = new Path(location.getCanonicalPath) - val conf = sparkContext.hadoopConfiguration + val conf = new Configuration(sqlContext.sessionState.hadoopConf) writeMetadata(parquetSchema, path, conf, extraMetadata) readParquetFile(path.toString) { df => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 70c2a82990..a164f4c733 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -217,7 +217,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3", SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") { withFileStreamSinkLog { sinkLog => - val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sessionState.hadoopConf) def listBatchFiles(): Set[String] = { fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 1328142704..22e011cfb7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -82,7 +82,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") { - sqlContext.sparkContext.hadoopConfiguration.set( + sqlContext.sessionState.hadoopConf.set( s"fs.$scheme.impl", classOf[FakeFileSystem].getName) withTempDir { temp => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 5691105235..fcfac359f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -88,7 +88,7 @@ private[sql] trait SQLTestUtils * The Hadoop configuration used by the active [[SQLContext]]. */ protected def hadoopConfiguration: Configuration = { - sparkContext.hadoopConfiguration + sqlContext.sessionState.hadoopConf } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 431ac8e2c8..d270775af6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.test import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext} +import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SQLConf} /** * A special [[SQLContext]] prepared for testing. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 13f29e08fb..edb87b94ea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -544,7 +544,7 @@ private[hive] class MetaStoreFileCatalog( extends HDFSFileCatalog(ctx, Map.empty, paths, Some(partitionSpecFromHive.partitionColumns)) { override def getStatus(path: Path): Array[FileStatus] = { - val fs = path.getFileSystem(ctx.sparkContext.hadoopConfiguration) + val fs = path.getFileSystem(ctx.sessionState.hadoopConf) fs.listStatus(path) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 6457a904eb..bf0288c9f7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -223,6 +223,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) } + // TODO: why do we get this from SparkConf but not SQLConf? def hiveThriftServerSingleSession: Boolean = { ctx.sparkContext.conf.getBoolean( "spark.sql.hive.thriftServer.singleSession", defaultValue = false) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 4250a87341..1095f5fd95 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -57,7 +57,7 @@ private[sql] class DefaultSource files: Seq[FileStatus]): Option[StructType] = { OrcFileOperator.readSchema( files.map(_.getPath.toUri.toString), - Some(sqlContext.sparkContext.hadoopConfiguration) + Some(new Configuration(sqlContext.sessionState.hadoopConf)) ) } @@ -115,7 +115,7 @@ private[sql] class DefaultSource requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { - val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val orcConf = new Configuration(sqlContext.sessionState.hadoopConf) if (sqlContext.conf.orcFilterPushDown) { // Sets pushed predicates @@ -278,7 +278,7 @@ private[orc] case class OrcTableScan( with HiveInspectors { def execute(): RDD[InternalRow] = { - val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) + val job = Job.getInstance(new Configuration(sqlContext.sessionState.hadoopConf)) val conf = job.getConfiguration // Tries to push down filters if ORC filter push-down is enabled diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index bf099e09e3..04b2494043 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION -import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo @@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{RuntimeConfigImpl, SQLConf} import org.apache.spark.util.{ShutdownHookManager, Utils} // SPARK-3729: Test key required to check for initialization errors with config. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 68244cdb11..5965cdc81c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -374,7 +374,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val expectedPath = sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable")) val filesystemPath = new Path(expectedPath) - val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) + val fs = filesystemPath.getFileSystem(sqlContext.sessionState.hadoopConf) if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true) // It is a managed table when we do not specify the location. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 014c1009ed..8b4e4dced8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -239,12 +239,12 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } // Unset default URI Scheme and Authority: throw exception - val originalFsName = hiveContext.sparkContext.hadoopConfiguration.get("fs.default.name") - hiveContext.sparkContext.hadoopConfiguration.unset("fs.default.name") + val originalFsName = hiveContext.sessionState.hadoopConf.get("fs.default.name") + hiveContext.sessionState.hadoopConf.unset("fs.default.name") intercept[AnalysisException] { sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""") } - hiveContext.sparkContext.hadoopConfiguration.set("fs.default.name", originalFsName) + hiveContext.sessionState.hadoopConf.set("fs.default.name", originalFsName) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 206d911e0d..fd19fcbd4e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -36,7 +36,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val expectedTablePath = hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier) val filesystemPath = new Path(expectedTablePath) - val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) + val fs = filesystemPath.getFileSystem(hiveContext.sessionState.hadoopConf) fs.exists(filesystemPath) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 2345c1cf9c..f11c055fb9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -94,7 +94,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { .orc(path) // Check if this is compressed as ZLIB. - val conf = sparkContext.hadoopConfiguration + val conf = sqlContext.sessionState.hadoopConf val fs = FileSystem.getLocal(conf) val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) assert(maybeOrcFile.isDefined) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 4fb78ac02c..0a0cdf60e8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -181,7 +181,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { // Following codec is supported in hive-0.13.1, ignore it now ignore("Other compression options for writing to an ORC file - 0.13.1 and above") { val data = (1 to 100).map(i => (i, s"val_$i")) - val conf = sparkContext.hadoopConfiguration + val conf = sqlContext.sessionState.hadoopConf conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY") withOrcFile(data) { file => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index dad4f87ae3..eced8ed57f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -74,7 +74,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { inputAttributes.find(_.name == field.name) } - val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val conf = new Configuration(sqlContext.sessionState.hadoopConf) val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 5378336ff8..3b16468e76 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -209,7 +209,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes testDF.write.mode(SaveMode.Ignore).format(dataSourceName).save(file.getCanonicalPath) val path = new Path(file.getCanonicalPath) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf) assert(fs.listStatus(path).isEmpty) } } @@ -510,7 +510,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes s"${file.getCanonicalFile}/p1=2/p2=bar" ).map { p => val path = new Path(p) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf) path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString } -- cgit v1.2.3