aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 8d73593ab6..92b51ce392 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.util.SystemClock
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
@@ -80,6 +80,8 @@ private[streaming] class BlockGenerator(
private val clock = new SystemClock()
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
+ require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")
+
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)