aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-06-24 21:46:33 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-06-24 21:46:33 -0700
commit1249e9153b444172eb6a68d44efa39d7bb3b6910 (patch)
treebee647ebfe5bd646325363e249c45de3f062771d
parentcfcda95f86c6eec4a39e5ad182e068722be66fe7 (diff)
parentab0f834dbb509d323577572691293b74368a9d86 (diff)
downloadspark-1249e9153b444172eb6a68d44efa39d7bb3b6910.tar.gz
spark-1249e9153b444172eb6a68d44efa39d7bb3b6910.tar.bz2
spark-1249e9153b444172eb6a68d44efa39d7bb3b6910.zip
Merge pull request #572 from Reinvigorate/sm-block-interval
Adding spark.streaming.blockInterval property
-rw-r--r--docs/configuration.md7
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala2
2 files changed, 8 insertions, 1 deletions
diff --git a/docs/configuration.md b/docs/configuration.md
index 04eb6daaa5..55f1962b18 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -253,6 +253,13 @@ Apart from these, the following properties are also available, and may be useful
applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
</td>
</tr>
+<tr>
+ <td>spark.streaming.blockInterval</td>
+ <td>200</td>
+ <td>
+ Duration (milliseconds) of how long to batch new objects coming from network receivers.
+ </td>
+</tr>
</table>
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 7385474963..26805e9621 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -198,7 +198,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
val clock = new SystemClock()
- val blockInterval = 200L
+ val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)