aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala12
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala14
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala22
4 files changed, 28 insertions, 28 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 42514d8b47..f4963a78e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.SystemClock
+import org.apache.spark.util.{SystemClock, Utils}
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
@@ -79,9 +79,9 @@ private[streaming] class BlockGenerator(
private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
private val clock = new SystemClock()
- private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
+ private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
private val blockIntervalTimer =
- new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
+ new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
@@ -132,7 +132,7 @@ private[streaming] class BlockGenerator(
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
if (newBlockBuffer.size > 0) {
- val blockId = StreamBlockId(receiverId, time - blockInterval)
+ val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock) // put is blocking when queue is full
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 4946806d2e..58e56638a2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -24,7 +24,7 @@ import akka.actor.{ActorRef, Props, Actor}
import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.{Clock, ManualClock}
+import org.apache.spark.util.{Clock, ManualClock, Utils}
/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
@@ -104,17 +104,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
if (processReceivedData) {
logInfo("Stopping JobGenerator gracefully")
val timeWhenStopStarted = System.currentTimeMillis()
- val stopTimeout = conf.getLong(
- "spark.streaming.gracefulStopTimeout",
- 10 * ssc.graph.batchDuration.milliseconds
- )
+ val stopTimeoutMs = conf.getTimeAsMs(
+ "spark.streaming.gracefulStopTimeout", s"${10 * ssc.graph.batchDuration.milliseconds}ms")
val pollTime = 100
// To prevent graceful stop to get stuck permanently
def hasTimedOut: Boolean = {
- val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > stopTimeout
+ val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > stopTimeoutMs
if (timedOut) {
- logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")")
+ logWarning("Timed out while stopping the job generator (timeout = " + stopTimeoutMs + ")")
}
timedOut
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 10c35cba8d..91261a9db7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -131,11 +131,11 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
test("block generator") {
val blockGeneratorListener = new FakeBlockGeneratorListener
- val blockInterval = 200
- val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString)
+ val blockIntervalMs = 200
+ val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms")
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
val expectedBlocks = 5
- val waitTime = expectedBlocks * blockInterval + (blockInterval / 2)
+ val waitTime = expectedBlocks * blockIntervalMs + (blockIntervalMs / 2)
val generatedData = new ArrayBuffer[Int]
// Generate blocks
@@ -157,15 +157,15 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
test("block generator throttling") {
val blockGeneratorListener = new FakeBlockGeneratorListener
- val blockInterval = 100
+ val blockIntervalMs = 100
val maxRate = 100
- val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
+ val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms").
set("spark.streaming.receiver.maxRate", maxRate.toString)
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
val expectedBlocks = 20
- val waitTime = expectedBlocks * blockInterval
+ val waitTime = expectedBlocks * blockIntervalMs
val expectedMessages = maxRate * waitTime / 1000
- val expectedMessagesPerBlock = maxRate * blockInterval / 1000
+ val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000
val generatedData = new ArrayBuffer[Int]
// Generate blocks
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index d1bbf39dc7..58353a5f97 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -73,9 +73,9 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("from conf with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", "10")
+ myConf.set("spark.cleaner.ttl", "10s")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
+ assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
}
test("from existing SparkContext") {
@@ -85,24 +85,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("from existing SparkContext with settings") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", "10")
+ myConf.set("spark.cleaner.ttl", "10s")
ssc = new StreamingContext(myConf, batchDuration)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
+ assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
}
test("from checkpoint") {
val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
- myConf.set("spark.cleaner.ttl", "10")
+ myConf.set("spark.cleaner.ttl", "10s")
val ssc1 = new StreamingContext(myConf, batchDuration)
addInputStream(ssc1).register()
ssc1.start()
val cp = new Checkpoint(ssc1, Time(1000))
- assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
+ assert(
+ Utils.timeStringAsSeconds(cp.sparkConfPairs
+ .toMap.getOrElse("spark.cleaner.ttl", "-1")) === 10)
ssc1.stop()
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
- assert(newCp.createSparkConf().getInt("spark.cleaner.ttl", -1) === 10)
+ assert(newCp.createSparkConf().getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
ssc = new StreamingContext(null, newCp, null)
- assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
+ assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
}
test("start and stop state check") {
@@ -176,7 +178,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("stop gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
- conf.set("spark.cleaner.ttl", "3600")
+ conf.set("spark.cleaner.ttl", "3600s")
sc = new SparkContext(conf)
for (i <- 1 to 4) {
logInfo("==================================\n\n\n")
@@ -207,7 +209,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
test("stop slow receiver gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
- conf.set("spark.streaming.gracefulStopTimeout", "20000")
+ conf.set("spark.streaming.gracefulStopTimeout", "20000s")
sc = new SparkContext(conf)
logInfo("==================================\n\n\n")
ssc = new StreamingContext(sc, Milliseconds(100))