From ab0f834dbb509d323577572691293b74368a9d86 Mon Sep 17 00:00:00 2001 From: seanm Date: Tue, 16 Apr 2013 11:57:05 -0600 Subject: adding spark.streaming.blockInterval property --- docs/configuration.md | 7 +++++++ .../main/scala/spark/streaming/dstream/NetworkInputDStream.scala | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 04eb6daaa5..55f1962b18 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -253,6 +253,13 @@ Apart from these, the following properties are also available, and may be useful applications). Note that any RDD that persists in memory for more than this duration will be cleared as well. + + spark.streaming.blockInterval + 200 + + Duration (milliseconds) of how long to batch new objects coming from network receivers. + + diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 7385474963..26805e9621 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -198,7 +198,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log case class Block(id: String, iterator: Iterator[T], metadata: Any = null) val clock = new SystemClock() - val blockInterval = 200L + val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) val blockStorageLevel = storageLevel val blocksForPushing = new ArrayBlockingQueue[Block](1000) -- cgit v1.2.3