diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-10-11 20:27:08 -0700 |
---|---|---|
committer | gatorsmile <gatorsmile@gmail.com> | 2016-10-11 20:27:08 -0700 |
commit | b9a147181d5e38d9abed0c7215f4c5cb695f579c (patch) | |
tree | e2c3bda2b680a67b914b2fc737bbafb0350efb93 /sql/core | |
parent | 5b77e66dd6a128c5992ab3bde418613f84be7009 (diff) | |
download | spark-b9a147181d5e38d9abed0c7215f4c5cb695f579c.tar.gz spark-b9a147181d5e38d9abed0c7215f4c5cb695f579c.tar.bz2 spark-b9a147181d5e38d9abed0c7215f4c5cb695f579c.zip |
[SPARK-17720][SQL] introduce static SQL conf
## What changes were proposed in this pull request?
SQLConf is session-scoped and mutable. However, we do have the requirement for a static SQL conf, which is global and immutable, e.g. the `schemaStringThreshold` in `HiveExternalCatalog`, the flag to enable/disable hive support, the global temp view database in https://github.com/apache/spark/pull/14897.
Actually we've already implemented static SQL conf implicitly via `SparkConf`, this PR just make it explicit and expose it to users, so that they can see the config value via SQL command or `SparkSession.conf`, and forbid users to set/unset static SQL conf.
## How was this patch tested?
new tests in SQLConfSuite
Author: Wenchen Fan <wenchen@databricks.com>
Closes #15295 from cloud-fan/global-conf.
Diffstat (limited to 'sql/core')
7 files changed, 79 insertions, 26 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala index c2baa74ed7..9108d19d0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import org.apache.spark.annotation.InterfaceStability import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** @@ -38,6 +38,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { * @since 2.0.0 */ def set(key: String, value: String): Unit = { + requireNonStaticConf(key) sqlConf.setConfString(key, value) } @@ -47,6 +48,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { * @since 2.0.0 */ def set(key: String, value: Boolean): Unit = { + requireNonStaticConf(key) set(key, value.toString) } @@ -56,6 +58,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { * @since 2.0.0 */ def set(key: String, value: Long): Unit = { + requireNonStaticConf(key) set(key, value.toString) } @@ -124,6 +127,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { * @since 2.0.0 */ def unset(key: String): Unit = { + requireNonStaticConf(key) sqlConf.unsetConf(key) } @@ -134,4 +138,9 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { sqlConf.contains(key) } + private def requireNonStaticConf(key: String): Unit = { + if (StaticSQLConf.globalConfKeys.contains(key)) { + throw new AnalysisException(s"Cannot modify the value of a static config: $key") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index d26eea5072..137c426b4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -29,7 +29,6 @@ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental, InterfaceStability} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalog.Catalog @@ -41,6 +40,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types.{DataType, LongType, StructType} @@ -812,7 +812,7 @@ object SparkSession { // Get the session from current thread's active session. var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { - options.foreach { case (k, v) => session.conf.set(k, v) } + options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } if (options.nonEmpty) { logWarning("Use an existing SparkSession, some configuration may not take effect.") } @@ -824,7 +824,7 @@ object SparkSession { // If the current thread does not have an active session, get it from the global session. session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { - options.foreach { case (k, v) => session.conf.set(k, v) } + options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } if (options.nonEmpty) { logWarning("Use an existing SparkSession, some configuration may not take effect.") } @@ -850,7 +850,7 @@ object SparkSession { sc } session = new SparkSession(sparkContext) - options.foreach { case (k, v) => session.conf.set(k, v) } + options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } defaultSession.set(session) // Register a successfully instantiated context to the singleton. This should be at the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 7d8ea03a27..9de6510c63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -28,11 +28,11 @@ import org.apache.spark.SparkContext import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.api.r.SerDe import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.execution.command.ShowTablesCommand +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.types._ private[sql] object SQLUtils extends Logging { @@ -64,7 +64,7 @@ private[sql] object SQLUtils extends Logging { spark: SparkSession, sparkConfigMap: JMap[Object, Object]): Unit = { for ((name, value) <- sparkConfigMap.asScala) { - spark.conf.set(name.toString, value.toString) + spark.sessionState.conf.setConfString(name.toString, value.toString) } for ((name, value) <- sparkConfigMap.asScala) { spark.sparkContext.conf.set(name.toString, value.toString) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fecdf792fd..8cbfc4c762 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -41,7 +41,7 @@ object SQLConf { private val sqlConfEntries = java.util.Collections.synchronizedMap( new java.util.HashMap[String, ConfigEntry[_]]()) - private def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized { + private[sql] def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized { require(!sqlConfEntries.containsKey(entry.key), s"Duplicate SQLConfigEntry. ${entry.key} has been registered") sqlConfEntries.put(entry.key, entry) @@ -326,18 +326,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - // This is used to control the when we will split a schema's JSON string to multiple pieces - // in order to fit the JSON string in metastore's table property (by default, the value has - // a length restriction of 4000 characters). We will split the JSON string of a schema - // to its length exceeds the threshold. - val SCHEMA_STRING_LENGTH_THRESHOLD = - SQLConfigBuilder("spark.sql.sources.schemaStringLengthThreshold") - .doc("The maximum length allowed in a single cell when " + - "storing additional schema information in Hive's metastore.") - .internal() - .intConf - .createWithDefault(4000) - val PARTITION_COLUMN_TYPE_INFERENCE = SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled") .doc("When true, automatically infer the data types for partitioned columns.") @@ -736,10 +724,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED) - // Do not use a value larger than 4000 as the default value of this property. - // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information. - def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD) - def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS) def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = @@ -886,3 +870,41 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { } } +/** + * Static SQL configuration is a cross-session, immutable Spark configuration. External users can + * see the static sql configs via `SparkSession.conf`, but can NOT set/unset them. + */ +object StaticSQLConf { + val globalConfKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[String]()) + + private def buildConf(key: String): ConfigBuilder = { + ConfigBuilder(key).onCreate { entry => + globalConfKeys.add(entry.key) + SQLConf.register(entry) + } + } + + val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation") + .internal() + .stringConf + .checkValues(Set("hive", "in-memory")) + .createWithDefault("in-memory") + + val GLOBAL_TEMP_DATABASE = buildConf("spark.sql.globalTempDatabase") + .internal() + .stringConf + .createWithDefault("global_temp") + + // This is used to control when we will split a schema's JSON string to multiple pieces + // in order to fit the JSON string in metastore's table property (by default, the value has + // a length restriction of 4000 characters, so do not use a value larger than 4000 as the default + // value of this property). We will split the JSON string of a schema to its length exceeds the + // threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session, + // that's why this conf has to be a static SQL conf. + val SCHEMA_STRING_LENGTH_THRESHOLD = buildConf("spark.sql.sources.schemaStringLengthThreshold") + .doc("The maximum length allowed in a single cell when " + + "storing additional schema information in Hive's metastore.") + .internal() + .intConf + .createWithDefault(4000) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index c555a43cd2..c6083b372a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager, InMemoryCatalog} import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} +import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.util.{MutableURLClassLoader, Utils} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 19885156cc..097dc24413 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -22,7 +22,6 @@ import java.io.File import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach -import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} @@ -31,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 3c60b233c2..f545de0e10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -19,11 +19,14 @@ package org.apache.spark.sql.internal import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext} +import org.apache.spark.sql._ import org.apache.spark.sql.execution.WholeStageCodegenExec +import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} class SQLConfSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + private val testKey = "test.key.0" private val testVal = "test.val.0" @@ -250,4 +253,22 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } } } + + test("global SQL conf comes from SparkConf") { + val newSession = SparkSession.builder() + .config(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000") + .getOrCreate() + + assert(newSession.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD.key) == "2000") + checkAnswer( + newSession.sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}"), + Row(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000")) + } + + test("cannot set/unset global SQL conf") { + val e1 = intercept[AnalysisException](sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}=10")) + assert(e1.message.contains("Cannot modify the value of a static config")) + val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key)) + assert(e2.message.contains("Cannot modify the value of a static config")) + } } |