aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-11-06 14:51:53 +0000
committerSean Owen <sowen@cloudera.com>2015-11-06 14:51:53 +0000
commitcf69ce136590fea51843bc54f44f0f45c7d0ac36 (patch)
treec1e7582c2234da70e9e4e05abdbca7a8dc690d5e /streaming
parent253e87e8ab8717ffef40a6d0d376b1add155ef90 (diff)
downloadspark-cf69ce136590fea51843bc54f44f0f45c7d0ac36.tar.gz
spark-cf69ce136590fea51843bc54f44f0f45c7d0ac36.tar.bz2
spark-cf69ce136590fea51843bc54f44f0f45c7d0ac36.zip
[SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used
Just ignored `InputDStream`s that have null `rememberDuration` in `DStreamGraph.getMaxInputStreamRememberDuration`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #9476 from zsxwing/SPARK-11511.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala16
2 files changed, 18 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 1b0b7890b3..7829f5e887 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -167,7 +167,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
* safe remember duration which can be used to perform cleanup operations.
*/
def getMaxInputStreamRememberDuration(): Duration = {
- inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds }
+ // If an InputDStream is not used, its `rememberDuration` will be null and we can ignore them
+ inputStreams.map(_.rememberDuration).filter(_ != null).maxBy(_.milliseconds)
}
@throws(classOf[IOException])
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index c7a877142b..860fac29c0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
"Please don't use queueStream when checkpointing is enabled."))
}
+ test("Creating an InputDStream but not using it should not crash") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ val input1 = addInputStream(ssc)
+ val input2 = addInputStream(ssc)
+ val output = new TestOutputStream(input2)
+ output.register()
+ val batchCount = new BatchCounter(ssc)
+ ssc.start()
+ // Just wait for completing 2 batches to make sure it triggers
+ // `DStream.getMaxInputStreamRememberDuration`
+ batchCount.waitUntilBatchesCompleted(2, 10000)
+ // Throw the exception if crash
+ ssc.awaitTerminationOrTimeout(1)
+ ssc.stop()
+ }
+
def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1)