aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-10-11 20:27:08 -0700
committergatorsmile <gatorsmile@gmail.com>2016-10-11 20:27:08 -0700
commitb9a147181d5e38d9abed0c7215f4c5cb695f579c (patch)
treee2c3bda2b680a67b914b2fc737bbafb0350efb93
parent5b77e66dd6a128c5992ab3bde418613f84be7009 (diff)
downloadspark-b9a147181d5e38d9abed0c7215f4c5cb695f579c.tar.gz
spark-b9a147181d5e38d9abed0c7215f4c5cb695f579c.tar.bz2
spark-b9a147181d5e38d9abed0c7215f4c5cb695f579c.zip
[SPARK-17720][SQL] introduce static SQL conf
## What changes were proposed in this pull request? SQLConf is session-scoped and mutable. However, we do have the requirement for a static SQL conf, which is global and immutable, e.g. the `schemaStringThreshold` in `HiveExternalCatalog`, the flag to enable/disable hive support, the global temp view database in https://github.com/apache/spark/pull/14897. Actually we've already implemented static SQL conf implicitly via `SparkConf`, this PR just make it explicit and expose it to users, so that they can see the config value via SQL command or `SparkSession.conf`, and forbid users to set/unset static SQL conf. ## How was this patch tested? new tests in SQLConfSuite Author: Wenchen Fan <wenchen@databricks.com> Closes #15295 from cloud-fan/global-conf.
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R2
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala14
-rw-r--r--python/pyspark/sql/session.py2
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala2
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala56
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala23
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala45
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala2
18 files changed, 111 insertions, 78 deletions
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 6d8cfad5c1..61554248ee 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2609,7 +2609,7 @@ test_that("enableHiveSupport on SparkSession", {
unsetHiveContext()
# if we are still here, it must be built with hive
conf <- callJMethod(sparkSession, "conf")
- value <- callJMethod(conf, "get", "spark.sql.catalogImplementation", "")
+ value <- callJMethod(conf, "get", "spark.sql.catalogImplementation")
expect_equal(value, "hive")
})
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 0896e68eca..5a710158db 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -91,20 +91,6 @@ package object config {
.toSequence
.createWithDefault(Nil)
- // Note: This is a SQL config but needs to be in core because the REPL depends on it
- private[spark] val CATALOG_IMPLEMENTATION = ConfigBuilder("spark.sql.catalogImplementation")
- .internal()
- .stringConf
- .checkValues(Set("hive", "in-memory"))
- .createWithDefault("in-memory")
-
- // Note: This is a SQL config but needs to be in core because it's cross-session and can not put
- // in SQLConf.
- private[spark] val GLOBAL_TEMP_DATABASE = ConfigBuilder("spark.sql.globalTempDatabase")
- .internal()
- .stringConf
- .createWithDefault("global_temp")
-
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 8418abf99c..1e40b9c39f 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -176,7 +176,7 @@ class SparkSession(object):
sc._conf.set(key, value)
session = SparkSession(sc)
for key, value in self._options.items():
- session.conf.set(key, value)
+ session._jsparkSession.sessionState().conf().setConfString(key, value)
for key, value in self._options.items():
session.sparkContext._conf.set(key, value)
return session
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 5dfe18ad49..fec4d49379 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -22,9 +22,9 @@ import java.io.File
import scala.tools.nsc.GenericRunnerSettings
import org.apache.spark._
-import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.Utils
object Main extends Logging {
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index f7d7a4f041..9262e938c2 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -24,8 +24,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.log4j.{Level, LogManager}
import org.apache.spark.{SparkContext, SparkFunSuite}
-import org.apache.spark.internal.config._
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.Utils
class ReplSuite extends SparkFunSuite {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 5863c6a71c..fe41c41a6e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -63,7 +62,7 @@ class SessionCatalog(
conf: CatalystConf) {
this(
externalCatalog,
- new GlobalTempViewManager(GLOBAL_TEMP_DATABASE.defaultValueString),
+ new GlobalTempViewManager("global_temp"),
DummyFunctionResourceLoader,
functionRegistry,
conf,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
index c2baa74ed7..9108d19d0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
/**
@@ -38,6 +38,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
* @since 2.0.0
*/
def set(key: String, value: String): Unit = {
+ requireNonStaticConf(key)
sqlConf.setConfString(key, value)
}
@@ -47,6 +48,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
* @since 2.0.0
*/
def set(key: String, value: Boolean): Unit = {
+ requireNonStaticConf(key)
set(key, value.toString)
}
@@ -56,6 +58,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
* @since 2.0.0
*/
def set(key: String, value: Long): Unit = {
+ requireNonStaticConf(key)
set(key, value.toString)
}
@@ -124,6 +127,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
* @since 2.0.0
*/
def unset(key: String): Unit = {
+ requireNonStaticConf(key)
sqlConf.unsetConf(key)
}
@@ -134,4 +138,9 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
sqlConf.contains(key)
}
+ private def requireNonStaticConf(key: String): Unit = {
+ if (StaticSQLConf.globalConfKeys.contains(key)) {
+ throw new AnalysisException(s"Cannot modify the value of a static config: $key")
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index d26eea5072..137c426b4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -29,7 +29,6 @@ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, InterfaceStability}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalog.Catalog
@@ -41,6 +40,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{DataType, LongType, StructType}
@@ -812,7 +812,7 @@ object SparkSession {
// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
- options.foreach { case (k, v) => session.conf.set(k, v) }
+ options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
if (options.nonEmpty) {
logWarning("Use an existing SparkSession, some configuration may not take effect.")
}
@@ -824,7 +824,7 @@ object SparkSession {
// If the current thread does not have an active session, get it from the global session.
session = defaultSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
- options.foreach { case (k, v) => session.conf.set(k, v) }
+ options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
if (options.nonEmpty) {
logWarning("Use an existing SparkSession, some configuration may not take effect.")
}
@@ -850,7 +850,7 @@ object SparkSession {
sc
}
session = new SparkSession(sparkContext)
- options.foreach { case (k, v) => session.conf.set(k, v) }
+ options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
defaultSession.set(session)
// Register a successfully instantiated context to the singleton. This should be at the
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 7d8ea03a27..9de6510c63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -28,11 +28,11 @@ import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.r.SerDe
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.execution.command.ShowTablesCommand
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types._
private[sql] object SQLUtils extends Logging {
@@ -64,7 +64,7 @@ private[sql] object SQLUtils extends Logging {
spark: SparkSession,
sparkConfigMap: JMap[Object, Object]): Unit = {
for ((name, value) <- sparkConfigMap.asScala) {
- spark.conf.set(name.toString, value.toString)
+ spark.sessionState.conf.setConfString(name.toString, value.toString)
}
for ((name, value) <- sparkConfigMap.asScala) {
spark.sparkContext.conf.set(name.toString, value.toString)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fecdf792fd..8cbfc4c762 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -41,7 +41,7 @@ object SQLConf {
private val sqlConfEntries = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, ConfigEntry[_]]())
- private def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
+ private[sql] def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
require(!sqlConfEntries.containsKey(entry.key),
s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
sqlConfEntries.put(entry.key, entry)
@@ -326,18 +326,6 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- // This is used to control the when we will split a schema's JSON string to multiple pieces
- // in order to fit the JSON string in metastore's table property (by default, the value has
- // a length restriction of 4000 characters). We will split the JSON string of a schema
- // to its length exceeds the threshold.
- val SCHEMA_STRING_LENGTH_THRESHOLD =
- SQLConfigBuilder("spark.sql.sources.schemaStringLengthThreshold")
- .doc("The maximum length allowed in a single cell when " +
- "storing additional schema information in Hive's metastore.")
- .internal()
- .intConf
- .createWithDefault(4000)
-
val PARTITION_COLUMN_TYPE_INFERENCE =
SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
.doc("When true, automatically infer the data types for partitioned columns.")
@@ -736,10 +724,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
- // Do not use a value larger than 4000 as the default value of this property.
- // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
- def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
-
def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS)
def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
@@ -886,3 +870,41 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
}
}
+/**
+ * Static SQL configuration is a cross-session, immutable Spark configuration. External users can
+ * see the static sql configs via `SparkSession.conf`, but can NOT set/unset them.
+ */
+object StaticSQLConf {
+ val globalConfKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[String]())
+
+ private def buildConf(key: String): ConfigBuilder = {
+ ConfigBuilder(key).onCreate { entry =>
+ globalConfKeys.add(entry.key)
+ SQLConf.register(entry)
+ }
+ }
+
+ val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation")
+ .internal()
+ .stringConf
+ .checkValues(Set("hive", "in-memory"))
+ .createWithDefault("in-memory")
+
+ val GLOBAL_TEMP_DATABASE = buildConf("spark.sql.globalTempDatabase")
+ .internal()
+ .stringConf
+ .createWithDefault("global_temp")
+
+ // This is used to control when we will split a schema's JSON string to multiple pieces
+ // in order to fit the JSON string in metastore's table property (by default, the value has
+ // a length restriction of 4000 characters, so do not use a value larger than 4000 as the default
+ // value of this property). We will split the JSON string of a schema to its length exceeds the
+ // threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session,
+ // that's why this conf has to be a static SQL conf.
+ val SCHEMA_STRING_LENGTH_THRESHOLD = buildConf("spark.sql.sources.schemaStringLengthThreshold")
+ .doc("The maximum length allowed in a single cell when " +
+ "storing additional schema information in Hive's metastore.")
+ .internal()
+ .intConf
+ .createWithDefault(4000)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index c555a43cd2..c6083b372a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
+import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.util.{MutableURLClassLoader, Utils}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 19885156cc..097dc24413 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
-import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
@@ -31,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 3c60b233c2..f545de0e10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -19,11 +19,14 @@ package org.apache.spark.sql.internal
import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext}
+import org.apache.spark.sql._
import org.apache.spark.sql.execution.WholeStageCodegenExec
+import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
class SQLConfSuite extends QueryTest with SharedSQLContext {
+ import testImplicits._
+
private val testKey = "test.key.0"
private val testVal = "test.val.0"
@@ -250,4 +253,22 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
}
}
}
+
+ test("global SQL conf comes from SparkConf") {
+ val newSession = SparkSession.builder()
+ .config(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000")
+ .getOrCreate()
+
+ assert(newSession.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD.key) == "2000")
+ checkAnswer(
+ newSession.sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}"),
+ Row(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000"))
+ }
+
+ test("cannot set/unset global SQL conf") {
+ val e1 = intercept[AnalysisException](sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}=10"))
+ assert(e1.message.contains("Cannot modify the value of a static config"))
+ val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key))
+ assert(e2.message.contains("Cannot modify the value of a static config"))
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 261cc6feff..e1c0cad907 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.{DataType, StructType}
@@ -201,11 +202,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
- // TODO: the threshold should be set by `spark.sql.sources.schemaStringLengthThreshold`,
- // however the current SQLConf is session isolated, which is not applicable to external
- // catalog. We should re-enable this conf instead of hard code the value here, after we have
- // global SQLConf.
- val threshold = 4000
+ val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = tableDefinition.schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 39d71e164b..a5ef8723c8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.language.implicitConversions
@@ -36,11 +35,11 @@ import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.sql._
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 163f210802..6eb571b91f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -30,7 +30,6 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
@@ -40,6 +39,7 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.internal.{SharedState, SQLConf}
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.{ShutdownHookManager, Utils}
// SPARK-3729: Test key required to check for initialization errors with config.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 51670649ad..0477122fc6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -26,12 +26,12 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -699,28 +699,27 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
test("SPARK-6024 wide schema support") {
- withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") {
- withTable("wide_schema") {
- withTempDir { tempDir =>
- // We will need 80 splits for this schema if the threshold is 4000.
- val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType)))
-
- val tableDesc = CatalogTable(
- identifier = TableIdentifier("wide_schema"),
- tableType = CatalogTableType.EXTERNAL,
- storage = CatalogStorageFormat.empty.copy(
- properties = Map("path" -> tempDir.getCanonicalPath)
- ),
- schema = schema,
- provider = Some("json")
- )
- spark.sessionState.catalog.createTable(tableDesc, ignoreIfExists = false)
-
- sessionState.refreshTable("wide_schema")
-
- val actualSchema = table("wide_schema").schema
- assert(schema === actualSchema)
- }
+ assert(spark.sparkContext.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) == 4000)
+ withTable("wide_schema") {
+ withTempDir { tempDir =>
+ // We will need 80 splits for this schema if the threshold is 4000.
+ val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType)))
+
+ val tableDesc = CatalogTable(
+ identifier = TableIdentifier("wide_schema"),
+ tableType = CatalogTableType.EXTERNAL,
+ storage = CatalogStorageFormat.empty.copy(
+ properties = Map("path" -> tempDir.getCanonicalPath)
+ ),
+ schema = schema,
+ provider = Some("json")
+ )
+ spark.sessionState.catalog.createTable(tableDesc, ignoreIfExists = false)
+
+ sessionState.refreshTable("wide_schema")
+
+ val actualSchema = table("wide_schema").schema
+ assert(schema === actualSchema)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 751e976c7b..8bff6de008 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
-import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
@@ -32,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
class HiveDDLSuite