aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorIlya Ganelin <ilya.ganelin@capitalone.com>2015-04-13 16:28:07 -0700
committerAndrew Or <andrew@databricks.com>2015-04-13 16:28:07 -0700
commitc4ab255e94366ba9b9023d5431f9d2412e0d6dc7 (patch)
treecade698e2139a54ab81957383c3ef2b5c8e8e9f2 /core
parentc5602bdc310cc8f82dc304500bebe40217cba785 (diff)
downloadspark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.tar.gz
spark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.tar.bz2
spark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.zip
[SPARK-5931][CORE] Use consistent naming for time properties
I've added new utility methods to do the conversion from times specified as e.g. 120s, 240ms, 360us to convert to a consistent internal representation. I've updated usage of these constants throughout the code to be consistent. I believe I've captured all usages of time-based properties throughout the code. I've also updated variable names in a number of places to reflect their units for clarity and updated documentation where appropriate. Author: Ilya Ganelin <ilya.ganelin@capitalone.com> Author: Ilya Ganelin <ilganeli@gmail.com> Closes #5236 from ilganeli/SPARK-5931 and squashes the following commits: 4526c81 [Ilya Ganelin] Update configuration.md de3bff9 [Ilya Ganelin] Fixing style errors f5fafcd [Ilya Ganelin] Doc updates 951ca2d [Ilya Ganelin] Made the most recent round of changes bc04e05 [Ilya Ganelin] Minor fixes and doc updates 25d3f52 [Ilya Ganelin] Minor nit fixes 642a06d [Ilya Ganelin] Fixed logic for invalid suffixes and addid matching test 8927e66 [Ilya Ganelin] Fixed handling of -1 69fedcc [Ilya Ganelin] Added test for zero dc7bd08 [Ilya Ganelin] Fixed error in exception handling 7d19cdd [Ilya Ganelin] Added fix for possible NPE 6f651a8 [Ilya Ganelin] Now using regexes to simplify code in parseTimeString. Introduces getTimeAsSec and getTimeAsMs methods in SparkConf. Updated documentation cbd2ca6 [Ilya Ganelin] Formatting error 1a1122c [Ilya Ganelin] Formatting fixes and added m for use as minute formatter 4e48679 [Ilya Ganelin] Fixed priority order and mixed up conversions in a couple spots d4efd26 [Ilya Ganelin] Added time conversion for yarn.scheduler.heartbeat.interval-ms cbf41db [Ilya Ganelin] Got rid of thrown exceptions 1465390 [Ilya Ganelin] Nit 28187bf [Ilya Ganelin] Convert straight to seconds ff40bfe [Ilya Ganelin] Updated tests to fix small bugs 19c31af [Ilya Ganelin] Added cleaner computation of time conversions in tests 6387772 [Ilya Ganelin] Updated suffix handling to handle overlap of units more gracefully 5193d5f [Ilya Ganelin] Resolved merge conflicts 76cfa27 [Ilya Ganelin] [SPARK-5931] Minor nit fixes' bf779b0 [Ilya Ganelin] Special handling of overlapping usffixes for java dd0a680 [Ilya Ganelin] Updated scala code to call into java b2fc965 [Ilya Ganelin] replaced get or default since it's not present in this version of java 39164f9 [Ilya Ganelin] [SPARK-5931] Updated Java conversion to be similar to scala conversion. Updated conversions to clean up code a little using TimeUnit.convert. Added Unit tests 3b126e1 [Ilya Ganelin] Fixed conversion to US from seconds 1858197 [Ilya Ganelin] Fixed bug where all time was being converted to us instead of the appropriate units bac9edf [Ilya Ganelin] More whitespace 8613631 [Ilya Ganelin] Whitespace 1c0c07c [Ilya Ganelin] Updated Java code to add day, minutes, and hours 647b5ac [Ilya Ganelin] Udpated time conversion to use map iterator instead of if fall through 70ac213 [Ilya Ganelin] Fixed remaining usages to be consistent. Updated Java-side time conversion 68f4e93 [Ilya Ganelin] Updated more files to clean up usage of default time strings 3a12dd8 [Ilya Ganelin] Updated host revceiver 5232a36 [Ilya Ganelin] [SPARK-5931] Changed default behavior of time string conversion. 499bdf0 [Ilya Ganelin] Merge branch 'SPARK-5931' of github.com:ilganeli/spark into SPARK-5931 9e2547c [Ilya Ganelin] Reverting doc changes 8f741e1 [Ilya Ganelin] Update JavaUtils.java 34f87c2 [Ilya Ganelin] Update Utils.scala 9a29d8d [Ilya Ganelin] Fixed misuse of time in streaming context test 42477aa [Ilya Ganelin] Updated configuration doc with note on specifying time properties cde9bff [Ilya Ganelin] Updated spark.streaming.blockInterval c6a0095 [Ilya Ganelin] Updated spark.core.connection.auth.wait.timeout 5181597 [Ilya Ganelin] Updated spark.dynamicAllocation.schedulerBacklogTimeout 2fcc91c [Ilya Ganelin] Updated spark.dynamicAllocation.executorIdleTimeout 6d1518e [Ilya Ganelin] Upated spark.speculation.interval 3f1cfc8 [Ilya Ganelin] Updated spark.scheduler.revive.interval 3352d34 [Ilya Ganelin] Updated spark.scheduler.maxRegisteredResourcesWaitingTime 272c215 [Ilya Ganelin] Updated spark.locality.wait 7320c87 [Ilya Ganelin] updated spark.akka.heartbeat.interval 064ebd6 [Ilya Ganelin] Updated usage of spark.cleaner.ttl 21ef3dd [Ilya Ganelin] updated spark.shuffle.sasl.timeout c9f5cad [Ilya Ganelin] Updated spark.shuffle.io.retryWait 4933fda [Ilya Ganelin] Updated usage of spark.storage.blockManagerSlaveTimeout 7db6d2a [Ilya Ganelin] Updated usage of spark.akka.timeout 404f8c3 [Ilya Ganelin] Updated usage of spark.core.connection.ack.wait.timeout 59bf9e1 [Ilya Ganelin] [SPARK-5931] Updated Utils and JavaUtils classes to add helper methods to handle time strings. Updated time strings in a few places to properly parse time
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala44
16 files changed, 186 insertions, 80 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 9385f557c4..4e7bf51fc0 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -80,16 +80,16 @@ private[spark] class ExecutorAllocationManager(
Integer.MAX_VALUE)
// How long there must be backlogged tasks for before an addition is triggered (seconds)
- private val schedulerBacklogTimeout = conf.getLong(
- "spark.dynamicAllocation.schedulerBacklogTimeout", 5)
+ private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
+ "spark.dynamicAllocation.schedulerBacklogTimeout", "5s")
- // Same as above, but used only after `schedulerBacklogTimeout` is exceeded
- private val sustainedSchedulerBacklogTimeout = conf.getLong(
- "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
+ // Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
+ private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
+ "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")
// How long an executor must be idle for before it is removed (seconds)
- private val executorIdleTimeout = conf.getLong(
- "spark.dynamicAllocation.executorIdleTimeout", 600)
+ private val executorIdleTimeoutS = conf.getTimeAsSeconds(
+ "spark.dynamicAllocation.executorIdleTimeout", "600s")
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
@@ -150,14 +150,14 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
- if (schedulerBacklogTimeout <= 0) {
+ if (schedulerBacklogTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
}
- if (sustainedSchedulerBacklogTimeout <= 0) {
+ if (sustainedSchedulerBacklogTimeoutS <= 0) {
throw new SparkException(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
}
- if (executorIdleTimeout <= 0) {
+ if (executorIdleTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
}
// Require external shuffle service for dynamic allocation
@@ -262,8 +262,8 @@ private[spark] class ExecutorAllocationManager(
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
- s"expire in $sustainedSchedulerBacklogTimeout seconds)")
- addTime += sustainedSchedulerBacklogTimeout * 1000
+ s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
+ addTime += sustainedSchedulerBacklogTimeoutS * 1000
delta
} else {
0
@@ -351,7 +351,7 @@ private[spark] class ExecutorAllocationManager(
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
- s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
+ s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
@@ -407,8 +407,8 @@ private[spark] class ExecutorAllocationManager(
private def onSchedulerBacklogged(): Unit = synchronized {
if (addTime == NOT_SET) {
logDebug(s"Starting timer to add executors because pending tasks " +
- s"are building up (to expire in $schedulerBacklogTimeout seconds)")
- addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000
+ s"are building up (to expire in $schedulerBacklogTimeoutS seconds)")
+ addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000
}
}
@@ -431,8 +431,8 @@ private[spark] class ExecutorAllocationManager(
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
- s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
- removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
+ s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)")
+ removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 5871b8c869..e3bd16f1cb 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -62,14 +62,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
- private val executorTimeoutMs = sc.conf.getOption("spark.network.timeout").map(_.toLong * 1000).
- getOrElse(sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120000))
-
+ private val slaveTimeoutMs =
+ sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s")
+ private val executorTimeoutMs =
+ sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000
+
// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
- private val checkTimeoutIntervalMs =
- sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000).
- getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))
+ private val timeoutIntervalMs =
+ sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
+ private val checkTimeoutIntervalMs =
+ sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000
private var timeoutCheckingTask: ScheduledFuture[_] = null
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 0c123c96b8..390e631647 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -174,6 +174,42 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).getOrElse(defaultValue)
}
+ /**
+ * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
+ * suffix is provided then seconds are assumed.
+ * @throws NoSuchElementException
+ */
+ def getTimeAsSeconds(key: String): Long = {
+ Utils.timeStringAsSeconds(get(key))
+ }
+
+ /**
+ * Get a time parameter as seconds, falling back to a default if not set. If no
+ * suffix is provided then seconds are assumed.
+ *
+ */
+ def getTimeAsSeconds(key: String, defaultValue: String): Long = {
+ Utils.timeStringAsSeconds(get(key, defaultValue))
+ }
+
+ /**
+ * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
+ * suffix is provided then milliseconds are assumed.
+ * @throws NoSuchElementException
+ */
+ def getTimeAsMs(key: String): Long = {
+ Utils.timeStringAsMs(get(key))
+ }
+
+ /**
+ * Get a time parameter as milliseconds, falling back to a default if not set. If no
+ * suffix is provided then milliseconds are assumed.
+ */
+ def getTimeAsMs(key: String, defaultValue: String): Long = {
+ Utils.timeStringAsMs(get(key, defaultValue))
+ }
+
+
/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key))
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 14f99a464b..516f619529 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -436,14 +436,14 @@ private[spark] class Executor(
* This thread stops running when the executor is stopped.
*/
private def startDriverHeartbeater(): Unit = {
- val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
+ val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
val thread = new Thread() {
override def run() {
// Sleep a random interval so the heartbeats don't end up in sync
- Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
+ Thread.sleep(intervalMs + (math.random * intervalMs).asInstanceOf[Int])
while (!isStopped) {
reportHeartBeat()
- Thread.sleep(interval)
+ Thread.sleep(intervalMs)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 741fe3e1ea..8e3c30fc3d 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -82,7 +82,8 @@ private[nio] class ConnectionManager(
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
private val ackTimeout =
- conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 120))
+ conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
+ conf.get("spark.network.timeout", "120s"))
// Get the thread counts from the Spark Configuration.
//
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 076b36e86c..2362cc7240 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -62,10 +62,10 @@ private[spark] class TaskSchedulerImpl(
val conf = sc.conf
// How often to check for speculative tasks
- val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100)
+ val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
// Threshold above which we warn user initial TaskSet may be starved
- val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)
+ val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
@@ -143,8 +143,8 @@ private[spark] class TaskSchedulerImpl(
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
- sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
- SPECULATION_INTERVAL milliseconds) {
+ sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
+ SPECULATION_INTERVAL_MS milliseconds) {
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
}
}
@@ -173,7 +173,7 @@ private[spark] class TaskSchedulerImpl(
this.cancel()
}
}
- }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+ }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index d509881c74..7dc325283d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -848,15 +848,18 @@ private[spark] class TaskSetManager(
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
- val defaultWait = conf.get("spark.locality.wait", "3000")
- level match {
- case TaskLocality.PROCESS_LOCAL =>
- conf.get("spark.locality.wait.process", defaultWait).toLong
- case TaskLocality.NODE_LOCAL =>
- conf.get("spark.locality.wait.node", defaultWait).toLong
- case TaskLocality.RACK_LOCAL =>
- conf.get("spark.locality.wait.rack", defaultWait).toLong
- case _ => 0L
+ val defaultWait = conf.get("spark.locality.wait", "3s")
+ val localityWaitKey = level match {
+ case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
+ case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
+ case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
+ case _ => null
+ }
+
+ if (localityWaitKey != null) {
+ conf.getTimeAsMs(localityWaitKey, defaultWait)
+ } else {
+ 0L
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 4c49da87af..63987dfb32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -52,8 +52,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
- val maxRegisteredWaitingTime =
- conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
+ val maxRegisteredWaitingTimeMs =
+ conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
val createTime = System.currentTimeMillis()
private val executorDataMap = new HashMap[String, ExecutorData]
@@ -77,12 +77,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def onStart() {
// Periodically revive offers to allow delay scheduling to work
- val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
+ val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
+
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
- }, 0, reviveInterval, TimeUnit.MILLISECONDS)
+ }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
override def receive: PartialFunction[Any, Unit] = {
@@ -301,9 +302,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
return true
}
- if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
+ if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs) {
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
- s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
+ s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)")
return true
}
false
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 6c2c526130..8e8cc7cc63 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -20,7 +20,6 @@ package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
-import scala.util.Try
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
@@ -66,7 +65,8 @@ private[spark] object AkkaUtils extends Logging {
val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
- val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 120))
+ val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
+ conf.get("spark.network.timeout", "120s"))
val akkaFrameSize = maxFrameSizeBytes(conf)
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
@@ -78,8 +78,8 @@ private[spark] object AkkaUtils extends Logging {
val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
- val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000)
- val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
+ val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
+ val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")
val secretKey = securityManager.getSecretKey()
val isAuthOn = securityManager.isAuthenticationEnabled()
@@ -102,14 +102,14 @@ private[spark] object AkkaUtils extends Logging {
|akka.jvm-exit-on-fatal-error = off
|akka.remote.require-cookie = "$requireCookie"
|akka.remote.secure-cookie = "$secureCookie"
- |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
- |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+ |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
+ |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = $port
|akka.remote.netty.tcp.tcp-nodelay = on
- |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
+ |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 375ed430bd..2bbfc988a9 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -76,7 +76,7 @@ private[spark] object MetadataCleanerType extends Enumeration {
// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
private[spark] object MetadataCleaner {
def getDelaySeconds(conf: SparkConf): Int = {
- conf.getInt("spark.cleaner.ttl", -1)
+ conf.getTimeAsSeconds("spark.cleaner.ttl", "-1").toInt
}
def getDelaySeconds(
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a541d660cd..1029b0f9fc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
-import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
import scala.collection.JavaConversions._
@@ -47,6 +47,7 @@ import tachyon.client.{TachyonFS, TachyonFile}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
/** CallSite represents a place in user code. It can have a short and a long form. */
@@ -612,9 +613,10 @@ private[spark] object Utils extends Logging {
}
Utils.setupSecureURLConnection(uc, securityMgr)
- val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
- uc.setConnectTimeout(timeout)
- uc.setReadTimeout(timeout)
+ val timeoutMs =
+ conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
+ uc.setConnectTimeout(timeoutMs)
+ uc.setReadTimeout(timeoutMs)
uc.connect()
val in = uc.getInputStream()
downloadFile(url, in, targetFile, fileOverwrite)
@@ -1019,6 +1021,22 @@ private[spark] object Utils extends Logging {
}
/**
+ * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
+ * no suffix is provided, the passed number is assumed to be in ms.
+ */
+ def timeStringAsMs(str: String): Long = {
+ JavaUtils.timeStringAsMs(str)
+ }
+
+ /**
+ * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
+ * no suffix is provided, the passed number is assumed to be in seconds.
+ */
+ def timeStringAsSeconds(str: String): Long = {
+ JavaUtils.timeStringAsSec(str)
+ }
+
+ /**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
*/
def memoryStringToMb(str: String): Int = {
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 3ded1e4af8..6b3049b28c 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -684,10 +684,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
- .set("spark.dynamicAllocation.schedulerBacklogTimeout", schedulerBacklogTimeout.toString)
+ .set("spark.dynamicAllocation.schedulerBacklogTimeout",
+ s"${schedulerBacklogTimeout.toString}s")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
- sustainedSchedulerBacklogTimeout.toString)
- .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
+ s"${sustainedSchedulerBacklogTimeout.toString}s")
+ .set("spark.dynamicAllocation.executorIdleTimeout", s"${executorIdleTimeout.toString}s")
.set("spark.dynamicAllocation.testing", "true")
val sc = new SparkContext(conf)
contexts += sc
diff --git a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
index 716f875d30..02424c59d6 100644
--- a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
@@ -260,8 +260,8 @@ class ConnectionManagerSuite extends FunSuite {
test("sendMessageReliably timeout") {
val clientConf = new SparkConf
clientConf.set("spark.authenticate", "false")
- val ackTimeout = 30
- clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")
+ val ackTimeoutS = 30
+ clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeoutS}s")
val clientSecurityManager = new SecurityManager(clientConf)
val manager = new ConnectionManager(0, clientConf, clientSecurityManager)
@@ -272,7 +272,7 @@ class ConnectionManagerSuite extends FunSuite {
val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager)
managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
// sleep 60 sec > ack timeout for simulating server slow down or hang up
- Thread.sleep(ackTimeout * 3 * 1000)
+ Thread.sleep(ackTimeoutS * 3 * 1000)
None
})
@@ -287,7 +287,7 @@ class ConnectionManagerSuite extends FunSuite {
// Otherwise TimeoutExcepton is thrown from Await.result.
// We expect TimeoutException is not thrown.
intercept[IOException] {
- Await.result(future, (ackTimeout * 2) second)
+ Await.result(future, (ackTimeoutS * 2) second)
}
manager.stop()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 716d12c076..6198cea46d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler
-import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
import java.util.Random
import scala.collection.mutable.ArrayBuffer
@@ -27,7 +26,7 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{ManualClock, Utils}
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
extends DAGScheduler(sc) {
@@ -152,7 +151,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
private val conf = new SparkConf
- val LOCALITY_WAIT = conf.getLong("spark.locality.wait", 3000)
+ val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s")
val MAX_TASK_FAILURES = 4
override def beforeEach() {
@@ -240,7 +239,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) should
// get chosen before the noPref task
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
@@ -251,7 +250,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task
// after failing to find a node_Local task
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3)
}
@@ -292,7 +291,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host1 again: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// Offer host1 again: second task (on host2) should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
@@ -306,7 +305,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Now that we've launched a local task, we should no longer launch the task for host3
assert(manager.resourceOffer("exec2", "host2", ANY) === None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// After another delay, we can go ahead and launch that task non-locally
assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3)
@@ -338,7 +337,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// nothing should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
- clock.advance(LOCALITY_WAIT * 2)
+ clock.advance(LOCALITY_WAIT_MS * 2)
// task 1 and 2 would be scheduled as nonLocal task
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
@@ -528,7 +527,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
// Set allowed locality to ANY
- clock.advance(LOCALITY_WAIT * 3)
+ clock.advance(LOCALITY_WAIT_MS * 3)
// Offer host3
// No task is scheduled if we restrict locality to RACK_LOCAL
assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None)
@@ -622,12 +621,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
manager.speculatableTasks += 1
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// schedule the nonPref task
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
// schedule the speculative task
assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1)
- clock.advance(LOCALITY_WAIT * 3)
+ clock.advance(LOCALITY_WAIT_MS * 3)
// schedule non-local tasks
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
}
@@ -716,13 +715,13 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
assert(manager.resourceOffer("execA", "host1", ANY) !== None)
- clock.advance(LOCALITY_WAIT * 4)
+ clock.advance(LOCALITY_WAIT_MS * 4)
assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
sched.removeExecutor("execA")
sched.removeExecutor("execB.2")
manager.executorLost("execA", "host1")
manager.executorLost("execB.2", "host2")
- clock.advance(LOCALITY_WAIT * 4)
+ clock.advance(LOCALITY_WAIT_MS * 4)
sched.addExecutor("execC", "host3")
manager.executorAdded()
// Prior to the fix, this line resulted in an ArrayIndexOutOfBoundsException:
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b4de90b65d..ffa5162a31 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -76,7 +76,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
conf.set("spark.storage.unrollMemoryThreshold", "512")
// to make a replication attempt to inactive store fail fast
- conf.set("spark.core.connection.ack.wait.timeout", "1")
+ conf.set("spark.core.connection.ack.wait.timeout", "1s")
// to make cached peers refresh frequently
conf.set("spark.storage.cachedPeersTtl", "10")
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 449fb87f11..fb97e650ff 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -23,6 +23,7 @@ import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStr
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
import java.text.DecimalFormatSymbols
+import java.util.concurrent.TimeUnit
import java.util.Locale
import com.google.common.base.Charsets.UTF_8
@@ -35,7 +36,50 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
class UtilsSuite extends FunSuite with ResetSystemProperties {
+
+ test("timeConversion") {
+ // Test -1
+ assert(Utils.timeStringAsSeconds("-1") === -1)
+
+ // Test zero
+ assert(Utils.timeStringAsSeconds("0") === 0)
+
+ assert(Utils.timeStringAsSeconds("1") === 1)
+ assert(Utils.timeStringAsSeconds("1s") === 1)
+ assert(Utils.timeStringAsSeconds("1000ms") === 1)
+ assert(Utils.timeStringAsSeconds("1000000us") === 1)
+ assert(Utils.timeStringAsSeconds("1m") === TimeUnit.MINUTES.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))
+
+ assert(Utils.timeStringAsMs("1") === 1)
+ assert(Utils.timeStringAsMs("1ms") === 1)
+ assert(Utils.timeStringAsMs("1000us") === 1)
+ assert(Utils.timeStringAsMs("1s") === TimeUnit.SECONDS.toMillis(1))
+ assert(Utils.timeStringAsMs("1m") === TimeUnit.MINUTES.toMillis(1))
+ assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
+ assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
+ assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))
+
+ // Test invalid strings
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This breaks 600s")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This breaks 600ds")
+ }
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("600s This breaks")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This 123s breaks")
+ }
+ }
+
test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")