aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala771
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala96
5 files changed, 420 insertions, 495 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 587ba1ea05..1c9cb79ba4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst._
@@ -41,7 +42,6 @@ import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.internal.{SessionState, SQLConf}
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -138,7 +138,7 @@ class SQLContext private[sql](
def setConf(props: Properties): Unit = conf.setConf(props)
/** Set the given Spark SQL configuration property. */
- private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = conf.setConf(entry, value)
+ private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = conf.setConf(entry, value)
/**
* Set the given Spark SQL configuration property.
@@ -158,16 +158,16 @@ class SQLContext private[sql](
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue` in [[SQLConfEntry]].
+ * yet, return `defaultValue` in [[ConfigEntry]].
*/
- private[sql] def getConf[T](entry: SQLConfEntry[T]): T = conf.getConf(entry)
+ private[sql] def getConf[T](entry: ConfigEntry[T]): T = conf.getConf(entry)
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+ * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
* desired one.
*/
- private[sql] def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+ private[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
conf.getConf(entry, defaultValue)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a7c0be63fc..927af89949 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -25,6 +25,8 @@ import scala.collection.immutable
import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.util.Utils
@@ -36,418 +38,305 @@ import org.apache.spark.util.Utils
object SQLConf {
private val sqlConfEntries = java.util.Collections.synchronizedMap(
- new java.util.HashMap[String, SQLConfEntry[_]]())
+ new java.util.HashMap[String, ConfigEntry[_]]())
- /**
- * An entry contains all meta information for a configuration.
- *
- * @param key the key for the configuration
- * @param defaultValue the default value for the configuration
- * @param valueConverter how to convert a string to the value. It should throw an exception if the
- * string does not have the required format.
- * @param stringConverter how to convert a value to a string that the user can use it as a valid
- * string value. It's usually `toString`. But sometimes, a custom converter
- * is necessary. E.g., if T is List[String], `a, b, c` is better than
- * `List(a, b, c)`.
- * @param doc the document for the configuration
- * @param isPublic if this configuration is public to the user. If it's `false`, this
- * configuration is only used internally and we should not expose it to the user.
- * @tparam T the value type
- */
- class SQLConfEntry[T] private(
- val key: String,
- val defaultValue: Option[T],
- val valueConverter: String => T,
- val stringConverter: T => String,
- val doc: String,
- val isPublic: Boolean) {
-
- def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("<undefined>")
-
- override def toString: String = {
- s"SQLConfEntry(key = $key, defaultValue=$defaultValueString, doc=$doc, isPublic = $isPublic)"
- }
+ private def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
+ require(!sqlConfEntries.containsKey(entry.key),
+ s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
+ sqlConfEntries.put(entry.key, entry)
}
- object SQLConfEntry {
-
- private def apply[T](
- key: String,
- defaultValue: Option[T],
- valueConverter: String => T,
- stringConverter: T => String,
- doc: String,
- isPublic: Boolean): SQLConfEntry[T] =
- sqlConfEntries.synchronized {
- if (sqlConfEntries.containsKey(key)) {
- throw new IllegalArgumentException(s"Duplicate SQLConfEntry. $key has been registered")
- }
- val entry =
- new SQLConfEntry[T](key, defaultValue, valueConverter, stringConverter, doc, isPublic)
- sqlConfEntries.put(key, entry)
- entry
- }
-
- def intConf(
- key: String,
- defaultValue: Option[Int] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Int] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toInt
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be int, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def longConf(
- key: String,
- defaultValue: Option[Long] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Long] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toLong
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be long, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def longMemConf(
- key: String,
- defaultValue: Option[Long] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Long] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toLong
- } catch {
- case _: NumberFormatException =>
- try {
- Utils.byteStringAsBytes(v)
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be long, but was $v")
- }
- }
- }, _.toString, doc, isPublic)
-
- def doubleConf(
- key: String,
- defaultValue: Option[Double] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Double] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toDouble
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be double, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def booleanConf(
- key: String,
- defaultValue: Option[Boolean] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Boolean] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toBoolean
- } catch {
- case _: IllegalArgumentException =>
- throw new IllegalArgumentException(s"$key should be boolean, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def stringConf(
- key: String,
- defaultValue: Option[String] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[String] =
- SQLConfEntry(key, defaultValue, v => v, v => v, doc, isPublic)
-
- def enumConf[T](
- key: String,
- valueConverter: String => T,
- validValues: Set[T],
- defaultValue: Option[T] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[T] =
- SQLConfEntry(key, defaultValue, v => {
- val _v = valueConverter(v)
- if (!validValues.contains(_v)) {
- throw new IllegalArgumentException(
- s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v")
- }
- _v
- }, _.toString, doc, isPublic)
-
- def seqConf[T](
- key: String,
- valueConverter: String => T,
- defaultValue: Option[Seq[T]] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Seq[T]] = {
- SQLConfEntry(
- key, defaultValue, _.split(",").map(valueConverter), _.mkString(","), doc, isPublic)
- }
+ private[sql] object SQLConfigBuilder {
- def stringSeqConf(
- key: String,
- defaultValue: Option[Seq[String]] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Seq[String]] = {
- seqConf(key, s => s, defaultValue, doc, isPublic)
- }
- }
+ def apply(key: String): ConfigBuilder = new ConfigBuilder(key).onCreate(register)
- import SQLConfEntry._
+ }
- val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts",
- defaultValue = Some(true),
- doc = "When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
+ val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
+ .doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
"through the constructor (new SQLContexts/HiveContexts created through newSession " +
"method is allowed). Please note that this conf needs to be set in Spark Conf. Once " +
"a SQLContext/HiveContext has been created, changing the value of this conf will not " +
- "have effect.",
- isPublic = true)
-
- val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
- defaultValue = Some(true),
- doc = "When set to true Spark SQL will automatically select a compression codec for each " +
- "column based on statistics of the data.",
- isPublic = false)
-
- val COLUMN_BATCH_SIZE = intConf("spark.sql.inMemoryColumnarStorage.batchSize",
- defaultValue = Some(10000),
- doc = "Controls the size of batches for columnar caching. Larger batch sizes can improve " +
- "memory utilization and compression, but risk OOMs when caching data.",
- isPublic = false)
+ "have effect.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val COMPRESS_CACHED = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.compressed")
+ .internal()
+ .doc("When set to true Spark SQL will automatically select a compression codec for each " +
+ "column based on statistics of the data.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val COLUMN_BATCH_SIZE = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.batchSize")
+ .internal()
+ .doc("Controls the size of batches for columnar caching. Larger batch sizes can improve " +
+ "memory utilization and compression, but risk OOMs when caching data.")
+ .intConf
+ .createWithDefault(10000)
val IN_MEMORY_PARTITION_PRUNING =
- booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning",
- defaultValue = Some(true),
- doc = "When true, enable partition pruning for in-memory columnar tables.",
- isPublic = false)
-
- val PREFER_SORTMERGEJOIN = booleanConf("spark.sql.join.preferSortMergeJoin",
- defaultValue = Some(true),
- doc = "When true, prefer sort merge join over shuffle hash join.",
- isPublic = false)
-
- val AUTO_BROADCASTJOIN_THRESHOLD = intConf("spark.sql.autoBroadcastJoinThreshold",
- defaultValue = Some(10 * 1024 * 1024),
- doc = "Configures the maximum size in bytes for a table that will be broadcast to all worker " +
+ SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.partitionPruning")
+ .internal()
+ .doc("When true, enable partition pruning for in-memory columnar tables.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PREFER_SORTMERGEJOIN = SQLConfigBuilder("spark.sql.join.preferSortMergeJoin")
+ .internal()
+ .doc("When true, prefer sort merge join over shuffle hash join.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val AUTO_BROADCASTJOIN_THRESHOLD = SQLConfigBuilder("spark.sql.autoBroadcastJoinThreshold")
+ .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
"nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
"Note that currently statistics are only supported for Hive Metastore tables where the " +
"command<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been run.")
+ .intConf
+ .createWithDefault(10 * 1024 * 1024)
- val DEFAULT_SIZE_IN_BYTES = longConf(
- "spark.sql.defaultSizeInBytes",
- doc = "The default table size used in query planning. By default, it is set to a larger " +
+ val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
+ .internal()
+ .doc("The default table size used in query planning. By default, it is set to a larger " +
"value than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. That is to say " +
"by default the optimizer will not choose to broadcast a table unless it knows for sure " +
- "its size is small enough.",
- isPublic = false)
+ "its size is small enough.")
+ .longConf
+ .createWithDefault(-1)
- val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions",
- defaultValue = Some(200),
- doc = "The default number of partitions to use when shuffling data for joins or aggregations.")
+ val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions")
+ .doc("The default number of partitions to use when shuffling data for joins or aggregations.")
+ .intConf
+ .createWithDefault(200)
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
- longMemConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize",
- defaultValue = Some(64 * 1024 * 1024),
- doc = "The target post-shuffle input size in bytes of a task.")
+ SQLConfigBuilder("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
+ .doc("The target post-shuffle input size in bytes of a task.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(64 * 1024 * 1024)
- val ADAPTIVE_EXECUTION_ENABLED = booleanConf("spark.sql.adaptive.enabled",
- defaultValue = Some(false),
- doc = "When true, enable adaptive query execution.")
+ val ADAPTIVE_EXECUTION_ENABLED = SQLConfigBuilder("spark.sql.adaptive.enabled")
+ .doc("When true, enable adaptive query execution.")
+ .booleanConf
+ .createWithDefault(false)
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
- intConf("spark.sql.adaptive.minNumPostShufflePartitions",
- defaultValue = Some(-1),
- doc = "The advisory minimal number of post-shuffle partitions provided to " +
+ SQLConfigBuilder("spark.sql.adaptive.minNumPostShufflePartitions")
+ .internal()
+ .doc("The advisory minimal number of post-shuffle partitions provided to " +
"ExchangeCoordinator. This setting is used in our test to make sure we " +
"have enough parallelism to expose issues that will not be exposed with a " +
"single partition. When the value is a non-positive value, this setting will " +
- "not be provided to ExchangeCoordinator.",
- isPublic = false)
-
- val SUBEXPRESSION_ELIMINATION_ENABLED = booleanConf("spark.sql.subexpressionElimination.enabled",
- defaultValue = Some(true),
- doc = "When true, common subexpressions will be eliminated.",
- isPublic = false)
-
- val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive",
- defaultValue = Some(true),
- doc = "Whether the query analyzer should be case sensitive or not.")
-
- val USE_FILE_SCAN = booleanConf("spark.sql.sources.fileScan",
- defaultValue = Some(true),
- doc = "Use the new FileScanRDD path for reading HDSF based data sources.",
- isPublic = false)
-
- val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
- defaultValue = Some(false),
- doc = "When true, the Parquet data source merges schemas collected from all data files, " +
- "otherwise the schema is picked from the summary file or a random data file " +
- "if no summary file is available.")
-
- val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles",
- defaultValue = Some(false),
- doc = "When true, we make assumption that all part-files of Parquet are consistent with " +
- "summary files and we will ignore them when merging schema. Otherwise, if this is " +
- "false, which is the default, we will merge all part-files. This should be considered " +
- "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
-
- val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
- defaultValue = Some(false),
- doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
+ "not be provided to ExchangeCoordinator.")
+ .intConf
+ .createWithDefault(-1)
+
+ val SUBEXPRESSION_ELIMINATION_ENABLED =
+ SQLConfigBuilder("spark.sql.subexpressionElimination.enabled")
+ .internal()
+ .doc("When true, common subexpressions will be eliminated.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val CASE_SENSITIVE = SQLConfigBuilder("spark.sql.caseSensitive")
+ .doc("Whether the query analyzer should be case sensitive or not.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val USE_FILE_SCAN = SQLConfigBuilder("spark.sql.sources.fileScan")
+ .internal()
+ .doc("Use the new FileScanRDD path for reading HDSF based data sources.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
+ .doc("When true, the Parquet data source merges schemas collected from all data files, " +
+ "otherwise the schema is picked from the summary file or a random data file " +
+ "if no summary file is available.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles")
+ .doc("When true, we make assumption that all part-files of Parquet are consistent with " +
+ "summary files and we will ignore them when merging schema. Otherwise, if this is " +
+ "false, which is the default, we will merge all part-files. This should be considered " +
+ "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString")
+ .doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
"Spark SQL, do not differentiate between binary data and strings when writing out the " +
"Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
"compatibility with these systems.")
+ .booleanConf
+ .createWithDefault(false)
- val PARQUET_INT96_AS_TIMESTAMP = booleanConf("spark.sql.parquet.int96AsTimestamp",
- defaultValue = Some(true),
- doc = "Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
+ val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp")
+ .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
"Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
"nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
"provide compatibility with these systems.")
+ .booleanConf
+ .createWithDefault(true)
- val PARQUET_CACHE_METADATA = booleanConf("spark.sql.parquet.cacheMetadata",
- defaultValue = Some(true),
- doc = "Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
+ val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
+ .doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
+ .booleanConf
+ .createWithDefault(true)
- val PARQUET_COMPRESSION = enumConf("spark.sql.parquet.compression.codec",
- valueConverter = v => v.toLowerCase,
- validValues = Set("uncompressed", "snappy", "gzip", "lzo"),
- defaultValue = Some("gzip"),
- doc = "Sets the compression codec use when writing Parquet files. Acceptable values include: " +
+ val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec")
+ .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
"uncompressed, snappy, gzip, lzo.")
-
- val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
- defaultValue = Some(true),
- doc = "Enables Parquet filter push-down optimization when set to true.")
-
- val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
- key = "spark.sql.parquet.writeLegacyFormat",
- defaultValue = Some(false),
- doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
+ .stringConf
+ .transform(_.toLowerCase())
+ .checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
+ .createWithDefault("gzip")
+
+ val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
+ .doc("Enables Parquet filter push-down optimization when set to true.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat")
+ .doc("Whether to follow Parquet's format specification when converting Parquet schema to " +
"Spark SQL schema and vice versa.")
+ .booleanConf
+ .createWithDefault(false)
- val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
- key = "spark.sql.parquet.output.committer.class",
- defaultValue = Some(classOf[ParquetOutputCommitter].getName),
- doc = "The output committer class used by Parquet. The specified class needs to be a " +
+ val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class")
+ .doc("The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
"option must be set in Hadoop Configuration. 2. This option overrides " +
"\"spark.sql.sources.outputCommitterClass\".")
-
- val PARQUET_VECTORIZED_READER_ENABLED = booleanConf(
- key = "spark.sql.parquet.enableVectorizedReader",
- defaultValue = Some(true),
- doc = "Enables vectorized parquet decoding.")
-
- val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
- defaultValue = Some(false),
- doc = "When true, enable filter pushdown for ORC files.")
-
- val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
- defaultValue = Some(false),
- doc = "When true, check all the partition paths under the table\'s root directory " +
- "when reading data stored in HDFS.")
-
- val HIVE_METASTORE_PARTITION_PRUNING = booleanConf("spark.sql.hive.metastorePartitionPruning",
- defaultValue = Some(false),
- doc = "When true, some predicates will be pushed down into the Hive metastore so that " +
- "unmatching partitions can be eliminated earlier.")
-
- val NATIVE_VIEW = booleanConf("spark.sql.nativeView",
- defaultValue = Some(true),
- doc = "When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
- "Note that this function is experimental and should ony be used when you are using " +
- "non-hive-compatible tables written by Spark SQL. The SQL string used to create " +
- "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
- "possible, or you may get wrong result.",
- isPublic = false)
-
- val CANONICAL_NATIVE_VIEW = booleanConf("spark.sql.nativeView.canonical",
- defaultValue = Some(true),
- doc = "When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
- "CREATE VIEW statement using SQL query string generated from view definition logical " +
- "plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
- "original native view implementation.",
- isPublic = false)
-
- val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
- defaultValue = Some("_corrupt_record"),
- doc = "The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
-
- val BROADCAST_TIMEOUT = intConf("spark.sql.broadcastTimeout",
- defaultValue = Some(5 * 60),
- doc = "Timeout in seconds for the broadcast wait time in broadcast joins.")
+ .stringConf
+ .createWithDefault(classOf[ParquetOutputCommitter].getName)
+
+ val PARQUET_VECTORIZED_READER_ENABLED =
+ SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
+ .doc("Enables vectorized parquet decoding.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown")
+ .doc("When true, enable filter pushdown for ORC files.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val HIVE_VERIFY_PARTITION_PATH = SQLConfigBuilder("spark.sql.hive.verifyPartitionPath")
+ .doc("When true, check all the partition paths under the table\'s root directory " +
+ "when reading data stored in HDFS.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val HIVE_METASTORE_PARTITION_PRUNING =
+ SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning")
+ .doc("When true, some predicates will be pushed down into the Hive metastore so that " +
+ "unmatching partitions can be eliminated earlier.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView")
+ .internal()
+ .doc("When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
+ "Note that this function is experimental and should ony be used when you are using " +
+ "non-hive-compatible tables written by Spark SQL. The SQL string used to create " +
+ "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
+ "possible, or you may get wrong result.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val CANONICAL_NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView.canonical")
+ .internal()
+ .doc("When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
+ "CREATE VIEW statement using SQL query string generated from view definition logical " +
+ "plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
+ "original native view implementation.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord")
+ .doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
+ .stringConf
+ .createWithDefault("_corrupt_record")
+
+ val BROADCAST_TIMEOUT = SQLConfigBuilder("spark.sql.broadcastTimeout")
+ .doc("Timeout in seconds for the broadcast wait time in broadcast joins.")
+ .intConf
+ .createWithDefault(5 * 60)
// This is only used for the thriftserver
- val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
- doc = "Set a Fair Scheduler pool for a JDBC client session.")
-
- val THRIFTSERVER_UI_STATEMENT_LIMIT = intConf("spark.sql.thriftserver.ui.retainedStatements",
- defaultValue = Some(200),
- doc = "The number of SQL statements kept in the JDBC/ODBC web UI history.")
-
- val THRIFTSERVER_UI_SESSION_LIMIT = intConf("spark.sql.thriftserver.ui.retainedSessions",
- defaultValue = Some(200),
- doc = "The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
+ val THRIFTSERVER_POOL = SQLConfigBuilder("spark.sql.thriftserver.scheduler.pool")
+ .doc("Set a Fair Scheduler pool for a JDBC client session.")
+ .stringConf
+ .createOptional
+
+ val THRIFTSERVER_UI_STATEMENT_LIMIT =
+ SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements")
+ .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
+ .intConf
+ .createWithDefault(200)
+
+ val THRIFTSERVER_UI_SESSION_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedSessions")
+ .doc("The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
+ .intConf
+ .createWithDefault(200)
// This is used to set the default data source
- val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
- defaultValue = Some("org.apache.spark.sql.parquet"),
- doc = "The default data source to use in input/output.")
+ val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
+ .doc("The default data source to use in input/output.")
+ .stringConf
+ .createWithDefault("org.apache.spark.sql.parquet")
// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
// to its length exceeds the threshold.
- val SCHEMA_STRING_LENGTH_THRESHOLD = intConf("spark.sql.sources.schemaStringLengthThreshold",
- defaultValue = Some(4000),
- doc = "The maximum length allowed in a single cell when " +
- "storing additional schema information in Hive's metastore.",
- isPublic = false)
-
- val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
- defaultValue = Some(true),
- doc = "When true, automatically discover data partitions.")
+ val SCHEMA_STRING_LENGTH_THRESHOLD =
+ SQLConfigBuilder("spark.sql.sources.schemaStringLengthThreshold")
+ .doc("The maximum length allowed in a single cell when " +
+ "storing additional schema information in Hive's metastore.")
+ .internal()
+ .intConf
+ .createWithDefault(4000)
+
+ val PARTITION_DISCOVERY_ENABLED = SQLConfigBuilder("spark.sql.sources.partitionDiscovery.enabled")
+ .doc("When true, automatically discover data partitions.")
+ .booleanConf
+ .createWithDefault(true)
val PARTITION_COLUMN_TYPE_INFERENCE =
- booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
- defaultValue = Some(true),
- doc = "When true, automatically infer the data types for partitioned columns.")
+ SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
+ .doc("When true, automatically infer the data types for partitioned columns.")
+ .booleanConf
+ .createWithDefault(true)
val PARTITION_MAX_FILES =
- intConf("spark.sql.sources.maxConcurrentWrites",
- defaultValue = Some(1),
- doc = "The maximum number of concurrent files to open before falling back on sorting when " +
+ SQLConfigBuilder("spark.sql.sources.maxConcurrentWrites")
+ .doc("The maximum number of concurrent files to open before falling back on sorting when " +
"writing out files using dynamic partitioning.")
-
- val BUCKETING_ENABLED = booleanConf("spark.sql.sources.bucketing.enabled",
- defaultValue = Some(true),
- doc = "When false, we will treat bucketed table as normal table.")
-
- val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal",
- defaultValue = Some(true),
- doc = "When true, the ordinal numbers are treated as the position in the select list. " +
- "When false, the ordinal numbers in order/sort By clause are ignored.")
-
- val GROUP_BY_ORDINAL = booleanConf("spark.sql.groupByOrdinal",
- defaultValue = Some(true),
- doc = "When true, the ordinal numbers in group by clauses are treated as the position " +
+ .intConf
+ .createWithDefault(1)
+
+ val BUCKETING_ENABLED = SQLConfigBuilder("spark.sql.sources.bucketing.enabled")
+ .doc("When false, we will treat bucketed table as normal table")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ORDER_BY_ORDINAL = SQLConfigBuilder("spark.sql.orderByOrdinal")
+ .doc("When true, the ordinal numbers are treated as the position in the select list. " +
+ "When false, the ordinal numbers in order/sort By clause are ignored.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val GROUP_BY_ORDINAL = SQLConfigBuilder("spark.sql.groupByOrdinal")
+ .doc("When true, the ordinal numbers in group by clauses are treated as the position " +
"in the select list. When false, the ordinal numbers are ignored.")
+ .booleanConf
+ .createWithDefault(true)
// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
@@ -457,89 +346,95 @@ object SQLConf {
// 1. Instead of SQLConf, this option *must be set in Hadoop Configuration*.
// 2. This option can be overridden by "spark.sql.parquet.output.committer.class".
val OUTPUT_COMMITTER_CLASS =
- stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
+ SQLConfigBuilder("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional
- val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = intConf(
- key = "spark.sql.sources.parallelPartitionDiscovery.threshold",
- defaultValue = Some(32),
- doc = "The degree of parallelism for schema merging and partition discovery of " +
- "Parquet data sources.")
+ val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
+ SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
+ .doc("The degree of parallelism for schema merging and partition discovery of " +
+ "Parquet data sources.")
+ .intConf
+ .createWithDefault(32)
// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
- val DATAFRAME_EAGER_ANALYSIS = booleanConf(
- "spark.sql.eagerAnalysis",
- defaultValue = Some(true),
- doc = "When true, eagerly applies query analysis on DataFrame operations.",
- isPublic = false)
+ val DATAFRAME_EAGER_ANALYSIS = SQLConfigBuilder("spark.sql.eagerAnalysis")
+ .internal()
+ .doc("When true, eagerly applies query analysis on DataFrame operations.")
+ .booleanConf
+ .createWithDefault(true)
// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
- val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = booleanConf(
- "spark.sql.selfJoinAutoResolveAmbiguity",
- defaultValue = Some(true),
- isPublic = false)
+ val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
+ SQLConfigBuilder("spark.sql.selfJoinAutoResolveAmbiguity")
+ .internal()
+ .booleanConf
+ .createWithDefault(true)
// Whether to retain group by columns or not in GroupedData.agg.
- val DATAFRAME_RETAIN_GROUP_COLUMNS = booleanConf(
- "spark.sql.retainGroupColumns",
- defaultValue = Some(true),
- isPublic = false)
-
- val DATAFRAME_PIVOT_MAX_VALUES = intConf(
- "spark.sql.pivotMaxValues",
- defaultValue = Some(10000),
- doc = "When doing a pivot without specifying values for the pivot column this is the maximum " +
- "number of (distinct) values that will be collected without error."
- )
-
- val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles",
- defaultValue = Some(true),
- isPublic = false,
- doc = "When true, we could use `datasource`.`path` as table in SQL query."
- )
-
- val WHOLESTAGE_CODEGEN_ENABLED = booleanConf("spark.sql.codegen.wholeStage",
- defaultValue = Some(true),
- doc = "When true, the whole stage (of multiple operators) will be compiled into single java" +
- " method.",
- isPublic = false)
-
- val FILES_MAX_PARTITION_BYTES = longConf("spark.sql.files.maxPartitionBytes",
- defaultValue = Some(128 * 1024 * 1024), // parquet.block.size
- doc = "The maximum number of bytes to pack into a single partition when reading files.",
- isPublic = true)
-
- val FILES_OPEN_COST_IN_BYTES = longConf("spark.sql.files.openCostInBytes",
- defaultValue = Some(4 * 1024 * 1024),
- doc = "The estimated cost to open a file, measured by the number of bytes could be scanned in" +
+ val DATAFRAME_RETAIN_GROUP_COLUMNS = SQLConfigBuilder("spark.sql.retainGroupColumns")
+ .internal()
+ .booleanConf
+ .createWithDefault(true)
+
+ val DATAFRAME_PIVOT_MAX_VALUES = SQLConfigBuilder("spark.sql.pivotMaxValues")
+ .doc("When doing a pivot without specifying values for the pivot column this is the maximum " +
+ "number of (distinct) values that will be collected without error.")
+ .intConf
+ .createWithDefault(10000)
+
+ val RUN_SQL_ON_FILES = SQLConfigBuilder("spark.sql.runSQLOnFiles")
+ .internal()
+ .doc("When true, we could use `datasource`.`path` as table in SQL query.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val WHOLESTAGE_CODEGEN_ENABLED = SQLConfigBuilder("spark.sql.codegen.wholeStage")
+ .internal()
+ .doc("When true, the whole stage (of multiple operators) will be compiled into single java" +
+ " method.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes")
+ .doc("The maximum number of bytes to pack into a single partition when reading files.")
+ .longConf
+ .createWithDefault(128 * 1024 * 1024) // parquet.block.size
+
+ val FILES_OPEN_COST_IN_BYTES = SQLConfigBuilder("spark.sql.files.openCostInBytes")
+ .internal()
+ .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
" the same time. This is used when putting multiple files into a partition. It's better to" +
" over estimated, then the partitions with small files will be faster than partitions with" +
- " bigger files (which is scheduled first).",
- isPublic = false)
-
- val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
- defaultValue = Some(true),
- doc = "When true, the planner will try to find out duplicated exchanges and re-use them.",
- isPublic = false)
-
- val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT = intConf(
- "spark.sql.streaming.stateStore.minDeltasForSnapshot",
- defaultValue = Some(10),
- doc = "Minimum number of state store delta files that needs to be generated before they " +
- "consolidated into snapshots.",
- isPublic = false)
-
- val STATE_STORE_MIN_VERSIONS_TO_RETAIN = intConf(
- "spark.sql.streaming.stateStore.minBatchesToRetain",
- defaultValue = Some(2),
- doc = "Minimum number of versions of a state store's data to retain after cleaning.",
- isPublic = false)
-
- val CHECKPOINT_LOCATION = stringConf("spark.sql.streaming.checkpointLocation",
- defaultValue = None,
- doc = "The default location for storing checkpoint data for continuously executing queries.",
- isPublic = true)
+ " bigger files (which is scheduled first).")
+ .longConf
+ .createWithDefault(4 * 1024 * 1024)
+
+ val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse")
+ .internal()
+ .doc("When true, the planner will try to find out duplicated exchanges and re-use them.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
+ SQLConfigBuilder("spark.sql.streaming.stateStore.minDeltasForSnapshot")
+ .internal()
+ .doc("Minimum number of state store delta files that needs to be generated before they " +
+ "consolidated into snapshots.")
+ .intConf
+ .createWithDefault(10)
+
+ val STATE_STORE_MIN_VERSIONS_TO_RETAIN =
+ SQLConfigBuilder("spark.sql.streaming.stateStore.minBatchesToRetain")
+ .internal()
+ .doc("Minimum number of versions of a state store's data to retain after cleaning.")
+ .intConf
+ .createWithDefault(2)
+
+ val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation")
+ .doc("The default location for storing checkpoint data for continuously executing queries.")
+ .stringConf
+ .createOptional
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -562,7 +457,7 @@ object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
-class SQLConf extends Serializable with CatalystConf with Logging {
+private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
import SQLConf._
/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
@@ -686,7 +581,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
}
/** Set the given Spark SQL configuration property. */
- def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+ def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
require(entry != null, "entry cannot be null")
require(value != null, s"value cannot be null for key: ${entry.key}")
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
@@ -706,25 +601,35 @@ class SQLConf extends Serializable with CatalystConf with Logging {
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+ * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
* desired one.
*/
- def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+ def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue)
}
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue` in [[SQLConfEntry]].
+ * yet, return `defaultValue` in [[ConfigEntry]].
*/
- def getConf[T](entry: SQLConfEntry[T]): T = {
+ def getConf[T](entry: ConfigEntry[T]): T = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
Option(settings.get(entry.key)).map(entry.valueConverter).orElse(entry.defaultValue).
getOrElse(throw new NoSuchElementException(entry.key))
}
/**
+ * Return the value of an optional Spark SQL configuration property for the given key. If the key
+ * is not set yet, throw an exception.
+ */
+ def getConf[T](entry: OptionalConfigEntry[T]): T = {
+ require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
+ Option(settings.get(entry.key)).map(entry.rawValueConverter).
+ getOrElse(throw new NoSuchElementException(entry.key))
+ }
+
+ /**
* Return the `string` value of Spark SQL configuration property for the given key. If the key is
* not set yet, return `defaultValue`.
*/
@@ -765,7 +670,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
settings.remove(key)
}
- def unsetConf(entry: SQLConfEntry[_]): Unit = {
+ private[spark] def unsetConf(entry: ConfigEntry[_]): Unit = {
settings.remove(entry.key)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
index 2b89fa9f23..cc69199139 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
@@ -26,7 +26,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("intConf") {
val key = "spark.sql.SQLConfEntrySuite.int"
- val confEntry = SQLConfEntry.intConf(key)
+ val confEntry = SQLConfigBuilder(key).intConf.createWithDefault(1)
assert(conf.getConf(confEntry, 5) === 5)
conf.setConf(confEntry, 10)
@@ -45,7 +45,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("longConf") {
val key = "spark.sql.SQLConfEntrySuite.long"
- val confEntry = SQLConfEntry.longConf(key)
+ val confEntry = SQLConfigBuilder(key).longConf.createWithDefault(1L)
assert(conf.getConf(confEntry, 5L) === 5L)
conf.setConf(confEntry, 10L)
@@ -64,7 +64,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("booleanConf") {
val key = "spark.sql.SQLConfEntrySuite.boolean"
- val confEntry = SQLConfEntry.booleanConf(key)
+ val confEntry = SQLConfigBuilder(key).booleanConf.createWithDefault(true)
assert(conf.getConf(confEntry, false) === false)
conf.setConf(confEntry, true)
@@ -83,7 +83,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("doubleConf") {
val key = "spark.sql.SQLConfEntrySuite.double"
- val confEntry = SQLConfEntry.doubleConf(key)
+ val confEntry = SQLConfigBuilder(key).doubleConf.createWithDefault(1d)
assert(conf.getConf(confEntry, 5.0) === 5.0)
conf.setConf(confEntry, 10.0)
@@ -102,7 +102,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("stringConf") {
val key = "spark.sql.SQLConfEntrySuite.string"
- val confEntry = SQLConfEntry.stringConf(key)
+ val confEntry = SQLConfigBuilder(key).stringConf.createWithDefault(null)
assert(conf.getConf(confEntry, "abc") === "abc")
conf.setConf(confEntry, "abcd")
@@ -116,7 +116,10 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("enumConf") {
val key = "spark.sql.SQLConfEntrySuite.enum"
- val confEntry = SQLConfEntry.enumConf(key, v => v, Set("a", "b", "c"), defaultValue = Some("a"))
+ val confEntry = SQLConfigBuilder(key)
+ .stringConf
+ .checkValues(Set("a", "b", "c"))
+ .createWithDefault("a")
assert(conf.getConf(confEntry) === "a")
conf.setConf(confEntry, "b")
@@ -135,8 +138,10 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("stringSeqConf") {
val key = "spark.sql.SQLConfEntrySuite.stringSeq"
- val confEntry = SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq",
- defaultValue = Some(Nil))
+ val confEntry = SQLConfigBuilder(key)
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c"))
conf.setConf(confEntry, Seq("a", "b", "c", "d"))
@@ -147,4 +152,12 @@ class SQLConfEntrySuite extends SparkFunSuite {
assert(conf.getConfString(key) === "a,b,c,d,e")
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e"))
}
+
+ test("duplicate entry") {
+ val key = "spark.sql.SQLConfEntrySuite.duplicate"
+ SQLConfigBuilder(key).stringConf.createOptional
+ intercept[IllegalArgumentException] {
+ SQLConfigBuilder(key).stringConf.createOptional
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index e944d328a3..e687e6a5ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -119,15 +119,10 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
}
intercept[IllegalArgumentException] {
- // This value less than Int.MinValue
+ // This value less than Long.MinValue
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g")
}
- // Test invalid input
- intercept[IllegalArgumentException] {
- // This value exceeds Long.MaxValue
- sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g")
- }
sqlContext.conf.clear()
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 073b954a5f..505e5c0bb6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis._
@@ -54,8 +55,7 @@ import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -318,7 +318,7 @@ class HiveContext private[hive](
hiveconf.set(key, value)
}
- override private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+ override private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
setConf(entry.key, entry.stringConverter(value))
}
@@ -413,19 +413,19 @@ private[hive] object HiveContext extends Logging {
/** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "1.2.1"
- val HIVE_METASTORE_VERSION = stringConf("spark.sql.hive.metastore.version",
- defaultValue = Some(hiveExecutionVersion),
- doc = "Version of the Hive metastore. Available options are " +
+ val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
+ .doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
+ .stringConf
+ .createWithDefault(hiveExecutionVersion)
- val HIVE_EXECUTION_VERSION = stringConf(
- key = "spark.sql.hive.version",
- defaultValue = Some(hiveExecutionVersion),
- doc = "Version of Hive used internally by Spark SQL.")
+ val HIVE_EXECUTION_VERSION = SQLConfigBuilder("spark.sql.hive.version")
+ .doc("Version of Hive used internally by Spark SQL.")
+ .stringConf
+ .createWithDefault(hiveExecutionVersion)
- val HIVE_METASTORE_JARS = stringConf("spark.sql.hive.metastore.jars",
- defaultValue = Some("builtin"),
- doc = s"""
+ val HIVE_METASTORE_JARS = SQLConfigBuilder("spark.sql.hive.metastore.jars")
+ .doc(s"""
| Location of the jars that should be used to instantiate the HiveMetastoreClient.
| This property can be one of three options: "
| 1. "builtin"
@@ -436,49 +436,61 @@ private[hive] object HiveContext extends Logging {
| 2. "maven"
| Use Hive jars of specified version downloaded from Maven repositories.
| 3. A classpath in the standard format for both Hive and Hadoop.
- """.stripMargin)
- val CONVERT_METASTORE_PARQUET = booleanConf("spark.sql.hive.convertMetastoreParquet",
- defaultValue = Some(true),
- doc = "When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
- "the built in support.")
-
- val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING = booleanConf(
- "spark.sql.hive.convertMetastoreParquet.mergeSchema",
- defaultValue = Some(false),
- doc = "When true, also tries to merge possibly different but compatible Parquet schemas in " +
- "different Parquet data files. This configuration is only effective " +
- "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
+ """.stripMargin)
+ .stringConf
+ .createWithDefault("builtin")
- val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc",
- defaultValue = Some(true),
- doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
+ val CONVERT_METASTORE_PARQUET = SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet")
+ .doc("When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
"the built in support.")
-
- val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS",
- defaultValue = Some(false),
- doc = "When true, a table created by a Hive CTAS statement (no USING clause) will be " +
+ .booleanConf
+ .createWithDefault(true)
+
+ val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING =
+ SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet.mergeSchema")
+ .doc("When true, also tries to merge possibly different but compatible Parquet schemas in " +
+ "different Parquet data files. This configuration is only effective " +
+ "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
+ .doc("When true, a table created by a Hive CTAS statement (no USING clause) will be " +
"converted to a data source table, using the data source set by spark.sql.sources.default.")
+ .booleanConf
+ .createWithDefault(false)
- val HIVE_METASTORE_SHARED_PREFIXES = stringSeqConf("spark.sql.hive.metastore.sharedPrefixes",
- defaultValue = Some(jdbcPrefixes),
- doc = "A comma separated list of class prefixes that should be loaded using the classloader " +
+ val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
+ .doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
+ "the built in support.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val HIVE_METASTORE_SHARED_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.sharedPrefixes")
+ .doc("A comma separated list of class prefixes that should be loaded using the classloader " +
"that is shared between Spark SQL and a specific version of Hive. An example of classes " +
"that should be shared is JDBC drivers that are needed to talk to the metastore. Other " +
"classes that need to be shared are those that interact with classes that are already " +
"shared. For example, custom appenders that are used by log4j.")
+ .stringConf
+ .toSequence
+ .createWithDefault(jdbcPrefixes)
private def jdbcPrefixes = Seq(
"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc")
- val HIVE_METASTORE_BARRIER_PREFIXES = stringSeqConf("spark.sql.hive.metastore.barrierPrefixes",
- defaultValue = Some(Seq()),
- doc = "A comma separated list of class prefixes that should explicitly be reloaded for each " +
+ val HIVE_METASTORE_BARRIER_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.barrierPrefixes")
+ .doc("A comma separated list of class prefixes that should explicitly be reloaded for each " +
"version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are " +
"declared in a prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>).")
-
- val HIVE_THRIFT_SERVER_ASYNC = booleanConf("spark.sql.hive.thriftServer.async",
- defaultValue = Some(true),
- doc = "When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ val HIVE_THRIFT_SERVER_ASYNC = SQLConfigBuilder("spark.sql.hive.thriftServer.async")
+ .doc("When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
+ .booleanConf
+ .createWithDefault(true)
/**
* The version of the hive client that will be used to communicate with the metastore. Note that