aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala174
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala22
-rw-r--r--docs/monitoring.md6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala3
6 files changed, 124 insertions, 95 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 390e631647..b0186e9a00 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -68,6 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value for " + key)
}
+ logDeprecationWarning(key)
settings.put(key, value)
this
}
@@ -134,13 +135,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Set multiple parameters together */
def setAll(settings: Traversable[(String, String)]): SparkConf = {
- this.settings.putAll(settings.toMap.asJava)
+ settings.foreach { case (k, v) => set(k, v) }
this
}
/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
- settings.putIfAbsent(key, value)
+ if (settings.putIfAbsent(key, value) == null) {
+ logDeprecationWarning(key)
+ }
this
}
@@ -174,8 +177,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).getOrElse(defaultValue)
}
- /**
- * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
+ /**
+ * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then seconds are assumed.
* @throws NoSuchElementException
*/
@@ -183,36 +186,35 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
Utils.timeStringAsSeconds(get(key))
}
- /**
- * Get a time parameter as seconds, falling back to a default if not set. If no
+ /**
+ * Get a time parameter as seconds, falling back to a default if not set. If no
* suffix is provided then seconds are assumed.
- *
*/
def getTimeAsSeconds(key: String, defaultValue: String): Long = {
Utils.timeStringAsSeconds(get(key, defaultValue))
}
- /**
- * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
- * suffix is provided then milliseconds are assumed.
+ /**
+ * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
+ * suffix is provided then milliseconds are assumed.
* @throws NoSuchElementException
*/
def getTimeAsMs(key: String): Long = {
Utils.timeStringAsMs(get(key))
}
- /**
- * Get a time parameter as milliseconds, falling back to a default if not set. If no
- * suffix is provided then milliseconds are assumed.
+ /**
+ * Get a time parameter as milliseconds, falling back to a default if not set. If no
+ * suffix is provided then milliseconds are assumed.
*/
def getTimeAsMs(key: String, defaultValue: String): Long = {
Utils.timeStringAsMs(get(key, defaultValue))
}
-
+
/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
- Option(settings.get(key))
+ Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
}
/** Get all parameters as a list of pairs */
@@ -379,13 +381,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}
}
-
- // Warn against the use of deprecated configs
- deprecatedConfigs.values.foreach { dc =>
- if (contains(dc.oldName)) {
- dc.warn()
- }
- }
}
/**
@@ -400,19 +395,44 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
private[spark] object SparkConf extends Logging {
+ /**
+ * Maps deprecated config keys to information about the deprecation.
+ *
+ * The extra information is logged as a warning when the config is present in the user's
+ * configuration.
+ */
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
- DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
- "1.3"),
- DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
- "Use spark.{driver,executor}.userClassPathFirst instead."),
- DeprecatedConfig("spark.history.fs.updateInterval",
- "spark.history.fs.update.interval.seconds",
- "1.3", "Use spark.history.fs.update.interval.seconds instead"),
- DeprecatedConfig("spark.history.updateInterval",
- "spark.history.fs.update.interval.seconds",
- "1.3", "Use spark.history.fs.update.interval.seconds instead"))
- configs.map { x => (x.oldName, x) }.toMap
+ DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
+ "Please use spark.{driver,executor}.userClassPathFirst instead."))
+ Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
+ }
+
+ /**
+ * Maps a current config key to alternate keys that were used in previous version of Spark.
+ *
+ * The alternates are used in the order defined in this map. If deprecated configs are
+ * present in the user's configuration, a warning is logged.
+ */
+ private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
+ "spark.executor.userClassPathFirst" -> Seq(
+ AlternateConfig("spark.files.userClassPathFirst", "1.3")),
+ "spark.history.fs.update.interval" -> Seq(
+ AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
+ AlternateConfig("spark.history.fs.updateInterval", "1.3"),
+ AlternateConfig("spark.history.updateInterval", "1.3"))
+ )
+
+ /**
+ * A view of `configsWithAlternatives` that makes it more efficient to look up deprecated
+ * config keys.
+ *
+ * Maps the deprecated config name to a 2-tuple (new config name, alternate config info).
+ */
+ private val allAlternatives: Map[String, (String, AlternateConfig)] = {
+ configsWithAlternatives.keys.flatMap { key =>
+ configsWithAlternatives(key).map { cfg => (cfg.key -> (key -> cfg)) }
+ }.toMap
}
/**
@@ -443,61 +463,57 @@ private[spark] object SparkConf extends Logging {
}
/**
- * Translate the configuration key if it is deprecated and has a replacement, otherwise just
- * returns the provided key.
- *
- * @param userKey Configuration key from the user / caller.
- * @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
- * only once for each key.
+ * Looks for available deprecated keys for the given config option, and return the first
+ * value available.
*/
- private def translateConfKey(userKey: String, warn: Boolean = false): String = {
- deprecatedConfigs.get(userKey)
- .map { deprecatedKey =>
- if (warn) {
- deprecatedKey.warn()
- }
- deprecatedKey.newName.getOrElse(userKey)
- }.getOrElse(userKey)
+ def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = {
+ configsWithAlternatives.get(key).flatMap { alts =>
+ alts.collectFirst { case alt if conf.contains(alt.key) =>
+ val value = conf.get(alt.key)
+ alt.translation.map(_(value)).getOrElse(value)
+ }
+ }
}
/**
- * Holds information about keys that have been deprecated or renamed.
+ * Logs a warning message if the given config key is deprecated.
+ */
+ def logDeprecationWarning(key: String): Unit = {
+ deprecatedConfigs.get(key).foreach { cfg =>
+ logWarning(
+ s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
+ s"may be removed in the future. ${cfg.deprecationMessage}")
+ }
+
+ allAlternatives.get(key).foreach { case (newKey, cfg) =>
+ logWarning(
+ s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
+ s"and may be removed in the future. Please use the new key '$newKey' instead.")
+ }
+ }
+
+ /**
+ * Holds information about keys that have been deprecated and do not have a replacement.
*
- * @param oldName Old configuration key.
- * @param newName New configuration key, or `null` if key has no replacement, in which case the
- * deprecated key will be used (but the warning message will still be printed).
+ * @param key The deprecated key.
* @param version Version of Spark where key was deprecated.
- * @param deprecationMessage Message to include in the deprecation warning; mandatory when
- * `newName` is not provided.
+ * @param deprecationMessage Message to include in the deprecation warning.
*/
private case class DeprecatedConfig(
- oldName: String,
- _newName: String,
+ key: String,
version: String,
- deprecationMessage: String = null) {
-
- private val warned = new AtomicBoolean(false)
- val newName = Option(_newName)
+ deprecationMessage: String)
- if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
- throw new IllegalArgumentException("Need new config name or deprecation message.")
- }
-
- def warn(): Unit = {
- if (warned.compareAndSet(false, true)) {
- if (newName != null) {
- val message = Option(deprecationMessage).getOrElse(
- s"Please use the alternative '$newName' instead.")
- logWarning(
- s"The configuration option '$oldName' has been replaced as of Spark $version and " +
- s"may be removed in the future. $message")
- } else {
- logWarning(
- s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
- s"may be removed in the future. $deprecationMessage")
- }
- }
- }
+ /**
+ * Information about an alternate configuration key that has been deprecated.
+ *
+ * @param key The deprecated config key.
+ * @param version The Spark version in which the key was deprecated.
+ * @param translation A translation function for converting old config values into new ones.
+ */
+ private case class AlternateConfig(
+ key: String,
+ version: String,
+ translation: Option[String => String] = None)
- }
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 9d40d8c8fd..985545742d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -49,11 +49,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private val NOT_STARTED = "<Not Started>"
// Interval between each check for event log updates
- private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
- .orElse(conf.getOption("spark.history.fs.updateInterval"))
- .orElse(conf.getOption("spark.history.updateInterval"))
- .map(_.toInt)
- .getOrElse(10) * 1000
+ private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
// Interval between each cleaner checks for event logs to delete
private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
@@ -130,8 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
// A task that periodically checks for event log updates on disk.
- pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
- TimeUnit.MILLISECONDS)
+ pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 1b5fdeba28..327d155b38 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -89,10 +89,7 @@ private[spark] class Executor(
ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME, new ExecutorEndpoint(env.rpcEnv, executorId))
// Whether to load classes in user jars before those in Spark jars
- private val userClassPathFirst: Boolean = {
- conf.getBoolean("spark.executor.userClassPathFirst",
- conf.getBoolean("spark.files.userClassPathFirst", false))
- }
+ private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index e08210ae60..7d87ba5fd2 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -197,6 +197,28 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
serializer.newInstance().serialize(new StringBuffer())
}
+ test("deprecated configs") {
+ val conf = new SparkConf()
+ val newName = "spark.history.fs.update.interval"
+
+ assert(!conf.contains(newName))
+
+ conf.set("spark.history.updateInterval", "1")
+ assert(conf.get(newName) === "1")
+
+ conf.set("spark.history.fs.updateInterval", "2")
+ assert(conf.get(newName) === "2")
+
+ conf.set("spark.history.fs.update.interval.seconds", "3")
+ assert(conf.get(newName) === "3")
+
+ conf.set(newName, "4")
+ assert(conf.get(newName) === "4")
+
+ val count = conf.getAll.filter { case (k, v) => k.startsWith("spark.history.") }.size
+ assert(count === 4)
+ }
+
}
class Class1 {}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 6816671ffb..2a13022459 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -86,10 +86,10 @@ follows:
</td>
</tr>
<tr>
- <td>spark.history.fs.update.interval.seconds</td>
- <td>10</td>
+ <td>spark.history.fs.update.interval</td>
+ <td>10s</td>
<td>
- The period, in seconds, at which information displayed by this history server is updated.
+ The period at which information displayed by this history server is updated.
Each update checks for any changes made to the event logs in persisted storage.
</td>
</tr>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1091ff54b0..52e4dee46c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1052,8 +1052,7 @@ object Client extends Logging {
if (isDriver) {
conf.getBoolean("spark.driver.userClassPathFirst", false)
} else {
- conf.getBoolean("spark.executor.userClassPathFirst",
- conf.getBoolean("spark.files.userClassPathFirst", false))
+ conf.getBoolean("spark.executor.userClassPathFirst", false)
}
}