aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-03-07 14:13:44 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2016-03-07 14:13:44 -0800
commite1fb857992074164dcaa02498c5a9604fac6f57e (patch)
tree5f2a9de0230df4ebd0ca7317c879472eb8d3fbbc /core
parente9e67b39abb23a88d8be2d0fea5b5fd93184a25b (diff)
downloadspark-e1fb857992074164dcaa02498c5a9604fac6f57e.tar.gz
spark-e1fb857992074164dcaa02498c5a9604fac6f57e.tar.bz2
spark-e1fb857992074164dcaa02498c5a9604fac6f57e.zip
[SPARK-529][CORE][YARN] Add type-safe config keys to SparkConf.
This is, in a way, the basics to enable SPARK-529 (which was closed as won't fix but I think is still valuable). In fact, Spark SQL created something for that, and this change basically factors out that code and inserts it into SparkConf, with some extra bells and whistles. To showcase the usage of this pattern, I modified the YARN backend to use the new config keys (defined in the new `config` package object under `o.a.s.deploy.yarn`). Most of the changes are mechanic, although logic had to be slightly modified in a handful of places. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #10205 from vanzin/conf-opts.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala184
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala111
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala76
-rw-r--r--core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala155
5 files changed, 564 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b81bfb3182..16423e771a 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -17,13 +17,15 @@
package org.apache.spark
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
+import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
@@ -74,6 +76,16 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}
+ private[spark] def set[T](entry: ConfigEntry[T], value: T): SparkConf = {
+ set(entry.key, entry.stringConverter(value))
+ this
+ }
+
+ private[spark] def set[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {
+ set(entry.key, entry.rawStringConverter(value))
+ this
+ }
+
/**
* The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
@@ -148,6 +160,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}
+ private[spark] def setIfMissing[T](entry: ConfigEntry[T], value: T): SparkConf = {
+ if (settings.putIfAbsent(entry.key, entry.stringConverter(value)) == null) {
+ logDeprecationWarning(entry.key)
+ }
+ this
+ }
+
+ private[spark] def setIfMissing[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {
+ if (settings.putIfAbsent(entry.key, entry.rawStringConverter(value)) == null) {
+ logDeprecationWarning(entry.key)
+ }
+ this
+ }
+
/**
* Use Kryo serialization and register the given set of classes with Kryo.
* If called multiple times, this will append the classes from all calls together.
@@ -199,6 +225,17 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
/**
+ * Retrieves the value of a pre-defined configuration entry.
+ *
+ * - This is an internal Spark API.
+ * - The return type if defined by the configuration entry.
+ * - This will throw an exception is the config is not optional and the value is not set.
+ */
+ private[spark] def get[T](entry: ConfigEntry[T]): T = {
+ entry.readFrom(this)
+ }
+
+ /**
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then seconds are assumed.
* @throws NoSuchElementException
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
new file mode 100644
index 0000000000..770b43697a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.network.util.{ByteUnit, JavaUtils}
+
+private object ConfigHelpers {
+
+ def toNumber[T](s: String, converter: String => T, key: String, configType: String): T = {
+ try {
+ converter(s)
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"$key should be $configType, but was $s")
+ }
+ }
+
+ def toBoolean(s: String, key: String): Boolean = {
+ try {
+ s.toBoolean
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new IllegalArgumentException(s"$key should be boolean, but was $s")
+ }
+ }
+
+ def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
+ str.split(",").map(_.trim()).filter(_.nonEmpty).map(converter)
+ }
+
+ def seqToString[T](v: Seq[T], stringConverter: T => String): String = {
+ v.map(stringConverter).mkString(",")
+ }
+
+ def timeFromString(str: String, unit: TimeUnit): Long = JavaUtils.timeStringAs(str, unit)
+
+ def timeToString(v: Long, unit: TimeUnit): String = TimeUnit.MILLISECONDS.convert(v, unit) + "ms"
+
+ def byteFromString(str: String, unit: ByteUnit): Long = {
+ val (input, multiplier) =
+ if (str.length() > 0 && str.charAt(0) == '-') {
+ (str.substring(1), -1)
+ } else {
+ (str, 1)
+ }
+ multiplier * JavaUtils.byteStringAs(input, unit)
+ }
+
+ def byteToString(v: Long, unit: ByteUnit): String = unit.convertTo(v, ByteUnit.BYTE) + "b"
+
+}
+
+/**
+ * A type-safe config builder. Provides methods for transforming the input data (which can be
+ * used, e.g., for validation) and creating the final config entry.
+ *
+ * One of the methods that return a [[ConfigEntry]] must be called to create a config entry that
+ * can be used with [[SparkConf]].
+ */
+private[spark] class TypedConfigBuilder[T](
+ val parent: ConfigBuilder,
+ val converter: String => T,
+ val stringConverter: T => String) {
+
+ import ConfigHelpers._
+
+ def this(parent: ConfigBuilder, converter: String => T) = {
+ this(parent, converter, Option(_).map(_.toString).orNull)
+ }
+
+ def transform(fn: T => T): TypedConfigBuilder[T] = {
+ new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
+ }
+
+ def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
+ transform { v =>
+ if (!validValues.contains(v)) {
+ throw new IllegalArgumentException(
+ s"The value of ${parent.key} should be one of ${validValues.mkString(", ")}, but was $v")
+ }
+ v
+ }
+ }
+
+ def toSequence: TypedConfigBuilder[Seq[T]] = {
+ new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter))
+ }
+
+ /** Creates a [[ConfigEntry]] that does not require a default value. */
+ def optional: OptionalConfigEntry[T] = {
+ new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc, parent._public)
+ }
+
+ /** Creates a [[ConfigEntry]] that has a default value. */
+ def withDefault(default: T): ConfigEntry[T] = {
+ val transformedDefault = converter(stringConverter(default))
+ new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter, stringConverter,
+ parent._doc, parent._public)
+ }
+
+ /**
+ * Creates a [[ConfigEntry]] that has a default value. The default value is provided as a
+ * [[String]] and must be a valid value for the entry.
+ */
+ def withDefaultString(default: String): ConfigEntry[T] = {
+ val typedDefault = converter(default)
+ new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter, parent._doc,
+ parent._public)
+ }
+
+}
+
+/**
+ * Basic builder for Spark configurations. Provides methods for creating type-specific builders.
+ *
+ * @see TypedConfigBuilder
+ */
+private[spark] case class ConfigBuilder(key: String) {
+
+ import ConfigHelpers._
+
+ var _public = true
+ var _doc = ""
+
+ def internal: ConfigBuilder = {
+ _public = false
+ this
+ }
+
+ def doc(s: String): ConfigBuilder = {
+ _doc = s
+ this
+ }
+
+ def intConf: TypedConfigBuilder[Int] = {
+ new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
+ }
+
+ def longConf: TypedConfigBuilder[Long] = {
+ new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long"))
+ }
+
+ def doubleConf: TypedConfigBuilder[Double] = {
+ new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double"))
+ }
+
+ def booleanConf: TypedConfigBuilder[Boolean] = {
+ new TypedConfigBuilder(this, toBoolean(_, key))
+ }
+
+ def stringConf: TypedConfigBuilder[String] = {
+ new TypedConfigBuilder(this, v => v)
+ }
+
+ def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
+ new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit))
+ }
+
+ def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = {
+ new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_, unit))
+ }
+
+ def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
+ new FallbackConfigEntry(key, _doc, _public, fallback)
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
new file mode 100644
index 0000000000..f7296b487c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import org.apache.spark.SparkConf
+
+/**
+ * An entry contains all meta information for a configuration.
+ *
+ * @param key the key for the configuration
+ * @param defaultValue the default value for the configuration
+ * @param valueConverter how to convert a string to the value. It should throw an exception if the
+ * string does not have the required format.
+ * @param stringConverter how to convert a value to a string that the user can use it as a valid
+ * string value. It's usually `toString`. But sometimes, a custom converter
+ * is necessary. E.g., if T is List[String], `a, b, c` is better than
+ * `List(a, b, c)`.
+ * @param doc the documentation for the configuration
+ * @param isPublic if this configuration is public to the user. If it's `false`, this
+ * configuration is only used internally and we should not expose it to users.
+ * @tparam T the value type
+ */
+private[spark] abstract class ConfigEntry[T] (
+ val key: String,
+ val valueConverter: String => T,
+ val stringConverter: T => String,
+ val doc: String,
+ val isPublic: Boolean) {
+
+ def defaultValueString: String
+
+ def readFrom(conf: SparkConf): T
+
+ // This is used by SQLConf, since it doesn't use SparkConf to store settings and thus cannot
+ // use readFrom().
+ def defaultValue: Option[T] = None
+
+ override def toString: String = {
+ s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
+ }
+}
+
+private class ConfigEntryWithDefault[T] (
+ key: String,
+ _defaultValue: T,
+ valueConverter: String => T,
+ stringConverter: T => String,
+ doc: String,
+ isPublic: Boolean)
+ extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
+
+ override def defaultValue: Option[T] = Some(_defaultValue)
+
+ override def defaultValueString: String = stringConverter(_defaultValue)
+
+ override def readFrom(conf: SparkConf): T = {
+ conf.getOption(key).map(valueConverter).getOrElse(_defaultValue)
+ }
+
+}
+
+/**
+ * A config entry that does not have a default value.
+ */
+private[spark] class OptionalConfigEntry[T](
+ key: String,
+ val rawValueConverter: String => T,
+ val rawStringConverter: T => String,
+ doc: String,
+ isPublic: Boolean)
+ extends ConfigEntry[Option[T]](key, s => Some(rawValueConverter(s)),
+ v => v.map(rawStringConverter).orNull, doc, isPublic) {
+
+ override def defaultValueString: String = "<undefined>"
+
+ override def readFrom(conf: SparkConf): Option[T] = conf.getOption(key).map(rawValueConverter)
+
+}
+
+/**
+ * A config entry whose default value is defined by another config entry.
+ */
+private class FallbackConfigEntry[T] (
+ key: String,
+ doc: String,
+ isPublic: Boolean,
+ private val fallback: ConfigEntry[T])
+ extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
+
+ override def defaultValueString: String = s"<value of ${fallback.key}>"
+
+ override def readFrom(conf: SparkConf): T = {
+ conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
new file mode 100644
index 0000000000..f2f20b3207
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal
+
+import org.apache.spark.launcher.SparkLauncher
+
+package object config {
+
+ private[spark] val DRIVER_CLASS_PATH =
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.optional
+
+ private[spark] val DRIVER_JAVA_OPTIONS =
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.optional
+
+ private[spark] val DRIVER_LIBRARY_PATH =
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.optional
+
+ private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
+ ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false)
+
+ private[spark] val EXECUTOR_CLASS_PATH =
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional
+
+ private[spark] val EXECUTOR_JAVA_OPTIONS =
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.optional
+
+ private[spark] val EXECUTOR_LIBRARY_PATH =
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.optional
+
+ private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
+ ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false)
+
+ private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal
+ .booleanConf.withDefault(false)
+
+ private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.withDefault(1)
+
+ private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
+ ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.withDefault(0)
+
+ private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS =
+ ConfigBuilder("spark.dynamicAllocation.initialExecutors")
+ .fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS)
+
+ private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
+ ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.withDefault(Int.MaxValue)
+
+ private[spark] val SHUFFLE_SERVICE_ENABLED =
+ ConfigBuilder("spark.shuffle.service.enabled").booleanConf.withDefault(false)
+
+ private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
+ .doc("Location of user's keytab.")
+ .stringConf.optional
+
+ private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal")
+ .doc("Name of the Kerberos principal.")
+ .stringConf.optional
+
+ private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional
+
+}
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
new file mode 100644
index 0000000000..0644148eae
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.network.util.ByteUnit
+
+class ConfigEntrySuite extends SparkFunSuite {
+
+ test("conf entry: int") {
+ val conf = new SparkConf()
+ val iConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+ assert(conf.get(iConf) === 1)
+ conf.set(iConf, 2)
+ assert(conf.get(iConf) === 2)
+ }
+
+ test("conf entry: long") {
+ val conf = new SparkConf()
+ val lConf = ConfigBuilder("spark.long").longConf.withDefault(0L)
+ conf.set(lConf, 1234L)
+ assert(conf.get(lConf) === 1234L)
+ }
+
+ test("conf entry: double") {
+ val conf = new SparkConf()
+ val dConf = ConfigBuilder("spark.double").doubleConf.withDefault(0.0)
+ conf.set(dConf, 20.0)
+ assert(conf.get(dConf) === 20.0)
+ }
+
+ test("conf entry: boolean") {
+ val conf = new SparkConf()
+ val bConf = ConfigBuilder("spark.boolean").booleanConf.withDefault(false)
+ assert(!conf.get(bConf))
+ conf.set(bConf, true)
+ assert(conf.get(bConf))
+ }
+
+ test("conf entry: optional") {
+ val conf = new SparkConf()
+ val optionalConf = ConfigBuilder("spark.optional").intConf.optional
+ assert(conf.get(optionalConf) === None)
+ conf.set(optionalConf, 1)
+ assert(conf.get(optionalConf) === Some(1))
+ }
+
+ test("conf entry: fallback") {
+ val conf = new SparkConf()
+ val parentConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+ val confWithFallback = ConfigBuilder("spark.fallback").fallbackConf(parentConf)
+ assert(conf.get(confWithFallback) === 1)
+ conf.set(confWithFallback, 2)
+ assert(conf.get(parentConf) === 1)
+ assert(conf.get(confWithFallback) === 2)
+ }
+
+ test("conf entry: time") {
+ val conf = new SparkConf()
+ val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).withDefaultString("1h")
+ assert(conf.get(time) === 3600L)
+ conf.set(time.key, "1m")
+ assert(conf.get(time) === 60L)
+ }
+
+ test("conf entry: bytes") {
+ val conf = new SparkConf()
+ val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).withDefaultString("1m")
+ assert(conf.get(bytes) === 1024L)
+ conf.set(bytes.key, "1k")
+ assert(conf.get(bytes) === 1L)
+ }
+
+ test("conf entry: string seq") {
+ val conf = new SparkConf()
+ val seq = ConfigBuilder("spark.seq").stringConf.toSequence.withDefault(Seq())
+ conf.set(seq.key, "1,,2, 3 , , 4")
+ assert(conf.get(seq) === Seq("1", "2", "3", "4"))
+ conf.set(seq, Seq("1", "2"))
+ assert(conf.get(seq) === Seq("1", "2"))
+ }
+
+ test("conf entry: int seq") {
+ val conf = new SparkConf()
+ val seq = ConfigBuilder("spark.seq").intConf.toSequence.withDefault(Seq())
+ conf.set(seq.key, "1,,2, 3 , , 4")
+ assert(conf.get(seq) === Seq(1, 2, 3, 4))
+ conf.set(seq, Seq(1, 2))
+ assert(conf.get(seq) === Seq(1, 2))
+ }
+
+ test("conf entry: transformation") {
+ val conf = new SparkConf()
+ val transformationConf = ConfigBuilder("spark.transformation")
+ .stringConf
+ .transform(_.toLowerCase())
+ .withDefault("FOO")
+
+ assert(conf.get(transformationConf) === "foo")
+ conf.set(transformationConf, "BAR")
+ assert(conf.get(transformationConf) === "bar")
+ }
+
+ test("conf entry: valid values check") {
+ val conf = new SparkConf()
+ val enum = ConfigBuilder("spark.enum")
+ .stringConf
+ .checkValues(Set("a", "b", "c"))
+ .withDefault("a")
+ assert(conf.get(enum) === "a")
+
+ conf.set(enum, "b")
+ assert(conf.get(enum) === "b")
+
+ conf.set(enum, "d")
+ val enumError = intercept[IllegalArgumentException] {
+ conf.get(enum)
+ }
+ assert(enumError.getMessage === s"The value of ${enum.key} should be one of a, b, c, but was d")
+ }
+
+ test("conf entry: conversion error") {
+ val conf = new SparkConf()
+ val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.optional
+ conf.set(conversionTest.key, "abc")
+ val conversionError = intercept[IllegalArgumentException] {
+ conf.get(conversionTest)
+ }
+ assert(conversionError.getMessage === s"${conversionTest.key} should be double, but was abc")
+ }
+
+ test("default value handling is null-safe") {
+ val conf = new SparkConf()
+ val stringConf = ConfigBuilder("spark.string").stringConf.withDefault(null)
+ assert(conf.get(stringConf) === null)
+ }
+
+}