aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala44
16 files changed, 186 insertions, 80 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 9385f557c4..4e7bf51fc0 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -80,16 +80,16 @@ private[spark] class ExecutorAllocationManager(
Integer.MAX_VALUE)
// How long there must be backlogged tasks for before an addition is triggered (seconds)
- private val schedulerBacklogTimeout = conf.getLong(
- "spark.dynamicAllocation.schedulerBacklogTimeout", 5)
+ private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
+ "spark.dynamicAllocation.schedulerBacklogTimeout", "5s")
- // Same as above, but used only after `schedulerBacklogTimeout` is exceeded
- private val sustainedSchedulerBacklogTimeout = conf.getLong(
- "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
+ // Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
+ private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
+ "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")
// How long an executor must be idle for before it is removed (seconds)
- private val executorIdleTimeout = conf.getLong(
- "spark.dynamicAllocation.executorIdleTimeout", 600)
+ private val executorIdleTimeoutS = conf.getTimeAsSeconds(
+ "spark.dynamicAllocation.executorIdleTimeout", "600s")
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
@@ -150,14 +150,14 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
- if (schedulerBacklogTimeout <= 0) {
+ if (schedulerBacklogTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
}
- if (sustainedSchedulerBacklogTimeout <= 0) {
+ if (sustainedSchedulerBacklogTimeoutS <= 0) {
throw new SparkException(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
}
- if (executorIdleTimeout <= 0) {
+ if (executorIdleTimeoutS <= 0) {
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
}
// Require external shuffle service for dynamic allocation
@@ -262,8 +262,8 @@ private[spark] class ExecutorAllocationManager(
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
- s"expire in $sustainedSchedulerBacklogTimeout seconds)")
- addTime += sustainedSchedulerBacklogTimeout * 1000
+ s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
+ addTime += sustainedSchedulerBacklogTimeoutS * 1000
delta
} else {
0
@@ -351,7 +351,7 @@ private[spark] class ExecutorAllocationManager(
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
- s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
+ s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
@@ -407,8 +407,8 @@ private[spark] class ExecutorAllocationManager(
private def onSchedulerBacklogged(): Unit = synchronized {
if (addTime == NOT_SET) {
logDebug(s"Starting timer to add executors because pending tasks " +
- s"are building up (to expire in $schedulerBacklogTimeout seconds)")
- addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000
+ s"are building up (to expire in $schedulerBacklogTimeoutS seconds)")
+ addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000
}
}
@@ -431,8 +431,8 @@ private[spark] class ExecutorAllocationManager(
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
- s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
- removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
+ s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)")
+ removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000
}
} else {
logWarning(s"Attempted to mark unknown executor $executorId idle")
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 5871b8c869..e3bd16f1cb 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -62,14 +62,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
- private val executorTimeoutMs = sc.conf.getOption("spark.network.timeout").map(_.toLong * 1000).
- getOrElse(sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120000))
-
+ private val slaveTimeoutMs =
+ sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s")
+ private val executorTimeoutMs =
+ sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000
+
// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
- private val checkTimeoutIntervalMs =
- sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000).
- getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))
+ private val timeoutIntervalMs =
+ sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
+ private val checkTimeoutIntervalMs =
+ sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000
private var timeoutCheckingTask: ScheduledFuture[_] = null
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 0c123c96b8..390e631647 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -174,6 +174,42 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).getOrElse(defaultValue)
}
+ /**
+ * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
+ * suffix is provided then seconds are assumed.
+ * @throws NoSuchElementException
+ */
+ def getTimeAsSeconds(key: String): Long = {
+ Utils.timeStringAsSeconds(get(key))
+ }
+
+ /**
+ * Get a time parameter as seconds, falling back to a default if not set. If no
+ * suffix is provided then seconds are assumed.
+ *
+ */
+ def getTimeAsSeconds(key: String, defaultValue: String): Long = {
+ Utils.timeStringAsSeconds(get(key, defaultValue))
+ }
+
+ /**
+ * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
+ * suffix is provided then milliseconds are assumed.
+ * @throws NoSuchElementException
+ */
+ def getTimeAsMs(key: String): Long = {
+ Utils.timeStringAsMs(get(key))
+ }
+
+ /**
+ * Get a time parameter as milliseconds, falling back to a default if not set. If no
+ * suffix is provided then milliseconds are assumed.
+ */
+ def getTimeAsMs(key: String, defaultValue: String): Long = {
+ Utils.timeStringAsMs(get(key, defaultValue))
+ }
+
+
/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key))
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 14f99a464b..516f619529 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -436,14 +436,14 @@ private[spark] class Executor(
* This thread stops running when the executor is stopped.
*/
private def startDriverHeartbeater(): Unit = {
- val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
+ val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
val thread = new Thread() {
override def run() {
// Sleep a random interval so the heartbeats don't end up in sync
- Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
+ Thread.sleep(intervalMs + (math.random * intervalMs).asInstanceOf[Int])
while (!isStopped) {
reportHeartBeat()
- Thread.sleep(interval)
+ Thread.sleep(intervalMs)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 741fe3e1ea..8e3c30fc3d 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -82,7 +82,8 @@ private[nio] class ConnectionManager(
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
private val ackTimeout =
- conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 120))
+ conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
+ conf.get("spark.network.timeout", "120s"))
// Get the thread counts from the Spark Configuration.
//
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 076b36e86c..2362cc7240 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -62,10 +62,10 @@ private[spark] class TaskSchedulerImpl(
val conf = sc.conf
// How often to check for speculative tasks
- val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100)
+ val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
// Threshold above which we warn user initial TaskSet may be starved
- val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)
+ val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
@@ -143,8 +143,8 @@ private[spark] class TaskSchedulerImpl(
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
- sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
- SPECULATION_INTERVAL milliseconds) {
+ sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
+ SPECULATION_INTERVAL_MS milliseconds) {
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
}
}
@@ -173,7 +173,7 @@ private[spark] class TaskSchedulerImpl(
this.cancel()
}
}
- }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+ }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index d509881c74..7dc325283d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -848,15 +848,18 @@ private[spark] class TaskSetManager(
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
- val defaultWait = conf.get("spark.locality.wait", "3000")
- level match {
- case TaskLocality.PROCESS_LOCAL =>
- conf.get("spark.locality.wait.process", defaultWait).toLong
- case TaskLocality.NODE_LOCAL =>
- conf.get("spark.locality.wait.node", defaultWait).toLong
- case TaskLocality.RACK_LOCAL =>
- conf.get("spark.locality.wait.rack", defaultWait).toLong
- case _ => 0L
+ val defaultWait = conf.get("spark.locality.wait", "3s")
+ val localityWaitKey = level match {
+ case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
+ case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
+ case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
+ case _ => null
+ }
+
+ if (localityWaitKey != null) {
+ conf.getTimeAsMs(localityWaitKey, defaultWait)
+ } else {
+ 0L
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 4c49da87af..63987dfb32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -52,8 +52,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
- val maxRegisteredWaitingTime =
- conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
+ val maxRegisteredWaitingTimeMs =
+ conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
val createTime = System.currentTimeMillis()
private val executorDataMap = new HashMap[String, ExecutorData]
@@ -77,12 +77,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def onStart() {
// Periodically revive offers to allow delay scheduling to work
- val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
+ val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
+
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
- }, 0, reviveInterval, TimeUnit.MILLISECONDS)
+ }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
override def receive: PartialFunction[Any, Unit] = {
@@ -301,9 +302,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
return true
}
- if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
+ if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs) {
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
- s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
+ s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)")
return true
}
false
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 6c2c526130..8e8cc7cc63 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -20,7 +20,6 @@ package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
-import scala.util.Try
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
@@ -66,7 +65,8 @@ private[spark] object AkkaUtils extends Logging {
val akkaThreads = conf.getInt("spark.akka.threads", 4)
val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
- val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 120))
+ val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
+ conf.get("spark.network.timeout", "120s"))
val akkaFrameSize = maxFrameSizeBytes(conf)
val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
@@ -78,8 +78,8 @@ private[spark] object AkkaUtils extends Logging {
val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
- val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000)
- val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
+ val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
+ val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")
val secretKey = securityManager.getSecretKey()
val isAuthOn = securityManager.isAuthenticationEnabled()
@@ -102,14 +102,14 @@ private[spark] object AkkaUtils extends Logging {
|akka.jvm-exit-on-fatal-error = off
|akka.remote.require-cookie = "$requireCookie"
|akka.remote.secure-cookie = "$secureCookie"
- |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
- |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+ |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
+ |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = $port
|akka.remote.netty.tcp.tcp-nodelay = on
- |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
+ |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
|akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
|akka.remote.netty.tcp.execution-pool-size = $akkaThreads
|akka.actor.default-dispatcher.throughput = $akkaBatchSize
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 375ed430bd..2bbfc988a9 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -76,7 +76,7 @@ private[spark] object MetadataCleanerType extends Enumeration {
// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
private[spark] object MetadataCleaner {
def getDelaySeconds(conf: SparkConf): Int = {
- conf.getInt("spark.cleaner.ttl", -1)
+ conf.getTimeAsSeconds("spark.cleaner.ttl", "-1").toInt
}
def getDelaySeconds(
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a541d660cd..1029b0f9fc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
-import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection
import scala.collection.JavaConversions._
@@ -47,6 +47,7 @@ import tachyon.client.{TachyonFS, TachyonFile}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
/** CallSite represents a place in user code. It can have a short and a long form. */
@@ -612,9 +613,10 @@ private[spark] object Utils extends Logging {
}
Utils.setupSecureURLConnection(uc, securityMgr)
- val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
- uc.setConnectTimeout(timeout)
- uc.setReadTimeout(timeout)
+ val timeoutMs =
+ conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
+ uc.setConnectTimeout(timeoutMs)
+ uc.setReadTimeout(timeoutMs)
uc.connect()
val in = uc.getInputStream()
downloadFile(url, in, targetFile, fileOverwrite)
@@ -1019,6 +1021,22 @@ private[spark] object Utils extends Logging {
}
/**
+ * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
+ * no suffix is provided, the passed number is assumed to be in ms.
+ */
+ def timeStringAsMs(str: String): Long = {
+ JavaUtils.timeStringAsMs(str)
+ }
+
+ /**
+ * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
+ * no suffix is provided, the passed number is assumed to be in seconds.
+ */
+ def timeStringAsSeconds(str: String): Long = {
+ JavaUtils.timeStringAsSec(str)
+ }
+
+ /**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
*/
def memoryStringToMb(str: String): Int = {
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 3ded1e4af8..6b3049b28c 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -684,10 +684,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
- .set("spark.dynamicAllocation.schedulerBacklogTimeout", schedulerBacklogTimeout.toString)
+ .set("spark.dynamicAllocation.schedulerBacklogTimeout",
+ s"${schedulerBacklogTimeout.toString}s")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
- sustainedSchedulerBacklogTimeout.toString)
- .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
+ s"${sustainedSchedulerBacklogTimeout.toString}s")
+ .set("spark.dynamicAllocation.executorIdleTimeout", s"${executorIdleTimeout.toString}s")
.set("spark.dynamicAllocation.testing", "true")
val sc = new SparkContext(conf)
contexts += sc
diff --git a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
index 716f875d30..02424c59d6 100644
--- a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
@@ -260,8 +260,8 @@ class ConnectionManagerSuite extends FunSuite {
test("sendMessageReliably timeout") {
val clientConf = new SparkConf
clientConf.set("spark.authenticate", "false")
- val ackTimeout = 30
- clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")
+ val ackTimeoutS = 30
+ clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeoutS}s")
val clientSecurityManager = new SecurityManager(clientConf)
val manager = new ConnectionManager(0, clientConf, clientSecurityManager)
@@ -272,7 +272,7 @@ class ConnectionManagerSuite extends FunSuite {
val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager)
managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
// sleep 60 sec > ack timeout for simulating server slow down or hang up
- Thread.sleep(ackTimeout * 3 * 1000)
+ Thread.sleep(ackTimeoutS * 3 * 1000)
None
})
@@ -287,7 +287,7 @@ class ConnectionManagerSuite extends FunSuite {
// Otherwise TimeoutExcepton is thrown from Await.result.
// We expect TimeoutException is not thrown.
intercept[IOException] {
- Await.result(future, (ackTimeout * 2) second)
+ Await.result(future, (ackTimeoutS * 2) second)
}
manager.stop()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 716d12c076..6198cea46d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler
-import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
import java.util.Random
import scala.collection.mutable.ArrayBuffer
@@ -27,7 +26,7 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{ManualClock, Utils}
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
extends DAGScheduler(sc) {
@@ -152,7 +151,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
private val conf = new SparkConf
- val LOCALITY_WAIT = conf.getLong("spark.locality.wait", 3000)
+ val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s")
val MAX_TASK_FAILURES = 4
override def beforeEach() {
@@ -240,7 +239,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) should
// get chosen before the noPref task
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
@@ -251,7 +250,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task
// after failing to find a node_Local task
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3)
}
@@ -292,7 +291,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host1 again: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// Offer host1 again: second task (on host2) should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
@@ -306,7 +305,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Now that we've launched a local task, we should no longer launch the task for host3
assert(manager.resourceOffer("exec2", "host2", ANY) === None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// After another delay, we can go ahead and launch that task non-locally
assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3)
@@ -338,7 +337,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// nothing should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
- clock.advance(LOCALITY_WAIT * 2)
+ clock.advance(LOCALITY_WAIT_MS * 2)
// task 1 and 2 would be scheduled as nonLocal task
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
@@ -528,7 +527,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
// Set allowed locality to ANY
- clock.advance(LOCALITY_WAIT * 3)
+ clock.advance(LOCALITY_WAIT_MS * 3)
// Offer host3
// No task is scheduled if we restrict locality to RACK_LOCAL
assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None)
@@ -622,12 +621,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
manager.speculatableTasks += 1
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// schedule the nonPref task
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
// schedule the speculative task
assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1)
- clock.advance(LOCALITY_WAIT * 3)
+ clock.advance(LOCALITY_WAIT_MS * 3)
// schedule non-local tasks
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
}
@@ -716,13 +715,13 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
assert(manager.resourceOffer("execA", "host1", ANY) !== None)
- clock.advance(LOCALITY_WAIT * 4)
+ clock.advance(LOCALITY_WAIT_MS * 4)
assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
sched.removeExecutor("execA")
sched.removeExecutor("execB.2")
manager.executorLost("execA", "host1")
manager.executorLost("execB.2", "host2")
- clock.advance(LOCALITY_WAIT * 4)
+ clock.advance(LOCALITY_WAIT_MS * 4)
sched.addExecutor("execC", "host3")
manager.executorAdded()
// Prior to the fix, this line resulted in an ArrayIndexOutOfBoundsException:
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b4de90b65d..ffa5162a31 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -76,7 +76,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
conf.set("spark.storage.unrollMemoryThreshold", "512")
// to make a replication attempt to inactive store fail fast
- conf.set("spark.core.connection.ack.wait.timeout", "1")
+ conf.set("spark.core.connection.ack.wait.timeout", "1s")
// to make cached peers refresh frequently
conf.set("spark.storage.cachedPeersTtl", "10")
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 449fb87f11..fb97e650ff 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -23,6 +23,7 @@ import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStr
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
import java.text.DecimalFormatSymbols
+import java.util.concurrent.TimeUnit
import java.util.Locale
import com.google.common.base.Charsets.UTF_8
@@ -35,7 +36,50 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
class UtilsSuite extends FunSuite with ResetSystemProperties {
+
+ test("timeConversion") {
+ // Test -1
+ assert(Utils.timeStringAsSeconds("-1") === -1)
+
+ // Test zero
+ assert(Utils.timeStringAsSeconds("0") === 0)
+
+ assert(Utils.timeStringAsSeconds("1") === 1)
+ assert(Utils.timeStringAsSeconds("1s") === 1)
+ assert(Utils.timeStringAsSeconds("1000ms") === 1)
+ assert(Utils.timeStringAsSeconds("1000000us") === 1)
+ assert(Utils.timeStringAsSeconds("1m") === TimeUnit.MINUTES.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))
+
+ assert(Utils.timeStringAsMs("1") === 1)
+ assert(Utils.timeStringAsMs("1ms") === 1)
+ assert(Utils.timeStringAsMs("1000us") === 1)
+ assert(Utils.timeStringAsMs("1s") === TimeUnit.SECONDS.toMillis(1))
+ assert(Utils.timeStringAsMs("1m") === TimeUnit.MINUTES.toMillis(1))
+ assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
+ assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
+ assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))
+
+ // Test invalid strings
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This breaks 600s")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This breaks 600ds")
+ }
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("600s This breaks")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This 123s breaks")
+ }
+ }
+
test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")