aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala44
5 files changed, 65 insertions, 21 deletions
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 3ded1e4af8..6b3049b28c 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -684,10 +684,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
- .set("spark.dynamicAllocation.schedulerBacklogTimeout", schedulerBacklogTimeout.toString)
+ .set("spark.dynamicAllocation.schedulerBacklogTimeout",
+ s"${schedulerBacklogTimeout.toString}s")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
- sustainedSchedulerBacklogTimeout.toString)
- .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
+ s"${sustainedSchedulerBacklogTimeout.toString}s")
+ .set("spark.dynamicAllocation.executorIdleTimeout", s"${executorIdleTimeout.toString}s")
.set("spark.dynamicAllocation.testing", "true")
val sc = new SparkContext(conf)
contexts += sc
diff --git a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
index 716f875d30..02424c59d6 100644
--- a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
@@ -260,8 +260,8 @@ class ConnectionManagerSuite extends FunSuite {
test("sendMessageReliably timeout") {
val clientConf = new SparkConf
clientConf.set("spark.authenticate", "false")
- val ackTimeout = 30
- clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")
+ val ackTimeoutS = 30
+ clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeoutS}s")
val clientSecurityManager = new SecurityManager(clientConf)
val manager = new ConnectionManager(0, clientConf, clientSecurityManager)
@@ -272,7 +272,7 @@ class ConnectionManagerSuite extends FunSuite {
val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager)
managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
// sleep 60 sec > ack timeout for simulating server slow down or hang up
- Thread.sleep(ackTimeout * 3 * 1000)
+ Thread.sleep(ackTimeoutS * 3 * 1000)
None
})
@@ -287,7 +287,7 @@ class ConnectionManagerSuite extends FunSuite {
// Otherwise TimeoutExcepton is thrown from Await.result.
// We expect TimeoutException is not thrown.
intercept[IOException] {
- Await.result(future, (ackTimeout * 2) second)
+ Await.result(future, (ackTimeoutS * 2) second)
}
manager.stop()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 716d12c076..6198cea46d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler
-import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
import java.util.Random
import scala.collection.mutable.ArrayBuffer
@@ -27,7 +26,7 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{ManualClock, Utils}
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
extends DAGScheduler(sc) {
@@ -152,7 +151,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
private val conf = new SparkConf
- val LOCALITY_WAIT = conf.getLong("spark.locality.wait", 3000)
+ val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s")
val MAX_TASK_FAILURES = 4
override def beforeEach() {
@@ -240,7 +239,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) should
// get chosen before the noPref task
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
@@ -251,7 +250,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task
// after failing to find a node_Local task
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3)
}
@@ -292,7 +291,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host1 again: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// Offer host1 again: second task (on host2) should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
@@ -306,7 +305,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Now that we've launched a local task, we should no longer launch the task for host3
assert(manager.resourceOffer("exec2", "host2", ANY) === None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// After another delay, we can go ahead and launch that task non-locally
assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3)
@@ -338,7 +337,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// nothing should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
- clock.advance(LOCALITY_WAIT * 2)
+ clock.advance(LOCALITY_WAIT_MS * 2)
// task 1 and 2 would be scheduled as nonLocal task
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
@@ -528,7 +527,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
// Set allowed locality to ANY
- clock.advance(LOCALITY_WAIT * 3)
+ clock.advance(LOCALITY_WAIT_MS * 3)
// Offer host3
// No task is scheduled if we restrict locality to RACK_LOCAL
assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None)
@@ -622,12 +621,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
manager.speculatableTasks += 1
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// schedule the nonPref task
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
// schedule the speculative task
assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1)
- clock.advance(LOCALITY_WAIT * 3)
+ clock.advance(LOCALITY_WAIT_MS * 3)
// schedule non-local tasks
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
}
@@ -716,13 +715,13 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
assert(manager.resourceOffer("execA", "host1", ANY) !== None)
- clock.advance(LOCALITY_WAIT * 4)
+ clock.advance(LOCALITY_WAIT_MS * 4)
assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
sched.removeExecutor("execA")
sched.removeExecutor("execB.2")
manager.executorLost("execA", "host1")
manager.executorLost("execB.2", "host2")
- clock.advance(LOCALITY_WAIT * 4)
+ clock.advance(LOCALITY_WAIT_MS * 4)
sched.addExecutor("execC", "host3")
manager.executorAdded()
// Prior to the fix, this line resulted in an ArrayIndexOutOfBoundsException:
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b4de90b65d..ffa5162a31 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -76,7 +76,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
conf.set("spark.storage.unrollMemoryThreshold", "512")
// to make a replication attempt to inactive store fail fast
- conf.set("spark.core.connection.ack.wait.timeout", "1")
+ conf.set("spark.core.connection.ack.wait.timeout", "1s")
// to make cached peers refresh frequently
conf.set("spark.storage.cachedPeersTtl", "10")
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 449fb87f11..fb97e650ff 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -23,6 +23,7 @@ import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStr
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
import java.text.DecimalFormatSymbols
+import java.util.concurrent.TimeUnit
import java.util.Locale
import com.google.common.base.Charsets.UTF_8
@@ -35,7 +36,50 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
class UtilsSuite extends FunSuite with ResetSystemProperties {
+
+ test("timeConversion") {
+ // Test -1
+ assert(Utils.timeStringAsSeconds("-1") === -1)
+
+ // Test zero
+ assert(Utils.timeStringAsSeconds("0") === 0)
+
+ assert(Utils.timeStringAsSeconds("1") === 1)
+ assert(Utils.timeStringAsSeconds("1s") === 1)
+ assert(Utils.timeStringAsSeconds("1000ms") === 1)
+ assert(Utils.timeStringAsSeconds("1000000us") === 1)
+ assert(Utils.timeStringAsSeconds("1m") === TimeUnit.MINUTES.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))
+
+ assert(Utils.timeStringAsMs("1") === 1)
+ assert(Utils.timeStringAsMs("1ms") === 1)
+ assert(Utils.timeStringAsMs("1000us") === 1)
+ assert(Utils.timeStringAsMs("1s") === TimeUnit.SECONDS.toMillis(1))
+ assert(Utils.timeStringAsMs("1m") === TimeUnit.MINUTES.toMillis(1))
+ assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
+ assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
+ assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))
+
+ // Test invalid strings
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This breaks 600s")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This breaks 600ds")
+ }
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("600s This breaks")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This 123s breaks")
+ }
+ }
+
test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")