aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-04-17 19:02:07 -0700
committerAndrew Or <andrew@databricks.com>2015-04-17 19:02:07 -0700
commit1991337336596f94698e79c2366f065c374128ab (patch)
tree9ddb184bf0adb8acf0ae063007bab1052093675f /core
parent6fbeb82e13db7117d8f216e6148632490a4bc5be (diff)
downloadspark-1991337336596f94698e79c2366f065c374128ab.tar.gz
spark-1991337336596f94698e79c2366f065c374128ab.tar.bz2
spark-1991337336596f94698e79c2366f065c374128ab.zip
[SPARK-5933] [core] Move config deprecation warnings to SparkConf.
I didn't find many deprecated configs after a grep-based search, but the ones I could find were moved to the centralized location in SparkConf. While there, I deprecated a couple more HS configs that mentioned time units. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5562 from vanzin/SPARK-5933 and squashes the following commits: dcb617e7 [Marcelo Vanzin] [SPARK-5933] [core] Move config deprecation warnings to SparkConf.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala3
4 files changed, 22 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b0186e9a00..e3a649d755 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -403,6 +403,9 @@ private[spark] object SparkConf extends Logging {
*/
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
+ DeprecatedConfig("spark.cache.class", "0.8",
+ "The spark.cache.class property is no longer being used! Specify storage levels using " +
+ "the RDD.persist() method instead."),
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
"Please use spark.{driver,executor}.userClassPathFirst instead."))
Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
@@ -420,7 +423,15 @@ private[spark] object SparkConf extends Logging {
"spark.history.fs.update.interval" -> Seq(
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
AlternateConfig("spark.history.fs.updateInterval", "1.3"),
- AlternateConfig("spark.history.updateInterval", "1.3"))
+ AlternateConfig("spark.history.updateInterval", "1.3")),
+ "spark.history.fs.cleaner.interval" -> Seq(
+ AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
+ "spark.history.fs.cleaner.maxAge" -> Seq(
+ AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
+ "spark.yarn.am.waitTime" -> Seq(
+ AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
+ // Translate old value to a duration, with 10s wait time per try.
+ translation = s => s"${s.toLong * 10}s"))
)
/**
@@ -470,7 +481,7 @@ private[spark] object SparkConf extends Logging {
configsWithAlternatives.get(key).flatMap { alts =>
alts.collectFirst { case alt if conf.contains(alt.key) =>
val value = conf.get(alt.key)
- alt.translation.map(_(value)).getOrElse(value)
+ if (alt.translation != null) alt.translation(value) else value
}
}
}
@@ -514,6 +525,6 @@ private[spark] object SparkConf extends Logging {
private case class AlternateConfig(
key: String,
version: String,
- translation: Option[String => String] = None)
+ translation: String => String = null)
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 0171488e09..959aefabd8 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -103,7 +103,7 @@ class SparkEnv (
// actorSystem.awaitTermination()
// Note that blockTransferService is stopped by BlockManager since it is started by it.
-
+
// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
@@ -375,12 +375,6 @@ object SparkEnv extends Logging {
"."
}
- // Warn about deprecated spark.cache.class property
- if (conf.contains("spark.cache.class")) {
- logWarning("The spark.cache.class property is no longer being used! Specify storage " +
- "levels using the RDD.persist() method instead.")
- }
-
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf)
}
@@ -406,7 +400,7 @@ object SparkEnv extends Logging {
shuffleMemoryManager,
outputCommitCoordinator,
conf)
-
+
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 985545742d..47bdd7749e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -52,8 +52,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
// Interval between each cleaner checks for event logs to delete
- private val CLEAN_INTERVAL_MS = conf.getLong("spark.history.fs.cleaner.interval.seconds",
- DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000
+ private val CLEAN_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.cleaner.interval", "1d")
private val logDir = conf.getOption("spark.history.fs.logDirectory")
.map { d => Utils.resolveURI(d).toString }
@@ -130,8 +129,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
- pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
- TimeUnit.MILLISECONDS)
+ pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
}
}
}
@@ -270,8 +268,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
- val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds",
- DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
+ val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
val now = System.currentTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
@@ -417,12 +414,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
-
- // One day
- val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds
-
- // One week
- val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
}
private class FsApplicationHistoryInfo(
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 7d87ba5fd2..8e6c200c4b 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -217,6 +217,9 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
val count = conf.getAll.filter { case (k, v) => k.startsWith("spark.history.") }.size
assert(count === 4)
+
+ conf.set("spark.yarn.applicationMaster.waitTries", "42")
+ assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
}
}