aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorIlya Ganelin <ilya.ganelin@capitalone.com>2015-04-13 16:28:07 -0700
committerAndrew Or <andrew@databricks.com>2015-04-13 16:28:07 -0700
commitc4ab255e94366ba9b9023d5431f9d2412e0d6dc7 (patch)
treecade698e2139a54ab81957383c3ef2b5c8e8e9f2 /streaming
parentc5602bdc310cc8f82dc304500bebe40217cba785 (diff)
downloadspark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.tar.gz
spark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.tar.bz2
spark-c4ab255e94366ba9b9023d5431f9d2412e0d6dc7.zip
[SPARK-5931][CORE] Use consistent naming for time properties
I've added new utility methods to do the conversion from times specified as e.g. 120s, 240ms, 360us to convert to a consistent internal representation. I've updated usage of these constants throughout the code to be consistent. I believe I've captured all usages of time-based properties throughout the code. I've also updated variable names in a number of places to reflect their units for clarity and updated documentation where appropriate. Author: Ilya Ganelin <ilya.ganelin@capitalone.com> Author: Ilya Ganelin <ilganeli@gmail.com> Closes #5236 from ilganeli/SPARK-5931 and squashes the following commits: 4526c81 [Ilya Ganelin] Update configuration.md de3bff9 [Ilya Ganelin] Fixing style errors f5fafcd [Ilya Ganelin] Doc updates 951ca2d [Ilya Ganelin] Made the most recent round of changes bc04e05 [Ilya Ganelin] Minor fixes and doc updates 25d3f52 [Ilya Ganelin] Minor nit fixes 642a06d [Ilya Ganelin] Fixed logic for invalid suffixes and addid matching test 8927e66 [Ilya Ganelin] Fixed handling of -1 69fedcc [Ilya Ganelin] Added test for zero dc7bd08 [Ilya Ganelin] Fixed error in exception handling 7d19cdd [Ilya Ganelin] Added fix for possible NPE 6f651a8 [Ilya Ganelin] Now using regexes to simplify code in parseTimeString. Introduces getTimeAsSec and getTimeAsMs methods in SparkConf. Updated documentation cbd2ca6 [Ilya Ganelin] Formatting error 1a1122c [Ilya Ganelin] Formatting fixes and added m for use as minute formatter 4e48679 [Ilya Ganelin] Fixed priority order and mixed up conversions in a couple spots d4efd26 [Ilya Ganelin] Added time conversion for yarn.scheduler.heartbeat.interval-ms cbf41db [Ilya Ganelin] Got rid of thrown exceptions 1465390 [Ilya Ganelin] Nit 28187bf [Ilya Ganelin] Convert straight to seconds ff40bfe [Ilya Ganelin] Updated tests to fix small bugs 19c31af [Ilya Ganelin] Added cleaner computation of time conversions in tests 6387772 [Ilya Ganelin] Updated suffix handling to handle overlap of units more gracefully 5193d5f [Ilya Ganelin] Resolved merge conflicts 76cfa27 [Ilya Ganelin] [SPARK-5931] Minor nit fixes' bf779b0 [Ilya Ganelin] Special handling of overlapping usffixes for java dd0a680 [Ilya Ganelin] Updated scala code to call into java b2fc965 [Ilya Ganelin] replaced get or default since it's not present in this version of java 39164f9 [Ilya Ganelin] [SPARK-5931] Updated Java conversion to be similar to scala conversion. Updated conversions to clean up code a little using TimeUnit.convert. Added Unit tests 3b126e1 [Ilya Ganelin] Fixed conversion to US from seconds 1858197 [Ilya Ganelin] Fixed bug where all time was being converted to us instead of the appropriate units bac9edf [Ilya Ganelin] More whitespace 8613631 [Ilya Ganelin] Whitespace 1c0c07c [Ilya Ganelin] Updated Java code to add day, minutes, and hours 647b5ac [Ilya Ganelin] Udpated time conversion to use map iterator instead of if fall through 70ac213 [Ilya Ganelin] Fixed remaining usages to be consistent. Updated Java-side time conversion 68f4e93 [Ilya Ganelin] Updated more files to clean up usage of default time strings 3a12dd8 [Ilya Ganelin] Updated host revceiver 5232a36 [Ilya Ganelin] [SPARK-5931] Changed default behavior of time string conversion. 499bdf0 [Ilya Ganelin] Merge branch 'SPARK-5931' of github.com:ilganeli/spark into SPARK-5931 9e2547c [Ilya Ganelin] Reverting doc changes 8f741e1 [Ilya Ganelin] Update JavaUtils.java 34f87c2 [Ilya Ganelin] Update Utils.scala 9a29d8d [Ilya Ganelin] Fixed misuse of time in streaming context test 42477aa [Ilya Ganelin] Updated configuration doc with note on specifying time properties cde9bff [Ilya Ganelin] Updated spark.streaming.blockInterval c6a0095 [Ilya Ganelin] Updated spark.core.connection.auth.wait.timeout 5181597 [Ilya Ganelin] Updated spark.dynamicAllocation.schedulerBacklogTimeout 2fcc91c [Ilya Ganelin] Updated spark.dynamicAllocation.executorIdleTimeout 6d1518e [Ilya Ganelin] Upated spark.speculation.interval 3f1cfc8 [Ilya Ganelin] Updated spark.scheduler.revive.interval 3352d34 [Ilya Ganelin] Updated spark.scheduler.maxRegisteredResourcesWaitingTime 272c215 [Ilya Ganelin] Updated spark.locality.wait 7320c87 [Ilya Ganelin] updated spark.akka.heartbeat.interval 064ebd6 [Ilya Ganelin] Updated usage of spark.cleaner.ttl 21ef3dd [Ilya Ganelin] updated spark.shuffle.sasl.timeout c9f5cad [Ilya Ganelin] Updated spark.shuffle.io.retryWait 4933fda [Ilya Ganelin] Updated usage of spark.storage.blockManagerSlaveTimeout 7db6d2a [Ilya Ganelin] Updated usage of spark.akka.timeout 404f8c3 [Ilya Ganelin] Updated usage of spark.core.connection.ack.wait.timeout 59bf9e1 [Ilya Ganelin] [SPARK-5931] Updated Utils and JavaUtils classes to add helper methods to handle time strings. Updated time strings in a few places to properly parse time
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala12
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala14
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala22
4 files changed, 28 insertions, 28 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 42514d8b47..f4963a78e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.SystemClock
+import org.apache.spark.util.{SystemClock, Utils}
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
@@ -79,9 +79,9 @@ private[streaming] class BlockGenerator(
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
private val clock = new SystemClock()
- private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
+ private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
private val blockIntervalTimer =
- new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
+ new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
@@ -132,7 +132,7 @@ private[streaming] class BlockGenerator(
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
if (newBlockBuffer.size > 0) {
- val blockId = StreamBlockId(receiverId, time - blockInterval)
+ val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock) // put is blocking when queue is full
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 4946806d2e..58e56638a2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -24,7 +24,7 @@ import akka.actor.{ActorRef, Props, Actor}
import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.{Clock, ManualClock}
+import org.apache.spark.util.{Clock, ManualClock, Utils}
/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
@@ -104,17 +104,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
if (processReceivedData) {
logInfo("Stopping JobGenerator gracefully")
val timeWhenStopStarted = System.currentTimeMillis()
- val stopTimeout = conf.getLong(
- "spark.streaming.gracefulStopTimeout",
- 10 * ssc.graph.batchDuration.milliseconds
- )
+ val stopTimeoutMs = conf.getTimeAsMs(
+ "spark.streaming.gracefulStopTimeout", s"${10 * ssc.graph.batchDuration.milliseconds}ms")
val pollTime = 100
// To prevent graceful stop to get stuck permanently
def hasTimedOut: Boolean = {
- val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > stopTimeout
+ val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > stopTimeoutMs
if (timedOut) {
- logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")")
+ logWarning("Timed out while stopping the job generator (timeout = " + stopTimeoutMs + ")")
}
timedOut
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 10c35cba8d..91261a9db7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -131,11 +131,11 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
test("block generator") {
val blockGeneratorListener = new FakeBlockGeneratorListener
- val blockInterval = 200
- val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString)
+ val blockIntervalMs = 200
+ val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms")
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
val expectedBlocks = 5
- val waitTime = expectedBlocks * blockInterval + (blockInterval / 2)
+ val waitTime = expectedBlocks * blockIntervalMs + (blockIntervalMs / 2)
val generatedData = new ArrayBuffer[Int]
// Generate blocks
@@ -157,15 +157,15 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
test("block generator throttling") {
val blockGeneratorListener = new FakeBlockGeneratorListener
- val blockInterval = 100
+ val blockIntervalMs = 100
val maxRate = 100
- val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
+ val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms").
set("spark.streaming.receiver.maxRate", maxRate.toString)
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
val expectedBlocks = 20
- val waitTime = expectedBlocks * blockInterval
+ val waitTime = expectedBlocks * blockIntervalMs
val expectedMessages = maxRate * waitTime / 1000
- val expectedMessagesPerBlock = maxRate * blockInterval / 1000
+ val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000
val generatedData = new ArrayBuffer[Int]
// Generate blocks
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index d1bbf39dc7..58353a5f97 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -73,9 +73,9 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("from conf with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", "10")
+ myConf.set("spark.cleaner.ttl", "10s")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
+ assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
}
test("from existing SparkContext") {
@@ -85,24 +85,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("from existing SparkContext with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", "10")
+ myConf.set("spark.cleaner.ttl", "10s")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
+ assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
}
test("from checkpoint") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", "10")
+ myConf.set("spark.cleaner.ttl", "10s")
val ssc1 = new StreamingContext(myConf, batchDuration)
addInputStream(ssc1).register()
ssc1.start()
val cp = new Checkpoint(ssc1, Time(1000))
- assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
+ assert(
+ Utils.timeStringAsSeconds(cp.sparkConfPairs
+ .toMap.getOrElse("spark.cleaner.ttl", "-1")) === 10)
ssc1.stop()
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
- assert(newCp.createSparkConf().getInt("spark.cleaner.ttl", -1) === 10)
+ assert(newCp.createSparkConf().getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
ssc = new StreamingContext(null, newCp, null)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
+ assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
}
test("start and stop state check") {
@@ -176,7 +178,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("stop gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
- conf.set("spark.cleaner.ttl", "3600")
+ conf.set("spark.cleaner.ttl", "3600s")
sc = new SparkContext(conf)
for (i <- 1 to 4) {
logInfo("==================================\n\n\n")
@@ -207,7 +209,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("stop slow receiver gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
- conf.set("spark.streaming.gracefulStopTimeout", "20000")
+ conf.set("spark.streaming.gracefulStopTimeout", "20000s")
sc = new SparkContext(conf)
logInfo("==================================\n\n\n")
ssc = new StreamingContext(sc, Milliseconds(100))