aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-04-17 11:06:01 +0100
committerSean Owen <sowen@cloudera.com>2015-04-17 11:06:01 +0100
commit4527761bcd6501c362baf2780905a0018b9a74ba (patch)
tree7513658d9858e7a4a63a93b6e9d75b207a0d6fc3 /core
parentf7a25644ed5b3b49fe7f33743bec3d95cdf7913e (diff)
downloadspark-4527761bcd6501c362baf2780905a0018b9a74ba.tar.gz
spark-4527761bcd6501c362baf2780905a0018b9a74ba.tar.bz2
spark-4527761bcd6501c362baf2780905a0018b9a74ba.zip
[SPARK-6046] [core] Reorganize deprecated config support in SparkConf.
This change tries to follow the chosen way for handling deprecated configs in SparkConf: all values (old and new) are kept in the conf object, and newer names take precedence over older ones when retrieving the value. Warnings are logged when config options are set, which generally happens on the driver node (where the logs are most visible). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5514 from vanzin/SPARK-6046 and squashes the following commits: 9371529 [Marcelo Vanzin] Avoid math. 6cf3f11 [Marcelo Vanzin] Review feedback. 2445d48 [Marcelo Vanzin] Fix (and cleanup) update interval initialization. b6824be [Marcelo Vanzin] Clean up the other deprecated config use also. ab20351 [Marcelo Vanzin] Update FsHistoryProvider to only retrieve new config key. 2c93209 [Marcelo Vanzin] [SPARK-6046] [core] Reorganize deprecated config support in SparkConf.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala174
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala22
4 files changed, 120 insertions, 90 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 390e631647..b0186e9a00 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -68,6 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value for " + key)
}
+ logDeprecationWarning(key)
settings.put(key, value)
this
}
@@ -134,13 +135,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Set multiple parameters together */
def setAll(settings: Traversable[(String, String)]): SparkConf = {
- this.settings.putAll(settings.toMap.asJava)
+ settings.foreach { case (k, v) => set(k, v) }
this
}
/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
- settings.putIfAbsent(key, value)
+ if (settings.putIfAbsent(key, value) == null) {
+ logDeprecationWarning(key)
+ }
this
}
@@ -174,8 +177,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).getOrElse(defaultValue)
}
- /**
- * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
+ /**
+ * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then seconds are assumed.
* @throws NoSuchElementException
*/
@@ -183,36 +186,35 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
Utils.timeStringAsSeconds(get(key))
}
- /**
- * Get a time parameter as seconds, falling back to a default if not set. If no
+ /**
+ * Get a time parameter as seconds, falling back to a default if not set. If no
* suffix is provided then seconds are assumed.
- *
*/
def getTimeAsSeconds(key: String, defaultValue: String): Long = {
Utils.timeStringAsSeconds(get(key, defaultValue))
}
- /**
- * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
- * suffix is provided then milliseconds are assumed.
+ /**
+ * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
+ * suffix is provided then milliseconds are assumed.
* @throws NoSuchElementException
*/
def getTimeAsMs(key: String): Long = {
Utils.timeStringAsMs(get(key))
}
- /**
- * Get a time parameter as milliseconds, falling back to a default if not set. If no
- * suffix is provided then milliseconds are assumed.
+ /**
+ * Get a time parameter as milliseconds, falling back to a default if not set. If no
+ * suffix is provided then milliseconds are assumed.
*/
def getTimeAsMs(key: String, defaultValue: String): Long = {
Utils.timeStringAsMs(get(key, defaultValue))
}
-
+
/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
- Option(settings.get(key))
+ Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
}
/** Get all parameters as a list of pairs */
@@ -379,13 +381,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}
}
-
- // Warn against the use of deprecated configs
- deprecatedConfigs.values.foreach { dc =>
- if (contains(dc.oldName)) {
- dc.warn()
- }
- }
}
/**
@@ -400,19 +395,44 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
private[spark] object SparkConf extends Logging {
+ /**
+ * Maps deprecated config keys to information about the deprecation.
+ *
+ * The extra information is logged as a warning when the config is present in the user's
+ * configuration.
+ */
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
- DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
- "1.3"),
- DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
- "Use spark.{driver,executor}.userClassPathFirst instead."),
- DeprecatedConfig("spark.history.fs.updateInterval",
- "spark.history.fs.update.interval.seconds",
- "1.3", "Use spark.history.fs.update.interval.seconds instead"),
- DeprecatedConfig("spark.history.updateInterval",
- "spark.history.fs.update.interval.seconds",
- "1.3", "Use spark.history.fs.update.interval.seconds instead"))
- configs.map { x => (x.oldName, x) }.toMap
+ DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
+ "Please use spark.{driver,executor}.userClassPathFirst instead."))
+ Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
+ }
+
+ /**
+ * Maps a current config key to alternate keys that were used in previous version of Spark.
+ *
+ * The alternates are used in the order defined in this map. If deprecated configs are
+ * present in the user's configuration, a warning is logged.
+ */
+ private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
+ "spark.executor.userClassPathFirst" -> Seq(
+ AlternateConfig("spark.files.userClassPathFirst", "1.3")),
+ "spark.history.fs.update.interval" -> Seq(
+ AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
+ AlternateConfig("spark.history.fs.updateInterval", "1.3"),
+ AlternateConfig("spark.history.updateInterval", "1.3"))
+ )
+
+ /**
+ * A view of `configsWithAlternatives` that makes it more efficient to look up deprecated
+ * config keys.
+ *
+ * Maps the deprecated config name to a 2-tuple (new config name, alternate config info).
+ */
+ private val allAlternatives: Map[String, (String, AlternateConfig)] = {
+ configsWithAlternatives.keys.flatMap { key =>
+ configsWithAlternatives(key).map { cfg => (cfg.key -> (key -> cfg)) }
+ }.toMap
}
/**
@@ -443,61 +463,57 @@ private[spark] object SparkConf extends Logging {
}
/**
- * Translate the configuration key if it is deprecated and has a replacement, otherwise just
- * returns the provided key.
- *
- * @param userKey Configuration key from the user / caller.
- * @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
- * only once for each key.
+ * Looks for available deprecated keys for the given config option, and return the first
+ * value available.
*/
- private def translateConfKey(userKey: String, warn: Boolean = false): String = {
- deprecatedConfigs.get(userKey)
- .map { deprecatedKey =>
- if (warn) {
- deprecatedKey.warn()
- }
- deprecatedKey.newName.getOrElse(userKey)
- }.getOrElse(userKey)
+ def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = {
+ configsWithAlternatives.get(key).flatMap { alts =>
+ alts.collectFirst { case alt if conf.contains(alt.key) =>
+ val value = conf.get(alt.key)
+ alt.translation.map(_(value)).getOrElse(value)
+ }
+ }
}
/**
- * Holds information about keys that have been deprecated or renamed.
+ * Logs a warning message if the given config key is deprecated.
+ */
+ def logDeprecationWarning(key: String): Unit = {
+ deprecatedConfigs.get(key).foreach { cfg =>
+ logWarning(
+ s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
+ s"may be removed in the future. ${cfg.deprecationMessage}")
+ }
+
+ allAlternatives.get(key).foreach { case (newKey, cfg) =>
+ logWarning(
+ s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
+ s"and may be removed in the future. Please use the new key '$newKey' instead.")
+ }
+ }
+
+ /**
+ * Holds information about keys that have been deprecated and do not have a replacement.
*
- * @param oldName Old configuration key.
- * @param newName New configuration key, or `null` if key has no replacement, in which case the
- * deprecated key will be used (but the warning message will still be printed).
+ * @param key The deprecated key.
* @param version Version of Spark where key was deprecated.
- * @param deprecationMessage Message to include in the deprecation warning; mandatory when
- * `newName` is not provided.
+ * @param deprecationMessage Message to include in the deprecation warning.
*/
private case class DeprecatedConfig(
- oldName: String,
- _newName: String,
+ key: String,
version: String,
- deprecationMessage: String = null) {
-
- private val warned = new AtomicBoolean(false)
- val newName = Option(_newName)
+ deprecationMessage: String)
- if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
- throw new IllegalArgumentException("Need new config name or deprecation message.")
- }
-
- def warn(): Unit = {
- if (warned.compareAndSet(false, true)) {
- if (newName != null) {
- val message = Option(deprecationMessage).getOrElse(
- s"Please use the alternative '$newName' instead.")
- logWarning(
- s"The configuration option '$oldName' has been replaced as of Spark $version and " +
- s"may be removed in the future. $message")
- } else {
- logWarning(
- s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
- s"may be removed in the future. $deprecationMessage")
- }
- }
- }
+ /**
+ * Information about an alternate configuration key that has been deprecated.
+ *
+ * @param key The deprecated config key.
+ * @param version The Spark version in which the key was deprecated.
+ * @param translation A translation function for converting old config values into new ones.
+ */
+ private case class AlternateConfig(
+ key: String,
+ version: String,
+ translation: Option[String => String] = None)
- }
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 9d40d8c8fd..985545742d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -49,11 +49,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private val NOT_STARTED = "<Not Started>"
// Interval between each check for event log updates
- private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
- .orElse(conf.getOption("spark.history.fs.updateInterval"))
- .orElse(conf.getOption("spark.history.updateInterval"))
- .map(_.toInt)
- .getOrElse(10) * 1000
+ private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
// Interval between each cleaner checks for event logs to delete
private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
@@ -130,8 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
// A task that periodically checks for event log updates on disk.
- pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
- TimeUnit.MILLISECONDS)
+ pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 1b5fdeba28..327d155b38 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -89,10 +89,7 @@ private[spark] class Executor(
ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME, new ExecutorEndpoint(env.rpcEnv, executorId))
// Whether to load classes in user jars before those in Spark jars
- private val userClassPathFirst: Boolean = {
- conf.getBoolean("spark.executor.userClassPathFirst",
- conf.getBoolean("spark.files.userClassPathFirst", false))
- }
+ private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index e08210ae60..7d87ba5fd2 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -197,6 +197,28 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
serializer.newInstance().serialize(new StringBuffer())
}
+ test("deprecated configs") {
+ val conf = new SparkConf()
+ val newName = "spark.history.fs.update.interval"
+
+ assert(!conf.contains(newName))
+
+ conf.set("spark.history.updateInterval", "1")
+ assert(conf.get(newName) === "1")
+
+ conf.set("spark.history.fs.updateInterval", "2")
+ assert(conf.get(newName) === "2")
+
+ conf.set("spark.history.fs.update.interval.seconds", "3")
+ assert(conf.get(newName) === "3")
+
+ conf.set(newName, "4")
+ assert(conf.get(newName) === "4")
+
+ val count = conf.getAll.filter { case (k, v) => k.startsWith("spark.history.") }.size
+ assert(count === 4)
+ }
+
}
class Class1 {}