diff options
Diffstat (limited to 'streaming')
4 files changed, 28 insertions, 28 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 42514d8b47..f4963a78e1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Logging, SparkConf} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.util.RecurringTimer -import org.apache.spark.util.SystemClock +import org.apache.spark.util.{SystemClock, Utils} /** Listener object for BlockGenerator events */ private[streaming] trait BlockGeneratorListener { @@ -79,9 +79,9 @@ private[streaming] class BlockGenerator( private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any]) private val clock = new SystemClock() - private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200) + private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms") private val blockIntervalTimer = - new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator") + new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } @@ -132,7 +132,7 @@ private[streaming] class BlockGenerator( val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size > 0) { - val blockId = StreamBlockId(receiverId, time - blockInterval) + val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 4946806d2e..58e56638a2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -24,7 +24,7 @@ import akka.actor.{ActorRef, Props, Actor} import org.apache.spark.{SparkEnv, Logging} import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time} import org.apache.spark.streaming.util.RecurringTimer -import org.apache.spark.util.{Clock, ManualClock} +import org.apache.spark.util.{Clock, ManualClock, Utils} /** Event classes for JobGenerator */ private[scheduler] sealed trait JobGeneratorEvent @@ -104,17 +104,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { if (processReceivedData) { logInfo("Stopping JobGenerator gracefully") val timeWhenStopStarted = System.currentTimeMillis() - val stopTimeout = conf.getLong( - "spark.streaming.gracefulStopTimeout", - 10 * ssc.graph.batchDuration.milliseconds - ) + val stopTimeoutMs = conf.getTimeAsMs( + "spark.streaming.gracefulStopTimeout", s"${10 * ssc.graph.batchDuration.milliseconds}ms") val pollTime = 100 // To prevent graceful stop to get stuck permanently def hasTimedOut: Boolean = { - val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > stopTimeout + val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > stopTimeoutMs if (timedOut) { - logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")") + logWarning("Timed out while stopping the job generator (timeout = " + stopTimeoutMs + ")") } timedOut } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 10c35cba8d..91261a9db7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -131,11 +131,11 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { test("block generator") { val blockGeneratorListener = new FakeBlockGeneratorListener - val blockInterval = 200 - val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString) + val blockIntervalMs = 200 + val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms") val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) val expectedBlocks = 5 - val waitTime = expectedBlocks * blockInterval + (blockInterval / 2) + val waitTime = expectedBlocks * blockIntervalMs + (blockIntervalMs / 2) val generatedData = new ArrayBuffer[Int] // Generate blocks @@ -157,15 +157,15 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { test("block generator throttling") { val blockGeneratorListener = new FakeBlockGeneratorListener - val blockInterval = 100 + val blockIntervalMs = 100 val maxRate = 100 - val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString). + val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms"). set("spark.streaming.receiver.maxRate", maxRate.toString) val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) val expectedBlocks = 20 - val waitTime = expectedBlocks * blockInterval + val waitTime = expectedBlocks * blockIntervalMs val expectedMessages = maxRate * waitTime / 1000 - val expectedMessagesPerBlock = maxRate * blockInterval / 1000 + val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000 val generatedData = new ArrayBuffer[Int] // Generate blocks diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index d1bbf39dc7..58353a5f97 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -73,9 +73,9 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("from conf with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", "10") + myConf.set("spark.cleaner.ttl", "10s") ssc = new StreamingContext(myConf, batchDuration) - assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) + assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) } test("from existing SparkContext") { @@ -85,24 +85,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("from existing SparkContext with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", "10") + myConf.set("spark.cleaner.ttl", "10s") ssc = new StreamingContext(myConf, batchDuration) - assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) + assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) } test("from checkpoint") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) - myConf.set("spark.cleaner.ttl", "10") + myConf.set("spark.cleaner.ttl", "10s") val ssc1 = new StreamingContext(myConf, batchDuration) addInputStream(ssc1).register() ssc1.start() val cp = new Checkpoint(ssc1, Time(1000)) - assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10") + assert( + Utils.timeStringAsSeconds(cp.sparkConfPairs + .toMap.getOrElse("spark.cleaner.ttl", "-1")) === 10) ssc1.stop() val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(newCp.createSparkConf().getInt("spark.cleaner.ttl", -1) === 10) + assert(newCp.createSparkConf().getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) ssc = new StreamingContext(null, newCp, null) - assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10) + assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10) } test("start and stop state check") { @@ -176,7 +178,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("stop gracefully") { val conf = new SparkConf().setMaster(master).setAppName(appName) - conf.set("spark.cleaner.ttl", "3600") + conf.set("spark.cleaner.ttl", "3600s") sc = new SparkContext(conf) for (i <- 1 to 4) { logInfo("==================================\n\n\n") @@ -207,7 +209,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w test("stop slow receiver gracefully") { val conf = new SparkConf().setMaster(master).setAppName(appName) - conf.set("spark.streaming.gracefulStopTimeout", "20000") + conf.set("spark.streaming.gracefulStopTimeout", "20000s") sc = new SparkContext(conf) logInfo("==================================\n\n\n") ssc = new StreamingContext(sc, Milliseconds(100)) |