diff options
author | Reynold Xin <rxin@databricks.com> | 2017-02-07 18:55:19 +0100 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2017-02-07 18:55:19 +0100 |
commit | b7277e03d1038e2a19495c0ef7707e2d77937ccf (patch) | |
tree | 2472f54a41aaa8fd47e27ae1641b57c1f0477141 /sql | |
parent | 7a7ce272fe9a703f58b0180a9d2001ecb5c4b8db (diff) | |
download | spark-b7277e03d1038e2a19495c0ef7707e2d77937ccf.tar.gz spark-b7277e03d1038e2a19495c0ef7707e2d77937ccf.tar.bz2 spark-b7277e03d1038e2a19495c0ef7707e2d77937ccf.zip |
[SPARK-19495][SQL] Make SQLConf slightly more extensible
## What changes were proposed in this pull request?
This pull request makes SQLConf slightly more extensible by removing the visibility limitations on the build* functions.
## How was this patch tested?
N/A - there are no logic changes and everything should be covered by existing unit tests.
Author: Reynold Xin <rxin@databricks.com>
Closes #16835 from rxin/SPARK-19495.
Diffstat (limited to 'sql')
5 files changed, 139 insertions, 137 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala index edfcd7d56d..b352e332bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import org.apache.spark.annotation.InterfaceStability import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.internal.SQLConf /** @@ -140,7 +140,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { } private def requireNonStaticConf(key: String): Unit = { - if (StaticSQLConf.globalConfKeys.contains(key)) { + if (SQLConf.staticConfKeys.contains(key)) { throw new AnalysisException(s"Cannot modify the value of a static config: $key") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5ba4192512..8c77da1763 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -44,7 +44,10 @@ object SQLConf { private val sqlConfEntries = java.util.Collections.synchronizedMap( new java.util.HashMap[String, ConfigEntry[_]]()) - private[sql] def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized { + val staticConfKeys: java.util.Set[String] = + java.util.Collections.synchronizedSet(new java.util.HashSet[String]()) + + private def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized { require(!sqlConfEntries.containsKey(entry.key), s"Duplicate SQLConfigEntry. ${entry.key} has been registered") sqlConfEntries.put(entry.key, entry) @@ -55,33 +58,36 @@ object SQLConf { sqlConfEntries.remove(entry.key) } - private[sql] object SQLConfigBuilder { - - def apply(key: String): ConfigBuilder = new ConfigBuilder(key).onCreate(register) + def buildConf(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register) + def buildStaticConf(key: String): ConfigBuilder = { + ConfigBuilder(key).onCreate { entry => + staticConfKeys.add(entry.key) + SQLConf.register(entry) + } } - val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations") + val OPTIMIZER_MAX_ITERATIONS = buildConf("spark.sql.optimizer.maxIterations") .internal() .doc("The max number of iterations the optimizer and analyzer runs.") .intConf .createWithDefault(100) val OPTIMIZER_INSET_CONVERSION_THRESHOLD = - SQLConfigBuilder("spark.sql.optimizer.inSetConversionThreshold") + buildConf("spark.sql.optimizer.inSetConversionThreshold") .internal() .doc("The threshold of set size for InSet conversion.") .intConf .createWithDefault(10) - val COMPRESS_CACHED = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.compressed") + val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed") .internal() .doc("When set to true Spark SQL will automatically select a compression codec for each " + "column based on statistics of the data.") .booleanConf .createWithDefault(true) - val COLUMN_BATCH_SIZE = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.batchSize") + val COLUMN_BATCH_SIZE = buildConf("spark.sql.inMemoryColumnarStorage.batchSize") .internal() .doc("Controls the size of batches for columnar caching. Larger batch sizes can improve " + "memory utilization and compression, but risk OOMs when caching data.") @@ -89,19 +95,19 @@ object SQLConf { .createWithDefault(10000) val IN_MEMORY_PARTITION_PRUNING = - SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.partitionPruning") + buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning") .internal() .doc("When true, enable partition pruning for in-memory columnar tables.") .booleanConf .createWithDefault(true) - val PREFER_SORTMERGEJOIN = SQLConfigBuilder("spark.sql.join.preferSortMergeJoin") + val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin") .internal() .doc("When true, prefer sort merge join over shuffle hash join.") .booleanConf .createWithDefault(true) - val RADIX_SORT_ENABLED = SQLConfigBuilder("spark.sql.sort.enableRadixSort") + val RADIX_SORT_ENABLED = buildConf("spark.sql.sort.enableRadixSort") .internal() .doc("When true, enable use of radix sort when possible. Radix sort is much faster but " + "requires additional memory to be reserved up-front. The memory overhead may be " + @@ -109,7 +115,7 @@ object SQLConf { .booleanConf .createWithDefault(true) - val AUTO_BROADCASTJOIN_THRESHOLD = SQLConfigBuilder("spark.sql.autoBroadcastJoinThreshold") + val AUTO_BROADCASTJOIN_THRESHOLD = buildConf("spark.sql.autoBroadcastJoinThreshold") .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " + "nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " + "Note that currently statistics are only supported for Hive Metastore tables where the " + @@ -119,7 +125,7 @@ object SQLConf { .longConf .createWithDefault(10L * 1024 * 1024) - val LIMIT_SCALE_UP_FACTOR = SQLConfigBuilder("spark.sql.limit.scaleUpFactor") + val LIMIT_SCALE_UP_FACTOR = buildConf("spark.sql.limit.scaleUpFactor") .internal() .doc("Minimal increase rate in number of partitions between attempts when executing a take " + "on a query. Higher values lead to more partitions read. Lower values might lead to " + @@ -128,13 +134,13 @@ object SQLConf { .createWithDefault(4) val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = - SQLConfigBuilder("spark.sql.statistics.fallBackToHdfs") + buildConf("spark.sql.statistics.fallBackToHdfs") .doc("If the table statistics are not available from table metadata enable fall back to hdfs." + " This is useful in determining if a table is small enough to use auto broadcast joins.") .booleanConf .createWithDefault(false) - val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes") + val DEFAULT_SIZE_IN_BYTES = buildConf("spark.sql.defaultSizeInBytes") .internal() .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " + "which is larger than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. " + @@ -143,24 +149,24 @@ object SQLConf { .longConf .createWithDefault(Long.MaxValue) - val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions") + val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions") .doc("The default number of partitions to use when shuffling data for joins or aggregations.") .intConf .createWithDefault(200) val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = - SQLConfigBuilder("spark.sql.adaptive.shuffle.targetPostShuffleInputSize") + buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize") .doc("The target post-shuffle input size in bytes of a task.") .bytesConf(ByteUnit.BYTE) .createWithDefault(64 * 1024 * 1024) - val ADAPTIVE_EXECUTION_ENABLED = SQLConfigBuilder("spark.sql.adaptive.enabled") + val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") .doc("When true, enable adaptive query execution.") .booleanConf .createWithDefault(false) val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = - SQLConfigBuilder("spark.sql.adaptive.minNumPostShufflePartitions") + buildConf("spark.sql.adaptive.minNumPostShufflePartitions") .internal() .doc("The advisory minimal number of post-shuffle partitions provided to " + "ExchangeCoordinator. This setting is used in our test to make sure we " + @@ -171,27 +177,27 @@ object SQLConf { .createWithDefault(-1) val SUBEXPRESSION_ELIMINATION_ENABLED = - SQLConfigBuilder("spark.sql.subexpressionElimination.enabled") + buildConf("spark.sql.subexpressionElimination.enabled") .internal() .doc("When true, common subexpressions will be eliminated.") .booleanConf .createWithDefault(true) - val CASE_SENSITIVE = SQLConfigBuilder("spark.sql.caseSensitive") + val CASE_SENSITIVE = buildConf("spark.sql.caseSensitive") .internal() .doc("Whether the query analyzer should be case sensitive or not. " + "Default to case insensitive. It is highly discouraged to turn on case sensitive mode.") .booleanConf .createWithDefault(false) - val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema") + val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema") .doc("When true, the Parquet data source merges schemas collected from all data files, " + "otherwise the schema is picked from the summary file or a random data file " + "if no summary file is available.") .booleanConf .createWithDefault(false) - val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles") + val PARQUET_SCHEMA_RESPECT_SUMMARIES = buildConf("spark.sql.parquet.respectSummaryFiles") .doc("When true, we make assumption that all part-files of Parquet are consistent with " + "summary files and we will ignore them when merging schema. Otherwise, if this is " + "false, which is the default, we will merge all part-files. This should be considered " + @@ -199,7 +205,7 @@ object SQLConf { .booleanConf .createWithDefault(false) - val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString") + val PARQUET_BINARY_AS_STRING = buildConf("spark.sql.parquet.binaryAsString") .doc("Some other Parquet-producing systems, in particular Impala and older versions of " + "Spark SQL, do not differentiate between binary data and strings when writing out the " + "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " + @@ -207,7 +213,7 @@ object SQLConf { .booleanConf .createWithDefault(false) - val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp") + val PARQUET_INT96_AS_TIMESTAMP = buildConf("spark.sql.parquet.int96AsTimestamp") .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " + "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " + "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " + @@ -215,12 +221,12 @@ object SQLConf { .booleanConf .createWithDefault(true) - val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata") + val PARQUET_CACHE_METADATA = buildConf("spark.sql.parquet.cacheMetadata") .doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.") .booleanConf .createWithDefault(true) - val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec") + val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec") .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " + "uncompressed, snappy, gzip, lzo.") .stringConf @@ -228,18 +234,18 @@ object SQLConf { .checkValues(Set("uncompressed", "snappy", "gzip", "lzo")) .createWithDefault("snappy") - val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown") + val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown") .doc("Enables Parquet filter push-down optimization when set to true.") .booleanConf .createWithDefault(true) - val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat") + val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat") .doc("Whether to follow Parquet's format specification when converting Parquet schema to " + "Spark SQL schema and vice versa.") .booleanConf .createWithDefault(false) - val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class") + val PARQUET_OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.parquet.output.committer.class") .doc("The output committer class used by Parquet. The specified class needs to be a " + "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " + "of org.apache.parquet.hadoop.ParquetOutputCommitter.") @@ -248,24 +254,24 @@ object SQLConf { .createWithDefault(classOf[ParquetOutputCommitter].getName) val PARQUET_VECTORIZED_READER_ENABLED = - SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader") + buildConf("spark.sql.parquet.enableVectorizedReader") .doc("Enables vectorized parquet decoding.") .booleanConf .createWithDefault(true) - val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown") + val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf .createWithDefault(false) - val HIVE_VERIFY_PARTITION_PATH = SQLConfigBuilder("spark.sql.hive.verifyPartitionPath") + val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") .doc("When true, check all the partition paths under the table\'s root directory " + "when reading data stored in HDFS.") .booleanConf .createWithDefault(false) val HIVE_METASTORE_PARTITION_PRUNING = - SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning") + buildConf("spark.sql.hive.metastorePartitionPruning") .doc("When true, some predicates will be pushed down into the Hive metastore so that " + "unmatching partitions can be eliminated earlier. This only affects Hive tables " + "not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " + @@ -274,7 +280,7 @@ object SQLConf { .createWithDefault(true) val HIVE_MANAGE_FILESOURCE_PARTITIONS = - SQLConfigBuilder("spark.sql.hive.manageFilesourcePartitions") + buildConf("spark.sql.hive.manageFilesourcePartitions") .doc("When true, enable metastore partition management for file source tables as well. " + "This includes both datasource and converted Hive tables. When partition managment " + "is enabled, datasource tables store partition in the Hive metastore, and use the " + @@ -283,14 +289,14 @@ object SQLConf { .createWithDefault(true) val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE = - SQLConfigBuilder("spark.sql.hive.filesourcePartitionFileCacheSize") + buildConf("spark.sql.hive.filesourcePartitionFileCacheSize") .doc("When nonzero, enable caching of partition file metadata in memory. All tables share " + "a cache that can use up to specified num bytes for file metadata. This conf only " + "has an effect when hive filesource partition management is enabled.") .longConf .createWithDefault(250 * 1024 * 1024) - val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly") + val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly") .doc("When true, enable the metadata-only query optimization that use the table's metadata " + "to produce the partition columns instead of table scans. It applies when all the columns " + "scanned are partition columns and the query has an aggregate operator that satisfies " + @@ -298,47 +304,47 @@ object SQLConf { .booleanConf .createWithDefault(true) - val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord") + val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord") .doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.") .stringConf .createWithDefault("_corrupt_record") - val BROADCAST_TIMEOUT = SQLConfigBuilder("spark.sql.broadcastTimeout") + val BROADCAST_TIMEOUT = buildConf("spark.sql.broadcastTimeout") .doc("Timeout in seconds for the broadcast wait time in broadcast joins.") .intConf .createWithDefault(5 * 60) // This is only used for the thriftserver - val THRIFTSERVER_POOL = SQLConfigBuilder("spark.sql.thriftserver.scheduler.pool") + val THRIFTSERVER_POOL = buildConf("spark.sql.thriftserver.scheduler.pool") .doc("Set a Fair Scheduler pool for a JDBC client session.") .stringConf .createOptional val THRIFTSERVER_INCREMENTAL_COLLECT = - SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect") + buildConf("spark.sql.thriftServer.incrementalCollect") .internal() .doc("When true, enable incremental collection for execution in Thrift Server.") .booleanConf .createWithDefault(false) val THRIFTSERVER_UI_STATEMENT_LIMIT = - SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements") + buildConf("spark.sql.thriftserver.ui.retainedStatements") .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.") .intConf .createWithDefault(200) - val THRIFTSERVER_UI_SESSION_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedSessions") + val THRIFTSERVER_UI_SESSION_LIMIT = buildConf("spark.sql.thriftserver.ui.retainedSessions") .doc("The number of SQL client sessions kept in the JDBC/ODBC web UI history.") .intConf .createWithDefault(200) // This is used to set the default data source - val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default") + val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default") .doc("The default data source to use in input/output.") .stringConf .createWithDefault("parquet") - val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS") + val CONVERT_CTAS = buildConf("spark.sql.hive.convertCTAS") .internal() .doc("When true, a table created by a Hive CTAS statement (no USING clause) " + "without specifying any storage property will be converted to a data source table, " + @@ -346,7 +352,7 @@ object SQLConf { .booleanConf .createWithDefault(false) - val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats") + val GATHER_FASTSTAT = buildConf("spark.sql.hive.gatherFastStats") .internal() .doc("When true, fast stats (number of files and total size of all files) will be gathered" + " in parallel while repairing table partitions to avoid the sequential listing in Hive" + @@ -355,29 +361,29 @@ object SQLConf { .createWithDefault(true) val PARTITION_COLUMN_TYPE_INFERENCE = - SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled") + buildConf("spark.sql.sources.partitionColumnTypeInference.enabled") .doc("When true, automatically infer the data types for partitioned columns.") .booleanConf .createWithDefault(true) - val BUCKETING_ENABLED = SQLConfigBuilder("spark.sql.sources.bucketing.enabled") + val BUCKETING_ENABLED = buildConf("spark.sql.sources.bucketing.enabled") .doc("When false, we will treat bucketed table as normal table") .booleanConf .createWithDefault(true) - val CROSS_JOINS_ENABLED = SQLConfigBuilder("spark.sql.crossJoin.enabled") + val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") .doc("When false, we will throw an error if a query contains a cartesian product without " + "explicit CROSS JOIN syntax.") .booleanConf .createWithDefault(false) - val ORDER_BY_ORDINAL = SQLConfigBuilder("spark.sql.orderByOrdinal") + val ORDER_BY_ORDINAL = buildConf("spark.sql.orderByOrdinal") .doc("When true, the ordinal numbers are treated as the position in the select list. " + "When false, the ordinal numbers in order/sort by clause are ignored.") .booleanConf .createWithDefault(true) - val GROUP_BY_ORDINAL = SQLConfigBuilder("spark.sql.groupByOrdinal") + val GROUP_BY_ORDINAL = buildConf("spark.sql.groupByOrdinal") .doc("When true, the ordinal numbers in group by clauses are treated as the position " + "in the select list. When false, the ordinal numbers are ignored.") .booleanConf @@ -386,16 +392,16 @@ object SQLConf { // The output committer class used by data sources. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. val OUTPUT_COMMITTER_CLASS = - SQLConfigBuilder("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional + buildConf("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional val FILE_COMMIT_PROTOCOL_CLASS = - SQLConfigBuilder("spark.sql.sources.commitProtocolClass") + buildConf("spark.sql.sources.commitProtocolClass") .internal() .stringConf .createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName) val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = - SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold") + buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold") .doc("The maximum number of files allowed for listing files at driver side. If the number " + "of detected files exceeds this value during partition discovery, it tries to list the " + "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " + @@ -404,7 +410,7 @@ object SQLConf { .createWithDefault(32) val PARALLEL_PARTITION_DISCOVERY_PARALLELISM = - SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism") + buildConf("spark.sql.sources.parallelPartitionDiscovery.parallelism") .doc("The number of parallelism to list a collection of path recursively, Set the " + "number to prevent file listing from generating too many tasks.") .internal() @@ -414,62 +420,62 @@ object SQLConf { // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231. val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = - SQLConfigBuilder("spark.sql.selfJoinAutoResolveAmbiguity") + buildConf("spark.sql.selfJoinAutoResolveAmbiguity") .internal() .booleanConf .createWithDefault(true) // Whether to retain group by columns or not in GroupedData.agg. - val DATAFRAME_RETAIN_GROUP_COLUMNS = SQLConfigBuilder("spark.sql.retainGroupColumns") + val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns") .internal() .booleanConf .createWithDefault(true) - val DATAFRAME_PIVOT_MAX_VALUES = SQLConfigBuilder("spark.sql.pivotMaxValues") + val DATAFRAME_PIVOT_MAX_VALUES = buildConf("spark.sql.pivotMaxValues") .doc("When doing a pivot without specifying values for the pivot column this is the maximum " + "number of (distinct) values that will be collected without error.") .intConf .createWithDefault(10000) - val RUN_SQL_ON_FILES = SQLConfigBuilder("spark.sql.runSQLOnFiles") + val RUN_SQL_ON_FILES = buildConf("spark.sql.runSQLOnFiles") .internal() .doc("When true, we could use `datasource`.`path` as table in SQL query.") .booleanConf .createWithDefault(true) - val WHOLESTAGE_CODEGEN_ENABLED = SQLConfigBuilder("spark.sql.codegen.wholeStage") + val WHOLESTAGE_CODEGEN_ENABLED = buildConf("spark.sql.codegen.wholeStage") .internal() .doc("When true, the whole stage (of multiple operators) will be compiled into single java" + " method.") .booleanConf .createWithDefault(true) - val WHOLESTAGE_MAX_NUM_FIELDS = SQLConfigBuilder("spark.sql.codegen.maxFields") + val WHOLESTAGE_MAX_NUM_FIELDS = buildConf("spark.sql.codegen.maxFields") .internal() .doc("The maximum number of fields (including nested fields) that will be supported before" + " deactivating whole-stage codegen.") .intConf .createWithDefault(100) - val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback") + val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback") .internal() .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + " fail to compile generated code") .booleanConf .createWithDefault(true) - val MAX_CASES_BRANCHES = SQLConfigBuilder("spark.sql.codegen.maxCaseBranches") + val MAX_CASES_BRANCHES = buildConf("spark.sql.codegen.maxCaseBranches") .internal() .doc("The maximum number of switches supported with codegen.") .intConf .createWithDefault(20) - val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes") + val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") .longConf .createWithDefault(128 * 1024 * 1024) // parquet.block.size - val FILES_OPEN_COST_IN_BYTES = SQLConfigBuilder("spark.sql.files.openCostInBytes") + val FILES_OPEN_COST_IN_BYTES = buildConf("spark.sql.files.openCostInBytes") .internal() .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" + " the same time. This is used when putting multiple files into a partition. It's better to" + @@ -478,46 +484,46 @@ object SQLConf { .longConf .createWithDefault(4 * 1024 * 1024) - val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles") + val IGNORE_CORRUPT_FILES = buildConf("spark.sql.files.ignoreCorruptFiles") .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " + "encountering corrupted or non-existing and contents that have been read will still be " + "returned.") .booleanConf .createWithDefault(false) - val MAX_RECORDS_PER_FILE = SQLConfigBuilder("spark.sql.files.maxRecordsPerFile") + val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile") .doc("Maximum number of records to write out to a single file. " + "If this value is zero or negative, there is no limit.") .longConf .createWithDefault(0) - val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse") + val EXCHANGE_REUSE_ENABLED = buildConf("spark.sql.exchange.reuse") .internal() .doc("When true, the planner will try to find out duplicated exchanges and re-use them.") .booleanConf .createWithDefault(true) val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT = - SQLConfigBuilder("spark.sql.streaming.stateStore.minDeltasForSnapshot") + buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot") .internal() .doc("Minimum number of state store delta files that needs to be generated before they " + "consolidated into snapshots.") .intConf .createWithDefault(10) - val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation") + val CHECKPOINT_LOCATION = buildConf("spark.sql.streaming.checkpointLocation") .doc("The default location for storing checkpoint data for streaming queries.") .stringConf .createOptional - val MIN_BATCHES_TO_RETAIN = SQLConfigBuilder("spark.sql.streaming.minBatchesToRetain") + val MIN_BATCHES_TO_RETAIN = buildConf("spark.sql.streaming.minBatchesToRetain") .internal() .doc("The minimum number of batches that must be retained and made recoverable.") .intConf .createWithDefault(100) val UNSUPPORTED_OPERATION_CHECK_ENABLED = - SQLConfigBuilder("spark.sql.streaming.unsupportedOperationCheck") + buildConf("spark.sql.streaming.unsupportedOperationCheck") .internal() .doc("When true, the logical plan for streaming query will be checked for unsupported" + " operations.") @@ -525,20 +531,20 @@ object SQLConf { .createWithDefault(true) val VARIABLE_SUBSTITUTE_ENABLED = - SQLConfigBuilder("spark.sql.variable.substitute") + buildConf("spark.sql.variable.substitute") .doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.") .booleanConf .createWithDefault(true) val VARIABLE_SUBSTITUTE_DEPTH = - SQLConfigBuilder("spark.sql.variable.substitute.depth") + buildConf("spark.sql.variable.substitute.depth") .internal() .doc("Deprecated: The maximum replacements the substitution engine will do.") .intConf .createWithDefault(40) val ENABLE_TWOLEVEL_AGG_MAP = - SQLConfigBuilder("spark.sql.codegen.aggregate.map.twolevel.enable") + buildConf("spark.sql.codegen.aggregate.map.twolevel.enable") .internal() .doc("Enable two-level aggregate hash map. When enabled, records will first be " + "inserted/looked-up at a 1st-level, small, fast map, and then fallback to a " + @@ -548,13 +554,13 @@ object SQLConf { .createWithDefault(true) val STREAMING_FILE_COMMIT_PROTOCOL_CLASS = - SQLConfigBuilder("spark.sql.streaming.commitProtocolClass") + buildConf("spark.sql.streaming.commitProtocolClass") .internal() .stringConf .createWithDefault(classOf[ManifestFileCommitProtocol].getName) val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD = - SQLConfigBuilder("spark.sql.objectHashAggregate.sortBased.fallbackThreshold") + buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold") .internal() .doc("In the case of ObjectHashAggregateExec, when the size of the in-memory hash map " + "grows too large, we will fall back to sort-based aggregation. This option sets a row " + @@ -565,20 +571,20 @@ object SQLConf { // percentile_approx). .createWithDefault(128) - val USE_OBJECT_HASH_AGG = SQLConfigBuilder("spark.sql.execution.useObjectHashAggregateExec") + val USE_OBJECT_HASH_AGG = buildConf("spark.sql.execution.useObjectHashAggregateExec") .internal() .doc("Decides if we use ObjectHashAggregateExec") .booleanConf .createWithDefault(true) - val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") + val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream sink.") .booleanConf .createWithDefault(true) val FILE_SINK_LOG_COMPACT_INTERVAL = - SQLConfigBuilder("spark.sql.streaming.fileSink.log.compactInterval") + buildConf("spark.sql.streaming.fileSink.log.compactInterval") .internal() .doc("Number of log files after which all the previous files " + "are compacted into the next log file.") @@ -586,20 +592,20 @@ object SQLConf { .createWithDefault(10) val FILE_SINK_LOG_CLEANUP_DELAY = - SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay") + buildConf("spark.sql.streaming.fileSink.log.cleanupDelay") .internal() .doc("How long that a file is guaranteed to be visible for all readers.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes - val FILE_SOURCE_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSource.log.deletion") + val FILE_SOURCE_LOG_DELETION = buildConf("spark.sql.streaming.fileSource.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream source.") .booleanConf .createWithDefault(true) val FILE_SOURCE_LOG_COMPACT_INTERVAL = - SQLConfigBuilder("spark.sql.streaming.fileSource.log.compactInterval") + buildConf("spark.sql.streaming.fileSource.log.compactInterval") .internal() .doc("Number of log files after which all the previous files " + "are compacted into the next log file.") @@ -607,47 +613,47 @@ object SQLConf { .createWithDefault(10) val FILE_SOURCE_LOG_CLEANUP_DELAY = - SQLConfigBuilder("spark.sql.streaming.fileSource.log.cleanupDelay") + buildConf("spark.sql.streaming.fileSource.log.cleanupDelay") .internal() .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes val STREAMING_SCHEMA_INFERENCE = - SQLConfigBuilder("spark.sql.streaming.schemaInference") + buildConf("spark.sql.streaming.schemaInference") .internal() .doc("Whether file-based streaming sources will infer its own schema") .booleanConf .createWithDefault(false) val STREAMING_POLLING_DELAY = - SQLConfigBuilder("spark.sql.streaming.pollingDelay") + buildConf("spark.sql.streaming.pollingDelay") .internal() .doc("How long to delay polling new data when no data is available") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL = - SQLConfigBuilder("spark.sql.streaming.noDataProgressEventInterval") + buildConf("spark.sql.streaming.noDataProgressEventInterval") .internal() .doc("How long to wait between two progress events when there is no data") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10000L) val STREAMING_METRICS_ENABLED = - SQLConfigBuilder("spark.sql.streaming.metricsEnabled") + buildConf("spark.sql.streaming.metricsEnabled") .doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.") .booleanConf .createWithDefault(false) val STREAMING_PROGRESS_RETENTION = - SQLConfigBuilder("spark.sql.streaming.numRecentProgressUpdates") + buildConf("spark.sql.streaming.numRecentProgressUpdates") .doc("The number of progress updates to retain for a streaming query") .intConf .createWithDefault(100) val NDV_MAX_ERROR = - SQLConfigBuilder("spark.sql.statistics.ndv.maxError") + buildConf("spark.sql.statistics.ndv.maxError") .internal() .doc("The maximum estimation error allowed in HyperLogLog++ algorithm when generating " + "column level statistics.") @@ -655,13 +661,13 @@ object SQLConf { .createWithDefault(0.05) val CBO_ENABLED = - SQLConfigBuilder("spark.sql.cbo.enabled") + buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") .booleanConf .createWithDefault(false) val SESSION_LOCAL_TIMEZONE = - SQLConfigBuilder("spark.sql.session.timeZone") + buildConf("spark.sql.session.timeZone") .doc("""The ID of session local timezone, e.g. "GMT", "America/Los_Angeles", etc.""") .stringConf .createWithDefault(TimeZone.getDefault().getID()) @@ -995,27 +1001,21 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { * see the static sql configs via `SparkSession.conf`, but can NOT set/unset them. */ object StaticSQLConf { - val globalConfKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[String]()) - private def buildConf(key: String): ConfigBuilder = { - ConfigBuilder(key).onCreate { entry => - globalConfKeys.add(entry.key) - SQLConf.register(entry) - } - } + import SQLConf.buildStaticConf - val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir") + val WAREHOUSE_PATH = buildStaticConf("spark.sql.warehouse.dir") .doc("The default location for managed databases and tables.") .stringConf .createWithDefault(Utils.resolveURI("spark-warehouse").toString) - val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation") + val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") .internal() .stringConf .checkValues(Set("hive", "in-memory")) .createWithDefault("in-memory") - val GLOBAL_TEMP_DATABASE = buildConf("spark.sql.globalTempDatabase") + val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase") .internal() .stringConf .createWithDefault("global_temp") @@ -1026,25 +1026,27 @@ object StaticSQLConf { // value of this property). We will split the JSON string of a schema to its length exceeds the // threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session, // that's why this conf has to be a static SQL conf. - val SCHEMA_STRING_LENGTH_THRESHOLD = buildConf("spark.sql.sources.schemaStringLengthThreshold") - .doc("The maximum length allowed in a single cell when " + - "storing additional schema information in Hive's metastore.") - .internal() - .intConf - .createWithDefault(4000) + val SCHEMA_STRING_LENGTH_THRESHOLD = + buildStaticConf("spark.sql.sources.schemaStringLengthThreshold") + .doc("The maximum length allowed in a single cell when " + + "storing additional schema information in Hive's metastore.") + .internal() + .intConf + .createWithDefault(4000) // When enabling the debug, Spark SQL internal table properties are not filtered out; however, // some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly. - val DEBUG_MODE = buildConf("spark.sql.debug") + val DEBUG_MODE = buildStaticConf("spark.sql.debug") .internal() .doc("Only used for internal debugging. Not all functions are supported when it is enabled.") .booleanConf .createWithDefault(false) - val HIVE_THRIFT_SERVER_SINGLESESSION = buildConf("spark.sql.hive.thriftServer.singleSession") - .doc("When set to true, Hive Thrift server is running in a single session mode. " + - "All the JDBC/ODBC connections share the temporary views, function registries, " + - "SQL configuration and the current database.") - .booleanConf - .createWithDefault(false) + val HIVE_THRIFT_SERVER_SINGLESESSION = + buildStaticConf("spark.sql.hive.thriftServer.singleSession") + .doc("When set to true, Hive Thrift server is running in a single session mode. " + + "All the JDBC/ODBC connections share the temporary views, function registries, " + + "SQL configuration and the current database.") + .booleanConf + .createWithDefault(false) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index a77f920598..9c95b12795 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -999,7 +999,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-19218 `SET -v` should not fail with null value configuration") { import SQLConf._ - val confEntry = SQLConfigBuilder("spark.test").doc("doc").stringConf.createWithDefault(null) + val confEntry = buildConf("spark.test").doc("doc").stringConf.createWithDefault(null) try { val result = sql("SET -v").collect() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala index 95bfd05c1f..6c12f0ff7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala @@ -26,7 +26,7 @@ class SQLConfEntrySuite extends SparkFunSuite { test("intConf") { val key = "spark.sql.SQLConfEntrySuite.int" - val confEntry = SQLConfigBuilder(key).intConf.createWithDefault(1) + val confEntry = buildConf(key).intConf.createWithDefault(1) assert(conf.getConf(confEntry, 5) === 5) conf.setConf(confEntry, 10) @@ -45,7 +45,7 @@ class SQLConfEntrySuite extends SparkFunSuite { test("longConf") { val key = "spark.sql.SQLConfEntrySuite.long" - val confEntry = SQLConfigBuilder(key).longConf.createWithDefault(1L) + val confEntry = buildConf(key).longConf.createWithDefault(1L) assert(conf.getConf(confEntry, 5L) === 5L) conf.setConf(confEntry, 10L) @@ -64,7 +64,7 @@ class SQLConfEntrySuite extends SparkFunSuite { test("booleanConf") { val key = "spark.sql.SQLConfEntrySuite.boolean" - val confEntry = SQLConfigBuilder(key).booleanConf.createWithDefault(true) + val confEntry = buildConf(key).booleanConf.createWithDefault(true) assert(conf.getConf(confEntry, false) === false) conf.setConf(confEntry, true) @@ -83,7 +83,7 @@ class SQLConfEntrySuite extends SparkFunSuite { test("doubleConf") { val key = "spark.sql.SQLConfEntrySuite.double" - val confEntry = SQLConfigBuilder(key).doubleConf.createWithDefault(1d) + val confEntry = buildConf(key).doubleConf.createWithDefault(1d) assert(conf.getConf(confEntry, 5.0) === 5.0) conf.setConf(confEntry, 10.0) @@ -102,7 +102,7 @@ class SQLConfEntrySuite extends SparkFunSuite { test("stringConf") { val key = "spark.sql.SQLConfEntrySuite.string" - val confEntry = SQLConfigBuilder(key).stringConf.createWithDefault(null) + val confEntry = buildConf(key).stringConf.createWithDefault(null) assert(conf.getConf(confEntry, "abc") === "abc") conf.setConf(confEntry, "abcd") @@ -116,7 +116,7 @@ class SQLConfEntrySuite extends SparkFunSuite { test("enumConf") { val key = "spark.sql.SQLConfEntrySuite.enum" - val confEntry = SQLConfigBuilder(key) + val confEntry = buildConf(key) .stringConf .checkValues(Set("a", "b", "c")) .createWithDefault("a") @@ -138,7 +138,7 @@ class SQLConfEntrySuite extends SparkFunSuite { test("stringSeqConf") { val key = "spark.sql.SQLConfEntrySuite.stringSeq" - val confEntry = SQLConfigBuilder(key) + val confEntry = buildConf(key) .stringConf .toSequence .createWithDefault(Nil) @@ -155,7 +155,7 @@ class SQLConfEntrySuite extends SparkFunSuite { test("optionalConf") { val key = "spark.sql.SQLConfEntrySuite.optional" - val confEntry = SQLConfigBuilder(key) + val confEntry = buildConf(key) .stringConf .createOptional @@ -166,9 +166,9 @@ class SQLConfEntrySuite extends SparkFunSuite { test("duplicate entry") { val key = "spark.sql.SQLConfEntrySuite.duplicate" - SQLConfigBuilder(key).stringConf.createOptional + buildConf(key).stringConf.createOptional intercept[IllegalArgumentException] { - SQLConfigBuilder(key).stringConf.createOptional + buildConf(key).stringConf.createOptional } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 2822a55e3d..30abc62803 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -69,18 +69,18 @@ private[spark] object HiveUtils extends Logging { */ val hiveTypeString: String = "HIVE_TYPE_STRING" - val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version") + val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.") .stringConf .createWithDefault(hiveExecutionVersion) - val HIVE_EXECUTION_VERSION = SQLConfigBuilder("spark.sql.hive.version") + val HIVE_EXECUTION_VERSION = buildConf("spark.sql.hive.version") .doc("Version of Hive used internally by Spark SQL.") .stringConf .createWithDefault(hiveExecutionVersion) - val HIVE_METASTORE_JARS = SQLConfigBuilder("spark.sql.hive.metastore.jars") + val HIVE_METASTORE_JARS = buildConf("spark.sql.hive.metastore.jars") .doc(s""" | Location of the jars that should be used to instantiate the HiveMetastoreClient. | This property can be one of three options: " @@ -96,28 +96,28 @@ private[spark] object HiveUtils extends Logging { .stringConf .createWithDefault("builtin") - val CONVERT_METASTORE_PARQUET = SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet") + val CONVERT_METASTORE_PARQUET = buildConf("spark.sql.hive.convertMetastoreParquet") .doc("When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " + "the built in support.") .booleanConf .createWithDefault(true) val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING = - SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet.mergeSchema") + buildConf("spark.sql.hive.convertMetastoreParquet.mergeSchema") .doc("When true, also tries to merge possibly different but compatible Parquet schemas in " + "different Parquet data files. This configuration is only effective " + "when \"spark.sql.hive.convertMetastoreParquet\" is true.") .booleanConf .createWithDefault(false) - val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc") + val CONVERT_METASTORE_ORC = buildConf("spark.sql.hive.convertMetastoreOrc") .internal() .doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " + "the built in support.") .booleanConf .createWithDefault(false) - val HIVE_METASTORE_SHARED_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.sharedPrefixes") + val HIVE_METASTORE_SHARED_PREFIXES = buildConf("spark.sql.hive.metastore.sharedPrefixes") .doc("A comma separated list of class prefixes that should be loaded using the classloader " + "that is shared between Spark SQL and a specific version of Hive. An example of classes " + "that should be shared is JDBC drivers that are needed to talk to the metastore. Other " + @@ -130,7 +130,7 @@ private[spark] object HiveUtils extends Logging { private def jdbcPrefixes = Seq( "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc") - val HIVE_METASTORE_BARRIER_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.barrierPrefixes") + val HIVE_METASTORE_BARRIER_PREFIXES = buildConf("spark.sql.hive.metastore.barrierPrefixes") .doc("A comma separated list of class prefixes that should explicitly be reloaded for each " + "version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are " + "declared in a prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>).") @@ -138,7 +138,7 @@ private[spark] object HiveUtils extends Logging { .toSequence .createWithDefault(Nil) - val HIVE_THRIFT_SERVER_ASYNC = SQLConfigBuilder("spark.sql.hive.thriftServer.async") + val HIVE_THRIFT_SERVER_ASYNC = buildConf("spark.sql.hive.thriftServer.async") .doc("When set to true, Hive Thrift server executes SQL queries in an asynchronous way.") .booleanConf .createWithDefault(true) |