diff options
Diffstat (limited to 'core')
3 files changed, 69 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala index 770b43697a..5d50e3851a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala @@ -85,10 +85,12 @@ private[spark] class TypedConfigBuilder[T]( this(parent, converter, Option(_).map(_.toString).orNull) } + /** Apply a transformation to the user-provided values of the config entry. */ def transform(fn: T => T): TypedConfigBuilder[T] = { new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter) } + /** Check that user-provided values for the config match a pre-defined set. */ def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = { transform { v => if (!validValues.contains(v)) { @@ -99,30 +101,38 @@ private[spark] class TypedConfigBuilder[T]( } } + /** Turns the config entry into a sequence of values of the underlying type. */ def toSequence: TypedConfigBuilder[Seq[T]] = { new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter)) } - /** Creates a [[ConfigEntry]] that does not require a default value. */ - def optional: OptionalConfigEntry[T] = { - new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc, parent._public) + /** Creates a [[ConfigEntry]] that does not have a default value. */ + def createOptional: OptionalConfigEntry[T] = { + val entry = new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc, + parent._public) + parent._onCreate.foreach(_(entry)) + entry } /** Creates a [[ConfigEntry]] that has a default value. */ - def withDefault(default: T): ConfigEntry[T] = { + def createWithDefault(default: T): ConfigEntry[T] = { val transformedDefault = converter(stringConverter(default)) - new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter, stringConverter, - parent._doc, parent._public) + val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter, + stringConverter, parent._doc, parent._public) + parent._onCreate.foreach(_(entry)) + entry } /** * Creates a [[ConfigEntry]] that has a default value. The default value is provided as a * [[String]] and must be a valid value for the entry. */ - def withDefaultString(default: String): ConfigEntry[T] = { + def createWithDefaultString(default: String): ConfigEntry[T] = { val typedDefault = converter(default) - new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter, parent._doc, - parent._public) + val entry = new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter, + parent._doc, parent._public) + parent._onCreate.foreach(_(entry)) + entry } } @@ -136,10 +146,11 @@ private[spark] case class ConfigBuilder(key: String) { import ConfigHelpers._ - var _public = true - var _doc = "" + private[config] var _public = true + private[config] var _doc = "" + private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None - def internal: ConfigBuilder = { + def internal(): ConfigBuilder = { _public = false this } @@ -149,6 +160,15 @@ private[spark] case class ConfigBuilder(key: String) { this } + /** + * Registers a callback for when the config entry is finally instantiated. Currently used by + * SQLConf to keep track of SQL configuration entries. + */ + def onCreate(callback: ConfigEntry[_] => Unit): ConfigBuilder = { + _onCreate = Option(callback) + this + } + def intConf: TypedConfigBuilder[Int] = { new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int")) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 968c5192ac..94b50ee065 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -23,68 +23,70 @@ import org.apache.spark.network.util.ByteUnit package object config { private[spark] val DRIVER_CLASS_PATH = - ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.optional + ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional private[spark] val DRIVER_JAVA_OPTIONS = - ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.optional + ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional private[spark] val DRIVER_LIBRARY_PATH = - ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.optional + ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.createOptional private[spark] val DRIVER_USER_CLASS_PATH_FIRST = - ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false) + ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false) private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") .bytesConf(ByteUnit.MiB) - .withDefaultString("1g") + .createWithDefaultString("1g") private[spark] val EXECUTOR_CLASS_PATH = - ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional + ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional private[spark] val EXECUTOR_JAVA_OPTIONS = - ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.optional + ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional private[spark] val EXECUTOR_LIBRARY_PATH = - ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.optional + ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.createOptional private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST = - ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false) + ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false) private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory") .bytesConf(ByteUnit.MiB) - .withDefaultString("1g") + .createWithDefaultString("1g") - private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal - .booleanConf.withDefault(false) + private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() + .booleanConf.createWithDefault(false) - private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.withDefault(1) + private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1) private[spark] val DYN_ALLOCATION_MIN_EXECUTORS = - ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.withDefault(0) + ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0) private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.initialExecutors") .fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS) private[spark] val DYN_ALLOCATION_MAX_EXECUTORS = - ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.withDefault(Int.MaxValue) + ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue) private[spark] val SHUFFLE_SERVICE_ENABLED = - ConfigBuilder("spark.shuffle.service.enabled").booleanConf.withDefault(false) + ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false) private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab") .doc("Location of user's keytab.") - .stringConf.optional + .stringConf.createOptional private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal") .doc("Name of the Kerberos principal.") - .stringConf.optional + .stringConf.createOptional - private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional + private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") + .intConf + .createOptional private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles") - .internal + .internal() .stringConf .toSequence - .withDefault(Nil) + .createWithDefault(Nil) } diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index 0644148eae..337fd7e85e 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -26,7 +26,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: int") { val conf = new SparkConf() - val iConf = ConfigBuilder("spark.int").intConf.withDefault(1) + val iConf = ConfigBuilder("spark.int").intConf.createWithDefault(1) assert(conf.get(iConf) === 1) conf.set(iConf, 2) assert(conf.get(iConf) === 2) @@ -34,21 +34,21 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: long") { val conf = new SparkConf() - val lConf = ConfigBuilder("spark.long").longConf.withDefault(0L) + val lConf = ConfigBuilder("spark.long").longConf.createWithDefault(0L) conf.set(lConf, 1234L) assert(conf.get(lConf) === 1234L) } test("conf entry: double") { val conf = new SparkConf() - val dConf = ConfigBuilder("spark.double").doubleConf.withDefault(0.0) + val dConf = ConfigBuilder("spark.double").doubleConf.createWithDefault(0.0) conf.set(dConf, 20.0) assert(conf.get(dConf) === 20.0) } test("conf entry: boolean") { val conf = new SparkConf() - val bConf = ConfigBuilder("spark.boolean").booleanConf.withDefault(false) + val bConf = ConfigBuilder("spark.boolean").booleanConf.createWithDefault(false) assert(!conf.get(bConf)) conf.set(bConf, true) assert(conf.get(bConf)) @@ -56,7 +56,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: optional") { val conf = new SparkConf() - val optionalConf = ConfigBuilder("spark.optional").intConf.optional + val optionalConf = ConfigBuilder("spark.optional").intConf.createOptional assert(conf.get(optionalConf) === None) conf.set(optionalConf, 1) assert(conf.get(optionalConf) === Some(1)) @@ -64,7 +64,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: fallback") { val conf = new SparkConf() - val parentConf = ConfigBuilder("spark.int").intConf.withDefault(1) + val parentConf = ConfigBuilder("spark.int").intConf.createWithDefault(1) val confWithFallback = ConfigBuilder("spark.fallback").fallbackConf(parentConf) assert(conf.get(confWithFallback) === 1) conf.set(confWithFallback, 2) @@ -74,7 +74,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: time") { val conf = new SparkConf() - val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).withDefaultString("1h") + val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).createWithDefaultString("1h") assert(conf.get(time) === 3600L) conf.set(time.key, "1m") assert(conf.get(time) === 60L) @@ -82,7 +82,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: bytes") { val conf = new SparkConf() - val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).withDefaultString("1m") + val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).createWithDefaultString("1m") assert(conf.get(bytes) === 1024L) conf.set(bytes.key, "1k") assert(conf.get(bytes) === 1L) @@ -90,7 +90,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: string seq") { val conf = new SparkConf() - val seq = ConfigBuilder("spark.seq").stringConf.toSequence.withDefault(Seq()) + val seq = ConfigBuilder("spark.seq").stringConf.toSequence.createWithDefault(Seq()) conf.set(seq.key, "1,,2, 3 , , 4") assert(conf.get(seq) === Seq("1", "2", "3", "4")) conf.set(seq, Seq("1", "2")) @@ -99,7 +99,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: int seq") { val conf = new SparkConf() - val seq = ConfigBuilder("spark.seq").intConf.toSequence.withDefault(Seq()) + val seq = ConfigBuilder("spark.seq").intConf.toSequence.createWithDefault(Seq()) conf.set(seq.key, "1,,2, 3 , , 4") assert(conf.get(seq) === Seq(1, 2, 3, 4)) conf.set(seq, Seq(1, 2)) @@ -111,7 +111,7 @@ class ConfigEntrySuite extends SparkFunSuite { val transformationConf = ConfigBuilder("spark.transformation") .stringConf .transform(_.toLowerCase()) - .withDefault("FOO") + .createWithDefault("FOO") assert(conf.get(transformationConf) === "foo") conf.set(transformationConf, "BAR") @@ -123,7 +123,7 @@ class ConfigEntrySuite extends SparkFunSuite { val enum = ConfigBuilder("spark.enum") .stringConf .checkValues(Set("a", "b", "c")) - .withDefault("a") + .createWithDefault("a") assert(conf.get(enum) === "a") conf.set(enum, "b") @@ -138,7 +138,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: conversion error") { val conf = new SparkConf() - val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.optional + val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.createOptional conf.set(conversionTest.key, "abc") val conversionError = intercept[IllegalArgumentException] { conf.get(conversionTest) @@ -148,7 +148,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("default value handling is null-safe") { val conf = new SparkConf() - val stringConf = ConfigBuilder("spark.string").stringConf.withDefault(null) + val stringConf = ConfigBuilder("spark.string").stringConf.createWithDefault(null) assert(conf.get(stringConf) === null) } |