aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-04-05 15:19:51 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-05 15:19:51 -0700
commitd5ee9d5c240fca5c15b21efc4a760b06a1f39fd6 (patch)
treeb0c0d55466cdd5678849cc32d914408f9dd84472 /sql
parent7329fe272d3ead7db9bc3e1e32adb7329dabc607 (diff)
downloadspark-d5ee9d5c240fca5c15b21efc4a760b06a1f39fd6.tar.gz
spark-d5ee9d5c240fca5c15b21efc4a760b06a1f39fd6.tar.bz2
spark-d5ee9d5c240fca5c15b21efc4a760b06a1f39fd6.zip
[SPARK-529][SQL] Modify SQLConf to use new config API from core.
Because SQL keeps track of all known configs, some customization was needed in SQLConf to allow that, since the core API does not have that feature. Tested via existing (and slightly updated) unit tests. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #11570 from vanzin/SPARK-529-sql.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala771
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala96
5 files changed, 420 insertions, 495 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 587ba1ea05..1c9cb79ba4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst._
@@ -41,7 +42,6 @@ import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.internal.{SessionState, SQLConf}
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -138,7 +138,7 @@ class SQLContext private[sql](
def setConf(props: Properties): Unit = conf.setConf(props)
/** Set the given Spark SQL configuration property. */
- private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = conf.setConf(entry, value)
+ private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = conf.setConf(entry, value)
/**
* Set the given Spark SQL configuration property.
@@ -158,16 +158,16 @@ class SQLContext private[sql](
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue` in [[SQLConfEntry]].
+ * yet, return `defaultValue` in [[ConfigEntry]].
*/
- private[sql] def getConf[T](entry: SQLConfEntry[T]): T = conf.getConf(entry)
+ private[sql] def getConf[T](entry: ConfigEntry[T]): T = conf.getConf(entry)
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+ * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
* desired one.
*/
- private[sql] def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+ private[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
conf.getConf(entry, defaultValue)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a7c0be63fc..927af89949 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -25,6 +25,8 @@ import scala.collection.immutable
import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.util.Utils
@@ -36,418 +38,305 @@ import org.apache.spark.util.Utils
object SQLConf {
private val sqlConfEntries = java.util.Collections.synchronizedMap(
- new java.util.HashMap[String, SQLConfEntry[_]]())
+ new java.util.HashMap[String, ConfigEntry[_]]())
- /**
- * An entry contains all meta information for a configuration.
- *
- * @param key the key for the configuration
- * @param defaultValue the default value for the configuration
- * @param valueConverter how to convert a string to the value. It should throw an exception if the
- * string does not have the required format.
- * @param stringConverter how to convert a value to a string that the user can use it as a valid
- * string value. It's usually `toString`. But sometimes, a custom converter
- * is necessary. E.g., if T is List[String], `a, b, c` is better than
- * `List(a, b, c)`.
- * @param doc the document for the configuration
- * @param isPublic if this configuration is public to the user. If it's `false`, this
- * configuration is only used internally and we should not expose it to the user.
- * @tparam T the value type
- */
- class SQLConfEntry[T] private(
- val key: String,
- val defaultValue: Option[T],
- val valueConverter: String => T,
- val stringConverter: T => String,
- val doc: String,
- val isPublic: Boolean) {
-
- def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("<undefined>")
-
- override def toString: String = {
- s"SQLConfEntry(key = $key, defaultValue=$defaultValueString, doc=$doc, isPublic = $isPublic)"
- }
+ private def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
+ require(!sqlConfEntries.containsKey(entry.key),
+ s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
+ sqlConfEntries.put(entry.key, entry)
}
- object SQLConfEntry {
-
- private def apply[T](
- key: String,
- defaultValue: Option[T],
- valueConverter: String => T,
- stringConverter: T => String,
- doc: String,
- isPublic: Boolean): SQLConfEntry[T] =
- sqlConfEntries.synchronized {
- if (sqlConfEntries.containsKey(key)) {
- throw new IllegalArgumentException(s"Duplicate SQLConfEntry. $key has been registered")
- }
- val entry =
- new SQLConfEntry[T](key, defaultValue, valueConverter, stringConverter, doc, isPublic)
- sqlConfEntries.put(key, entry)
- entry
- }
-
- def intConf(
- key: String,
- defaultValue: Option[Int] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Int] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toInt
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be int, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def longConf(
- key: String,
- defaultValue: Option[Long] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Long] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toLong
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be long, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def longMemConf(
- key: String,
- defaultValue: Option[Long] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Long] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toLong
- } catch {
- case _: NumberFormatException =>
- try {
- Utils.byteStringAsBytes(v)
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be long, but was $v")
- }
- }
- }, _.toString, doc, isPublic)
-
- def doubleConf(
- key: String,
- defaultValue: Option[Double] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Double] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toDouble
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be double, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def booleanConf(
- key: String,
- defaultValue: Option[Boolean] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Boolean] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toBoolean
- } catch {
- case _: IllegalArgumentException =>
- throw new IllegalArgumentException(s"$key should be boolean, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def stringConf(
- key: String,
- defaultValue: Option[String] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[String] =
- SQLConfEntry(key, defaultValue, v => v, v => v, doc, isPublic)
-
- def enumConf[T](
- key: String,
- valueConverter: String => T,
- validValues: Set[T],
- defaultValue: Option[T] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[T] =
- SQLConfEntry(key, defaultValue, v => {
- val _v = valueConverter(v)
- if (!validValues.contains(_v)) {
- throw new IllegalArgumentException(
- s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v")
- }
- _v
- }, _.toString, doc, isPublic)
-
- def seqConf[T](
- key: String,
- valueConverter: String => T,
- defaultValue: Option[Seq[T]] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Seq[T]] = {
- SQLConfEntry(
- key, defaultValue, _.split(",").map(valueConverter), _.mkString(","), doc, isPublic)
- }
+ private[sql] object SQLConfigBuilder {
- def stringSeqConf(
- key: String,
- defaultValue: Option[Seq[String]] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Seq[String]] = {
- seqConf(key, s => s, defaultValue, doc, isPublic)
- }
- }
+ def apply(key: String): ConfigBuilder = new ConfigBuilder(key).onCreate(register)
- import SQLConfEntry._
+ }
- val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts",
- defaultValue = Some(true),
- doc = "When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
+ val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
+ .doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
"through the constructor (new SQLContexts/HiveContexts created through newSession " +
"method is allowed). Please note that this conf needs to be set in Spark Conf. Once " +
"a SQLContext/HiveContext has been created, changing the value of this conf will not " +
- "have effect.",
- isPublic = true)
-
- val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
- defaultValue = Some(true),
- doc = "When set to true Spark SQL will automatically select a compression codec for each " +
- "column based on statistics of the data.",
- isPublic = false)
-
- val COLUMN_BATCH_SIZE = intConf("spark.sql.inMemoryColumnarStorage.batchSize",
- defaultValue = Some(10000),
- doc = "Controls the size of batches for columnar caching. Larger batch sizes can improve " +
- "memory utilization and compression, but risk OOMs when caching data.",
- isPublic = false)
+ "have effect.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val COMPRESS_CACHED = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.compressed")
+ .internal()
+ .doc("When set to true Spark SQL will automatically select a compression codec for each " +
+ "column based on statistics of the data.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val COLUMN_BATCH_SIZE = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.batchSize")
+ .internal()
+ .doc("Controls the size of batches for columnar caching. Larger batch sizes can improve " +
+ "memory utilization and compression, but risk OOMs when caching data.")
+ .intConf
+ .createWithDefault(10000)
val IN_MEMORY_PARTITION_PRUNING =
- booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning",
- defaultValue = Some(true),
- doc = "When true, enable partition pruning for in-memory columnar tables.",
- isPublic = false)
-
- val PREFER_SORTMERGEJOIN = booleanConf("spark.sql.join.preferSortMergeJoin",
- defaultValue = Some(true),
- doc = "When true, prefer sort merge join over shuffle hash join.",
- isPublic = false)
-
- val AUTO_BROADCASTJOIN_THRESHOLD = intConf("spark.sql.autoBroadcastJoinThreshold",
- defaultValue = Some(10 * 1024 * 1024),
- doc = "Configures the maximum size in bytes for a table that will be broadcast to all worker " +
+ SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.partitionPruning")
+ .internal()
+ .doc("When true, enable partition pruning for in-memory columnar tables.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PREFER_SORTMERGEJOIN = SQLConfigBuilder("spark.sql.join.preferSortMergeJoin")
+ .internal()
+ .doc("When true, prefer sort merge join over shuffle hash join.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val AUTO_BROADCASTJOIN_THRESHOLD = SQLConfigBuilder("spark.sql.autoBroadcastJoinThreshold")
+ .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
"nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
"Note that currently statistics are only supported for Hive Metastore tables where the " +
"command<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been run.")
+ .intConf
+ .createWithDefault(10 * 1024 * 1024)
- val DEFAULT_SIZE_IN_BYTES = longConf(
- "spark.sql.defaultSizeInBytes",
- doc = "The default table size used in query planning. By default, it is set to a larger " +
+ val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
+ .internal()
+ .doc("The default table size used in query planning. By default, it is set to a larger " +
"value than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. That is to say " +
"by default the optimizer will not choose to broadcast a table unless it knows for sure " +
- "its size is small enough.",
- isPublic = false)
+ "its size is small enough.")
+ .longConf
+ .createWithDefault(-1)
- val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions",
- defaultValue = Some(200),
- doc = "The default number of partitions to use when shuffling data for joins or aggregations.")
+ val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions")
+ .doc("The default number of partitions to use when shuffling data for joins or aggregations.")
+ .intConf
+ .createWithDefault(200)
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
- longMemConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize",
- defaultValue = Some(64 * 1024 * 1024),
- doc = "The target post-shuffle input size in bytes of a task.")
+ SQLConfigBuilder("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
+ .doc("The target post-shuffle input size in bytes of a task.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(64 * 1024 * 1024)
- val ADAPTIVE_EXECUTION_ENABLED = booleanConf("spark.sql.adaptive.enabled",
- defaultValue = Some(false),
- doc = "When true, enable adaptive query execution.")
+ val ADAPTIVE_EXECUTION_ENABLED = SQLConfigBuilder("spark.sql.adaptive.enabled")
+ .doc("When true, enable adaptive query execution.")
+ .booleanConf
+ .createWithDefault(false)
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
- intConf("spark.sql.adaptive.minNumPostShufflePartitions",
- defaultValue = Some(-1),
- doc = "The advisory minimal number of post-shuffle partitions provided to " +
+ SQLConfigBuilder("spark.sql.adaptive.minNumPostShufflePartitions")
+ .internal()
+ .doc("The advisory minimal number of post-shuffle partitions provided to " +
"ExchangeCoordinator. This setting is used in our test to make sure we " +
"have enough parallelism to expose issues that will not be exposed with a " +
"single partition. When the value is a non-positive value, this setting will " +
- "not be provided to ExchangeCoordinator.",
- isPublic = false)
-
- val SUBEXPRESSION_ELIMINATION_ENABLED = booleanConf("spark.sql.subexpressionElimination.enabled",
- defaultValue = Some(true),
- doc = "When true, common subexpressions will be eliminated.",
- isPublic = false)
-
- val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive",
- defaultValue = Some(true),
- doc = "Whether the query analyzer should be case sensitive or not.")
-
- val USE_FILE_SCAN = booleanConf("spark.sql.sources.fileScan",
- defaultValue = Some(true),
- doc = "Use the new FileScanRDD path for reading HDSF based data sources.",
- isPublic = false)
-
- val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
- defaultValue = Some(false),
- doc = "When true, the Parquet data source merges schemas collected from all data files, " +
- "otherwise the schema is picked from the summary file or a random data file " +
- "if no summary file is available.")
-
- val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles",
- defaultValue = Some(false),
- doc = "When true, we make assumption that all part-files of Parquet are consistent with " +
- "summary files and we will ignore them when merging schema. Otherwise, if this is " +
- "false, which is the default, we will merge all part-files. This should be considered " +
- "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
-
- val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
- defaultValue = Some(false),
- doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
+ "not be provided to ExchangeCoordinator.")
+ .intConf
+ .createWithDefault(-1)
+
+ val SUBEXPRESSION_ELIMINATION_ENABLED =
+ SQLConfigBuilder("spark.sql.subexpressionElimination.enabled")
+ .internal()
+ .doc("When true, common subexpressions will be eliminated.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val CASE_SENSITIVE = SQLConfigBuilder("spark.sql.caseSensitive")
+ .doc("Whether the query analyzer should be case sensitive or not.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val USE_FILE_SCAN = SQLConfigBuilder("spark.sql.sources.fileScan")
+ .internal()
+ .doc("Use the new FileScanRDD path for reading HDSF based data sources.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
+ .doc("When true, the Parquet data source merges schemas collected from all data files, " +
+ "otherwise the schema is picked from the summary file or a random data file " +
+ "if no summary file is available.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles")
+ .doc("When true, we make assumption that all part-files of Parquet are consistent with " +
+ "summary files and we will ignore them when merging schema. Otherwise, if this is " +
+ "false, which is the default, we will merge all part-files. This should be considered " +
+ "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString")
+ .doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
"Spark SQL, do not differentiate between binary data and strings when writing out the " +
"Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
"compatibility with these systems.")
+ .booleanConf
+ .createWithDefault(false)
- val PARQUET_INT96_AS_TIMESTAMP = booleanConf("spark.sql.parquet.int96AsTimestamp",
- defaultValue = Some(true),
- doc = "Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
+ val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp")
+ .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
"Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
"nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
"provide compatibility with these systems.")
+ .booleanConf
+ .createWithDefault(true)
- val PARQUET_CACHE_METADATA = booleanConf("spark.sql.parquet.cacheMetadata",
- defaultValue = Some(true),
- doc = "Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
+ val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
+ .doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
+ .booleanConf
+ .createWithDefault(true)
- val PARQUET_COMPRESSION = enumConf("spark.sql.parquet.compression.codec",
- valueConverter = v => v.toLowerCase,
- validValues = Set("uncompressed", "snappy", "gzip", "lzo"),
- defaultValue = Some("gzip"),
- doc = "Sets the compression codec use when writing Parquet files. Acceptable values include: " +
+ val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec")
+ .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
"uncompressed, snappy, gzip, lzo.")
-
- val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
- defaultValue = Some(true),
- doc = "Enables Parquet filter push-down optimization when set to true.")
-
- val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
- key = "spark.sql.parquet.writeLegacyFormat",
- defaultValue = Some(false),
- doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
+ .stringConf
+ .transform(_.toLowerCase())
+ .checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
+ .createWithDefault("gzip")
+
+ val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
+ .doc("Enables Parquet filter push-down optimization when set to true.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat")
+ .doc("Whether to follow Parquet's format specification when converting Parquet schema to " +
"Spark SQL schema and vice versa.")
+ .booleanConf
+ .createWithDefault(false)
- val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
- key = "spark.sql.parquet.output.committer.class",
- defaultValue = Some(classOf[ParquetOutputCommitter].getName),
- doc = "The output committer class used by Parquet. The specified class needs to be a " +
+ val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class")
+ .doc("The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
"option must be set in Hadoop Configuration. 2. This option overrides " +
"\"spark.sql.sources.outputCommitterClass\".")
-
- val PARQUET_VECTORIZED_READER_ENABLED = booleanConf(
- key = "spark.sql.parquet.enableVectorizedReader",
- defaultValue = Some(true),
- doc = "Enables vectorized parquet decoding.")
-
- val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
- defaultValue = Some(false),
- doc = "When true, enable filter pushdown for ORC files.")
-
- val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
- defaultValue = Some(false),
- doc = "When true, check all the partition paths under the table\'s root directory " +
- "when reading data stored in HDFS.")
-
- val HIVE_METASTORE_PARTITION_PRUNING = booleanConf("spark.sql.hive.metastorePartitionPruning",
- defaultValue = Some(false),
- doc = "When true, some predicates will be pushed down into the Hive metastore so that " +
- "unmatching partitions can be eliminated earlier.")
-
- val NATIVE_VIEW = booleanConf("spark.sql.nativeView",
- defaultValue = Some(true),
- doc = "When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
- "Note that this function is experimental and should ony be used when you are using " +
- "non-hive-compatible tables written by Spark SQL. The SQL string used to create " +
- "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
- "possible, or you may get wrong result.",
- isPublic = false)
-
- val CANONICAL_NATIVE_VIEW = booleanConf("spark.sql.nativeView.canonical",
- defaultValue = Some(true),
- doc = "When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
- "CREATE VIEW statement using SQL query string generated from view definition logical " +
- "plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
- "original native view implementation.",
- isPublic = false)
-
- val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
- defaultValue = Some("_corrupt_record"),
- doc = "The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
-
- val BROADCAST_TIMEOUT = intConf("spark.sql.broadcastTimeout",
- defaultValue = Some(5 * 60),
- doc = "Timeout in seconds for the broadcast wait time in broadcast joins.")
+ .stringConf
+ .createWithDefault(classOf[ParquetOutputCommitter].getName)
+
+ val PARQUET_VECTORIZED_READER_ENABLED =
+ SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
+ .doc("Enables vectorized parquet decoding.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown")
+ .doc("When true, enable filter pushdown for ORC files.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val HIVE_VERIFY_PARTITION_PATH = SQLConfigBuilder("spark.sql.hive.verifyPartitionPath")
+ .doc("When true, check all the partition paths under the table\'s root directory " +
+ "when reading data stored in HDFS.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val HIVE_METASTORE_PARTITION_PRUNING =
+ SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning")
+ .doc("When true, some predicates will be pushed down into the Hive metastore so that " +
+ "unmatching partitions can be eliminated earlier.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView")
+ .internal()
+ .doc("When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
+ "Note that this function is experimental and should ony be used when you are using " +
+ "non-hive-compatible tables written by Spark SQL. The SQL string used to create " +
+ "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
+ "possible, or you may get wrong result.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val CANONICAL_NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView.canonical")
+ .internal()
+ .doc("When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
+ "CREATE VIEW statement using SQL query string generated from view definition logical " +
+ "plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
+ "original native view implementation.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord")
+ .doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
+ .stringConf
+ .createWithDefault("_corrupt_record")
+
+ val BROADCAST_TIMEOUT = SQLConfigBuilder("spark.sql.broadcastTimeout")
+ .doc("Timeout in seconds for the broadcast wait time in broadcast joins.")
+ .intConf
+ .createWithDefault(5 * 60)
// This is only used for the thriftserver
- val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
- doc = "Set a Fair Scheduler pool for a JDBC client session.")
-
- val THRIFTSERVER_UI_STATEMENT_LIMIT = intConf("spark.sql.thriftserver.ui.retainedStatements",
- defaultValue = Some(200),
- doc = "The number of SQL statements kept in the JDBC/ODBC web UI history.")
-
- val THRIFTSERVER_UI_SESSION_LIMIT = intConf("spark.sql.thriftserver.ui.retainedSessions",
- defaultValue = Some(200),
- doc = "The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
+ val THRIFTSERVER_POOL = SQLConfigBuilder("spark.sql.thriftserver.scheduler.pool")
+ .doc("Set a Fair Scheduler pool for a JDBC client session.")
+ .stringConf
+ .createOptional
+
+ val THRIFTSERVER_UI_STATEMENT_LIMIT =
+ SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements")
+ .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
+ .intConf
+ .createWithDefault(200)
+
+ val THRIFTSERVER_UI_SESSION_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedSessions")
+ .doc("The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
+ .intConf
+ .createWithDefault(200)
// This is used to set the default data source
- val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
- defaultValue = Some("org.apache.spark.sql.parquet"),
- doc = "The default data source to use in input/output.")
+ val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
+ .doc("The default data source to use in input/output.")
+ .stringConf
+ .createWithDefault("org.apache.spark.sql.parquet")
// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
// to its length exceeds the threshold.
- val SCHEMA_STRING_LENGTH_THRESHOLD = intConf("spark.sql.sources.schemaStringLengthThreshold",
- defaultValue = Some(4000),
- doc = "The maximum length allowed in a single cell when " +
- "storing additional schema information in Hive's metastore.",
- isPublic = false)
-
- val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
- defaultValue = Some(true),
- doc = "When true, automatically discover data partitions.")
+ val SCHEMA_STRING_LENGTH_THRESHOLD =
+ SQLConfigBuilder("spark.sql.sources.schemaStringLengthThreshold")
+ .doc("The maximum length allowed in a single cell when " +
+ "storing additional schema information in Hive's metastore.")
+ .internal()
+ .intConf
+ .createWithDefault(4000)
+
+ val PARTITION_DISCOVERY_ENABLED = SQLConfigBuilder("spark.sql.sources.partitionDiscovery.enabled")
+ .doc("When true, automatically discover data partitions.")
+ .booleanConf
+ .createWithDefault(true)
val PARTITION_COLUMN_TYPE_INFERENCE =
- booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
- defaultValue = Some(true),
- doc = "When true, automatically infer the data types for partitioned columns.")
+ SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
+ .doc("When true, automatically infer the data types for partitioned columns.")
+ .booleanConf
+ .createWithDefault(true)
val PARTITION_MAX_FILES =
- intConf("spark.sql.sources.maxConcurrentWrites",
- defaultValue = Some(1),
- doc = "The maximum number of concurrent files to open before falling back on sorting when " +
+ SQLConfigBuilder("spark.sql.sources.maxConcurrentWrites")
+ .doc("The maximum number of concurrent files to open before falling back on sorting when " +
"writing out files using dynamic partitioning.")
-
- val BUCKETING_ENABLED = booleanConf("spark.sql.sources.bucketing.enabled",
- defaultValue = Some(true),
- doc = "When false, we will treat bucketed table as normal table.")
-
- val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal",
- defaultValue = Some(true),
- doc = "When true, the ordinal numbers are treated as the position in the select list. " +
- "When false, the ordinal numbers in order/sort By clause are ignored.")
-
- val GROUP_BY_ORDINAL = booleanConf("spark.sql.groupByOrdinal",
- defaultValue = Some(true),
- doc = "When true, the ordinal numbers in group by clauses are treated as the position " +
+ .intConf
+ .createWithDefault(1)
+
+ val BUCKETING_ENABLED = SQLConfigBuilder("spark.sql.sources.bucketing.enabled")
+ .doc("When false, we will treat bucketed table as normal table")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ORDER_BY_ORDINAL = SQLConfigBuilder("spark.sql.orderByOrdinal")
+ .doc("When true, the ordinal numbers are treated as the position in the select list. " +
+ "When false, the ordinal numbers in order/sort By clause are ignored.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val GROUP_BY_ORDINAL = SQLConfigBuilder("spark.sql.groupByOrdinal")
+ .doc("When true, the ordinal numbers in group by clauses are treated as the position " +
"in the select list. When false, the ordinal numbers are ignored.")
+ .booleanConf
+ .createWithDefault(true)
// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
@@ -457,89 +346,95 @@ object SQLConf {
// 1. Instead of SQLConf, this option *must be set in Hadoop Configuration*.
// 2. This option can be overridden by "spark.sql.parquet.output.committer.class".
val OUTPUT_COMMITTER_CLASS =
- stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
+ SQLConfigBuilder("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional
- val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = intConf(
- key = "spark.sql.sources.parallelPartitionDiscovery.threshold",
- defaultValue = Some(32),
- doc = "The degree of parallelism for schema merging and partition discovery of " +
- "Parquet data sources.")
+ val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
+ SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
+ .doc("The degree of parallelism for schema merging and partition discovery of " +
+ "Parquet data sources.")
+ .intConf
+ .createWithDefault(32)
// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
- val DATAFRAME_EAGER_ANALYSIS = booleanConf(
- "spark.sql.eagerAnalysis",
- defaultValue = Some(true),
- doc = "When true, eagerly applies query analysis on DataFrame operations.",
- isPublic = false)
+ val DATAFRAME_EAGER_ANALYSIS = SQLConfigBuilder("spark.sql.eagerAnalysis")
+ .internal()
+ .doc("When true, eagerly applies query analysis on DataFrame operations.")
+ .booleanConf
+ .createWithDefault(true)
// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
- val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = booleanConf(
- "spark.sql.selfJoinAutoResolveAmbiguity",
- defaultValue = Some(true),
- isPublic = false)
+ val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
+ SQLConfigBuilder("spark.sql.selfJoinAutoResolveAmbiguity")
+ .internal()
+ .booleanConf
+ .createWithDefault(true)
// Whether to retain group by columns or not in GroupedData.agg.
- val DATAFRAME_RETAIN_GROUP_COLUMNS = booleanConf(
- "spark.sql.retainGroupColumns",
- defaultValue = Some(true),
- isPublic = false)
-
- val DATAFRAME_PIVOT_MAX_VALUES = intConf(
- "spark.sql.pivotMaxValues",
- defaultValue = Some(10000),
- doc = "When doing a pivot without specifying values for the pivot column this is the maximum " +
- "number of (distinct) values that will be collected without error."
- )
-
- val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles",
- defaultValue = Some(true),
- isPublic = false,
- doc = "When true, we could use `datasource`.`path` as table in SQL query."
- )
-
- val WHOLESTAGE_CODEGEN_ENABLED = booleanConf("spark.sql.codegen.wholeStage",
- defaultValue = Some(true),
- doc = "When true, the whole stage (of multiple operators) will be compiled into single java" +
- " method.",
- isPublic = false)
-
- val FILES_MAX_PARTITION_BYTES = longConf("spark.sql.files.maxPartitionBytes",
- defaultValue = Some(128 * 1024 * 1024), // parquet.block.size
- doc = "The maximum number of bytes to pack into a single partition when reading files.",
- isPublic = true)
-
- val FILES_OPEN_COST_IN_BYTES = longConf("spark.sql.files.openCostInBytes",
- defaultValue = Some(4 * 1024 * 1024),
- doc = "The estimated cost to open a file, measured by the number of bytes could be scanned in" +
+ val DATAFRAME_RETAIN_GROUP_COLUMNS = SQLConfigBuilder("spark.sql.retainGroupColumns")
+ .internal()
+ .booleanConf
+ .createWithDefault(true)
+
+ val DATAFRAME_PIVOT_MAX_VALUES = SQLConfigBuilder("spark.sql.pivotMaxValues")
+ .doc("When doing a pivot without specifying values for the pivot column this is the maximum " +
+ "number of (distinct) values that will be collected without error.")
+ .intConf
+ .createWithDefault(10000)
+
+ val RUN_SQL_ON_FILES = SQLConfigBuilder("spark.sql.runSQLOnFiles")
+ .internal()
+ .doc("When true, we could use `datasource`.`path` as table in SQL query.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val WHOLESTAGE_CODEGEN_ENABLED = SQLConfigBuilder("spark.sql.codegen.wholeStage")
+ .internal()
+ .doc("When true, the whole stage (of multiple operators) will be compiled into single java" +
+ " method.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes")
+ .doc("The maximum number of bytes to pack into a single partition when reading files.")
+ .longConf
+ .createWithDefault(128 * 1024 * 1024) // parquet.block.size
+
+ val FILES_OPEN_COST_IN_BYTES = SQLConfigBuilder("spark.sql.files.openCostInBytes")
+ .internal()
+ .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
" the same time. This is used when putting multiple files into a partition. It's better to" +
" over estimated, then the partitions with small files will be faster than partitions with" +
- " bigger files (which is scheduled first).",
- isPublic = false)
-
- val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
- defaultValue = Some(true),
- doc = "When true, the planner will try to find out duplicated exchanges and re-use them.",
- isPublic = false)
-
- val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT = intConf(
- "spark.sql.streaming.stateStore.minDeltasForSnapshot",
- defaultValue = Some(10),
- doc = "Minimum number of state store delta files that needs to be generated before they " +
- "consolidated into snapshots.",
- isPublic = false)
-
- val STATE_STORE_MIN_VERSIONS_TO_RETAIN = intConf(
- "spark.sql.streaming.stateStore.minBatchesToRetain",
- defaultValue = Some(2),
- doc = "Minimum number of versions of a state store's data to retain after cleaning.",
- isPublic = false)
-
- val CHECKPOINT_LOCATION = stringConf("spark.sql.streaming.checkpointLocation",
- defaultValue = None,
- doc = "The default location for storing checkpoint data for continuously executing queries.",
- isPublic = true)
+ " bigger files (which is scheduled first).")
+ .longConf
+ .createWithDefault(4 * 1024 * 1024)
+
+ val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse")
+ .internal()
+ .doc("When true, the planner will try to find out duplicated exchanges and re-use them.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
+ SQLConfigBuilder("spark.sql.streaming.stateStore.minDeltasForSnapshot")
+ .internal()
+ .doc("Minimum number of state store delta files that needs to be generated before they " +
+ "consolidated into snapshots.")
+ .intConf
+ .createWithDefault(10)
+
+ val STATE_STORE_MIN_VERSIONS_TO_RETAIN =
+ SQLConfigBuilder("spark.sql.streaming.stateStore.minBatchesToRetain")
+ .internal()
+ .doc("Minimum number of versions of a state store's data to retain after cleaning.")
+ .intConf
+ .createWithDefault(2)
+
+ val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation")
+ .doc("The default location for storing checkpoint data for continuously executing queries.")
+ .stringConf
+ .createOptional
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -562,7 +457,7 @@ object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
-class SQLConf extends Serializable with CatalystConf with Logging {
+private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
import SQLConf._
/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
@@ -686,7 +581,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
}
/** Set the given Spark SQL configuration property. */
- def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+ def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
require(entry != null, "entry cannot be null")
require(value != null, s"value cannot be null for key: ${entry.key}")
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
@@ -706,25 +601,35 @@ class SQLConf extends Serializable with CatalystConf with Logging {
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+ * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
* desired one.
*/
- def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+ def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue)
}
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue` in [[SQLConfEntry]].
+ * yet, return `defaultValue` in [[ConfigEntry]].
*/
- def getConf[T](entry: SQLConfEntry[T]): T = {
+ def getConf[T](entry: ConfigEntry[T]): T = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
Option(settings.get(entry.key)).map(entry.valueConverter).orElse(entry.defaultValue).
getOrElse(throw new NoSuchElementException(entry.key))
}
/**
+ * Return the value of an optional Spark SQL configuration property for the given key. If the key
+ * is not set yet, throw an exception.
+ */
+ def getConf[T](entry: OptionalConfigEntry[T]): T = {
+ require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
+ Option(settings.get(entry.key)).map(entry.rawValueConverter).
+ getOrElse(throw new NoSuchElementException(entry.key))
+ }
+
+ /**
* Return the `string` value of Spark SQL configuration property for the given key. If the key is
* not set yet, return `defaultValue`.
*/
@@ -765,7 +670,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
settings.remove(key)
}
- def unsetConf(entry: SQLConfEntry[_]): Unit = {
+ private[spark] def unsetConf(entry: ConfigEntry[_]): Unit = {
settings.remove(entry.key)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
index 2b89fa9f23..cc69199139 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
@@ -26,7 +26,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("intConf") {
val key = "spark.sql.SQLConfEntrySuite.int"
- val confEntry = SQLConfEntry.intConf(key)
+ val confEntry = SQLConfigBuilder(key).intConf.createWithDefault(1)
assert(conf.getConf(confEntry, 5) === 5)
conf.setConf(confEntry, 10)
@@ -45,7 +45,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("longConf") {
val key = "spark.sql.SQLConfEntrySuite.long"
- val confEntry = SQLConfEntry.longConf(key)
+ val confEntry = SQLConfigBuilder(key).longConf.createWithDefault(1L)
assert(conf.getConf(confEntry, 5L) === 5L)
conf.setConf(confEntry, 10L)
@@ -64,7 +64,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("booleanConf") {
val key = "spark.sql.SQLConfEntrySuite.boolean"
- val confEntry = SQLConfEntry.booleanConf(key)
+ val confEntry = SQLConfigBuilder(key).booleanConf.createWithDefault(true)
assert(conf.getConf(confEntry, false) === false)
conf.setConf(confEntry, true)
@@ -83,7 +83,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("doubleConf") {
val key = "spark.sql.SQLConfEntrySuite.double"
- val confEntry = SQLConfEntry.doubleConf(key)
+ val confEntry = SQLConfigBuilder(key).doubleConf.createWithDefault(1d)
assert(conf.getConf(confEntry, 5.0) === 5.0)
conf.setConf(confEntry, 10.0)
@@ -102,7 +102,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("stringConf") {
val key = "spark.sql.SQLConfEntrySuite.string"
- val confEntry = SQLConfEntry.stringConf(key)
+ val confEntry = SQLConfigBuilder(key).stringConf.createWithDefault(null)
assert(conf.getConf(confEntry, "abc") === "abc")
conf.setConf(confEntry, "abcd")
@@ -116,7 +116,10 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("enumConf") {
val key = "spark.sql.SQLConfEntrySuite.enum"
- val confEntry = SQLConfEntry.enumConf(key, v => v, Set("a", "b", "c"), defaultValue = Some("a"))
+ val confEntry = SQLConfigBuilder(key)
+ .stringConf
+ .checkValues(Set("a", "b", "c"))
+ .createWithDefault("a")
assert(conf.getConf(confEntry) === "a")
conf.setConf(confEntry, "b")
@@ -135,8 +138,10 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("stringSeqConf") {
val key = "spark.sql.SQLConfEntrySuite.stringSeq"
- val confEntry = SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq",
- defaultValue = Some(Nil))
+ val confEntry = SQLConfigBuilder(key)
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c"))
conf.setConf(confEntry, Seq("a", "b", "c", "d"))
@@ -147,4 +152,12 @@ class SQLConfEntrySuite extends SparkFunSuite {
assert(conf.getConfString(key) === "a,b,c,d,e")
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e"))
}
+
+ test("duplicate entry") {
+ val key = "spark.sql.SQLConfEntrySuite.duplicate"
+ SQLConfigBuilder(key).stringConf.createOptional
+ intercept[IllegalArgumentException] {
+ SQLConfigBuilder(key).stringConf.createOptional
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index e944d328a3..e687e6a5ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -119,15 +119,10 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
}
intercept[IllegalArgumentException] {
- // This value less than Int.MinValue
+ // This value less than Long.MinValue
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g")
}
- // Test invalid input
- intercept[IllegalArgumentException] {
- // This value exceeds Long.MaxValue
- sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g")
- }
sqlContext.conf.clear()
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 073b954a5f..505e5c0bb6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis._
@@ -54,8 +55,7 @@ import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -318,7 +318,7 @@ class HiveContext private[hive](
hiveconf.set(key, value)
}
- override private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+ override private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
setConf(entry.key, entry.stringConverter(value))
}
@@ -413,19 +413,19 @@ private[hive] object HiveContext extends Logging {
/** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "1.2.1"
- val HIVE_METASTORE_VERSION = stringConf("spark.sql.hive.metastore.version",
- defaultValue = Some(hiveExecutionVersion),
- doc = "Version of the Hive metastore. Available options are " +
+ val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
+ .doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
+ .stringConf
+ .createWithDefault(hiveExecutionVersion)
- val HIVE_EXECUTION_VERSION = stringConf(
- key = "spark.sql.hive.version",
- defaultValue = Some(hiveExecutionVersion),
- doc = "Version of Hive used internally by Spark SQL.")
+ val HIVE_EXECUTION_VERSION = SQLConfigBuilder("spark.sql.hive.version")
+ .doc("Version of Hive used internally by Spark SQL.")
+ .stringConf
+ .createWithDefault(hiveExecutionVersion)
- val HIVE_METASTORE_JARS = stringConf("spark.sql.hive.metastore.jars",
- defaultValue = Some("builtin"),
- doc = s"""
+ val HIVE_METASTORE_JARS = SQLConfigBuilder("spark.sql.hive.metastore.jars")
+ .doc(s"""
| Location of the jars that should be used to instantiate the HiveMetastoreClient.
| This property can be one of three options: "
| 1. "builtin"
@@ -436,49 +436,61 @@ private[hive] object HiveContext extends Logging {
| 2. "maven"
| Use Hive jars of specified version downloaded from Maven repositories.
| 3. A classpath in the standard format for both Hive and Hadoop.
- """.stripMargin)
- val CONVERT_METASTORE_PARQUET = booleanConf("spark.sql.hive.convertMetastoreParquet",
- defaultValue = Some(true),
- doc = "When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
- "the built in support.")
-
- val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING = booleanConf(
- "spark.sql.hive.convertMetastoreParquet.mergeSchema",
- defaultValue = Some(false),
- doc = "When true, also tries to merge possibly different but compatible Parquet schemas in " +
- "different Parquet data files. This configuration is only effective " +
- "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
+ """.stripMargin)
+ .stringConf
+ .createWithDefault("builtin")
- val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc",
- defaultValue = Some(true),
- doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
+ val CONVERT_METASTORE_PARQUET = SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet")
+ .doc("When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
"the built in support.")
-
- val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS",
- defaultValue = Some(false),
- doc = "When true, a table created by a Hive CTAS statement (no USING clause) will be " +
+ .booleanConf
+ .createWithDefault(true)
+
+ val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING =
+ SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet.mergeSchema")
+ .doc("When true, also tries to merge possibly different but compatible Parquet schemas in " +
+ "different Parquet data files. This configuration is only effective " +
+ "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
+ .doc("When true, a table created by a Hive CTAS statement (no USING clause) will be " +
"converted to a data source table, using the data source set by spark.sql.sources.default.")
+ .booleanConf
+ .createWithDefault(false)
- val HIVE_METASTORE_SHARED_PREFIXES = stringSeqConf("spark.sql.hive.metastore.sharedPrefixes",
- defaultValue = Some(jdbcPrefixes),
- doc = "A comma separated list of class prefixes that should be loaded using the classloader " +
+ val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
+ .doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
+ "the built in support.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val HIVE_METASTORE_SHARED_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.sharedPrefixes")
+ .doc("A comma separated list of class prefixes that should be loaded using the classloader " +
"that is shared between Spark SQL and a specific version of Hive. An example of classes " +
"that should be shared is JDBC drivers that are needed to talk to the metastore. Other " +
"classes that need to be shared are those that interact with classes that are already " +
"shared. For example, custom appenders that are used by log4j.")
+ .stringConf
+ .toSequence
+ .createWithDefault(jdbcPrefixes)
private def jdbcPrefixes = Seq(
"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc")
- val HIVE_METASTORE_BARRIER_PREFIXES = stringSeqConf("spark.sql.hive.metastore.barrierPrefixes",
- defaultValue = Some(Seq()),
- doc = "A comma separated list of class prefixes that should explicitly be reloaded for each " +
+ val HIVE_METASTORE_BARRIER_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.barrierPrefixes")
+ .doc("A comma separated list of class prefixes that should explicitly be reloaded for each " +
"version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are " +
"declared in a prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>).")
-
- val HIVE_THRIFT_SERVER_ASYNC = booleanConf("spark.sql.hive.thriftServer.async",
- defaultValue = Some(true),
- doc = "When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ val HIVE_THRIFT_SERVER_ASYNC = SQLConfigBuilder("spark.sql.hive.thriftServer.async")
+ .doc("When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
+ .booleanConf
+ .createWithDefault(true)
/**
* The version of the hive client that will be used to communicate with the metastore. Note that