aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-29 20:46:07 -0700
committerYin Huai <yhuai@databricks.com>2016-04-29 20:46:07 -0700
commit66773eb8a55bfe6437dd4096c2c55685aca29dcd (patch)
tree35e6a60ea8d70b2bc487c607ae605b9df4a05576 /sql
parentb056e8cb0a7c58c3e4d199af3ee13be50305b747 (diff)
downloadspark-66773eb8a55bfe6437dd4096c2c55685aca29dcd.tar.gz
spark-66773eb8a55bfe6437dd4096c2c55685aca29dcd.tar.bz2
spark-66773eb8a55bfe6437dd4096c2c55685aca29dcd.zip
[SPARK-15012][SQL] Simplify configuration API further
## What changes were proposed in this pull request? 1. Remove all the `spark.setConf` etc. Just expose `spark.conf` 2. Make `spark.conf` take in things set in the core `SparkConf` as well, otherwise users may get confused This was done for both the Python and Scala APIs. ## How was this patch tested? `SQLConfSuite`, python tests. This one fixes the failed tests in #12787 Closes #12787 Author: Andrew Or <andrew@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #12798 from yhuai/conf-api.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala95
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala8
15 files changed, 104 insertions, 154 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
index 9e2e2d0bc5..f82130cfa8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
@@ -184,7 +184,7 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
- if (sparkSession.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+ if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6a600c1379..28f5ccd26b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -284,9 +284,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
new Path(userSpecified).toUri.toString
}.orElse {
val checkpointConfig: Option[String] =
- df.sparkSession.getConf(
- SQLConf.CHECKPOINT_LOCATION,
- None)
+ df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION, None)
checkpointConfig.map { location =>
new Path(location, queryName).toUri.toString
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 7ee9732fa1..4f5bf633fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -302,7 +302,7 @@ class RelationalGroupedDataset protected[sql](
*/
def pivot(pivotColumn: String): RelationalGroupedDataset = {
// This is to prevent unintended OOM errors when the number of distinct values is large
- val maxValues = df.sparkSession.getConf(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
+ val maxValues = df.sparkSession.conf.get(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
// Get the distinct values of the column and sort them so its consistent
val values = df.select(pivotColumn)
.distinct()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
index f2e851520e..670288b234 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -17,8 +17,10 @@
package org.apache.spark.sql
+import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.internal.SQLConf
+
/**
* Runtime configuration interface for Spark. To access this, use [[SparkSession.conf]].
*
@@ -78,6 +80,30 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
/**
* Returns the value of Spark runtime configuration property for the given key.
+ */
+ @throws[NoSuchElementException]("if the key is not set")
+ protected[sql] def get[T](entry: ConfigEntry[T]): T = {
+ sqlConf.getConf(entry)
+ }
+
+ /**
+ * Returns the value of Spark runtime configuration property for the given key.
+ */
+ protected[sql] def get[T](entry: ConfigEntry[T], default: T): T = {
+ sqlConf.getConf(entry, default)
+ }
+
+ /**
+ * Returns all properties set in this conf.
+ *
+ * @since 2.0.0
+ */
+ def getAll: Map[String, String] = {
+ sqlConf.getAllConfs
+ }
+
+ /**
+ * Returns the value of Spark runtime configuration property for the given key.
*
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6dfac3d7ae..ff633cf837 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -134,13 +134,15 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
- def setConf(props: Properties): Unit = sparkSession.setConf(props)
+ def setConf(props: Properties): Unit = {
+ sessionState.conf.setConf(props)
+ }
/**
* Set the given Spark SQL configuration property.
*/
private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
- sparkSession.setConf(entry, value)
+ sessionState.conf.setConf(entry, value)
}
/**
@@ -149,7 +151,9 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
- def setConf(key: String, value: String): Unit = sparkSession.setConf(key, value)
+ def setConf(key: String, value: String): Unit = {
+ sparkSession.conf.set(key, value)
+ }
/**
* Return the value of Spark SQL configuration property for the given key.
@@ -157,13 +161,17 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
- def getConf(key: String): String = sparkSession.getConf(key)
+ def getConf(key: String): String = {
+ sparkSession.conf.get(key)
+ }
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
* yet, return `defaultValue` in [[ConfigEntry]].
*/
- private[sql] def getConf[T](entry: ConfigEntry[T]): T = sparkSession.getConf(entry)
+ private[sql] def getConf[T](entry: ConfigEntry[T]): T = {
+ sparkSession.conf.get(entry)
+ }
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -171,7 +179,7 @@ class SQLContext private[sql](
* desired one.
*/
private[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
- sparkSession.getConf(entry, defaultValue)
+ sparkSession.conf.get(entry, defaultValue)
}
/**
@@ -181,7 +189,9 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
- def getConf(key: String, defaultValue: String): String = sparkSession.getConf(key, defaultValue)
+ def getConf(key: String, defaultValue: String): String = {
+ sparkSession.conf.get(key, defaultValue)
+ }
/**
* Return all the configuration properties that have been set (i.e. not the default).
@@ -190,7 +200,9 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
- def getAllConfs: immutable.Map[String, String] = sparkSession.getAllConfs
+ def getAllConfs: immutable.Map[String, String] = {
+ sparkSession.conf.getAll
+ }
protected[sql] def parseSql(sql: String): LogicalPlan = sparkSession.parseSql(sql)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 11c0aaab23..7d3ff9e947 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -109,6 +109,18 @@ class SparkSession private(
protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
/**
+ * Runtime configuration interface for Spark.
+ *
+ * This is the interface through which the user can get and set all Spark and Hadoop
+ * configurations that are relevant to Spark SQL. When getting the value of a config,
+ * this defaults to the value set in the underlying [[SparkContext]], if any.
+ *
+ * @group config
+ * @since 2.0.0
+ */
+ @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf)
+
+ /**
* :: Experimental ::
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
* that listen for execution metrics.
@@ -187,89 +199,6 @@ class SparkSession private(
}
- /* -------------------------------------------------- *
- | Methods for accessing or mutating configurations |
- * -------------------------------------------------- */
-
- /**
- * Runtime configuration interface for Spark.
- *
- * This is the interface through which the user can get and set all Spark and Hadoop
- * configurations that are relevant to Spark SQL. When getting the value of a config,
- * this defaults to the value set in the underlying [[SparkContext]], if any.
- *
- * @group config
- * @since 2.0.0
- */
- @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf)
-
- /**
- * Set Spark SQL configuration properties.
- *
- * @group config
- * @since 2.0.0
- */
- def setConf(props: Properties): Unit = sessionState.setConf(props)
-
- /**
- * Set the given Spark SQL configuration property.
- *
- * @group config
- * @since 2.0.0
- */
- def setConf(key: String, value: String): Unit = sessionState.setConf(key, value)
-
- /**
- * Return the value of Spark SQL configuration property for the given key.
- *
- * @group config
- * @since 2.0.0
- */
- def getConf(key: String): String = sessionState.conf.getConfString(key)
-
- /**
- * Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue`.
- *
- * @group config
- * @since 2.0.0
- */
- def getConf(key: String, defaultValue: String): String = {
- sessionState.conf.getConfString(key, defaultValue)
- }
-
- /**
- * Return all the configuration properties that have been set (i.e. not the default).
- * This creates a new copy of the config properties in the form of a Map.
- *
- * @group config
- * @since 2.0.0
- */
- def getAllConfs: immutable.Map[String, String] = sessionState.conf.getAllConfs
-
- /**
- * Set the given Spark SQL configuration property.
- */
- protected[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
- sessionState.setConf(entry, value)
- }
-
- /**
- * Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue` in [[ConfigEntry]].
- */
- protected[sql] def getConf[T](entry: ConfigEntry[T]): T = sessionState.conf.getConf(entry)
-
- /**
- * Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
- * desired one.
- */
- protected[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
- sessionState.conf.getConf(entry, defaultValue)
- }
-
-
/* --------------------------------- *
| Methods for creating DataFrames |
* --------------------------------- */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index bbb2a2235f..2409b5d203 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -56,7 +56,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
"determining the number of reducers is not supported."
throw new IllegalArgumentException(msg)
} else {
- sparkSession.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
+ sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, value)
Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
}
}
@@ -65,7 +65,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sparkSession: SparkSession) => {
- sparkSession.setConf(key, value)
+ sparkSession.conf.set(key, value)
Seq(Row(key, value))
}
(keyValueOutput, runFunc)
@@ -74,7 +74,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
// Queries all key-value pairs that are set in the SQLConf of the sparkSession.
case None =>
val runFunc = (sparkSession: SparkSession) => {
- sparkSession.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
+ sparkSession.conf.getAll.map { case (k, v) => Row(k, v) }.toSeq
}
(keyValueOutput, runFunc)
@@ -107,10 +107,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
// Queries a single property.
case Some((key, None)) =>
val runFunc = (sparkSession: SparkSession) => {
- val value =
- try sparkSession.getConf(key) catch {
- case _: NoSuchElementException => "<undefined>"
- }
+ val value = sparkSession.conf.getOption(key).getOrElse("<undefined>")
Seq(Row(key, value))
}
(keyValueOutput, runFunc)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 4df7d0ce4c..4921e4ca6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -131,7 +131,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
dataColumns = dataColumns,
inputSchema = query.output,
PartitioningUtils.DEFAULT_PARTITION_NAME,
- sparkSession.getConf(SQLConf.PARTITION_MAX_FILES),
+ sparkSession.conf.get(SQLConf.PARTITION_MAX_FILES),
isAppend)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index c689ad08ca..b1513bbe94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -143,10 +143,9 @@ private[sql] class DefaultSource
parameters
.get(ParquetRelation.MERGE_SCHEMA)
.map(_.toBoolean)
- .getOrElse(sparkSession.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
+ .getOrElse(sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
- val mergeRespectSummaries =
- sparkSession.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
+ val mergeRespectSummaries = sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
val filesByType = splitFiles(files)
@@ -281,22 +280,23 @@ private[sql] class DefaultSource
// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
- sparkSession.getConf(SQLConf.PARQUET_BINARY_AS_STRING))
+ sparkSession.conf.get(SQLConf.PARQUET_BINARY_AS_STRING))
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
- sparkSession.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
+ sparkSession.conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
// Try to push down filters when filter push-down is enabled.
- val pushed = if (sparkSession.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
- filters
- // Collects all converted Parquet filter predicates. Notice that not all predicates can be
- // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
- // is used here.
- .flatMap(ParquetFilters.createFilter(requiredSchema, _))
- .reduceOption(FilterApi.and)
- } else {
- None
- }
+ val pushed =
+ if (sparkSession.conf.get(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+ // is used here.
+ .flatMap(ParquetFilters.createFilter(requiredSchema, _))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index c548fbd369..b694b6155a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -80,11 +80,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
* a live lock may happen if the compaction happens too frequently: one processing keeps deleting
* old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
*/
- private val fileCleanupDelayMs = sparkSession.getConf(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
+ private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
- private val isDeletingExpiredLog = sparkSession.getConf(SQLConf.FILE_SINK_LOG_DELETION)
+ private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
- private val compactInterval = sparkSession.getConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
+ private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7de7748211..0bcf0f817a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -763,9 +763,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
}
private def setConfWithCheck(key: String, value: String): Unit = {
- if (key.startsWith("spark.") && !key.startsWith("spark.sql.")) {
- logWarning(s"Attempt to set non-Spark SQL config in SQLConf: key = $key, value = $value")
- }
settings.put(key, value)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index cacf50ec7a..6fa044aee0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -152,9 +152,11 @@ private[sql] class SessionState(sparkSession: SparkSession) {
private val jarClassLoader: NonClosableMutableURLClassLoader =
sparkSession.sharedState.jarClassLoader
- // Automatically extract `spark.sql.*` entries and put it in our SQLConf
+ // Automatically extract all entries and put it in our SQLConf
// We need to call it after all of vals have been initialized.
- setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))
+ sparkSession.sparkContext.getConf.getAll.foreach { case (k, v) =>
+ conf.setConfString(k, v)
+ }
// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
@@ -170,19 +172,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
}
- final def setConf(properties: Properties): Unit = {
- properties.asScala.foreach { case (k, v) => setConf(k, v) }
- }
-
- final def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
- conf.setConf(entry, value)
- setConf(entry.key, entry.stringConverter(value))
- }
-
- def setConf(key: String, value: String): Unit = {
- conf.setConfString(key, value)
- }
-
def addJar(path: String): Unit = {
sparkSession.sparkContext.addJar(path)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 2f62ad4850..1d5fc570c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -79,10 +79,4 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule))
}
- test("SQLContext can access `spark.sql.*` configs") {
- sc.conf.set("spark.sql.with.or.without.you", "my love")
- val sqlContext = new SQLContext(sc)
- assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love")
- }
-
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index e687e6a5ce..b87f482941 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.internal
-import org.apache.spark.sql.{QueryTest, SQLContext}
+import org.apache.spark.sql.{QueryTest, SparkSession, SQLContext}
import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
class SQLConfSuite extends QueryTest with SharedSQLContext {
@@ -125,4 +125,18 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
sqlContext.conf.clear()
}
+
+ test("SparkSession can access configs set in SparkConf") {
+ try {
+ sparkContext.conf.set("spark.to.be.or.not.to.be", "my love")
+ sparkContext.conf.set("spark.sql.with.or.without.you", "my love")
+ val spark = new SparkSession(sparkContext)
+ assert(spark.conf.get("spark.to.be.or.not.to.be") == "my love")
+ assert(spark.conf.get("spark.sql.with.or.without.you") == "my love")
+ } finally {
+ sparkContext.conf.remove("spark.to.be.or.not.to.be")
+ sparkContext.conf.remove("spark.sql.with.or.without.you")
+ }
+ }
+
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index b17a88b2ef..f3076912cb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.hive
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.sql._
@@ -114,12 +113,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
* - allow SQL11 keywords to be used as identifiers
*/
def setDefaultOverrideConfs(): Unit = {
- setConf(ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, "false")
- }
-
- override def setConf(key: String, value: String): Unit = {
- super.setConf(key, value)
- metadataHive.runSqlHive(s"SET $key=$value")
+ conf.setConfString(ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, "false")
}
override def addJar(path: String): Unit = {