aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-04-05 15:19:51 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-05 15:19:51 -0700
commitd5ee9d5c240fca5c15b21efc4a760b06a1f39fd6 (patch)
treeb0c0d55466cdd5678849cc32d914408f9dd84472 /core
parent7329fe272d3ead7db9bc3e1e32adb7329dabc607 (diff)
downloadspark-d5ee9d5c240fca5c15b21efc4a760b06a1f39fd6.tar.gz
spark-d5ee9d5c240fca5c15b21efc4a760b06a1f39fd6.tar.bz2
spark-d5ee9d5c240fca5c15b21efc4a760b06a1f39fd6.zip
[SPARK-529][SQL] Modify SQLConf to use new config API from core.
Because SQL keeps track of all known configs, some customization was needed in SQLConf to allow that, since the core API does not have that feature. Tested via existing (and slightly updated) unit tests. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #11570 from vanzin/SPARK-529-sql.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala44
-rw-r--r--core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala28
3 files changed, 69 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index 770b43697a..5d50e3851a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -85,10 +85,12 @@ private[spark] class TypedConfigBuilder[T](
this(parent, converter, Option(_).map(_.toString).orNull)
}
+ /** Apply a transformation to the user-provided values of the config entry. */
def transform(fn: T => T): TypedConfigBuilder[T] = {
new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
}
+ /** Check that user-provided values for the config match a pre-defined set. */
def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
transform { v =>
if (!validValues.contains(v)) {
@@ -99,30 +101,38 @@ private[spark] class TypedConfigBuilder[T](
}
}
+ /** Turns the config entry into a sequence of values of the underlying type. */
def toSequence: TypedConfigBuilder[Seq[T]] = {
new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter))
}
- /** Creates a [[ConfigEntry]] that does not require a default value. */
- def optional: OptionalConfigEntry[T] = {
- new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc, parent._public)
+ /** Creates a [[ConfigEntry]] that does not have a default value. */
+ def createOptional: OptionalConfigEntry[T] = {
+ val entry = new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc,
+ parent._public)
+ parent._onCreate.foreach(_(entry))
+ entry
}
/** Creates a [[ConfigEntry]] that has a default value. */
- def withDefault(default: T): ConfigEntry[T] = {
+ def createWithDefault(default: T): ConfigEntry[T] = {
val transformedDefault = converter(stringConverter(default))
- new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter, stringConverter,
- parent._doc, parent._public)
+ val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
+ stringConverter, parent._doc, parent._public)
+ parent._onCreate.foreach(_(entry))
+ entry
}
/**
* Creates a [[ConfigEntry]] that has a default value. The default value is provided as a
* [[String]] and must be a valid value for the entry.
*/
- def withDefaultString(default: String): ConfigEntry[T] = {
+ def createWithDefaultString(default: String): ConfigEntry[T] = {
val typedDefault = converter(default)
- new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter, parent._doc,
- parent._public)
+ val entry = new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter,
+ parent._doc, parent._public)
+ parent._onCreate.foreach(_(entry))
+ entry
}
}
@@ -136,10 +146,11 @@ private[spark] case class ConfigBuilder(key: String) {
import ConfigHelpers._
- var _public = true
- var _doc = ""
+ private[config] var _public = true
+ private[config] var _doc = ""
+ private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
- def internal: ConfigBuilder = {
+ def internal(): ConfigBuilder = {
_public = false
this
}
@@ -149,6 +160,15 @@ private[spark] case class ConfigBuilder(key: String) {
this
}
+ /**
+ * Registers a callback for when the config entry is finally instantiated. Currently used by
+ * SQLConf to keep track of SQL configuration entries.
+ */
+ def onCreate(callback: ConfigEntry[_] => Unit): ConfigBuilder = {
+ _onCreate = Option(callback)
+ this
+ }
+
def intConf: TypedConfigBuilder[Int] = {
new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 968c5192ac..94b50ee065 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -23,68 +23,70 @@ import org.apache.spark.network.util.ByteUnit
package object config {
private[spark] val DRIVER_CLASS_PATH =
- ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.optional
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional
private[spark] val DRIVER_JAVA_OPTIONS =
- ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.optional
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional
private[spark] val DRIVER_LIBRARY_PATH =
- ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.optional
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.createOptional
private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
- ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false)
+ ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)
private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
.bytesConf(ByteUnit.MiB)
- .withDefaultString("1g")
+ .createWithDefaultString("1g")
private[spark] val EXECUTOR_CLASS_PATH =
- ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
private[spark] val EXECUTOR_JAVA_OPTIONS =
- ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.optional
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
private[spark] val EXECUTOR_LIBRARY_PATH =
- ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.optional
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.createOptional
private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
- ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false)
+ ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false)
private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
.bytesConf(ByteUnit.MiB)
- .withDefaultString("1g")
+ .createWithDefaultString("1g")
- private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal
- .booleanConf.withDefault(false)
+ private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
+ .booleanConf.createWithDefault(false)
- private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.withDefault(1)
+ private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1)
private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
- ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.withDefault(0)
+ ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0)
private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS =
ConfigBuilder("spark.dynamicAllocation.initialExecutors")
.fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS)
private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
- ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.withDefault(Int.MaxValue)
+ ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)
private[spark] val SHUFFLE_SERVICE_ENABLED =
- ConfigBuilder("spark.shuffle.service.enabled").booleanConf.withDefault(false)
+ ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
.doc("Location of user's keytab.")
- .stringConf.optional
+ .stringConf.createOptional
private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal")
.doc("Name of the Kerberos principal.")
- .stringConf.optional
+ .stringConf.createOptional
- private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional
+ private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
+ .intConf
+ .createOptional
private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
- .internal
+ .internal()
.stringConf
.toSequence
- .withDefault(Nil)
+ .createWithDefault(Nil)
}
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
index 0644148eae..337fd7e85e 100644
--- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -26,7 +26,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: int") {
val conf = new SparkConf()
- val iConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+ val iConf = ConfigBuilder("spark.int").intConf.createWithDefault(1)
assert(conf.get(iConf) === 1)
conf.set(iConf, 2)
assert(conf.get(iConf) === 2)
@@ -34,21 +34,21 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: long") {
val conf = new SparkConf()
- val lConf = ConfigBuilder("spark.long").longConf.withDefault(0L)
+ val lConf = ConfigBuilder("spark.long").longConf.createWithDefault(0L)
conf.set(lConf, 1234L)
assert(conf.get(lConf) === 1234L)
}
test("conf entry: double") {
val conf = new SparkConf()
- val dConf = ConfigBuilder("spark.double").doubleConf.withDefault(0.0)
+ val dConf = ConfigBuilder("spark.double").doubleConf.createWithDefault(0.0)
conf.set(dConf, 20.0)
assert(conf.get(dConf) === 20.0)
}
test("conf entry: boolean") {
val conf = new SparkConf()
- val bConf = ConfigBuilder("spark.boolean").booleanConf.withDefault(false)
+ val bConf = ConfigBuilder("spark.boolean").booleanConf.createWithDefault(false)
assert(!conf.get(bConf))
conf.set(bConf, true)
assert(conf.get(bConf))
@@ -56,7 +56,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: optional") {
val conf = new SparkConf()
- val optionalConf = ConfigBuilder("spark.optional").intConf.optional
+ val optionalConf = ConfigBuilder("spark.optional").intConf.createOptional
assert(conf.get(optionalConf) === None)
conf.set(optionalConf, 1)
assert(conf.get(optionalConf) === Some(1))
@@ -64,7 +64,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: fallback") {
val conf = new SparkConf()
- val parentConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+ val parentConf = ConfigBuilder("spark.int").intConf.createWithDefault(1)
val confWithFallback = ConfigBuilder("spark.fallback").fallbackConf(parentConf)
assert(conf.get(confWithFallback) === 1)
conf.set(confWithFallback, 2)
@@ -74,7 +74,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: time") {
val conf = new SparkConf()
- val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).withDefaultString("1h")
+ val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).createWithDefaultString("1h")
assert(conf.get(time) === 3600L)
conf.set(time.key, "1m")
assert(conf.get(time) === 60L)
@@ -82,7 +82,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: bytes") {
val conf = new SparkConf()
- val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).withDefaultString("1m")
+ val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).createWithDefaultString("1m")
assert(conf.get(bytes) === 1024L)
conf.set(bytes.key, "1k")
assert(conf.get(bytes) === 1L)
@@ -90,7 +90,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: string seq") {
val conf = new SparkConf()
- val seq = ConfigBuilder("spark.seq").stringConf.toSequence.withDefault(Seq())
+ val seq = ConfigBuilder("spark.seq").stringConf.toSequence.createWithDefault(Seq())
conf.set(seq.key, "1,,2, 3 , , 4")
assert(conf.get(seq) === Seq("1", "2", "3", "4"))
conf.set(seq, Seq("1", "2"))
@@ -99,7 +99,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: int seq") {
val conf = new SparkConf()
- val seq = ConfigBuilder("spark.seq").intConf.toSequence.withDefault(Seq())
+ val seq = ConfigBuilder("spark.seq").intConf.toSequence.createWithDefault(Seq())
conf.set(seq.key, "1,,2, 3 , , 4")
assert(conf.get(seq) === Seq(1, 2, 3, 4))
conf.set(seq, Seq(1, 2))
@@ -111,7 +111,7 @@ class ConfigEntrySuite extends SparkFunSuite {
val transformationConf = ConfigBuilder("spark.transformation")
.stringConf
.transform(_.toLowerCase())
- .withDefault("FOO")
+ .createWithDefault("FOO")
assert(conf.get(transformationConf) === "foo")
conf.set(transformationConf, "BAR")
@@ -123,7 +123,7 @@ class ConfigEntrySuite extends SparkFunSuite {
val enum = ConfigBuilder("spark.enum")
.stringConf
.checkValues(Set("a", "b", "c"))
- .withDefault("a")
+ .createWithDefault("a")
assert(conf.get(enum) === "a")
conf.set(enum, "b")
@@ -138,7 +138,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("conf entry: conversion error") {
val conf = new SparkConf()
- val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.optional
+ val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.createOptional
conf.set(conversionTest.key, "abc")
val conversionError = intercept[IllegalArgumentException] {
conf.get(conversionTest)
@@ -148,7 +148,7 @@ class ConfigEntrySuite extends SparkFunSuite {
test("default value handling is null-safe") {
val conf = new SparkConf()
- val stringConf = ConfigBuilder("spark.string").stringConf.withDefault(null)
+ val stringConf = ConfigBuilder("spark.string").stringConf.createWithDefault(null)
assert(conf.get(stringConf) === null)
}