aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-04-16 11:57:05 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-04-16 11:57:05 -0600
commitab0f834dbb509d323577572691293b74368a9d86 (patch)
tree6c00e870d03b1f108f9c76b92b91619626957785
parent8ac9efba5a435443be9abf8ebbe867806d42c9db (diff)
downloadspark-ab0f834dbb509d323577572691293b74368a9d86.tar.gz
spark-ab0f834dbb509d323577572691293b74368a9d86.tar.bz2
spark-ab0f834dbb509d323577572691293b74368a9d86.zip
adding spark.streaming.blockInterval property
-rw-r--r--docs/configuration.md7
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala2
2 files changed, 8 insertions, 1 deletions
diff --git a/docs/configuration.md b/docs/configuration.md
index 04eb6daaa5..55f1962b18 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -253,6 +253,13 @@ Apart from these, the following properties are also available, and may be useful
applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
</td>
</tr>
+<tr>
+ <td>spark.streaming.blockInterval</td>
+ <td>200</td>
+ <td>
+ Duration (milliseconds) of how long to batch new objects coming from network receivers.
+ </td>
+</tr>
</table>
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 7385474963..26805e9621 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -198,7 +198,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
val clock = new SystemClock()
- val blockInterval = 200L
+ val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)