aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java25
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala184
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala111
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala76
-rw-r--r--core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala155
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala14
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala28
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala230
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala53
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala14
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala18
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala243
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala32
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala26
-rw-r--r--yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala4
20 files changed, 1019 insertions, 255 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index b3d8e0cd7c..ccc527306d 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -159,10 +159,10 @@ public class JavaUtils {
.build();
/**
- * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count for
- * internal use. If no suffix is provided a direct conversion is attempted.
+ * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count in the given unit.
+ * The unit is also considered the default if the given string does not specify a unit.
*/
- private static long parseTimeString(String str, TimeUnit unit) {
+ public static long timeStringAs(String str, TimeUnit unit) {
String lower = str.toLowerCase().trim();
try {
@@ -195,7 +195,7 @@ public class JavaUtils {
* no suffix is provided, the passed number is assumed to be in ms.
*/
public static long timeStringAsMs(String str) {
- return parseTimeString(str, TimeUnit.MILLISECONDS);
+ return timeStringAs(str, TimeUnit.MILLISECONDS);
}
/**
@@ -203,15 +203,14 @@ public class JavaUtils {
* no suffix is provided, the passed number is assumed to be in seconds.
*/
public static long timeStringAsSec(String str) {
- return parseTimeString(str, TimeUnit.SECONDS);
+ return timeStringAs(str, TimeUnit.SECONDS);
}
/**
- * Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to a ByteUnit for
- * internal use. If no suffix is provided a direct conversion of the provided default is
- * attempted.
+ * Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to the given. If no suffix is
+ * provided, a direct conversion to the provided unit is attempted.
*/
- private static long parseByteString(String str, ByteUnit unit) {
+ public static long byteStringAs(String str, ByteUnit unit) {
String lower = str.toLowerCase().trim();
try {
@@ -252,7 +251,7 @@ public class JavaUtils {
* If no suffix is provided, the passed number is assumed to be in bytes.
*/
public static long byteStringAsBytes(String str) {
- return parseByteString(str, ByteUnit.BYTE);
+ return byteStringAs(str, ByteUnit.BYTE);
}
/**
@@ -262,7 +261,7 @@ public class JavaUtils {
* If no suffix is provided, the passed number is assumed to be in kibibytes.
*/
public static long byteStringAsKb(String str) {
- return parseByteString(str, ByteUnit.KiB);
+ return byteStringAs(str, ByteUnit.KiB);
}
/**
@@ -272,7 +271,7 @@ public class JavaUtils {
* If no suffix is provided, the passed number is assumed to be in mebibytes.
*/
public static long byteStringAsMb(String str) {
- return parseByteString(str, ByteUnit.MiB);
+ return byteStringAs(str, ByteUnit.MiB);
}
/**
@@ -282,7 +281,7 @@ public class JavaUtils {
* If no suffix is provided, the passed number is assumed to be in gibibytes.
*/
public static long byteStringAsGb(String str) {
- return parseByteString(str, ByteUnit.GiB);
+ return byteStringAs(str, ByteUnit.GiB);
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b81bfb3182..16423e771a 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -17,13 +17,15 @@
package org.apache.spark
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet
import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
+import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
@@ -74,6 +76,16 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}
+ private[spark] def set[T](entry: ConfigEntry[T], value: T): SparkConf = {
+ set(entry.key, entry.stringConverter(value))
+ this
+ }
+
+ private[spark] def set[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {
+ set(entry.key, entry.rawStringConverter(value))
+ this
+ }
+
/**
* The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
@@ -148,6 +160,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}
+ private[spark] def setIfMissing[T](entry: ConfigEntry[T], value: T): SparkConf = {
+ if (settings.putIfAbsent(entry.key, entry.stringConverter(value)) == null) {
+ logDeprecationWarning(entry.key)
+ }
+ this
+ }
+
+ private[spark] def setIfMissing[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {
+ if (settings.putIfAbsent(entry.key, entry.rawStringConverter(value)) == null) {
+ logDeprecationWarning(entry.key)
+ }
+ this
+ }
+
/**
* Use Kryo serialization and register the given set of classes with Kryo.
* If called multiple times, this will append the classes from all calls together.
@@ -199,6 +225,17 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
/**
+ * Retrieves the value of a pre-defined configuration entry.
+ *
+ * - This is an internal Spark API.
+ * - The return type if defined by the configuration entry.
+ * - This will throw an exception is the config is not optional and the value is not set.
+ */
+ private[spark] def get[T](entry: ConfigEntry[T]): T = {
+ entry.readFrom(this)
+ }
+
+ /**
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then seconds are assumed.
* @throws NoSuchElementException
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
new file mode 100644
index 0000000000..770b43697a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.network.util.{ByteUnit, JavaUtils}
+
+private object ConfigHelpers {
+
+ def toNumber[T](s: String, converter: String => T, key: String, configType: String): T = {
+ try {
+ converter(s)
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"$key should be $configType, but was $s")
+ }
+ }
+
+ def toBoolean(s: String, key: String): Boolean = {
+ try {
+ s.toBoolean
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new IllegalArgumentException(s"$key should be boolean, but was $s")
+ }
+ }
+
+ def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
+ str.split(",").map(_.trim()).filter(_.nonEmpty).map(converter)
+ }
+
+ def seqToString[T](v: Seq[T], stringConverter: T => String): String = {
+ v.map(stringConverter).mkString(",")
+ }
+
+ def timeFromString(str: String, unit: TimeUnit): Long = JavaUtils.timeStringAs(str, unit)
+
+ def timeToString(v: Long, unit: TimeUnit): String = TimeUnit.MILLISECONDS.convert(v, unit) + "ms"
+
+ def byteFromString(str: String, unit: ByteUnit): Long = {
+ val (input, multiplier) =
+ if (str.length() > 0 && str.charAt(0) == '-') {
+ (str.substring(1), -1)
+ } else {
+ (str, 1)
+ }
+ multiplier * JavaUtils.byteStringAs(input, unit)
+ }
+
+ def byteToString(v: Long, unit: ByteUnit): String = unit.convertTo(v, ByteUnit.BYTE) + "b"
+
+}
+
+/**
+ * A type-safe config builder. Provides methods for transforming the input data (which can be
+ * used, e.g., for validation) and creating the final config entry.
+ *
+ * One of the methods that return a [[ConfigEntry]] must be called to create a config entry that
+ * can be used with [[SparkConf]].
+ */
+private[spark] class TypedConfigBuilder[T](
+ val parent: ConfigBuilder,
+ val converter: String => T,
+ val stringConverter: T => String) {
+
+ import ConfigHelpers._
+
+ def this(parent: ConfigBuilder, converter: String => T) = {
+ this(parent, converter, Option(_).map(_.toString).orNull)
+ }
+
+ def transform(fn: T => T): TypedConfigBuilder[T] = {
+ new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
+ }
+
+ def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
+ transform { v =>
+ if (!validValues.contains(v)) {
+ throw new IllegalArgumentException(
+ s"The value of ${parent.key} should be one of ${validValues.mkString(", ")}, but was $v")
+ }
+ v
+ }
+ }
+
+ def toSequence: TypedConfigBuilder[Seq[T]] = {
+ new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter))
+ }
+
+ /** Creates a [[ConfigEntry]] that does not require a default value. */
+ def optional: OptionalConfigEntry[T] = {
+ new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc, parent._public)
+ }
+
+ /** Creates a [[ConfigEntry]] that has a default value. */
+ def withDefault(default: T): ConfigEntry[T] = {
+ val transformedDefault = converter(stringConverter(default))
+ new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter, stringConverter,
+ parent._doc, parent._public)
+ }
+
+ /**
+ * Creates a [[ConfigEntry]] that has a default value. The default value is provided as a
+ * [[String]] and must be a valid value for the entry.
+ */
+ def withDefaultString(default: String): ConfigEntry[T] = {
+ val typedDefault = converter(default)
+ new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter, parent._doc,
+ parent._public)
+ }
+
+}
+
+/**
+ * Basic builder for Spark configurations. Provides methods for creating type-specific builders.
+ *
+ * @see TypedConfigBuilder
+ */
+private[spark] case class ConfigBuilder(key: String) {
+
+ import ConfigHelpers._
+
+ var _public = true
+ var _doc = ""
+
+ def internal: ConfigBuilder = {
+ _public = false
+ this
+ }
+
+ def doc(s: String): ConfigBuilder = {
+ _doc = s
+ this
+ }
+
+ def intConf: TypedConfigBuilder[Int] = {
+ new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
+ }
+
+ def longConf: TypedConfigBuilder[Long] = {
+ new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long"))
+ }
+
+ def doubleConf: TypedConfigBuilder[Double] = {
+ new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double"))
+ }
+
+ def booleanConf: TypedConfigBuilder[Boolean] = {
+ new TypedConfigBuilder(this, toBoolean(_, key))
+ }
+
+ def stringConf: TypedConfigBuilder[String] = {
+ new TypedConfigBuilder(this, v => v)
+ }
+
+ def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
+ new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit))
+ }
+
+ def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = {
+ new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_, unit))
+ }
+
+ def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
+ new FallbackConfigEntry(key, _doc, _public, fallback)
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
new file mode 100644
index 0000000000..f7296b487c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import org.apache.spark.SparkConf
+
+/**
+ * An entry contains all meta information for a configuration.
+ *
+ * @param key the key for the configuration
+ * @param defaultValue the default value for the configuration
+ * @param valueConverter how to convert a string to the value. It should throw an exception if the
+ * string does not have the required format.
+ * @param stringConverter how to convert a value to a string that the user can use it as a valid
+ * string value. It's usually `toString`. But sometimes, a custom converter
+ * is necessary. E.g., if T is List[String], `a, b, c` is better than
+ * `List(a, b, c)`.
+ * @param doc the documentation for the configuration
+ * @param isPublic if this configuration is public to the user. If it's `false`, this
+ * configuration is only used internally and we should not expose it to users.
+ * @tparam T the value type
+ */
+private[spark] abstract class ConfigEntry[T] (
+ val key: String,
+ val valueConverter: String => T,
+ val stringConverter: T => String,
+ val doc: String,
+ val isPublic: Boolean) {
+
+ def defaultValueString: String
+
+ def readFrom(conf: SparkConf): T
+
+ // This is used by SQLConf, since it doesn't use SparkConf to store settings and thus cannot
+ // use readFrom().
+ def defaultValue: Option[T] = None
+
+ override def toString: String = {
+ s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
+ }
+}
+
+private class ConfigEntryWithDefault[T] (
+ key: String,
+ _defaultValue: T,
+ valueConverter: String => T,
+ stringConverter: T => String,
+ doc: String,
+ isPublic: Boolean)
+ extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
+
+ override def defaultValue: Option[T] = Some(_defaultValue)
+
+ override def defaultValueString: String = stringConverter(_defaultValue)
+
+ override def readFrom(conf: SparkConf): T = {
+ conf.getOption(key).map(valueConverter).getOrElse(_defaultValue)
+ }
+
+}
+
+/**
+ * A config entry that does not have a default value.
+ */
+private[spark] class OptionalConfigEntry[T](
+ key: String,
+ val rawValueConverter: String => T,
+ val rawStringConverter: T => String,
+ doc: String,
+ isPublic: Boolean)
+ extends ConfigEntry[Option[T]](key, s => Some(rawValueConverter(s)),
+ v => v.map(rawStringConverter).orNull, doc, isPublic) {
+
+ override def defaultValueString: String = "<undefined>"
+
+ override def readFrom(conf: SparkConf): Option[T] = conf.getOption(key).map(rawValueConverter)
+
+}
+
+/**
+ * A config entry whose default value is defined by another config entry.
+ */
+private class FallbackConfigEntry[T] (
+ key: String,
+ doc: String,
+ isPublic: Boolean,
+ private val fallback: ConfigEntry[T])
+ extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
+
+ override def defaultValueString: String = s"<value of ${fallback.key}>"
+
+ override def readFrom(conf: SparkConf): T = {
+ conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
new file mode 100644
index 0000000000..f2f20b3207
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal
+
+import org.apache.spark.launcher.SparkLauncher
+
+package object config {
+
+ private[spark] val DRIVER_CLASS_PATH =
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.optional
+
+ private[spark] val DRIVER_JAVA_OPTIONS =
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.optional
+
+ private[spark] val DRIVER_LIBRARY_PATH =
+ ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.optional
+
+ private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
+ ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false)
+
+ private[spark] val EXECUTOR_CLASS_PATH =
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional
+
+ private[spark] val EXECUTOR_JAVA_OPTIONS =
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.optional
+
+ private[spark] val EXECUTOR_LIBRARY_PATH =
+ ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.optional
+
+ private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
+ ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false)
+
+ private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal
+ .booleanConf.withDefault(false)
+
+ private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.withDefault(1)
+
+ private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
+ ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.withDefault(0)
+
+ private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS =
+ ConfigBuilder("spark.dynamicAllocation.initialExecutors")
+ .fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS)
+
+ private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
+ ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.withDefault(Int.MaxValue)
+
+ private[spark] val SHUFFLE_SERVICE_ENABLED =
+ ConfigBuilder("spark.shuffle.service.enabled").booleanConf.withDefault(false)
+
+ private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
+ .doc("Location of user's keytab.")
+ .stringConf.optional
+
+ private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal")
+ .doc("Name of the Kerberos principal.")
+ .stringConf.optional
+
+ private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional
+
+}
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
new file mode 100644
index 0000000000..0644148eae
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.network.util.ByteUnit
+
+class ConfigEntrySuite extends SparkFunSuite {
+
+ test("conf entry: int") {
+ val conf = new SparkConf()
+ val iConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+ assert(conf.get(iConf) === 1)
+ conf.set(iConf, 2)
+ assert(conf.get(iConf) === 2)
+ }
+
+ test("conf entry: long") {
+ val conf = new SparkConf()
+ val lConf = ConfigBuilder("spark.long").longConf.withDefault(0L)
+ conf.set(lConf, 1234L)
+ assert(conf.get(lConf) === 1234L)
+ }
+
+ test("conf entry: double") {
+ val conf = new SparkConf()
+ val dConf = ConfigBuilder("spark.double").doubleConf.withDefault(0.0)
+ conf.set(dConf, 20.0)
+ assert(conf.get(dConf) === 20.0)
+ }
+
+ test("conf entry: boolean") {
+ val conf = new SparkConf()
+ val bConf = ConfigBuilder("spark.boolean").booleanConf.withDefault(false)
+ assert(!conf.get(bConf))
+ conf.set(bConf, true)
+ assert(conf.get(bConf))
+ }
+
+ test("conf entry: optional") {
+ val conf = new SparkConf()
+ val optionalConf = ConfigBuilder("spark.optional").intConf.optional
+ assert(conf.get(optionalConf) === None)
+ conf.set(optionalConf, 1)
+ assert(conf.get(optionalConf) === Some(1))
+ }
+
+ test("conf entry: fallback") {
+ val conf = new SparkConf()
+ val parentConf = ConfigBuilder("spark.int").intConf.withDefault(1)
+ val confWithFallback = ConfigBuilder("spark.fallback").fallbackConf(parentConf)
+ assert(conf.get(confWithFallback) === 1)
+ conf.set(confWithFallback, 2)
+ assert(conf.get(parentConf) === 1)
+ assert(conf.get(confWithFallback) === 2)
+ }
+
+ test("conf entry: time") {
+ val conf = new SparkConf()
+ val time = ConfigBuilder("spark.time").timeConf(TimeUnit.SECONDS).withDefaultString("1h")
+ assert(conf.get(time) === 3600L)
+ conf.set(time.key, "1m")
+ assert(conf.get(time) === 60L)
+ }
+
+ test("conf entry: bytes") {
+ val conf = new SparkConf()
+ val bytes = ConfigBuilder("spark.bytes").bytesConf(ByteUnit.KiB).withDefaultString("1m")
+ assert(conf.get(bytes) === 1024L)
+ conf.set(bytes.key, "1k")
+ assert(conf.get(bytes) === 1L)
+ }
+
+ test("conf entry: string seq") {
+ val conf = new SparkConf()
+ val seq = ConfigBuilder("spark.seq").stringConf.toSequence.withDefault(Seq())
+ conf.set(seq.key, "1,,2, 3 , , 4")
+ assert(conf.get(seq) === Seq("1", "2", "3", "4"))
+ conf.set(seq, Seq("1", "2"))
+ assert(conf.get(seq) === Seq("1", "2"))
+ }
+
+ test("conf entry: int seq") {
+ val conf = new SparkConf()
+ val seq = ConfigBuilder("spark.seq").intConf.toSequence.withDefault(Seq())
+ conf.set(seq.key, "1,,2, 3 , , 4")
+ assert(conf.get(seq) === Seq(1, 2, 3, 4))
+ conf.set(seq, Seq(1, 2))
+ assert(conf.get(seq) === Seq(1, 2))
+ }
+
+ test("conf entry: transformation") {
+ val conf = new SparkConf()
+ val transformationConf = ConfigBuilder("spark.transformation")
+ .stringConf
+ .transform(_.toLowerCase())
+ .withDefault("FOO")
+
+ assert(conf.get(transformationConf) === "foo")
+ conf.set(transformationConf, "BAR")
+ assert(conf.get(transformationConf) === "bar")
+ }
+
+ test("conf entry: valid values check") {
+ val conf = new SparkConf()
+ val enum = ConfigBuilder("spark.enum")
+ .stringConf
+ .checkValues(Set("a", "b", "c"))
+ .withDefault("a")
+ assert(conf.get(enum) === "a")
+
+ conf.set(enum, "b")
+ assert(conf.get(enum) === "b")
+
+ conf.set(enum, "d")
+ val enumError = intercept[IllegalArgumentException] {
+ conf.get(enum)
+ }
+ assert(enumError.getMessage === s"The value of ${enum.key} should be one of a, b, c, but was d")
+ }
+
+ test("conf entry: conversion error") {
+ val conf = new SparkConf()
+ val conversionTest = ConfigBuilder("spark.conversionTest").doubleConf.optional
+ conf.set(conversionTest.key, "abc")
+ val conversionError = intercept[IllegalArgumentException] {
+ conf.get(conversionTest)
+ }
+ assert(conversionError.getMessage === s"${conversionTest.key} should be double, but was abc")
+ }
+
+ test("default value handling is null-safe") {
+ val conf = new SparkConf()
+ val stringConf = ConfigBuilder("spark.string").stringConf.withDefault(null)
+ assert(conf.get(stringConf) === null)
+ }
+
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
index 70b67d21ec..6e95bb9710 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
import org.apache.spark.util.ThreadUtils
/*
@@ -60,11 +62,9 @@ private[yarn] class AMDelegationTokenRenewer(
private val hadoopUtil = YarnSparkHadoopUtil.get
- private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
- private val daysToKeepFiles =
- sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
- private val numFilesToKeep =
- sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
+ private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
+ private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
+ private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
private val freshHadoopConf =
hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme)
@@ -76,8 +76,8 @@ private[yarn] class AMDelegationTokenRenewer(
*
*/
private[spark] def scheduleLoginFromKeytab(): Unit = {
- val principal = sparkConf.get("spark.yarn.principal")
- val keytab = sparkConf.get("spark.yarn.keytab")
+ val principal = sparkConf.get(PRINCIPAL).get
+ val keytab = sparkConf.get(KEYTAB).get
/**
* Schedule re-login and creation of new tokens. If tokens have already expired, this method
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9f586bf4c1..7d7bf88b9e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -32,6 +32,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -65,16 +67,15 @@ private[spark] class ApplicationMaster(
// allocation is enabled), with a minimum of 3.
private val maxNumExecutorFailures = {
- val defaultKey =
+ val effectiveNumExecutors =
if (Utils.isDynamicAllocationEnabled(sparkConf)) {
- "spark.dynamicAllocation.maxExecutors"
+ sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
} else {
- "spark.executor.instances"
+ sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
}
- val effectiveNumExecutors = sparkConf.getInt(defaultKey, 0)
val defaultMaxNumExecutorFailures = math.max(3, 2 * effectiveNumExecutors)
- sparkConf.getInt("spark.yarn.max.executor.failures", defaultMaxNumExecutorFailures)
+ sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)
}
@volatile private var exitCode = 0
@@ -95,14 +96,13 @@ private[spark] class ApplicationMaster(
private val heartbeatInterval = {
// Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- math.max(0, math.min(expiryInterval / 2,
- sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
+ math.max(0, math.min(expiryInterval / 2, sparkConf.get(RM_HEARTBEAT_INTERVAL)))
}
// Initial wait interval before allocator poll, to allow for quicker ramp up when executors are
// being requested.
private val initialAllocationInterval = math.min(heartbeatInterval,
- sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
+ sparkConf.get(INITIAL_HEARTBEAT_INTERVAL))
// Next wait interval before allocator poll.
private var nextAllocationInterval = initialAllocationInterval
@@ -178,7 +178,7 @@ private[spark] class ApplicationMaster(
// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
- if (sparkConf.contains("spark.yarn.credentials.file")) {
+ if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
// If a principal and keytab have been set, use that to create new credentials for executors
// periodically
@@ -275,7 +275,7 @@ private[spark] class ApplicationMaster(
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress =
- sparkConf.getOption("spark.yarn.historyServer.address")
+ sparkConf.get(HISTORY_SERVER_ADDRESS)
.map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
.getOrElse("")
@@ -355,7 +355,7 @@ private[spark] class ApplicationMaster(
private def launchReporterThread(): Thread = {
// The number of failures in a row until Reporter thread give up
- val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
+ val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
val t = new Thread {
override def run() {
@@ -429,7 +429,7 @@ private[spark] class ApplicationMaster(
private def cleanupStagingDir(fs: FileSystem) {
var stagingDirPath: Path = null
try {
- val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
+ val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {
@@ -448,7 +448,7 @@ private[spark] class ApplicationMaster(
private def waitForSparkContextInitialized(): SparkContext = {
logInfo("Waiting for spark context initialization")
sparkContextRef.synchronized {
- val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", "100s")
+ val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
val deadline = System.currentTimeMillis() + totalWaitTime
while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) {
@@ -473,7 +473,7 @@ private[spark] class ApplicationMaster(
// Spark driver should already be up since it launched us, but we don't want to
// wait forever, so wait 100 seconds max to match the cluster mode setting.
- val totalWaitTimeMs = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", "100s")
+ val totalWaitTimeMs = sparkConf.get(AM_MAX_WAIT_TIME)
val deadline = System.currentTimeMillis + totalWaitTimeMs
while (!driverUp && !finished && System.currentTimeMillis < deadline) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index be45e9597f..36073de90d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.util.Utils
@@ -87,8 +89,7 @@ private[spark] class Client(
}
}
}
- private val fireAndForget = isClusterMode &&
- !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
+ private val fireAndForget = isClusterMode && sparkConf.get(WAIT_FOR_APP_COMPLETION)
private var appId: ApplicationId = null
@@ -156,7 +157,7 @@ private[spark] class Client(
private def cleanupStagingDir(appId: ApplicationId): Unit = {
val appStagingDir = getAppStagingDir(appId)
try {
- val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
+ val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
val stagingDirPath = new Path(appStagingDir)
val fs = FileSystem.get(hadoopConf)
if (!preserveFiles && fs.exists(stagingDirPath)) {
@@ -181,39 +182,36 @@ private[spark] class Client(
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
- sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
- .map(StringUtils.getTrimmedStringCollection(_))
- .filter(!_.isEmpty())
- .foreach { tagCollection =>
- try {
- // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
- // reflection to set it, printing a warning if a tag was specified but the YARN version
- // doesn't support it.
- val method = appContext.getClass().getMethod(
- "setApplicationTags", classOf[java.util.Set[String]])
- method.invoke(appContext, new java.util.HashSet[String](tagCollection))
- } catch {
- case e: NoSuchMethodException =>
- logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because this version of " +
- "YARN does not support it")
- }
+
+ sparkConf.get(APPLICATION_TAGS).foreach { tags =>
+ try {
+ // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
+ // reflection to set it, printing a warning if a tag was specified but the YARN version
+ // doesn't support it.
+ val method = appContext.getClass().getMethod(
+ "setApplicationTags", classOf[java.util.Set[String]])
+ method.invoke(appContext, new java.util.HashSet[String](tags.asJava))
+ } catch {
+ case e: NoSuchMethodException =>
+ logWarning(s"Ignoring ${APPLICATION_TAGS.key} because this version of " +
+ "YARN does not support it")
}
- sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
+ }
+ sparkConf.get(MAX_APP_ATTEMPTS) match {
case Some(v) => appContext.setMaxAppAttempts(v)
- case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
+ case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +
"Cluster's default value will be used.")
}
- if (sparkConf.contains("spark.yarn.am.attemptFailuresValidityInterval")) {
+ sparkConf.get(ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
try {
- val interval = sparkConf.getTimeAsMs("spark.yarn.am.attemptFailuresValidityInterval")
val method = appContext.getClass().getMethod(
"setAttemptFailuresValidityInterval", classOf[Long])
method.invoke(appContext, interval: java.lang.Long)
} catch {
case e: NoSuchMethodException =>
- logWarning("Ignoring spark.yarn.am.attemptFailuresValidityInterval because the version " +
- "of YARN does not support it")
+ logWarning(s"Ignoring ${ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
+ "the version of YARN does not support it")
}
}
@@ -221,28 +219,28 @@ private[spark] class Client(
capability.setMemory(args.amMemory + amMemoryOverhead)
capability.setVirtualCores(args.amCores)
- if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) {
- try {
- val amRequest = Records.newRecord(classOf[ResourceRequest])
- amRequest.setResourceName(ResourceRequest.ANY)
- amRequest.setPriority(Priority.newInstance(0))
- amRequest.setCapability(capability)
- amRequest.setNumContainers(1)
- val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression")
- val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
- method.invoke(amRequest, amLabelExpression)
-
- val setResourceRequestMethod =
- appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
- setResourceRequestMethod.invoke(appContext, amRequest)
- } catch {
- case e: NoSuchMethodException =>
- logWarning("Ignoring spark.yarn.am.nodeLabelExpression because the version " +
- "of YARN does not support it")
- appContext.setResource(capability)
- }
- } else {
- appContext.setResource(capability)
+ sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
+ case Some(expr) =>
+ try {
+ val amRequest = Records.newRecord(classOf[ResourceRequest])
+ amRequest.setResourceName(ResourceRequest.ANY)
+ amRequest.setPriority(Priority.newInstance(0))
+ amRequest.setCapability(capability)
+ amRequest.setNumContainers(1)
+ val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
+ method.invoke(amRequest, expr)
+
+ val setResourceRequestMethod =
+ appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
+ setResourceRequestMethod.invoke(appContext, amRequest)
+ } catch {
+ case e: NoSuchMethodException =>
+ logWarning(s"Ignoring ${AM_NODE_LABEL_EXPRESSION.key} because the version " +
+ "of YARN does not support it")
+ appContext.setResource(capability)
+ }
+ case None =>
+ appContext.setResource(capability)
}
appContext
@@ -345,8 +343,8 @@ private[spark] class Client(
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
- val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
- fs.getDefaultReplication(dst)).toShort
+ val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
+ .getOrElse(fs.getDefaultReplication(dst))
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
@@ -419,7 +417,7 @@ private[spark] class Client(
logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
" via the YARN Secure Distributed Cache.")
val (_, localizedPath) = distribute(keytab,
- destName = Some(sparkConf.get("spark.yarn.keytab")),
+ destName = sparkConf.get(KEYTAB),
appMasterOnly = true)
require(localizedPath != null, "Keytab file already distributed.")
}
@@ -433,8 +431,8 @@ private[spark] class Client(
* (3) Spark property key to set if the scheme is not local
*/
List(
- (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
- (APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
+ (SPARK_JAR_NAME, sparkJar(sparkConf), SPARK_JAR.key),
+ (APP_JAR_NAME, args.userJar, APP_JAR.key),
("log4j.properties", oldLog4jConf.orNull, null)
).foreach { case (destName, path, confKey) =>
if (path != null && !path.trim().isEmpty()) {
@@ -472,7 +470,7 @@ private[spark] class Client(
}
}
if (cachedSecondaryJarLinks.nonEmpty) {
- sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
+ sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks)
}
if (isClusterMode && args.primaryPyFile != null) {
@@ -586,7 +584,7 @@ private[spark] class Client(
val creds = new Credentials()
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
- nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal")))
+ nns, hadoopConf, creds, sparkConf.get(PRINCIPAL))
val t = creds.getAllTokens.asScala
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.head
@@ -606,8 +604,7 @@ private[spark] class Client(
pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
- val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
- populateClasspath(args, yarnConf, sparkConf, env, true, extraCp)
+ populateClasspath(args, yarnConf, sparkConf, env, true, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -615,11 +612,10 @@ private[spark] class Client(
val remoteFs = FileSystem.get(hadoopConf)
val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
val credentialsFile = "credentials-" + UUID.randomUUID().toString
- sparkConf.set(
- "spark.yarn.credentials.file", new Path(stagingDirPath, credentialsFile).toString)
+ sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
logInfo(s"Credentials file set to: $credentialsFile")
val renewalInterval = getTokenRenewalInterval(stagingDirPath)
- sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString)
+ sparkConf.set(TOKEN_RENEWAL_INTERVAL, renewalInterval)
}
// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
@@ -713,7 +709,7 @@ private[spark] class Client(
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val pySparkArchives =
- if (sparkConf.getBoolean("spark.yarn.isPython", false)) {
+ if (sparkConf.get(IS_PYTHON_APP)) {
findPySparkArchives()
} else {
Nil
@@ -766,36 +762,33 @@ private[spark] class Client(
// Include driver-specific java options if we are launching a driver
if (isClusterMode) {
- val driverOpts = sparkConf.getOption("spark.driver.extraJavaOptions")
- .orElse(sys.env.get("SPARK_JAVA_OPTS"))
+ val driverOpts = sparkConf.get(DRIVER_JAVA_OPTIONS).orElse(sys.env.get("SPARK_JAVA_OPTS"))
driverOpts.foreach { opts =>
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
- val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
+ val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
sys.props.get("spark.driver.libraryPath")).flatten
if (libraryPaths.nonEmpty) {
prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(libraryPaths)))
}
- if (sparkConf.getOption("spark.yarn.am.extraJavaOptions").isDefined) {
- logWarning("spark.yarn.am.extraJavaOptions will not take effect in cluster mode")
+ if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) {
+ logWarning(s"${AM_JAVA_OPTIONS.key} will not take effect in cluster mode")
}
} else {
// Validate and include yarn am specific java options in yarn-client mode.
- val amOptsKey = "spark.yarn.am.extraJavaOptions"
- val amOpts = sparkConf.getOption(amOptsKey)
- amOpts.foreach { opts =>
+ sparkConf.get(AM_JAVA_OPTIONS).foreach { opts =>
if (opts.contains("-Dspark")) {
- val msg = s"$amOptsKey is not allowed to set Spark options (was '$opts'). "
+ val msg = s"$${amJavaOptions.key} is not allowed to set Spark options (was '$opts'). "
throw new SparkException(msg)
}
if (opts.contains("-Xmx") || opts.contains("-Xms")) {
- val msg = s"$amOptsKey is not allowed to alter memory settings (was '$opts')."
+ val msg = s"$${amJavaOptions.key} is not allowed to alter memory settings (was '$opts')."
throw new SparkException(msg)
}
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
- sparkConf.getOption("spark.yarn.am.extraLibraryPath").foreach { paths =>
+ sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(paths))))
}
}
@@ -883,17 +876,10 @@ private[spark] class Client(
}
def setupCredentials(): Unit = {
- loginFromKeytab = args.principal != null || sparkConf.contains("spark.yarn.principal")
+ loginFromKeytab = args.principal != null || sparkConf.contains(PRINCIPAL.key)
if (loginFromKeytab) {
- principal =
- if (args.principal != null) args.principal else sparkConf.get("spark.yarn.principal")
- keytab = {
- if (args.keytab != null) {
- args.keytab
- } else {
- sparkConf.getOption("spark.yarn.keytab").orNull
- }
- }
+ principal = Option(args.principal).orElse(sparkConf.get(PRINCIPAL)).get
+ keytab = Option(args.keytab).orElse(sparkConf.get(KEYTAB)).orNull
require(keytab != null, "Keytab must be specified when principal is specified.")
logInfo("Attempting to login to the Kerberos" +
@@ -902,8 +888,8 @@ private[spark] class Client(
// Generate a file name that can be used for the keytab file, that does not conflict
// with any user file.
val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
- sparkConf.set("spark.yarn.keytab", keytabFileName)
- sparkConf.set("spark.yarn.principal", principal)
+ sparkConf.set(KEYTAB.key, keytabFileName)
+ sparkConf.set(PRINCIPAL.key, principal)
}
credentials = UserGroupInformation.getCurrentUser.getCredentials
}
@@ -923,7 +909,7 @@ private[spark] class Client(
appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
- val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
+ val interval = sparkConf.get(REPORT_INTERVAL)
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
@@ -1071,14 +1057,14 @@ object Client extends Logging {
val args = new ClientArguments(argStrings, sparkConf)
// to maintain backwards-compatibility
if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
- sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString)
+ sparkConf.setIfMissing(EXECUTOR_INSTANCES, args.numExecutors)
}
new Client(args, sparkConf).run()
}
// Alias for the Spark assembly jar and the user jar
- val SPARK_JAR: String = "__spark__.jar"
- val APP_JAR: String = "__app__.jar"
+ val SPARK_JAR_NAME: String = "__spark__.jar"
+ val APP_JAR_NAME: String = "__app__.jar"
// URI scheme that identifies local resources
val LOCAL_SCHEME = "local"
@@ -1087,20 +1073,8 @@ object Client extends Logging {
val SPARK_STAGING: String = ".sparkStaging"
// Location of any user-defined Spark jars
- val CONF_SPARK_JAR = "spark.yarn.jar"
val ENV_SPARK_JAR = "SPARK_JAR"
- // Internal config to propagate the location of the user's jar to the driver/executors
- val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
-
- // Internal config to propagate the locations of any extra jars to add to the classpath
- // of the executors
- val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
-
- // Comma-separated list of strings to pass through as YARN application tags appearing
- // in YARN ApplicationReports, which can be used for filtering when querying YARN.
- val CONF_SPARK_YARN_APPLICATION_TAGS = "spark.yarn.tags"
-
// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission =
FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
@@ -1125,23 +1099,23 @@ object Client extends Logging {
* Find the user-defined Spark jar if configured, or return the jar containing this
* class if not.
*
- * This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the
+ * This method first looks in the SparkConf object for the spark.yarn.jar key, and in the
* user environment if that is not found (for backwards compatibility).
*/
private def sparkJar(conf: SparkConf): String = {
- if (conf.contains(CONF_SPARK_JAR)) {
- conf.get(CONF_SPARK_JAR)
- } else if (System.getenv(ENV_SPARK_JAR) != null) {
- logWarning(
- s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " +
- s"in favor of the $CONF_SPARK_JAR configuration variable.")
- System.getenv(ENV_SPARK_JAR)
- } else {
- SparkContext.jarOfClass(this.getClass).getOrElse(throw new SparkException("Could not "
- + "find jar containing Spark classes. The jar can be defined using the "
- + "spark.yarn.jar configuration option. If testing Spark, either set that option or "
- + "make sure SPARK_PREPEND_CLASSES is not set."))
- }
+ conf.get(SPARK_JAR).getOrElse(
+ if (System.getenv(ENV_SPARK_JAR) != null) {
+ logWarning(
+ s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " +
+ s"in favor of the ${SPARK_JAR.key} configuration variable.")
+ System.getenv(ENV_SPARK_JAR)
+ } else {
+ SparkContext.jarOfClass(this.getClass).getOrElse(throw new SparkException("Could not "
+ + "find jar containing Spark classes. The jar can be defined using the "
+ + s"${SPARK_JAR.key} configuration option. If testing Spark, either set that option "
+ + "or make sure SPARK_PREPEND_CLASSES is not set."))
+ }
+ )
}
/**
@@ -1240,7 +1214,7 @@ object Client extends Logging {
LOCALIZED_CONF_DIR, env)
}
- if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
+ if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
// in order to properly add the app jar when user classpath is first
// we have to do the mainJar separate in order to send the right thing
// into addFileToClasspath
@@ -1248,21 +1222,21 @@ object Client extends Logging {
if (args != null) {
getMainJarUri(Option(args.userJar))
} else {
- getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR))
+ getMainJarUri(sparkConf.get(APP_JAR))
}
- mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env))
+ mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR_NAME, env))
val secondaryJars =
if (args != null) {
- getSecondaryJarUris(Option(args.addJars))
+ getSecondaryJarUris(Option(args.addJars).map(_.split(",").toSeq))
} else {
- getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+ getSecondaryJarUris(sparkConf.get(SECONDARY_JARS))
}
secondaryJars.foreach { x =>
addFileToClasspath(sparkConf, conf, x, null, env)
}
}
- addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+ addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR_NAME, env)
populateHadoopClasspath(conf, env)
sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1275,8 +1249,8 @@ object Client extends Logging {
* @param conf Spark configuration.
*/
def getUserClasspath(conf: SparkConf): Array[URI] = {
- val mainUri = getMainJarUri(conf.getOption(CONF_SPARK_USER_JAR))
- val secondaryUris = getSecondaryJarUris(conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS))
+ val mainUri = getMainJarUri(conf.get(APP_JAR))
+ val secondaryUris = getSecondaryJarUris(conf.get(SECONDARY_JARS))
(mainUri ++ secondaryUris).toArray
}
@@ -1284,11 +1258,11 @@ object Client extends Logging {
mainJar.flatMap { path =>
val uri = Utils.resolveURI(path)
if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None
- }.orElse(Some(new URI(APP_JAR)))
+ }.orElse(Some(new URI(APP_JAR_NAME)))
}
- private def getSecondaryJarUris(secondaryJars: Option[String]): Seq[URI] = {
- secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_))
+ private def getSecondaryJarUris(secondaryJars: Option[Seq[String]]): Seq[URI] = {
+ secondaryJars.getOrElse(Nil).map(new URI(_))
}
/**
@@ -1345,8 +1319,8 @@ object Client extends Logging {
* If either config is not available, the input path is returned.
*/
def getClusterPath(conf: SparkConf, path: String): String = {
- val localPath = conf.get("spark.yarn.config.gatewayPath", null)
- val clusterPath = conf.get("spark.yarn.config.replacementPath", null)
+ val localPath = conf.get(GATEWAY_ROOT_PATH)
+ val clusterPath = conf.get(REPLACEMENT_ROOT_PATH)
if (localPath != null && clusterPath != null) {
path.replace(localPath, clusterPath)
} else {
@@ -1405,9 +1379,9 @@ object Client extends Logging {
*/
def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean = {
if (isDriver) {
- conf.getBoolean("spark.driver.userClassPathFirst", false)
+ conf.get(DRIVER_USER_CLASS_PATH_FIRST)
} else {
- conf.getBoolean("spark.executor.userClassPathFirst", false)
+ conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index a9f4374357..47b4cc3009 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -21,10 +21,15 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
import org.apache.spark.util.{IntParam, MemoryParam, Utils}
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
+private[spark] class ClientArguments(
+ args: Array[String],
+ sparkConf: SparkConf) {
+
var addJars: String = null
var files: String = null
var archives: String = null
@@ -37,9 +42,9 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var executorMemory = 1024 // MB
var executorCores = 1
var numExecutors = DEFAULT_NUMBER_EXECUTORS
- var amQueue = sparkConf.get("spark.yarn.queue", "default")
- var amMemory: Int = 512 // MB
- var amCores: Int = 1
+ var amQueue = sparkConf.get(QUEUE_NAME)
+ var amMemory: Int = _
+ var amCores: Int = _
var appName: String = "Spark"
var priority = 0
var principal: String = null
@@ -48,11 +53,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
private var driverMemory: Int = Utils.DEFAULT_DRIVER_MEM_MB // MB
private var driverCores: Int = 1
- private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead"
- private val amMemKey = "spark.yarn.am.memory"
- private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
- private val driverCoresKey = "spark.driver.cores"
- private val amCoresKey = "spark.yarn.am.cores"
private val isDynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
parseArgs(args.toList)
@@ -60,33 +60,33 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
validateArgs()
// Additional memory to allocate to containers
- val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey
- val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf,
- math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
+ val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
+ val amMemoryOverhead = sparkConf.get(amMemoryOverheadEntry).getOrElse(
+ math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
- val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+ val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
/** Load any default arguments provided through environment variables and Spark properties. */
private def loadEnvironmentArgs(): Unit = {
// For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
// while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
files = Option(files)
- .orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p)))
+ .orElse(sparkConf.get(FILES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)))
.orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
.orNull
archives = Option(archives)
- .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
+ .orElse(sparkConf.get(ARCHIVES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)))
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orNull
// If dynamic allocation is enabled, start at the configured initial number of executors.
// Default to minExecutors if no initialExecutors is set.
numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors)
principal = Option(principal)
- .orElse(sparkConf.getOption("spark.yarn.principal"))
+ .orElse(sparkConf.get(PRINCIPAL))
.orNull
keytab = Option(keytab)
- .orElse(sparkConf.getOption("spark.yarn.keytab"))
+ .orElse(sparkConf.get(KEYTAB))
.orNull
}
@@ -103,13 +103,12 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
|${getUsageMessage()}
""".stripMargin)
}
- if (executorCores < sparkConf.getInt("spark.task.cpus", 1)) {
- throw new SparkException("Executor cores must not be less than " +
- "spark.task.cpus.")
+ if (executorCores < sparkConf.get(CPUS_PER_TASK)) {
+ throw new SparkException(s"Executor cores must not be less than ${CPUS_PER_TASK.key}.")
}
// scalastyle:off println
if (isClusterMode) {
- for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) {
+ for (key <- Seq(AM_MEMORY.key, AM_MEMORY_OVERHEAD.key, AM_CORES.key)) {
if (sparkConf.contains(key)) {
println(s"$key is set but does not apply in cluster mode.")
}
@@ -117,17 +116,13 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
amMemory = driverMemory
amCores = driverCores
} else {
- for (key <- Seq(driverMemOverheadKey, driverCoresKey)) {
+ for (key <- Seq(DRIVER_MEMORY_OVERHEAD.key, DRIVER_CORES.key)) {
if (sparkConf.contains(key)) {
println(s"$key is set but does not apply in client mode.")
}
}
- sparkConf.getOption(amMemKey)
- .map(Utils.memoryStringToMb)
- .foreach { mem => amMemory = mem }
- sparkConf.getOption(amCoresKey)
- .map(_.toInt)
- .foreach { cores => amCores = cores }
+ amMemory = sparkConf.get(AM_MEMORY).toInt
+ amCores = sparkConf.get(AM_CORES)
}
// scalastyle:on println
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
index 6474acc3dc..1ae278d76f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorDelegationTokenUpdater.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class ExecutorDelegationTokenUpdater(
@@ -34,7 +35,7 @@ private[spark] class ExecutorDelegationTokenUpdater(
@volatile private var lastCredentialsFileSuffix = 0
- private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
+ private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
private val freshHadoopConf =
SparkHadoopUtil.get.getConfBypassingFSCache(
hadoopConf, new Path(credentialsFile).toUri.getScheme)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 21ac04dc76..9f91d182eb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -38,11 +38,13 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
-class ExecutorRunnable(
+private[yarn] class ExecutorRunnable(
container: Container,
conf: Configuration,
sparkConf: SparkConf,
@@ -104,7 +106,7 @@ class ExecutorRunnable(
// If external shuffle service is enabled, register with the Yarn shuffle service already
// started on the NodeManager and, if authentication is enabled, provide it with our secret
// key for fetching shuffle files later
- if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
+ if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
val secretString = securityMgr.getSecretKey()
val secretBytes =
if (secretString != null) {
@@ -148,13 +150,13 @@ class ExecutorRunnable(
javaOpts += "-Xmx" + executorMemoryString
// Set extra Java options for the executor, if defined
- sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
+ sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts =>
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
- sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
+ sparkConf.get(EXECUTOR_LIBRARY_PATH).foreach { p =>
prefixEnv = Some(Client.getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(p))))
}
@@ -286,8 +288,8 @@ class ExecutorRunnable(
private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
- val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
- Client.populateClasspath(null, yarnConf, sparkConf, env, false, extraCp)
+ Client.populateClasspath(null, yarnConf, sparkConf, env, false,
+ sparkConf.get(EXECUTOR_CLASS_PATH))
sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index 2ec189de7c..8772e26f43 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config._
private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])
@@ -84,9 +85,6 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
val yarnConf: Configuration,
val resource: Resource) {
- // Number of CPUs per task
- private val CPUS_PER_TASK = sparkConf.getInt("spark.task.cpus", 1)
-
/**
* Calculate each container's node locality and rack locality
* @param numContainer number of containers to calculate
@@ -159,7 +157,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
*/
private def numExecutorsPending(numTasksPending: Int): Int = {
val coresPerExecutor = resource.getVirtualCores
- (numTasksPending * CPUS_PER_TASK + coresPerExecutor - 1) / coresPerExecutor
+ (numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor
}
/**
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 11426eb07c..a96cb4957b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -34,6 +34,7 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -107,21 +108,20 @@ private[yarn] class YarnAllocator(
// Executor memory in MB.
protected val executorMemory = args.executorMemory
// Additional memory overhead.
- protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+ protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
// Number of cores per executor.
protected val executorCores = args.executorCores
// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
- "ContainerLauncher",
- sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25))
+ "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
// For testing
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
- private val labelExpression = sparkConf.getOption("spark.yarn.executor.nodeLabelExpression")
+ private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
// ContainerRequest constructor that can take a node label expression. We grab it through
// reflection because it's only available in later versions of YARN.
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 98505b93dd..968f635276 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
@@ -117,7 +118,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
/** Returns the maximum number of attempts to register the AM. */
def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = {
- val sparkMaxAttempts = sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt)
+ val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt)
val yarnMaxAttempts = yarnConf.getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
val retval: Int = sparkMaxAttempts match {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index aef78fdfd4..ed56d4bd44 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.util.Utils
@@ -97,10 +99,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
* Get the list of namenodes the user may access.
*/
def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
- sparkConf.get("spark.yarn.access.namenodes", "")
- .split(",")
- .map(_.trim())
- .filter(!_.isEmpty)
+ sparkConf.get(NAMENODES_TO_ACCESS)
.map(new Path(_))
.toSet
}
@@ -335,7 +334,7 @@ object YarnSparkHadoopUtil {
// the common cases. Memory overhead tends to grow with container size.
val MEMORY_OVERHEAD_FACTOR = 0.10
- val MEMORY_OVERHEAD_MIN = 384
+ val MEMORY_OVERHEAD_MIN = 384L
val ANY_HOST = "*"
@@ -509,10 +508,9 @@ object YarnSparkHadoopUtil {
conf: SparkConf,
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
- val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
- val initialNumExecutors =
- conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
- val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
+ val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
+ val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS)
+ val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
s"initial executor number $initialNumExecutors must between min executor number" +
s"$minNumExecutors and max executor number $maxNumExecutors")
@@ -522,7 +520,7 @@ object YarnSparkHadoopUtil {
val targetNumExecutors =
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors)
// System property can override environment variable.
- conf.getInt("spark.executor.instances", targetNumExecutors)
+ conf.get(EXECUTOR_INSTANCES).getOrElse(targetNumExecutors)
}
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
new file mode 100644
index 0000000000..06c1be9bf0
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+package object config {
+
+ /* Common app configuration. */
+
+ private[spark] val APPLICATION_TAGS = ConfigBuilder("spark.yarn.tags")
+ .doc("Comma-separated list of strings to pass through as YARN application tags appearing " +
+ "in YARN Application Reports, which can be used for filtering when querying YARN.")
+ .stringConf
+ .toSequence
+ .optional
+
+ private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+ ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
+ .doc("Interval after which AM failures will be considered independent and " +
+ "not accumulate towards the attempt count.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .optional
+
+ private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts")
+ .doc("Maximum number of AM attempts before failing the app.")
+ .intConf
+ .optional
+
+ private[spark] val USER_CLASS_PATH_FIRST = ConfigBuilder("spark.yarn.user.classpath.first")
+ .doc("Whether to place user jars in front of Spark's classpath.")
+ .booleanConf
+ .withDefault(false)
+
+ private[spark] val GATEWAY_ROOT_PATH = ConfigBuilder("spark.yarn.config.gatewayPath")
+ .doc("Root of configuration paths that is present on gateway nodes, and will be replaced " +
+ "with the corresponding path in cluster machines.")
+ .stringConf
+ .withDefault(null)
+
+ private[spark] val REPLACEMENT_ROOT_PATH = ConfigBuilder("spark.yarn.config.replacementPath")
+ .doc(s"Path to use as a replacement for ${GATEWAY_ROOT_PATH.key} when launching processes " +
+ "in the YARN cluster.")
+ .stringConf
+ .withDefault(null)
+
+ private[spark] val QUEUE_NAME = ConfigBuilder("spark.yarn.queue")
+ .stringConf
+ .withDefault("default")
+
+ private[spark] val HISTORY_SERVER_ADDRESS = ConfigBuilder("spark.yarn.historyServer.address")
+ .stringConf
+ .optional
+
+ /* File distribution. */
+
+ private[spark] val SPARK_JAR = ConfigBuilder("spark.yarn.jar")
+ .doc("Location of the Spark jar to use.")
+ .stringConf
+ .optional
+
+ private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
+ .stringConf
+ .optional
+
+ private[spark] val FILES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.files")
+ .stringConf
+ .optional
+
+ private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files")
+ .doc("Whether to preserve temporary files created by the job in HDFS.")
+ .booleanConf
+ .withDefault(false)
+
+ private[spark] val STAGING_FILE_REPLICATION = ConfigBuilder("spark.yarn.submit.file.replication")
+ .doc("Replication factor for files uploaded by Spark to HDFS.")
+ .intConf
+ .optional
+
+ /* Cluster-mode launcher configuration. */
+
+ private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
+ .doc("In cluster mode, whether to wait for the application to finishe before exiting the " +
+ "launcher process.")
+ .booleanConf
+ .withDefault(true)
+
+ private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval")
+ .doc("Interval between reports of the current app status in cluster mode.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .withDefaultString("1s")
+
+ /* Shared Client-mode AM / Driver configuration. */
+
+ private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .withDefaultString("100s")
+
+ private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression")
+ .doc("Node label expression for the AM.")
+ .stringConf
+ .optional
+
+ private[spark] val CONTAINER_LAUNCH_MAX_THREADS =
+ ConfigBuilder("spark.yarn.containerLauncherMaxThreads")
+ .intConf
+ .withDefault(25)
+
+ private[spark] val MAX_EXECUTOR_FAILURES = ConfigBuilder("spark.yarn.max.executor.failures")
+ .intConf
+ .optional
+
+ private[spark] val MAX_REPORTER_THREAD_FAILURES =
+ ConfigBuilder("spark.yarn.scheduler.reporterThread.maxFailures")
+ .intConf
+ .withDefault(5)
+
+ private[spark] val RM_HEARTBEAT_INTERVAL =
+ ConfigBuilder("spark.yarn.scheduler.heartbeat.interval-ms")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .withDefaultString("3s")
+
+ private[spark] val INITIAL_HEARTBEAT_INTERVAL =
+ ConfigBuilder("spark.yarn.scheduler.initial-allocation.interval")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .withDefaultString("200ms")
+
+ private[spark] val SCHEDULER_SERVICES = ConfigBuilder("spark.yarn.services")
+ .doc("A comma-separated list of class names of services to add to the scheduler.")
+ .stringConf
+ .toSequence
+ .withDefault(Nil)
+
+ /* Client-mode AM configuration. */
+
+ private[spark] val AM_CORES = ConfigBuilder("spark.yarn.am.cores")
+ .intConf
+ .withDefault(1)
+
+ private[spark] val AM_JAVA_OPTIONS = ConfigBuilder("spark.yarn.am.extraJavaOptions")
+ .doc("Extra Java options for the client-mode AM.")
+ .stringConf
+ .optional
+
+ private[spark] val AM_LIBRARY_PATH = ConfigBuilder("spark.yarn.am.extraLibraryPath")
+ .doc("Extra native library path for the client-mode AM.")
+ .stringConf
+ .optional
+
+ private[spark] val AM_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.am.memoryOverhead")
+ .bytesConf(ByteUnit.MiB)
+ .optional
+
+ private[spark] val AM_MEMORY = ConfigBuilder("spark.yarn.am.memory")
+ .bytesConf(ByteUnit.MiB)
+ .withDefaultString("512m")
+
+ /* Driver configuration. */
+
+ private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
+ .intConf
+ .optional
+
+ private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead")
+ .bytesConf(ByteUnit.MiB)
+ .optional
+
+ /* Executor configuration. */
+
+ private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
+ .bytesConf(ByteUnit.MiB)
+ .optional
+
+ private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
+ ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
+ .doc("Node label expression for executors.")
+ .stringConf
+ .optional
+
+ /* Security configuration. */
+
+ private[spark] val CREDENTIAL_FILE_MAX_COUNT =
+ ConfigBuilder("spark.yarn.credentials.file.retention.count")
+ .intConf
+ .withDefault(5)
+
+ private[spark] val CREDENTIALS_FILE_MAX_RETENTION =
+ ConfigBuilder("spark.yarn.credentials.file.retention.days")
+ .intConf
+ .withDefault(5)
+
+ private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes")
+ .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " +
+ "fs.defaultFS does not need to be listed here.")
+ .stringConf
+ .toSequence
+ .withDefault(Nil)
+
+ private[spark] val TOKEN_RENEWAL_INTERVAL = ConfigBuilder("spark.yarn.token.renewal.interval")
+ .internal
+ .timeConf(TimeUnit.MILLISECONDS)
+ .optional
+
+ /* Private configs. */
+
+ private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file")
+ .internal
+ .stringConf
+ .withDefault(null)
+
+ // Internal config to propagate the location of the user's jar to the driver/executors
+ private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar")
+ .internal
+ .stringConf
+ .optional
+
+ // Internal config to propagate the locations of any extra jars to add to the classpath
+ // of the executors
+ private[spark] val SECONDARY_JARS = ConfigBuilder("spark.yarn.secondary.jars")
+ .internal
+ .stringConf
+ .toSequence
+ .optional
+
+}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
index c064521845..c4757e335b 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.util.Utils
/**
@@ -103,20 +104,15 @@ private[spark] class SchedulerExtensionServices extends SchedulerExtensionServic
val attemptId = binding.attemptId
logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId")
- serviceOption = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
- services = serviceOption
- .map { s =>
- s.split(",").map(_.trim()).filter(!_.isEmpty)
- .map { sClass =>
- val instance = Utils.classForName(sClass)
- .newInstance()
- .asInstanceOf[SchedulerExtensionService]
- // bind this service
- instance.start(binding)
- logInfo(s"Service $sClass started")
- instance
- }.toList
- }.getOrElse(Nil)
+ services = sparkContext.conf.get(SCHEDULER_SERVICES).map { sClass =>
+ val instance = Utils.classForName(sClass)
+ .newInstance()
+ .asInstanceOf[SchedulerExtensionService]
+ // bind this service
+ instance.start(binding)
+ logInfo(s"Service $sClass started")
+ instance
+ }.toList
}
/**
@@ -144,11 +140,3 @@ private[spark] class SchedulerExtensionServices extends SchedulerExtensionServic
| services=$services,
| started=$started)""".stripMargin
}
-
-private[spark] object SchedulerExtensionServices {
-
- /**
- * A list of comma separated services to instantiate in the scheduler
- */
- val SPARK_YARN_SERVICES = "spark.yarn.services"
-}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 19065373c6..b57c179d89 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -41,6 +41,7 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.util.{ResetSystemProperties, Utils}
class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
@@ -103,8 +104,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
test("Local jar URIs") {
val conf = new Configuration()
- val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK)
- .set("spark.yarn.user.classpath.first", "true")
+ val sparkConf = new SparkConf()
+ .set(SPARK_JAR, SPARK)
+ .set(USER_CLASS_PATH_FIRST, true)
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
@@ -129,13 +131,13 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
cp should contain(pwdVar)
cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
- cp should not contain (Client.SPARK_JAR)
- cp should not contain (Client.APP_JAR)
+ cp should not contain (Client.SPARK_JAR_NAME)
+ cp should not contain (Client.APP_JAR_NAME)
}
test("Jar path propagation through SparkConf") {
val conf = new Configuration()
- val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK)
+ val sparkConf = new SparkConf().set(SPARK_JAR, SPARK)
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
val client = spy(new Client(args, conf, sparkConf))
@@ -145,7 +147,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val tempDir = Utils.createTempDir()
try {
client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
- sparkConf.getOption(Client.CONF_SPARK_USER_JAR) should be (Some(USER))
+ sparkConf.get(APP_JAR) should be (Some(USER))
// The non-local path should be propagated by name only, since it will end up in the app's
// staging dir.
@@ -160,7 +162,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
})
.mkString(",")
- sparkConf.getOption(Client.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected))
+ sparkConf.get(SECONDARY_JARS) should be (Some(expected.split(",").toSeq))
} finally {
Utils.deleteRecursively(tempDir)
}
@@ -169,9 +171,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
test("Cluster path translation") {
val conf = new Configuration()
val sparkConf = new SparkConf()
- .set(Client.CONF_SPARK_JAR, "local:/localPath/spark.jar")
- .set("spark.yarn.config.gatewayPath", "/localPath")
- .set("spark.yarn.config.replacementPath", "/remotePath")
+ .set(SPARK_JAR.key, "local:/localPath/spark.jar")
+ .set(GATEWAY_ROOT_PATH, "/localPath")
+ .set(REPLACEMENT_ROOT_PATH, "/remotePath")
Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
@@ -191,8 +193,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
// Spaces between non-comma strings should be preserved as single tags. Empty strings may or
// may not be removed depending on the version of Hadoop being used.
val sparkConf = new SparkConf()
- .set(Client.CONF_SPARK_YARN_APPLICATION_TAGS, ",tag1, dup,tag2 , ,multi word , dup")
- .set("spark.yarn.maxAppAttempts", "42")
+ .set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
+ .set(MAX_APP_ATTEMPTS, 42)
val args = new ClientArguments(Array(
"--name", "foo-test-app",
"--queue", "staging-queue"), sparkConf)
diff --git a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
index b4d1b0a3d2..338fbe2ef4 100644
--- a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, Logging, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
/**
* Test the integration with [[SchedulerExtensionServices]]
@@ -36,8 +37,7 @@ class ExtensionServiceIntegrationSuite extends SparkFunSuite
*/
before {
val sparkConf = new SparkConf()
- sparkConf.set(SchedulerExtensionServices.SPARK_YARN_SERVICES,
- classOf[SimpleExtensionService].getName())
+ sparkConf.set(SCHEDULER_SERVICES, Seq(classOf[SimpleExtensionService].getName()))
sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite")
sc = new SparkContext(sparkConf)
}