aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorIlya Ganelin <ilya.ganelin@capitalone.com>2015-04-13 16:28:07 -0700
committerAndrew Or <andrew@databricks.com>2015-04-13 16:28:07 -0700
commitc4ab255e94366ba9b9023d5431f9d2412e0d6dc7 (patch)
treecade698e2139a54ab81957383c3ef2b5c8e8e9f2 /core/src/test
parentc5602bdc310cc8f82dc304500bebe40217cba785 (diff)
downloadspark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.tar.gz
spark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.tar.bz2
spark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.zip
[SPARK-5931][CORE] Use consistent naming for time properties
I've added new utility methods to do the conversion from times specified as e.g. 120s, 240ms, 360us to convert to a consistent internal representation. I've updated usage of these constants throughout the code to be consistent. I believe I've captured all usages of time-based properties throughout the code. I've also updated variable names in a number of places to reflect their units for clarity and updated documentation where appropriate. Author: Ilya Ganelin <ilya.ganelin@capitalone.com> Author: Ilya Ganelin <ilganeli@gmail.com> Closes #5236 from ilganeli/SPARK-5931 and squashes the following commits: 4526c81 [Ilya Ganelin] Update configuration.md de3bff9 [Ilya Ganelin] Fixing style errors f5fafcd [Ilya Ganelin] Doc updates 951ca2d [Ilya Ganelin] Made the most recent round of changes bc04e05 [Ilya Ganelin] Minor fixes and doc updates 25d3f52 [Ilya Ganelin] Minor nit fixes 642a06d [Ilya Ganelin] Fixed logic for invalid suffixes and addid matching test 8927e66 [Ilya Ganelin] Fixed handling of -1 69fedcc [Ilya Ganelin] Added test for zero dc7bd08 [Ilya Ganelin] Fixed error in exception handling 7d19cdd [Ilya Ganelin] Added fix for possible NPE 6f651a8 [Ilya Ganelin] Now using regexes to simplify code in parseTimeString. Introduces getTimeAsSec and getTimeAsMs methods in SparkConf. Updated documentation cbd2ca6 [Ilya Ganelin] Formatting error 1a1122c [Ilya Ganelin] Formatting fixes and added m for use as minute formatter 4e48679 [Ilya Ganelin] Fixed priority order and mixed up conversions in a couple spots d4efd26 [Ilya Ganelin] Added time conversion for yarn.scheduler.heartbeat.interval-ms cbf41db [Ilya Ganelin] Got rid of thrown exceptions 1465390 [Ilya Ganelin] Nit 28187bf [Ilya Ganelin] Convert straight to seconds ff40bfe [Ilya Ganelin] Updated tests to fix small bugs 19c31af [Ilya Ganelin] Added cleaner computation of time conversions in tests 6387772 [Ilya Ganelin] Updated suffix handling to handle overlap of units more gracefully 5193d5f [Ilya Ganelin] Resolved merge conflicts 76cfa27 [Ilya Ganelin] [SPARK-5931] Minor nit fixes' bf779b0 [Ilya Ganelin] Special handling of overlapping usffixes for java dd0a680 [Ilya Ganelin] Updated scala code to call into java b2fc965 [Ilya Ganelin] replaced get or default since it's not present in this version of java 39164f9 [Ilya Ganelin] [SPARK-5931] Updated Java conversion to be similar to scala conversion. Updated conversions to clean up code a little using TimeUnit.convert. Added Unit tests 3b126e1 [Ilya Ganelin] Fixed conversion to US from seconds 1858197 [Ilya Ganelin] Fixed bug where all time was being converted to us instead of the appropriate units bac9edf [Ilya Ganelin] More whitespace 8613631 [Ilya Ganelin] Whitespace 1c0c07c [Ilya Ganelin] Updated Java code to add day, minutes, and hours 647b5ac [Ilya Ganelin] Udpated time conversion to use map iterator instead of if fall through 70ac213 [Ilya Ganelin] Fixed remaining usages to be consistent. Updated Java-side time conversion 68f4e93 [Ilya Ganelin] Updated more files to clean up usage of default time strings 3a12dd8 [Ilya Ganelin] Updated host revceiver 5232a36 [Ilya Ganelin] [SPARK-5931] Changed default behavior of time string conversion. 499bdf0 [Ilya Ganelin] Merge branch 'SPARK-5931' of github.com:ilganeli/spark into SPARK-5931 9e2547c [Ilya Ganelin] Reverting doc changes 8f741e1 [Ilya Ganelin] Update JavaUtils.java 34f87c2 [Ilya Ganelin] Update Utils.scala 9a29d8d [Ilya Ganelin] Fixed misuse of time in streaming context test 42477aa [Ilya Ganelin] Updated configuration doc with note on specifying time properties cde9bff [Ilya Ganelin] Updated spark.streaming.blockInterval c6a0095 [Ilya Ganelin] Updated spark.core.connection.auth.wait.timeout 5181597 [Ilya Ganelin] Updated spark.dynamicAllocation.schedulerBacklogTimeout 2fcc91c [Ilya Ganelin] Updated spark.dynamicAllocation.executorIdleTimeout 6d1518e [Ilya Ganelin] Upated spark.speculation.interval 3f1cfc8 [Ilya Ganelin] Updated spark.scheduler.revive.interval 3352d34 [Ilya Ganelin] Updated spark.scheduler.maxRegisteredResourcesWaitingTime 272c215 [Ilya Ganelin] Updated spark.locality.wait 7320c87 [Ilya Ganelin] updated spark.akka.heartbeat.interval 064ebd6 [Ilya Ganelin] Updated usage of spark.cleaner.ttl 21ef3dd [Ilya Ganelin] updated spark.shuffle.sasl.timeout c9f5cad [Ilya Ganelin] Updated spark.shuffle.io.retryWait 4933fda [Ilya Ganelin] Updated usage of spark.storage.blockManagerSlaveTimeout 7db6d2a [Ilya Ganelin] Updated usage of spark.akka.timeout 404f8c3 [Ilya Ganelin] Updated usage of spark.core.connection.ack.wait.timeout 59bf9e1 [Ilya Ganelin] [SPARK-5931] Updated Utils and JavaUtils classes to add helper methods to handle time strings. Updated time strings in a few places to properly parse time
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala44
5 files changed, 65 insertions, 21 deletions
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 3ded1e4af8..6b3049b28c 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -684,10 +684,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
- .set("spark.dynamicAllocation.schedulerBacklogTimeout", schedulerBacklogTimeout.toString)
+ .set("spark.dynamicAllocation.schedulerBacklogTimeout",
+ s"${schedulerBacklogTimeout.toString}s")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
- sustainedSchedulerBacklogTimeout.toString)
- .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
+ s"${sustainedSchedulerBacklogTimeout.toString}s")
+ .set("spark.dynamicAllocation.executorIdleTimeout", s"${executorIdleTimeout.toString}s")
.set("spark.dynamicAllocation.testing", "true")
val sc = new SparkContext(conf)
contexts += sc
diff --git a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
index 716f875d30..02424c59d6 100644
--- a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
@@ -260,8 +260,8 @@ class ConnectionManagerSuite extends FunSuite {
test("sendMessageReliably timeout") {
val clientConf = new SparkConf
clientConf.set("spark.authenticate", "false")
- val ackTimeout = 30
- clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")
+ val ackTimeoutS = 30
+ clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeoutS}s")
val clientSecurityManager = new SecurityManager(clientConf)
val manager = new ConnectionManager(0, clientConf, clientSecurityManager)
@@ -272,7 +272,7 @@ class ConnectionManagerSuite extends FunSuite {
val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager)
managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
// sleep 60 sec > ack timeout for simulating server slow down or hang up
- Thread.sleep(ackTimeout * 3 * 1000)
+ Thread.sleep(ackTimeoutS * 3 * 1000)
None
})
@@ -287,7 +287,7 @@ class ConnectionManagerSuite extends FunSuite {
// Otherwise TimeoutExcepton is thrown from Await.result.
// We expect TimeoutException is not thrown.
intercept[IOException] {
- Await.result(future, (ackTimeout * 2) second)
+ Await.result(future, (ackTimeoutS * 2) second)
}
manager.stop()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 716d12c076..6198cea46d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler
-import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
import java.util.Random
import scala.collection.mutable.ArrayBuffer
@@ -27,7 +26,7 @@ import org.scalatest.FunSuite
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{ManualClock, Utils}
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
extends DAGScheduler(sc) {
@@ -152,7 +151,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
private val conf = new SparkConf
- val LOCALITY_WAIT = conf.getLong("spark.locality.wait", 3000)
+ val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s")
val MAX_TASK_FAILURES = 4
override def beforeEach() {
@@ -240,7 +239,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) should
// get chosen before the noPref task
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
@@ -251,7 +250,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task
// after failing to find a node_Local task
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3)
}
@@ -292,7 +291,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host1 again: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// Offer host1 again: second task (on host2) should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
@@ -306,7 +305,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Now that we've launched a local task, we should no longer launch the task for host3
assert(manager.resourceOffer("exec2", "host2", ANY) === None)
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// After another delay, we can go ahead and launch that task non-locally
assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3)
@@ -338,7 +337,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// nothing should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
- clock.advance(LOCALITY_WAIT * 2)
+ clock.advance(LOCALITY_WAIT_MS * 2)
// task 1 and 2 would be scheduled as nonLocal task
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
@@ -528,7 +527,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
// Set allowed locality to ANY
- clock.advance(LOCALITY_WAIT * 3)
+ clock.advance(LOCALITY_WAIT_MS * 3)
// Offer host3
// No task is scheduled if we restrict locality to RACK_LOCAL
assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None)
@@ -622,12 +621,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
manager.speculatableTasks += 1
- clock.advance(LOCALITY_WAIT)
+ clock.advance(LOCALITY_WAIT_MS)
// schedule the nonPref task
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
// schedule the speculative task
assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1)
- clock.advance(LOCALITY_WAIT * 3)
+ clock.advance(LOCALITY_WAIT_MS * 3)
// schedule non-local tasks
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
}
@@ -716,13 +715,13 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
assert(manager.resourceOffer("execA", "host1", ANY) !== None)
- clock.advance(LOCALITY_WAIT * 4)
+ clock.advance(LOCALITY_WAIT_MS * 4)
assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
sched.removeExecutor("execA")
sched.removeExecutor("execB.2")
manager.executorLost("execA", "host1")
manager.executorLost("execB.2", "host2")
- clock.advance(LOCALITY_WAIT * 4)
+ clock.advance(LOCALITY_WAIT_MS * 4)
sched.addExecutor("execC", "host3")
manager.executorAdded()
// Prior to the fix, this line resulted in an ArrayIndexOutOfBoundsException:
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b4de90b65d..ffa5162a31 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -76,7 +76,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
conf.set("spark.storage.unrollMemoryThreshold", "512")
// to make a replication attempt to inactive store fail fast
- conf.set("spark.core.connection.ack.wait.timeout", "1")
+ conf.set("spark.core.connection.ack.wait.timeout", "1s")
// to make cached peers refresh frequently
conf.set("spark.storage.cachedPeersTtl", "10")
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 449fb87f11..fb97e650ff 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -23,6 +23,7 @@ import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStr
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
import java.text.DecimalFormatSymbols
+import java.util.concurrent.TimeUnit
import java.util.Locale
import com.google.common.base.Charsets.UTF_8
@@ -35,7 +36,50 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
class UtilsSuite extends FunSuite with ResetSystemProperties {
+
+ test("timeConversion") {
+ // Test -1
+ assert(Utils.timeStringAsSeconds("-1") === -1)
+
+ // Test zero
+ assert(Utils.timeStringAsSeconds("0") === 0)
+
+ assert(Utils.timeStringAsSeconds("1") === 1)
+ assert(Utils.timeStringAsSeconds("1s") === 1)
+ assert(Utils.timeStringAsSeconds("1000ms") === 1)
+ assert(Utils.timeStringAsSeconds("1000000us") === 1)
+ assert(Utils.timeStringAsSeconds("1m") === TimeUnit.MINUTES.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
+ assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))
+
+ assert(Utils.timeStringAsMs("1") === 1)
+ assert(Utils.timeStringAsMs("1ms") === 1)
+ assert(Utils.timeStringAsMs("1000us") === 1)
+ assert(Utils.timeStringAsMs("1s") === TimeUnit.SECONDS.toMillis(1))
+ assert(Utils.timeStringAsMs("1m") === TimeUnit.MINUTES.toMillis(1))
+ assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
+ assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
+ assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))
+
+ // Test invalid strings
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This breaks 600s")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This breaks 600ds")
+ }
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("600s This breaks")
+ }
+
+ intercept[NumberFormatException] {
+ Utils.timeStringAsMs("This 123s breaks")
+ }
+ }
+
test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")