From d5ee9d5c240fca5c15b21efc4a760b06a1f39fd6 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 5 Apr 2016 15:19:51 -0700 Subject: [SPARK-529][SQL] Modify SQLConf to use new config API from core. Because SQL keeps track of all known configs, some customization was needed in SQLConf to allow that, since the core API does not have that feature. Tested via existing (and slightly updated) unit tests. Author: Marcelo Vanzin Closes #11570 from vanzin/SPARK-529-sql. --- .../spark/internal/config/ConfigBuilder.scala | 44 ++++++++++++++++------ .../org/apache/spark/internal/config/package.scala | 44 +++++++++++----------- .../spark/internal/config/ConfigEntrySuite.scala | 28 +++++++------- 3 files changed, 69 insertions(+), 47 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala index 770b43697a..5d50e3851a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala @@ -85,10 +85,12 @@ private[spark] class TypedConfigBuilder[T]( this(parent, converter, Option(_).map(_.toString).orNull) } + /** Apply a transformation to the user-provided values of the config entry. */ def transform(fn: T => T): TypedConfigBuilder[T] = { new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter) } + /** Check that user-provided values for the config match a pre-defined set. */ def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = { transform { v => if (!validValues.contains(v)) { @@ -99,30 +101,38 @@ private[spark] class TypedConfigBuilder[T]( } } + /** Turns the config entry into a sequence of values of the underlying type. */ def toSequence: TypedConfigBuilder[Seq[T]] = { new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter)) } - /** Creates a [[ConfigEntry]] that does not require a default value. */ - def optional: OptionalConfigEntry[T] = { - new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc, parent._public) + /** Creates a [[ConfigEntry]] that does not have a default value. */ + def createOptional: OptionalConfigEntry[T] = { + val entry = new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc, + parent._public) + parent._onCreate.foreach(_(entry)) + entry } /** Creates a [[ConfigEntry]] that has a default value. */ - def withDefault(default: T): ConfigEntry[T] = { + def createWithDefault(default: T): ConfigEntry[T] = { val transformedDefault = converter(stringConverter(default)) - new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter, stringConverter, - parent._doc, parent._public) + val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter, + stringConverter, parent._doc, parent._public) + parent._onCreate.foreach(_(entry)) + entry } /** * Creates a [[ConfigEntry]] that has a default value. The default value is provided as a * [[String]] and must be a valid value for the entry. */ - def withDefaultString(default: String): ConfigEntry[T] = { + def createWithDefaultString(default: String): ConfigEntry[T] = { val typedDefault = converter(default) - new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter, parent._doc, - parent._public) + val entry = new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter, + parent._doc, parent._public) + parent._onCreate.foreach(_(entry)) + entry } } @@ -136,10 +146,11 @@ private[spark] case class ConfigBuilder(key: String) { import ConfigHelpers._ - var _public = true - var _doc = "" + private[config] var _public = true + private[config] var _doc = "" + private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None - def internal: ConfigBuilder = { + def internal(): ConfigBuilder = { _public = false this } @@ -149,6 +160,15 @@ private[spark] case class ConfigBuilder(key: String) { this } + /** + * Registers a callback for when the config entry is finally instantiated. Currently used by + * SQLConf to keep track of SQL configuration entries. + */ + def onCreate(callback: ConfigEntry[_] => Unit): ConfigBuilder = { + _onCreate = Option(callback) + this + } + def intConf: TypedConfigBuilder[Int] = { new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int")) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 968c5192ac..94b50ee065 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -23,68 +23,70 @@ import org.apache.spark.network.util.ByteUnit package object config { private[spark] val DRIVER_CLASS_PATH = - ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.optional + ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional private[spark] val DRIVER_JAVA_OPTIONS = - ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.optional + ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional private[spark] val DRIVER_LIBRARY_PATH = - ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.optional + ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.createOptional private[spark] val DRIVER_USER_CLASS_PATH_FIRST = - ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false) + ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false) private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") .bytesConf(ByteUnit.MiB) - .withDefaultString("1g") + .createWithDefaultString("1g") private[spark] val EXECUTOR_CLASS_PATH = - ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional + ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional private[spark] val EXECUTOR_JAVA_OPTIONS = - ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.optional + ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional private[spark] val EXECUTOR_LIBRARY_PATH = - ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.optional + ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.createOptional private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST = - ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false) + ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false) private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory") .bytesConf(ByteUnit.MiB) - .withDefaultString("1g") + .createWithDefaultString("1g") - private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal - .booleanConf.withDefault(false) + private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() + .booleanConf.createWithDefault(false) - private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.withDefault(1) + private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1) private[spark] val DYN_ALLOCATION_MIN_EXECUTORS = - ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.withDefault(0) + ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0) private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.initialExecutors") .fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS) private[spark] val DYN_ALLOCATION_MAX_EXECUTORS = - ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.withDefault(Int.MaxValue) + ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue) private[spark] val SHUFFLE_SERVICE_ENABLED = - ConfigBuilder("spark.shuffle.service.enabled").booleanConf.withDefault(false) + ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false) private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab") .doc("Location of user's keytab.") - .stringConf.optional + .stringConf.createOptional private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal") .doc("Name of the Kerberos principal.") - .stringConf.optional + .stringConf.createOptional - private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional + private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") + .intConf + .createOptional private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles") - .internal + .internal() .stringConf .toSequence - .withDefault(Nil) + .createWithDefault(Nil) } diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index 0644148eae..337fd7e85e 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -26,7 +26,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: int") { val conf = new SparkConf() - val iConf = ConfigBuilder("spark.int").intConf.withDefault(1) + val iConf = ConfigBuilder("spark.int").intConf.createWithDefault(1) assert(conf.get(iConf) === 1) conf.set(iConf, 2) assert(conf.get(iConf) === 2) @@ -34,21 +34,21 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: long") { val conf = new SparkConf() - val lConf = ConfigBuilder("spark.long").longConf.withDefault(0L) + val lConf = ConfigBuilder("spark.long").longConf.createWithDefault(0L) conf.set(lConf, 1234L) assert(conf.get(lConf) === 1234L) } test("conf entry: double") { val conf = new SparkConf() - val dConf = ConfigBuilder("spark.double").doubleConf.withDefault(0.0) + val dConf = ConfigBuilder("spark.double").doubleConf.createWithDefault(0.0) conf.set(dConf, 20.0) assert(conf.get(dConf) === 20.0) } test("conf entry: boolean") { val conf = new SparkConf() - val bConf = ConfigBuilder("spark.boolean").booleanConf.withDefault(false) + val bConf = ConfigBuilder("spark.boolean").booleanConf.createWithDefault(false) assert(!conf.get(bConf)) conf.set(bConf, true) assert(conf.get(bConf)) @@ -56,7 +56,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: optional") { val conf = new SparkConf() - val optionalConf = ConfigBuilder("spark.optional").intConf.optional + val optionalConf = ConfigBuilder("spark.optional").intConf.createOptional assert(conf.get(optionalConf) === None) conf.set(optionalConf, 1) assert(conf.get(optionalConf) === Some(1)) @@ -64,7 +64,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: fallback") { val conf = new SparkConf() - val parentConf = ConfigBuilder("spark.int").intConf.withDefault(1) + val parentConf = ConfigBuilder("spark.int").intConf.createWithDefault(1) val confWithFallback = ConfigBuilder("spark.fallback").fallbackConf(parentConf) assert(conf.get(confWithFallback) === 1) conf.set(confWithFallback, 2) @@ -74,7 +74,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: time") { val conf = new SparkConf() - val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).withDefaultString("1h") + val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).createWithDefaultString("1h") assert(conf.get(time) === 3600L) conf.set(time.key, "1m") assert(conf.get(time) === 60L) @@ -82,7 +82,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: bytes") { val conf = new SparkConf() - val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).withDefaultString("1m") + val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).createWithDefaultString("1m") assert(conf.get(bytes) === 1024L) conf.set(bytes.key, "1k") assert(conf.get(bytes) === 1L) @@ -90,7 +90,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: string seq") { val conf = new SparkConf() - val seq = ConfigBuilder("spark.seq").stringConf.toSequence.withDefault(Seq()) + val seq = ConfigBuilder("spark.seq").stringConf.toSequence.createWithDefault(Seq()) conf.set(seq.key, "1,,2, 3 , , 4") assert(conf.get(seq) === Seq("1", "2", "3", "4")) conf.set(seq, Seq("1", "2")) @@ -99,7 +99,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: int seq") { val conf = new SparkConf() - val seq = ConfigBuilder("spark.seq").intConf.toSequence.withDefault(Seq()) + val seq = ConfigBuilder("spark.seq").intConf.toSequence.createWithDefault(Seq()) conf.set(seq.key, "1,,2, 3 , , 4") assert(conf.get(seq) === Seq(1, 2, 3, 4)) conf.set(seq, Seq(1, 2)) @@ -111,7 +111,7 @@ class ConfigEntrySuite extends SparkFunSuite { val transformationConf = ConfigBuilder("spark.transformation") .stringConf .transform(_.toLowerCase()) - .withDefault("FOO") + .createWithDefault("FOO") assert(conf.get(transformationConf) === "foo") conf.set(transformationConf, "BAR") @@ -123,7 +123,7 @@ class ConfigEntrySuite extends SparkFunSuite { val enum = ConfigBuilder("spark.enum") .stringConf .checkValues(Set("a", "b", "c")) - .withDefault("a") + .createWithDefault("a") assert(conf.get(enum) === "a") conf.set(enum, "b") @@ -138,7 +138,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("conf entry: conversion error") { val conf = new SparkConf() - val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.optional + val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.createOptional conf.set(conversionTest.key, "abc") val conversionError = intercept[IllegalArgumentException] { conf.get(conversionTest) @@ -148,7 +148,7 @@ class ConfigEntrySuite extends SparkFunSuite { test("default value handling is null-safe") { val conf = new SparkConf() - val stringConf = ConfigBuilder("spark.string").stringConf.withDefault(null) + val stringConf = ConfigBuilder("spark.string").stringConf.createWithDefault(null) assert(conf.get(stringConf) === null) } -- cgit v1.2.3