aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala44
-rw-r--r--core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala28
-rw-r--r--project/SparkBuild.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala771
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala96
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala92
10 files changed, 551 insertions, 590 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index 770b43697a..5d50e3851a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -85,10 +85,12 @@ private[spark] class TypedConfigBuilder[T](
this(parent, converter, Option(_).map(_.toString).orNull)
}
+ /** Apply a transformation to the user-provided values of the config entry. */
def transform(fn: T => T): TypedConfigBuilder[T] = {
new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
}
+ /** Check that user-provided values for the config match a pre-defined set. */
def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
transform { v =>
if (!validValues.contains(v)) {
@@ -99,30 +101,38 @@ private[spark] class TypedConfigBuilder[T](
}
}
+ /** Turns the config entry into a sequence of values of the underlying type. */
def toSequence: TypedConfigBuilder[Seq[T]] = {
new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter))
}
- /** Creates a [[ConfigEntry]] that does not require a default value. */
- def optional: OptionalConfigEntry[T] = {
- new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc, parent._public)
+ /** Creates a [[ConfigEntry]] that does not have a default value. */
+ def createOptional: OptionalConfigEntry[T] = {
+ val entry = new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc,
+ parent._public)
+ parent._onCreate.foreach(_(entry))
+ entry
}
/** Creates a [[ConfigEntry]] that has a default value. */
- def withDefault(default: T): ConfigEntry[T] = {
+ def createWithDefault(default: T): ConfigEntry[T] = {
val transformedDefault = converter(stringConverter(default))
- new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter, stringConverter,
- parent._doc, parent._public)
+ val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
+ stringConverter, parent._doc, parent._public)
+ parent._onCreate.foreach(_(entry))
+ entry
}
/**
* Creates a [[ConfigEntry]] that has a default value. The default value is provided as a
* [[String]] and must be a valid value for the entry.
*/
- def withDefaultString(default: String): ConfigEntry[T] = {
+ def createWithDefaultString(default: String): ConfigEntry[T] = {
val typedDefault = converter(default)
- new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter, parent._doc,
- parent._public)
+ val entry = new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter,
+ parent._doc, parent._public)
+ parent._onCreate.foreach(_(entry))
+ entry
}
}
@@ -136,10 +146,11 @@ private[spark] case class ConfigBuilder(key: String) {
import ConfigHelpers._
- var _public = true
- var _doc = ""
+ private[config] var _public = true
+ private[config] var _doc = ""
+ private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
- def internal: ConfigBuilder = {
+ def internal(): ConfigBuilder = {
_public = false
this
}
@@ -149,6 +160,15 @@ private[spark] case class ConfigBuilder(key: String) {
this
}
+ /**
+ * Registers a callback for when the config entry is finally instantiated. Currently used by
+ * SQLConf to keep track of SQL configuration entries.
+ */
+ def onCreate(callback: ConfigEntry[_] => Unit): ConfigBuilder = {
+ _onCreate = Option(callback)
+ this
+ }
+
def intConf: TypedConfigBuilder[Int] = {
new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 968c5192ac..94b50ee065 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -23,68 +23,70 @@ import org.apache.spark.network.util.ByteUnit
package object config {
private[spark] val DRIVER_CLASS_PATH =
- ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.optional
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional
private[spark] val DRIVER_JAVA_OPTIONS =
- ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.optional
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional
private[spark] val DRIVER_LIBRARY_PATH =
- ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.optional
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.createOptional
private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
- ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false)
+ ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)
private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
.bytesConf(ByteUnit.MiB)
- .withDefaultString("1g")
+ .createWithDefaultString("1g")
private[spark] val EXECUTOR_CLASS_PATH =
- ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
private[spark] val EXECUTOR_JAVA_OPTIONS =
- ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.optional
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
private[spark] val EXECUTOR_LIBRARY_PATH =
- ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.optional
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.createOptional
private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
- ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false)
+ ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false)
private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
.bytesConf(ByteUnit.MiB)
- .withDefaultString("1g")
+ .createWithDefaultString("1g")
- private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal
- .booleanConf.withDefault(false)
+ private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
+ .booleanConf.createWithDefault(false)
- private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.withDefault(1)
+ private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1)
private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
- ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.withDefault(0)
+ ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0)
private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS =
ConfigBuilder("spark.dynamicAllocation.initialExecutors")
.fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS)
private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
- ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.withDefault(Int.MaxValue)
+ ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)
private[spark] val SHUFFLE_SERVICE_ENABLED =
- ConfigBuilder("spark.shuffle.service.enabled").booleanConf.withDefault(false)
+ ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
.doc("Location of user's keytab.")
- .stringConf.optional
+ .stringConf.createOptional
private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal")
.doc("Name of the Kerberos principal.")
- .stringConf.optional
+ .stringConf.createOptional
- private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional
+ private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
+ .intConf
+ .createOptional
private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
- .internal
+ .internal()
.stringConf
.toSequence
- .withDefault(Nil)
+ .createWithDefault(Nil)
}
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
index 0644148eae..337fd7e85e 100644
--- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -26,7 +26,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: int") {
val conf = new SparkConf()
- val iConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+ val iConf = ConfigBuilder("spark.int").intConf.createWithDefault(1)
assert(conf.get(iConf) === 1)
conf.set(iConf, 2)
assert(conf.get(iConf) === 2)
@@ -34,21 +34,21 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: long") {
val conf = new SparkConf()
- val lConf = ConfigBuilder("spark.long").longConf.withDefault(0L)
+ val lConf = ConfigBuilder("spark.long").longConf.createWithDefault(0L)
conf.set(lConf, 1234L)
assert(conf.get(lConf) === 1234L)
}
test("conf entry: double") {
val conf = new SparkConf()
- val dConf = ConfigBuilder("spark.double").doubleConf.withDefault(0.0)
+ val dConf = ConfigBuilder("spark.double").doubleConf.createWithDefault(0.0)
conf.set(dConf, 20.0)
assert(conf.get(dConf) === 20.0)
}
test("conf entry: boolean") {
val conf = new SparkConf()
- val bConf = ConfigBuilder("spark.boolean").booleanConf.withDefault(false)
+ val bConf = ConfigBuilder("spark.boolean").booleanConf.createWithDefault(false)
assert(!conf.get(bConf))
conf.set(bConf, true)
assert(conf.get(bConf))
@@ -56,7 +56,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: optional") {
val conf = new SparkConf()
- val optionalConf = ConfigBuilder("spark.optional").intConf.optional
+ val optionalConf = ConfigBuilder("spark.optional").intConf.createOptional
assert(conf.get(optionalConf) === None)
conf.set(optionalConf, 1)
assert(conf.get(optionalConf) === Some(1))
@@ -64,7 +64,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: fallback") {
val conf = new SparkConf()
- val parentConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+ val parentConf = ConfigBuilder("spark.int").intConf.createWithDefault(1)
val confWithFallback = ConfigBuilder("spark.fallback").fallbackConf(parentConf)
assert(conf.get(confWithFallback) === 1)
conf.set(confWithFallback, 2)
@@ -74,7 +74,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: time") {
val conf = new SparkConf()
- val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).withDefaultString("1h")
+ val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).createWithDefaultString("1h")
assert(conf.get(time) === 3600L)
conf.set(time.key, "1m")
assert(conf.get(time) === 60L)
@@ -82,7 +82,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: bytes") {
val conf = new SparkConf()
- val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).withDefaultString("1m")
+ val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).createWithDefaultString("1m")
assert(conf.get(bytes) === 1024L)
conf.set(bytes.key, "1k")
assert(conf.get(bytes) === 1L)
@@ -90,7 +90,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: string seq") {
val conf = new SparkConf()
- val seq = ConfigBuilder("spark.seq").stringConf.toSequence.withDefault(Seq())
+ val seq = ConfigBuilder("spark.seq").stringConf.toSequence.createWithDefault(Seq())
conf.set(seq.key, "1,,2, 3 , , 4")
assert(conf.get(seq) === Seq("1", "2", "3", "4"))
conf.set(seq, Seq("1", "2"))
@@ -99,7 +99,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: int seq") {
val conf = new SparkConf()
- val seq = ConfigBuilder("spark.seq").intConf.toSequence.withDefault(Seq())
+ val seq = ConfigBuilder("spark.seq").intConf.toSequence.createWithDefault(Seq())
conf.set(seq.key, "1,,2, 3 , , 4")
assert(conf.get(seq) === Seq(1, 2, 3, 4))
conf.set(seq, Seq(1, 2))
@@ -111,7 +111,7 @@ class ConfigEntrySuite extends SparkFunSuite {
val transformationConf = ConfigBuilder("spark.transformation")
.stringConf
.transform(_.toLowerCase())
- .withDefault("FOO")
+ .createWithDefault("FOO")
assert(conf.get(transformationConf) === "foo")
conf.set(transformationConf, "BAR")
@@ -123,7 +123,7 @@ class ConfigEntrySuite extends SparkFunSuite {
val enum = ConfigBuilder("spark.enum")
.stringConf
.checkValues(Set("a", "b", "c"))
- .withDefault("a")
+ .createWithDefault("a")
assert(conf.get(enum) === "a")
conf.set(enum, "b")
@@ -138,7 +138,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: conversion error") {
val conf = new SparkConf()
- val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.optional
+ val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.createOptional
conf.set(conversionTest.key, "abc")
val conversionError = intercept[IllegalArgumentException] {
conf.get(conversionTest)
@@ -148,7 +148,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("default value handling is null-safe") {
val conf = new SparkConf()
- val stringConf = ConfigBuilder("spark.string").stringConf.withDefault(null)
+ val stringConf = ConfigBuilder("spark.string").stringConf.createWithDefault(null)
assert(conf.get(stringConf) === null)
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index b32480b164..60124ef0a1 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -20,6 +20,7 @@ import java.nio.file.Files
import scala.util.Properties
import scala.collection.JavaConverters._
+import scala.collection.mutable.Stack
import sbt._
import sbt.Classpaths.publishTask
@@ -742,8 +743,21 @@ object TestSettings {
parallelExecution in Test := false,
// Make sure the test temp directory exists.
resourceGenerators in Test <+= resourceManaged in Test map { outDir: File =>
- if (!new File(testTempDir).isDirectory()) {
- require(new File(testTempDir).mkdirs(), s"Error creating temp directory $testTempDir.")
+ var dir = new File(testTempDir)
+ if (!dir.isDirectory()) {
+ // Because File.mkdirs() can fail if multiple callers are trying to create the same
+ // parent directory, this code tries to create parents one at a time, and avoids
+ // failures when the directories have been created by somebody else.
+ val stack = new Stack[File]()
+ while (!dir.isDirectory()) {
+ stack.push(dir)
+ dir = dir.getParentFile()
+ }
+
+ while (stack.nonEmpty) {
+ val d = stack.pop()
+ require(d.mkdir() || d.isDirectory(), s"Failed to create directory $d")
+ }
}
Seq[File]()
},
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 587ba1ea05..1c9cb79ba4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst._
@@ -41,7 +42,6 @@ import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.internal.{SessionState, SQLConf}
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -138,7 +138,7 @@ class SQLContext private[sql](
def setConf(props: Properties): Unit = conf.setConf(props)
/** Set the given Spark SQL configuration property. */
- private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = conf.setConf(entry, value)
+ private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = conf.setConf(entry, value)
/**
* Set the given Spark SQL configuration property.
@@ -158,16 +158,16 @@ class SQLContext private[sql](
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue` in [[SQLConfEntry]].
+ * yet, return `defaultValue` in [[ConfigEntry]].
*/
- private[sql] def getConf[T](entry: SQLConfEntry[T]): T = conf.getConf(entry)
+ private[sql] def getConf[T](entry: ConfigEntry[T]): T = conf.getConf(entry)
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+ * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
* desired one.
*/
- private[sql] def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+ private[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
conf.getConf(entry, defaultValue)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a7c0be63fc..927af89949 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -25,6 +25,8 @@ import scala.collection.immutable
import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.util.Utils
@@ -36,418 +38,305 @@ import org.apache.spark.util.Utils
object SQLConf {
private val sqlConfEntries = java.util.Collections.synchronizedMap(
- new java.util.HashMap[String, SQLConfEntry[_]]())
+ new java.util.HashMap[String, ConfigEntry[_]]())
- /**
- * An entry contains all meta information for a configuration.
- *
- * @param key the key for the configuration
- * @param defaultValue the default value for the configuration
- * @param valueConverter how to convert a string to the value. It should throw an exception if the
- * string does not have the required format.
- * @param stringConverter how to convert a value to a string that the user can use it as a valid
- * string value. It's usually `toString`. But sometimes, a custom converter
- * is necessary. E.g., if T is List[String], `a, b, c` is better than
- * `List(a, b, c)`.
- * @param doc the document for the configuration
- * @param isPublic if this configuration is public to the user. If it's `false`, this
- * configuration is only used internally and we should not expose it to the user.
- * @tparam T the value type
- */
- class SQLConfEntry[T] private(
- val key: String,
- val defaultValue: Option[T],
- val valueConverter: String => T,
- val stringConverter: T => String,
- val doc: String,
- val isPublic: Boolean) {
-
- def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("<undefined>")
-
- override def toString: String = {
- s"SQLConfEntry(key = $key, defaultValue=$defaultValueString, doc=$doc, isPublic = $isPublic)"
- }
+ private def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
+ require(!sqlConfEntries.containsKey(entry.key),
+ s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
+ sqlConfEntries.put(entry.key, entry)
}
- object SQLConfEntry {
-
- private def apply[T](
- key: String,
- defaultValue: Option[T],
- valueConverter: String => T,
- stringConverter: T => String,
- doc: String,
- isPublic: Boolean): SQLConfEntry[T] =
- sqlConfEntries.synchronized {
- if (sqlConfEntries.containsKey(key)) {
- throw new IllegalArgumentException(s"Duplicate SQLConfEntry. $key has been registered")
- }
- val entry =
- new SQLConfEntry[T](key, defaultValue, valueConverter, stringConverter, doc, isPublic)
- sqlConfEntries.put(key, entry)
- entry
- }
-
- def intConf(
- key: String,
- defaultValue: Option[Int] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Int] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toInt
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be int, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def longConf(
- key: String,
- defaultValue: Option[Long] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Long] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toLong
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be long, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def longMemConf(
- key: String,
- defaultValue: Option[Long] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Long] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toLong
- } catch {
- case _: NumberFormatException =>
- try {
- Utils.byteStringAsBytes(v)
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be long, but was $v")
- }
- }
- }, _.toString, doc, isPublic)
-
- def doubleConf(
- key: String,
- defaultValue: Option[Double] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Double] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toDouble
- } catch {
- case _: NumberFormatException =>
- throw new IllegalArgumentException(s"$key should be double, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def booleanConf(
- key: String,
- defaultValue: Option[Boolean] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Boolean] =
- SQLConfEntry(key, defaultValue, { v =>
- try {
- v.toBoolean
- } catch {
- case _: IllegalArgumentException =>
- throw new IllegalArgumentException(s"$key should be boolean, but was $v")
- }
- }, _.toString, doc, isPublic)
-
- def stringConf(
- key: String,
- defaultValue: Option[String] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[String] =
- SQLConfEntry(key, defaultValue, v => v, v => v, doc, isPublic)
-
- def enumConf[T](
- key: String,
- valueConverter: String => T,
- validValues: Set[T],
- defaultValue: Option[T] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[T] =
- SQLConfEntry(key, defaultValue, v => {
- val _v = valueConverter(v)
- if (!validValues.contains(_v)) {
- throw new IllegalArgumentException(
- s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v")
- }
- _v
- }, _.toString, doc, isPublic)
-
- def seqConf[T](
- key: String,
- valueConverter: String => T,
- defaultValue: Option[Seq[T]] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Seq[T]] = {
- SQLConfEntry(
- key, defaultValue, _.split(",").map(valueConverter), _.mkString(","), doc, isPublic)
- }
+ private[sql] object SQLConfigBuilder {
- def stringSeqConf(
- key: String,
- defaultValue: Option[Seq[String]] = None,
- doc: String = "",
- isPublic: Boolean = true): SQLConfEntry[Seq[String]] = {
- seqConf(key, s => s, defaultValue, doc, isPublic)
- }
- }
+ def apply(key: String): ConfigBuilder = new ConfigBuilder(key).onCreate(register)
- import SQLConfEntry._
+ }
- val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts",
- defaultValue = Some(true),
- doc = "When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
+ val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
+ .doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
"through the constructor (new SQLContexts/HiveContexts created through newSession " +
"method is allowed). Please note that this conf needs to be set in Spark Conf. Once " +
"a SQLContext/HiveContext has been created, changing the value of this conf will not " +
- "have effect.",
- isPublic = true)
-
- val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
- defaultValue = Some(true),
- doc = "When set to true Spark SQL will automatically select a compression codec for each " +
- "column based on statistics of the data.",
- isPublic = false)
-
- val COLUMN_BATCH_SIZE = intConf("spark.sql.inMemoryColumnarStorage.batchSize",
- defaultValue = Some(10000),
- doc = "Controls the size of batches for columnar caching. Larger batch sizes can improve " +
- "memory utilization and compression, but risk OOMs when caching data.",
- isPublic = false)
+ "have effect.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val COMPRESS_CACHED = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.compressed")
+ .internal()
+ .doc("When set to true Spark SQL will automatically select a compression codec for each " +
+ "column based on statistics of the data.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val COLUMN_BATCH_SIZE = SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.batchSize")
+ .internal()
+ .doc("Controls the size of batches for columnar caching. Larger batch sizes can improve " +
+ "memory utilization and compression, but risk OOMs when caching data.")
+ .intConf
+ .createWithDefault(10000)
val IN_MEMORY_PARTITION_PRUNING =
- booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning",
- defaultValue = Some(true),
- doc = "When true, enable partition pruning for in-memory columnar tables.",
- isPublic = false)
-
- val PREFER_SORTMERGEJOIN = booleanConf("spark.sql.join.preferSortMergeJoin",
- defaultValue = Some(true),
- doc = "When true, prefer sort merge join over shuffle hash join.",
- isPublic = false)
-
- val AUTO_BROADCASTJOIN_THRESHOLD = intConf("spark.sql.autoBroadcastJoinThreshold",
- defaultValue = Some(10 * 1024 * 1024),
- doc = "Configures the maximum size in bytes for a table that will be broadcast to all worker " +
+ SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.partitionPruning")
+ .internal()
+ .doc("When true, enable partition pruning for in-memory columnar tables.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PREFER_SORTMERGEJOIN = SQLConfigBuilder("spark.sql.join.preferSortMergeJoin")
+ .internal()
+ .doc("When true, prefer sort merge join over shuffle hash join.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val AUTO_BROADCASTJOIN_THRESHOLD = SQLConfigBuilder("spark.sql.autoBroadcastJoinThreshold")
+ .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
"nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
"Note that currently statistics are only supported for Hive Metastore tables where the " +
"command<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been run.")
+ .intConf
+ .createWithDefault(10 * 1024 * 1024)
- val DEFAULT_SIZE_IN_BYTES = longConf(
- "spark.sql.defaultSizeInBytes",
- doc = "The default table size used in query planning. By default, it is set to a larger " +
+ val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
+ .internal()
+ .doc("The default table size used in query planning. By default, it is set to a larger " +
"value than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. That is to say " +
"by default the optimizer will not choose to broadcast a table unless it knows for sure " +
- "its size is small enough.",
- isPublic = false)
+ "its size is small enough.")
+ .longConf
+ .createWithDefault(-1)
- val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions",
- defaultValue = Some(200),
- doc = "The default number of partitions to use when shuffling data for joins or aggregations.")
+ val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions")
+ .doc("The default number of partitions to use when shuffling data for joins or aggregations.")
+ .intConf
+ .createWithDefault(200)
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
- longMemConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize",
- defaultValue = Some(64 * 1024 * 1024),
- doc = "The target post-shuffle input size in bytes of a task.")
+ SQLConfigBuilder("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
+ .doc("The target post-shuffle input size in bytes of a task.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(64 * 1024 * 1024)
- val ADAPTIVE_EXECUTION_ENABLED = booleanConf("spark.sql.adaptive.enabled",
- defaultValue = Some(false),
- doc = "When true, enable adaptive query execution.")
+ val ADAPTIVE_EXECUTION_ENABLED = SQLConfigBuilder("spark.sql.adaptive.enabled")
+ .doc("When true, enable adaptive query execution.")
+ .booleanConf
+ .createWithDefault(false)
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
- intConf("spark.sql.adaptive.minNumPostShufflePartitions",
- defaultValue = Some(-1),
- doc = "The advisory minimal number of post-shuffle partitions provided to " +
+ SQLConfigBuilder("spark.sql.adaptive.minNumPostShufflePartitions")
+ .internal()
+ .doc("The advisory minimal number of post-shuffle partitions provided to " +
"ExchangeCoordinator. This setting is used in our test to make sure we " +
"have enough parallelism to expose issues that will not be exposed with a " +
"single partition. When the value is a non-positive value, this setting will " +
- "not be provided to ExchangeCoordinator.",
- isPublic = false)
-
- val SUBEXPRESSION_ELIMINATION_ENABLED = booleanConf("spark.sql.subexpressionElimination.enabled",
- defaultValue = Some(true),
- doc = "When true, common subexpressions will be eliminated.",
- isPublic = false)
-
- val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive",
- defaultValue = Some(true),
- doc = "Whether the query analyzer should be case sensitive or not.")
-
- val USE_FILE_SCAN = booleanConf("spark.sql.sources.fileScan",
- defaultValue = Some(true),
- doc = "Use the new FileScanRDD path for reading HDSF based data sources.",
- isPublic = false)
-
- val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
- defaultValue = Some(false),
- doc = "When true, the Parquet data source merges schemas collected from all data files, " +
- "otherwise the schema is picked from the summary file or a random data file " +
- "if no summary file is available.")
-
- val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles",
- defaultValue = Some(false),
- doc = "When true, we make assumption that all part-files of Parquet are consistent with " +
- "summary files and we will ignore them when merging schema. Otherwise, if this is " +
- "false, which is the default, we will merge all part-files. This should be considered " +
- "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
-
- val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
- defaultValue = Some(false),
- doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
+ "not be provided to ExchangeCoordinator.")
+ .intConf
+ .createWithDefault(-1)
+
+ val SUBEXPRESSION_ELIMINATION_ENABLED =
+ SQLConfigBuilder("spark.sql.subexpressionElimination.enabled")
+ .internal()
+ .doc("When true, common subexpressions will be eliminated.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val CASE_SENSITIVE = SQLConfigBuilder("spark.sql.caseSensitive")
+ .doc("Whether the query analyzer should be case sensitive or not.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val USE_FILE_SCAN = SQLConfigBuilder("spark.sql.sources.fileScan")
+ .internal()
+ .doc("Use the new FileScanRDD path for reading HDSF based data sources.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PARQUET_SCHEMA_MERGING_ENABLED = SQLConfigBuilder("spark.sql.parquet.mergeSchema")
+ .doc("When true, the Parquet data source merges schemas collected from all data files, " +
+ "otherwise the schema is picked from the summary file or a random data file " +
+ "if no summary file is available.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val PARQUET_SCHEMA_RESPECT_SUMMARIES = SQLConfigBuilder("spark.sql.parquet.respectSummaryFiles")
+ .doc("When true, we make assumption that all part-files of Parquet are consistent with " +
+ "summary files and we will ignore them when merging schema. Otherwise, if this is " +
+ "false, which is the default, we will merge all part-files. This should be considered " +
+ "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val PARQUET_BINARY_AS_STRING = SQLConfigBuilder("spark.sql.parquet.binaryAsString")
+ .doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
"Spark SQL, do not differentiate between binary data and strings when writing out the " +
"Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
"compatibility with these systems.")
+ .booleanConf
+ .createWithDefault(false)
- val PARQUET_INT96_AS_TIMESTAMP = booleanConf("spark.sql.parquet.int96AsTimestamp",
- defaultValue = Some(true),
- doc = "Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
+ val PARQUET_INT96_AS_TIMESTAMP = SQLConfigBuilder("spark.sql.parquet.int96AsTimestamp")
+ .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
"Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
"nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
"provide compatibility with these systems.")
+ .booleanConf
+ .createWithDefault(true)
- val PARQUET_CACHE_METADATA = booleanConf("spark.sql.parquet.cacheMetadata",
- defaultValue = Some(true),
- doc = "Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
+ val PARQUET_CACHE_METADATA = SQLConfigBuilder("spark.sql.parquet.cacheMetadata")
+ .doc("Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
+ .booleanConf
+ .createWithDefault(true)
- val PARQUET_COMPRESSION = enumConf("spark.sql.parquet.compression.codec",
- valueConverter = v => v.toLowerCase,
- validValues = Set("uncompressed", "snappy", "gzip", "lzo"),
- defaultValue = Some("gzip"),
- doc = "Sets the compression codec use when writing Parquet files. Acceptable values include: " +
+ val PARQUET_COMPRESSION = SQLConfigBuilder("spark.sql.parquet.compression.codec")
+ .doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
"uncompressed, snappy, gzip, lzo.")
-
- val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
- defaultValue = Some(true),
- doc = "Enables Parquet filter push-down optimization when set to true.")
-
- val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
- key = "spark.sql.parquet.writeLegacyFormat",
- defaultValue = Some(false),
- doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
+ .stringConf
+ .transform(_.toLowerCase())
+ .checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
+ .createWithDefault("gzip")
+
+ val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
+ .doc("Enables Parquet filter push-down optimization when set to true.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PARQUET_WRITE_LEGACY_FORMAT = SQLConfigBuilder("spark.sql.parquet.writeLegacyFormat")
+ .doc("Whether to follow Parquet's format specification when converting Parquet schema to " +
"Spark SQL schema and vice versa.")
+ .booleanConf
+ .createWithDefault(false)
- val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
- key = "spark.sql.parquet.output.committer.class",
- defaultValue = Some(classOf[ParquetOutputCommitter].getName),
- doc = "The output committer class used by Parquet. The specified class needs to be a " +
+ val PARQUET_OUTPUT_COMMITTER_CLASS = SQLConfigBuilder("spark.sql.parquet.output.committer.class")
+ .doc("The output committer class used by Parquet. The specified class needs to be a " +
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
"option must be set in Hadoop Configuration. 2. This option overrides " +
"\"spark.sql.sources.outputCommitterClass\".")
-
- val PARQUET_VECTORIZED_READER_ENABLED = booleanConf(
- key = "spark.sql.parquet.enableVectorizedReader",
- defaultValue = Some(true),
- doc = "Enables vectorized parquet decoding.")
-
- val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
- defaultValue = Some(false),
- doc = "When true, enable filter pushdown for ORC files.")
-
- val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
- defaultValue = Some(false),
- doc = "When true, check all the partition paths under the table\'s root directory " +
- "when reading data stored in HDFS.")
-
- val HIVE_METASTORE_PARTITION_PRUNING = booleanConf("spark.sql.hive.metastorePartitionPruning",
- defaultValue = Some(false),
- doc = "When true, some predicates will be pushed down into the Hive metastore so that " +
- "unmatching partitions can be eliminated earlier.")
-
- val NATIVE_VIEW = booleanConf("spark.sql.nativeView",
- defaultValue = Some(true),
- doc = "When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
- "Note that this function is experimental and should ony be used when you are using " +
- "non-hive-compatible tables written by Spark SQL. The SQL string used to create " +
- "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
- "possible, or you may get wrong result.",
- isPublic = false)
-
- val CANONICAL_NATIVE_VIEW = booleanConf("spark.sql.nativeView.canonical",
- defaultValue = Some(true),
- doc = "When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
- "CREATE VIEW statement using SQL query string generated from view definition logical " +
- "plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
- "original native view implementation.",
- isPublic = false)
-
- val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
- defaultValue = Some("_corrupt_record"),
- doc = "The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
-
- val BROADCAST_TIMEOUT = intConf("spark.sql.broadcastTimeout",
- defaultValue = Some(5 * 60),
- doc = "Timeout in seconds for the broadcast wait time in broadcast joins.")
+ .stringConf
+ .createWithDefault(classOf[ParquetOutputCommitter].getName)
+
+ val PARQUET_VECTORIZED_READER_ENABLED =
+ SQLConfigBuilder("spark.sql.parquet.enableVectorizedReader")
+ .doc("Enables vectorized parquet decoding.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown")
+ .doc("When true, enable filter pushdown for ORC files.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val HIVE_VERIFY_PARTITION_PATH = SQLConfigBuilder("spark.sql.hive.verifyPartitionPath")
+ .doc("When true, check all the partition paths under the table\'s root directory " +
+ "when reading data stored in HDFS.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val HIVE_METASTORE_PARTITION_PRUNING =
+ SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning")
+ .doc("When true, some predicates will be pushed down into the Hive metastore so that " +
+ "unmatching partitions can be eliminated earlier.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView")
+ .internal()
+ .doc("When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
+ "Note that this function is experimental and should ony be used when you are using " +
+ "non-hive-compatible tables written by Spark SQL. The SQL string used to create " +
+ "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
+ "possible, or you may get wrong result.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val CANONICAL_NATIVE_VIEW = SQLConfigBuilder("spark.sql.nativeView.canonical")
+ .internal()
+ .doc("When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
+ "CREATE VIEW statement using SQL query string generated from view definition logical " +
+ "plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
+ "original native view implementation.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val COLUMN_NAME_OF_CORRUPT_RECORD = SQLConfigBuilder("spark.sql.columnNameOfCorruptRecord")
+ .doc("The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
+ .stringConf
+ .createWithDefault("_corrupt_record")
+
+ val BROADCAST_TIMEOUT = SQLConfigBuilder("spark.sql.broadcastTimeout")
+ .doc("Timeout in seconds for the broadcast wait time in broadcast joins.")
+ .intConf
+ .createWithDefault(5 * 60)
// This is only used for the thriftserver
- val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
- doc = "Set a Fair Scheduler pool for a JDBC client session.")
-
- val THRIFTSERVER_UI_STATEMENT_LIMIT = intConf("spark.sql.thriftserver.ui.retainedStatements",
- defaultValue = Some(200),
- doc = "The number of SQL statements kept in the JDBC/ODBC web UI history.")
-
- val THRIFTSERVER_UI_SESSION_LIMIT = intConf("spark.sql.thriftserver.ui.retainedSessions",
- defaultValue = Some(200),
- doc = "The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
+ val THRIFTSERVER_POOL = SQLConfigBuilder("spark.sql.thriftserver.scheduler.pool")
+ .doc("Set a Fair Scheduler pool for a JDBC client session.")
+ .stringConf
+ .createOptional
+
+ val THRIFTSERVER_UI_STATEMENT_LIMIT =
+ SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements")
+ .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
+ .intConf
+ .createWithDefault(200)
+
+ val THRIFTSERVER_UI_SESSION_LIMIT = SQLConfigBuilder("spark.sql.thriftserver.ui.retainedSessions")
+ .doc("The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
+ .intConf
+ .createWithDefault(200)
// This is used to set the default data source
- val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
- defaultValue = Some("org.apache.spark.sql.parquet"),
- doc = "The default data source to use in input/output.")
+ val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
+ .doc("The default data source to use in input/output.")
+ .stringConf
+ .createWithDefault("org.apache.spark.sql.parquet")
// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
// to its length exceeds the threshold.
- val SCHEMA_STRING_LENGTH_THRESHOLD = intConf("spark.sql.sources.schemaStringLengthThreshold",
- defaultValue = Some(4000),
- doc = "The maximum length allowed in a single cell when " +
- "storing additional schema information in Hive's metastore.",
- isPublic = false)
-
- val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
- defaultValue = Some(true),
- doc = "When true, automatically discover data partitions.")
+ val SCHEMA_STRING_LENGTH_THRESHOLD =
+ SQLConfigBuilder("spark.sql.sources.schemaStringLengthThreshold")
+ .doc("The maximum length allowed in a single cell when " +
+ "storing additional schema information in Hive's metastore.")
+ .internal()
+ .intConf
+ .createWithDefault(4000)
+
+ val PARTITION_DISCOVERY_ENABLED = SQLConfigBuilder("spark.sql.sources.partitionDiscovery.enabled")
+ .doc("When true, automatically discover data partitions.")
+ .booleanConf
+ .createWithDefault(true)
val PARTITION_COLUMN_TYPE_INFERENCE =
- booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
- defaultValue = Some(true),
- doc = "When true, automatically infer the data types for partitioned columns.")
+ SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
+ .doc("When true, automatically infer the data types for partitioned columns.")
+ .booleanConf
+ .createWithDefault(true)
val PARTITION_MAX_FILES =
- intConf("spark.sql.sources.maxConcurrentWrites",
- defaultValue = Some(1),
- doc = "The maximum number of concurrent files to open before falling back on sorting when " +
+ SQLConfigBuilder("spark.sql.sources.maxConcurrentWrites")
+ .doc("The maximum number of concurrent files to open before falling back on sorting when " +
"writing out files using dynamic partitioning.")
-
- val BUCKETING_ENABLED = booleanConf("spark.sql.sources.bucketing.enabled",
- defaultValue = Some(true),
- doc = "When false, we will treat bucketed table as normal table.")
-
- val ORDER_BY_ORDINAL = booleanConf("spark.sql.orderByOrdinal",
- defaultValue = Some(true),
- doc = "When true, the ordinal numbers are treated as the position in the select list. " +
- "When false, the ordinal numbers in order/sort By clause are ignored.")
-
- val GROUP_BY_ORDINAL = booleanConf("spark.sql.groupByOrdinal",
- defaultValue = Some(true),
- doc = "When true, the ordinal numbers in group by clauses are treated as the position " +
+ .intConf
+ .createWithDefault(1)
+
+ val BUCKETING_ENABLED = SQLConfigBuilder("spark.sql.sources.bucketing.enabled")
+ .doc("When false, we will treat bucketed table as normal table")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ORDER_BY_ORDINAL = SQLConfigBuilder("spark.sql.orderByOrdinal")
+ .doc("When true, the ordinal numbers are treated as the position in the select list. " +
+ "When false, the ordinal numbers in order/sort By clause are ignored.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val GROUP_BY_ORDINAL = SQLConfigBuilder("spark.sql.groupByOrdinal")
+ .doc("When true, the ordinal numbers in group by clauses are treated as the position " +
"in the select list. When false, the ordinal numbers are ignored.")
+ .booleanConf
+ .createWithDefault(true)
// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
@@ -457,89 +346,95 @@ object SQLConf {
// 1. Instead of SQLConf, this option *must be set in Hadoop Configuration*.
// 2. This option can be overridden by "spark.sql.parquet.output.committer.class".
val OUTPUT_COMMITTER_CLASS =
- stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
+ SQLConfigBuilder("spark.sql.sources.outputCommitterClass").internal().stringConf.createOptional
- val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = intConf(
- key = "spark.sql.sources.parallelPartitionDiscovery.threshold",
- defaultValue = Some(32),
- doc = "The degree of parallelism for schema merging and partition discovery of " +
- "Parquet data sources.")
+ val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
+ SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
+ .doc("The degree of parallelism for schema merging and partition discovery of " +
+ "Parquet data sources.")
+ .intConf
+ .createWithDefault(32)
// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
- val DATAFRAME_EAGER_ANALYSIS = booleanConf(
- "spark.sql.eagerAnalysis",
- defaultValue = Some(true),
- doc = "When true, eagerly applies query analysis on DataFrame operations.",
- isPublic = false)
+ val DATAFRAME_EAGER_ANALYSIS = SQLConfigBuilder("spark.sql.eagerAnalysis")
+ .internal()
+ .doc("When true, eagerly applies query analysis on DataFrame operations.")
+ .booleanConf
+ .createWithDefault(true)
// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
- val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = booleanConf(
- "spark.sql.selfJoinAutoResolveAmbiguity",
- defaultValue = Some(true),
- isPublic = false)
+ val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
+ SQLConfigBuilder("spark.sql.selfJoinAutoResolveAmbiguity")
+ .internal()
+ .booleanConf
+ .createWithDefault(true)
// Whether to retain group by columns or not in GroupedData.agg.
- val DATAFRAME_RETAIN_GROUP_COLUMNS = booleanConf(
- "spark.sql.retainGroupColumns",
- defaultValue = Some(true),
- isPublic = false)
-
- val DATAFRAME_PIVOT_MAX_VALUES = intConf(
- "spark.sql.pivotMaxValues",
- defaultValue = Some(10000),
- doc = "When doing a pivot without specifying values for the pivot column this is the maximum " +
- "number of (distinct) values that will be collected without error."
- )
-
- val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles",
- defaultValue = Some(true),
- isPublic = false,
- doc = "When true, we could use `datasource`.`path` as table in SQL query."
- )
-
- val WHOLESTAGE_CODEGEN_ENABLED = booleanConf("spark.sql.codegen.wholeStage",
- defaultValue = Some(true),
- doc = "When true, the whole stage (of multiple operators) will be compiled into single java" +
- " method.",
- isPublic = false)
-
- val FILES_MAX_PARTITION_BYTES = longConf("spark.sql.files.maxPartitionBytes",
- defaultValue = Some(128 * 1024 * 1024), // parquet.block.size
- doc = "The maximum number of bytes to pack into a single partition when reading files.",
- isPublic = true)
-
- val FILES_OPEN_COST_IN_BYTES = longConf("spark.sql.files.openCostInBytes",
- defaultValue = Some(4 * 1024 * 1024),
- doc = "The estimated cost to open a file, measured by the number of bytes could be scanned in" +
+ val DATAFRAME_RETAIN_GROUP_COLUMNS = SQLConfigBuilder("spark.sql.retainGroupColumns")
+ .internal()
+ .booleanConf
+ .createWithDefault(true)
+
+ val DATAFRAME_PIVOT_MAX_VALUES = SQLConfigBuilder("spark.sql.pivotMaxValues")
+ .doc("When doing a pivot without specifying values for the pivot column this is the maximum " +
+ "number of (distinct) values that will be collected without error.")
+ .intConf
+ .createWithDefault(10000)
+
+ val RUN_SQL_ON_FILES = SQLConfigBuilder("spark.sql.runSQLOnFiles")
+ .internal()
+ .doc("When true, we could use `datasource`.`path` as table in SQL query.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val WHOLESTAGE_CODEGEN_ENABLED = SQLConfigBuilder("spark.sql.codegen.wholeStage")
+ .internal()
+ .doc("When true, the whole stage (of multiple operators) will be compiled into single java" +
+ " method.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes")
+ .doc("The maximum number of bytes to pack into a single partition when reading files.")
+ .longConf
+ .createWithDefault(128 * 1024 * 1024) // parquet.block.size
+
+ val FILES_OPEN_COST_IN_BYTES = SQLConfigBuilder("spark.sql.files.openCostInBytes")
+ .internal()
+ .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
" the same time. This is used when putting multiple files into a partition. It's better to" +
" over estimated, then the partitions with small files will be faster than partitions with" +
- " bigger files (which is scheduled first).",
- isPublic = false)
-
- val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
- defaultValue = Some(true),
- doc = "When true, the planner will try to find out duplicated exchanges and re-use them.",
- isPublic = false)
-
- val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT = intConf(
- "spark.sql.streaming.stateStore.minDeltasForSnapshot",
- defaultValue = Some(10),
- doc = "Minimum number of state store delta files that needs to be generated before they " +
- "consolidated into snapshots.",
- isPublic = false)
-
- val STATE_STORE_MIN_VERSIONS_TO_RETAIN = intConf(
- "spark.sql.streaming.stateStore.minBatchesToRetain",
- defaultValue = Some(2),
- doc = "Minimum number of versions of a state store's data to retain after cleaning.",
- isPublic = false)
-
- val CHECKPOINT_LOCATION = stringConf("spark.sql.streaming.checkpointLocation",
- defaultValue = None,
- doc = "The default location for storing checkpoint data for continuously executing queries.",
- isPublic = true)
+ " bigger files (which is scheduled first).")
+ .longConf
+ .createWithDefault(4 * 1024 * 1024)
+
+ val EXCHANGE_REUSE_ENABLED = SQLConfigBuilder("spark.sql.exchange.reuse")
+ .internal()
+ .doc("When true, the planner will try to find out duplicated exchanges and re-use them.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
+ SQLConfigBuilder("spark.sql.streaming.stateStore.minDeltasForSnapshot")
+ .internal()
+ .doc("Minimum number of state store delta files that needs to be generated before they " +
+ "consolidated into snapshots.")
+ .intConf
+ .createWithDefault(10)
+
+ val STATE_STORE_MIN_VERSIONS_TO_RETAIN =
+ SQLConfigBuilder("spark.sql.streaming.stateStore.minBatchesToRetain")
+ .internal()
+ .doc("Minimum number of versions of a state store's data to retain after cleaning.")
+ .intConf
+ .createWithDefault(2)
+
+ val CHECKPOINT_LOCATION = SQLConfigBuilder("spark.sql.streaming.checkpointLocation")
+ .doc("The default location for storing checkpoint data for continuously executing queries.")
+ .stringConf
+ .createOptional
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -562,7 +457,7 @@ object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
-class SQLConf extends Serializable with CatalystConf with Logging {
+private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
import SQLConf._
/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
@@ -686,7 +581,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
}
/** Set the given Spark SQL configuration property. */
- def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+ def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
require(entry != null, "entry cannot be null")
require(value != null, s"value cannot be null for key: ${entry.key}")
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
@@ -706,25 +601,35 @@ class SQLConf extends Serializable with CatalystConf with Logging {
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+ * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
* desired one.
*/
- def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+ def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue)
}
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue` in [[SQLConfEntry]].
+ * yet, return `defaultValue` in [[ConfigEntry]].
*/
- def getConf[T](entry: SQLConfEntry[T]): T = {
+ def getConf[T](entry: ConfigEntry[T]): T = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
Option(settings.get(entry.key)).map(entry.valueConverter).orElse(entry.defaultValue).
getOrElse(throw new NoSuchElementException(entry.key))
}
/**
+ * Return the value of an optional Spark SQL configuration property for the given key. If the key
+ * is not set yet, throw an exception.
+ */
+ def getConf[T](entry: OptionalConfigEntry[T]): T = {
+ require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
+ Option(settings.get(entry.key)).map(entry.rawValueConverter).
+ getOrElse(throw new NoSuchElementException(entry.key))
+ }
+
+ /**
* Return the `string` value of Spark SQL configuration property for the given key. If the key is
* not set yet, return `defaultValue`.
*/
@@ -765,7 +670,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
settings.remove(key)
}
- def unsetConf(entry: SQLConfEntry[_]): Unit = {
+ private[spark] def unsetConf(entry: ConfigEntry[_]): Unit = {
settings.remove(entry.key)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
index 2b89fa9f23..cc69199139 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
@@ -26,7 +26,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("intConf") {
val key = "spark.sql.SQLConfEntrySuite.int"
- val confEntry = SQLConfEntry.intConf(key)
+ val confEntry = SQLConfigBuilder(key).intConf.createWithDefault(1)
assert(conf.getConf(confEntry, 5) === 5)
conf.setConf(confEntry, 10)
@@ -45,7 +45,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("longConf") {
val key = "spark.sql.SQLConfEntrySuite.long"
- val confEntry = SQLConfEntry.longConf(key)
+ val confEntry = SQLConfigBuilder(key).longConf.createWithDefault(1L)
assert(conf.getConf(confEntry, 5L) === 5L)
conf.setConf(confEntry, 10L)
@@ -64,7 +64,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("booleanConf") {
val key = "spark.sql.SQLConfEntrySuite.boolean"
- val confEntry = SQLConfEntry.booleanConf(key)
+ val confEntry = SQLConfigBuilder(key).booleanConf.createWithDefault(true)
assert(conf.getConf(confEntry, false) === false)
conf.setConf(confEntry, true)
@@ -83,7 +83,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("doubleConf") {
val key = "spark.sql.SQLConfEntrySuite.double"
- val confEntry = SQLConfEntry.doubleConf(key)
+ val confEntry = SQLConfigBuilder(key).doubleConf.createWithDefault(1d)
assert(conf.getConf(confEntry, 5.0) === 5.0)
conf.setConf(confEntry, 10.0)
@@ -102,7 +102,7 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("stringConf") {
val key = "spark.sql.SQLConfEntrySuite.string"
- val confEntry = SQLConfEntry.stringConf(key)
+ val confEntry = SQLConfigBuilder(key).stringConf.createWithDefault(null)
assert(conf.getConf(confEntry, "abc") === "abc")
conf.setConf(confEntry, "abcd")
@@ -116,7 +116,10 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("enumConf") {
val key = "spark.sql.SQLConfEntrySuite.enum"
- val confEntry = SQLConfEntry.enumConf(key, v => v, Set("a", "b", "c"), defaultValue = Some("a"))
+ val confEntry = SQLConfigBuilder(key)
+ .stringConf
+ .checkValues(Set("a", "b", "c"))
+ .createWithDefault("a")
assert(conf.getConf(confEntry) === "a")
conf.setConf(confEntry, "b")
@@ -135,8 +138,10 @@ class SQLConfEntrySuite extends SparkFunSuite {
test("stringSeqConf") {
val key = "spark.sql.SQLConfEntrySuite.stringSeq"
- val confEntry = SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq",
- defaultValue = Some(Nil))
+ val confEntry = SQLConfigBuilder(key)
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c"))
conf.setConf(confEntry, Seq("a", "b", "c", "d"))
@@ -147,4 +152,12 @@ class SQLConfEntrySuite extends SparkFunSuite {
assert(conf.getConfString(key) === "a,b,c,d,e")
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e"))
}
+
+ test("duplicate entry") {
+ val key = "spark.sql.SQLConfEntrySuite.duplicate"
+ SQLConfigBuilder(key).stringConf.createOptional
+ intercept[IllegalArgumentException] {
+ SQLConfigBuilder(key).stringConf.createOptional
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index e944d328a3..e687e6a5ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -119,15 +119,10 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
}
intercept[IllegalArgumentException] {
- // This value less than Int.MinValue
+ // This value less than Long.MinValue
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g")
}
- // Test invalid input
- intercept[IllegalArgumentException] {
- // This value exceeds Long.MaxValue
- sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g")
- }
sqlContext.conf.clear()
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 073b954a5f..505e5c0bb6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis._
@@ -54,8 +55,7 @@ import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
-import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -318,7 +318,7 @@ class HiveContext private[hive](
hiveconf.set(key, value)
}
- override private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+ override private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
setConf(entry.key, entry.stringConverter(value))
}
@@ -413,19 +413,19 @@ private[hive] object HiveContext extends Logging {
/** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "1.2.1"
- val HIVE_METASTORE_VERSION = stringConf("spark.sql.hive.metastore.version",
- defaultValue = Some(hiveExecutionVersion),
- doc = "Version of the Hive metastore. Available options are " +
+ val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
+ .doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
+ .stringConf
+ .createWithDefault(hiveExecutionVersion)
- val HIVE_EXECUTION_VERSION = stringConf(
- key = "spark.sql.hive.version",
- defaultValue = Some(hiveExecutionVersion),
- doc = "Version of Hive used internally by Spark SQL.")
+ val HIVE_EXECUTION_VERSION = SQLConfigBuilder("spark.sql.hive.version")
+ .doc("Version of Hive used internally by Spark SQL.")
+ .stringConf
+ .createWithDefault(hiveExecutionVersion)
- val HIVE_METASTORE_JARS = stringConf("spark.sql.hive.metastore.jars",
- defaultValue = Some("builtin"),
- doc = s"""
+ val HIVE_METASTORE_JARS = SQLConfigBuilder("spark.sql.hive.metastore.jars")
+ .doc(s"""
| Location of the jars that should be used to instantiate the HiveMetastoreClient.
| This property can be one of three options: "
| 1. "builtin"
@@ -436,49 +436,61 @@ private[hive] object HiveContext extends Logging {
| 2. "maven"
| Use Hive jars of specified version downloaded from Maven repositories.
| 3. A classpath in the standard format for both Hive and Hadoop.
- """.stripMargin)
- val CONVERT_METASTORE_PARQUET = booleanConf("spark.sql.hive.convertMetastoreParquet",
- defaultValue = Some(true),
- doc = "When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
- "the built in support.")
-
- val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING = booleanConf(
- "spark.sql.hive.convertMetastoreParquet.mergeSchema",
- defaultValue = Some(false),
- doc = "When true, also tries to merge possibly different but compatible Parquet schemas in " +
- "different Parquet data files. This configuration is only effective " +
- "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
+ """.stripMargin)
+ .stringConf
+ .createWithDefault("builtin")
- val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc",
- defaultValue = Some(true),
- doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
+ val CONVERT_METASTORE_PARQUET = SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet")
+ .doc("When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
"the built in support.")
-
- val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS",
- defaultValue = Some(false),
- doc = "When true, a table created by a Hive CTAS statement (no USING clause) will be " +
+ .booleanConf
+ .createWithDefault(true)
+
+ val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING =
+ SQLConfigBuilder("spark.sql.hive.convertMetastoreParquet.mergeSchema")
+ .doc("When true, also tries to merge possibly different but compatible Parquet schemas in " +
+ "different Parquet data files. This configuration is only effective " +
+ "when \"spark.sql.hive.convertMetastoreParquet\" is true.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val CONVERT_CTAS = SQLConfigBuilder("spark.sql.hive.convertCTAS")
+ .doc("When true, a table created by a Hive CTAS statement (no USING clause) will be " +
"converted to a data source table, using the data source set by spark.sql.sources.default.")
+ .booleanConf
+ .createWithDefault(false)
- val HIVE_METASTORE_SHARED_PREFIXES = stringSeqConf("spark.sql.hive.metastore.sharedPrefixes",
- defaultValue = Some(jdbcPrefixes),
- doc = "A comma separated list of class prefixes that should be loaded using the classloader " +
+ val CONVERT_METASTORE_ORC = SQLConfigBuilder("spark.sql.hive.convertMetastoreOrc")
+ .doc("When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " +
+ "the built in support.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val HIVE_METASTORE_SHARED_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.sharedPrefixes")
+ .doc("A comma separated list of class prefixes that should be loaded using the classloader " +
"that is shared between Spark SQL and a specific version of Hive. An example of classes " +
"that should be shared is JDBC drivers that are needed to talk to the metastore. Other " +
"classes that need to be shared are those that interact with classes that are already " +
"shared. For example, custom appenders that are used by log4j.")
+ .stringConf
+ .toSequence
+ .createWithDefault(jdbcPrefixes)
private def jdbcPrefixes = Seq(
"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc")
- val HIVE_METASTORE_BARRIER_PREFIXES = stringSeqConf("spark.sql.hive.metastore.barrierPrefixes",
- defaultValue = Some(Seq()),
- doc = "A comma separated list of class prefixes that should explicitly be reloaded for each " +
+ val HIVE_METASTORE_BARRIER_PREFIXES = SQLConfigBuilder("spark.sql.hive.metastore.barrierPrefixes")
+ .doc("A comma separated list of class prefixes that should explicitly be reloaded for each " +
"version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are " +
"declared in a prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>).")
-
- val HIVE_THRIFT_SERVER_ASYNC = booleanConf("spark.sql.hive.thriftServer.async",
- defaultValue = Some(true),
- doc = "When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ val HIVE_THRIFT_SERVER_ASYNC = SQLConfigBuilder("spark.sql.hive.thriftServer.async")
+ .doc("When set to true, Hive Thrift server executes SQL queries in an asynchronous way.")
+ .booleanConf
+ .createWithDefault(true)
/**
* The version of the hive client that will be used to communicate with the metastore. Note that
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 5188a3e229..8d576bebb0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -31,82 +31,82 @@ package object config {
"in YARN Application Reports, which can be used for filtering when querying YARN.")
.stringConf
.toSequence
- .optional
+ .createOptional
private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
.doc("Interval after which AM failures will be considered independent and " +
"not accumulate towards the attempt count.")
.timeConf(TimeUnit.MILLISECONDS)
- .optional
+ .createOptional
private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts")
.doc("Maximum number of AM attempts before failing the app.")
.intConf
- .optional
+ .createOptional
private[spark] val USER_CLASS_PATH_FIRST = ConfigBuilder("spark.yarn.user.classpath.first")
.doc("Whether to place user jars in front of Spark's classpath.")
.booleanConf
- .withDefault(false)
+ .createWithDefault(false)
private[spark] val GATEWAY_ROOT_PATH = ConfigBuilder("spark.yarn.config.gatewayPath")
.doc("Root of configuration paths that is present on gateway nodes, and will be replaced " +
"with the corresponding path in cluster machines.")
.stringConf
- .withDefault(null)
+ .createWithDefault(null)
private[spark] val REPLACEMENT_ROOT_PATH = ConfigBuilder("spark.yarn.config.replacementPath")
.doc(s"Path to use as a replacement for ${GATEWAY_ROOT_PATH.key} when launching processes " +
"in the YARN cluster.")
.stringConf
- .withDefault(null)
+ .createWithDefault(null)
private[spark] val QUEUE_NAME = ConfigBuilder("spark.yarn.queue")
.stringConf
- .withDefault("default")
+ .createWithDefault("default")
private[spark] val HISTORY_SERVER_ADDRESS = ConfigBuilder("spark.yarn.historyServer.address")
.stringConf
- .optional
+ .createOptional
/* File distribution. */
private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
.doc("Location of archive containing jars files with Spark classes.")
.stringConf
- .optional
+ .createOptional
private[spark] val SPARK_JARS = ConfigBuilder("spark.yarn.jars")
.doc("Location of jars containing Spark classes.")
.stringConf
.toSequence
- .optional
+ .createOptional
private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
.stringConf
.toSequence
- .withDefault(Nil)
+ .createWithDefault(Nil)
private[spark] val FILES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.files")
.stringConf
.toSequence
- .withDefault(Nil)
+ .createWithDefault(Nil)
private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars")
.stringConf
.toSequence
- .withDefault(Nil)
+ .createWithDefault(Nil)
private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files")
.doc("Whether to preserve temporary files created by the job in HDFS.")
.booleanConf
- .withDefault(false)
+ .createWithDefault(false)
private[spark] val STAGING_FILE_REPLICATION = ConfigBuilder("spark.yarn.submit.file.replication")
.doc("Replication factor for files uploaded by Spark to HDFS.")
.intConf
- .optional
+ .createOptional
private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
@@ -119,146 +119,146 @@ package object config {
.doc("In cluster mode, whether to wait for the application to finish before exiting the " +
"launcher process.")
.booleanConf
- .withDefault(true)
+ .createWithDefault(true)
private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval")
.doc("Interval between reports of the current app status in cluster mode.")
.timeConf(TimeUnit.MILLISECONDS)
- .withDefaultString("1s")
+ .createWithDefaultString("1s")
/* Shared Client-mode AM / Driver configuration. */
private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
.timeConf(TimeUnit.MILLISECONDS)
- .withDefaultString("100s")
+ .createWithDefaultString("100s")
private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression")
.doc("Node label expression for the AM.")
.stringConf
- .optional
+ .createOptional
private[spark] val CONTAINER_LAUNCH_MAX_THREADS =
ConfigBuilder("spark.yarn.containerLauncherMaxThreads")
.intConf
- .withDefault(25)
+ .createWithDefault(25)
private[spark] val MAX_EXECUTOR_FAILURES = ConfigBuilder("spark.yarn.max.executor.failures")
.intConf
- .optional
+ .createOptional
private[spark] val MAX_REPORTER_THREAD_FAILURES =
ConfigBuilder("spark.yarn.scheduler.reporterThread.maxFailures")
.intConf
- .withDefault(5)
+ .createWithDefault(5)
private[spark] val RM_HEARTBEAT_INTERVAL =
ConfigBuilder("spark.yarn.scheduler.heartbeat.interval-ms")
.timeConf(TimeUnit.MILLISECONDS)
- .withDefaultString("3s")
+ .createWithDefaultString("3s")
private[spark] val INITIAL_HEARTBEAT_INTERVAL =
ConfigBuilder("spark.yarn.scheduler.initial-allocation.interval")
.timeConf(TimeUnit.MILLISECONDS)
- .withDefaultString("200ms")
+ .createWithDefaultString("200ms")
private[spark] val SCHEDULER_SERVICES = ConfigBuilder("spark.yarn.services")
.doc("A comma-separated list of class names of services to add to the scheduler.")
.stringConf
.toSequence
- .withDefault(Nil)
+ .createWithDefault(Nil)
/* Client-mode AM configuration. */
private[spark] val AM_CORES = ConfigBuilder("spark.yarn.am.cores")
.intConf
- .withDefault(1)
+ .createWithDefault(1)
private[spark] val AM_JAVA_OPTIONS = ConfigBuilder("spark.yarn.am.extraJavaOptions")
.doc("Extra Java options for the client-mode AM.")
.stringConf
- .optional
+ .createOptional
private[spark] val AM_LIBRARY_PATH = ConfigBuilder("spark.yarn.am.extraLibraryPath")
.doc("Extra native library path for the client-mode AM.")
.stringConf
- .optional
+ .createOptional
private[spark] val AM_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.am.memoryOverhead")
.bytesConf(ByteUnit.MiB)
- .optional
+ .createOptional
private[spark] val AM_MEMORY = ConfigBuilder("spark.yarn.am.memory")
.bytesConf(ByteUnit.MiB)
- .withDefaultString("512m")
+ .createWithDefaultString("512m")
/* Driver configuration. */
private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
.intConf
- .withDefault(1)
+ .createWithDefault(1)
private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead")
.bytesConf(ByteUnit.MiB)
- .optional
+ .createOptional
/* Executor configuration. */
private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
.intConf
- .withDefault(1)
+ .createWithDefault(1)
private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
.bytesConf(ByteUnit.MiB)
- .optional
+ .createOptional
private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
.doc("Node label expression for executors.")
.stringConf
- .optional
+ .createOptional
/* Security configuration. */
private[spark] val CREDENTIAL_FILE_MAX_COUNT =
ConfigBuilder("spark.yarn.credentials.file.retention.count")
.intConf
- .withDefault(5)
+ .createWithDefault(5)
private[spark] val CREDENTIALS_FILE_MAX_RETENTION =
ConfigBuilder("spark.yarn.credentials.file.retention.days")
.intConf
- .withDefault(5)
+ .createWithDefault(5)
private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes")
.doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " +
"fs.defaultFS does not need to be listed here.")
.stringConf
.toSequence
- .withDefault(Nil)
+ .createWithDefault(Nil)
private[spark] val TOKEN_RENEWAL_INTERVAL = ConfigBuilder("spark.yarn.token.renewal.interval")
- .internal
+ .internal()
.timeConf(TimeUnit.MILLISECONDS)
- .optional
+ .createOptional
/* Private configs. */
private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file")
- .internal
+ .internal()
.stringConf
- .withDefault(null)
+ .createWithDefault(null)
// Internal config to propagate the location of the user's jar to the driver/executors
private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar")
- .internal
+ .internal()
.stringConf
- .optional
+ .createOptional
// Internal config to propagate the locations of any extra jars to add to the classpath
// of the executors
private[spark] val SECONDARY_JARS = ConfigBuilder("spark.yarn.secondary.jars")
- .internal
+ .internal()
.stringConf
.toSequence
- .optional
+ .createOptional
}