diff options
-rw-r--r-- | docs/configuration.md | 7 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala | 2 |
2 files changed, 8 insertions, 1 deletions
diff --git a/docs/configuration.md b/docs/configuration.md index 04eb6daaa5..55f1962b18 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -253,6 +253,13 @@ Apart from these, the following properties are also available, and may be useful applications). Note that any RDD that persists in memory for more than this duration will be cleared as well. </td> </tr> +<tr> + <td>spark.streaming.blockInterval</td> + <td>200</td> + <td> + Duration (milliseconds) of how long to batch new objects coming from network receivers. + </td> +</tr> </table> diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 7385474963..26805e9621 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -198,7 +198,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log case class Block(id: String, iterator: Iterator[T], metadata: Any = null) val clock = new SystemClock() - val blockInterval = 200L + val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) val blockStorageLevel = storageLevel val blocksForPushing = new ArrayBlockingQueue[Block](1000) |