diff options
author | huangzhaowei <carlmartinmax@gmail.com> | 2015-06-16 08:16:09 +0200 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-06-16 08:16:09 +0200 |
commit | ccf010f27bc62f7e7f409c6eef7488ab476de609 (patch) | |
tree | 7a5ae2fafab4a83860e649eca7bc5c6a2ee2c195 | |
parent | bc76a0f7506c9796209a96b027a236270c23bbf6 (diff) | |
download | spark-ccf010f27bc62f7e7f409c6eef7488ab476de609.tar.gz spark-ccf010f27bc62f7e7f409c6eef7488ab476de609.tar.bz2 spark-ccf010f27bc62f7e7f409c6eef7488ab476de609.zip |
[SPARK-8367] [STREAMING] Add a limit for 'spark.streaming.blockInterval` since a data loss bug.
Bug had reported in the jira [SPARK-8367](https://issues.apache.org/jira/browse/SPARK-8367)
The relution is limitting the configuration `spark.streaming.blockInterval` to a positive number.
Author: huangzhaowei <carlmartinmax@gmail.com>
Author: huangzhaowei <SaintBacchus@users.noreply.github.com>
Closes #6818 from SaintBacchus/SPARK-8367 and squashes the following commits:
c9d1927 [huangzhaowei] Update BlockGenerator.scala
bd3f71a [huangzhaowei] Use requre instead of if
3d17796 [huangzhaowei] [SPARK_8367][Streaming]Add a limit for 'spark.streaming.blockInterval' since a data loss bug.
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 8d73593ab6..92b51ce392 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Logging, SparkConf} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.util.RecurringTimer -import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.util.SystemClock /** Listener object for BlockGenerator events */ private[streaming] trait BlockGeneratorListener { @@ -80,6 +80,8 @@ private[streaming] class BlockGenerator( private val clock = new SystemClock() private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms") + require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value") + private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) |