aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-10-16 11:53:47 +0100
committerSean Owen <sowen@cloudera.com>2015-10-16 11:53:47 +0100
commit43f5d1f326d7a2a4a78fe94853d0d05237568203 (patch)
treeee8b15f4e95d4ae7b15e751bb3af60a958cfe134 /streaming/src/main/scala
parenteb0b4d6e2ddfb765f082d0d88472626336ad2609 (diff)
downloadspark-43f5d1f326d7a2a4a78fe94853d0d05237568203.tar.gz
spark-43f5d1f326d7a2a4a78fe94853d0d05237568203.tar.bz2
spark-43f5d1f326d7a2a4a78fe94853d0d05237568203.zip
[SPARK-11060] [STREAMING] Fix some potential NPE in DStream transformation
This patch fixes: 1. Guard out against NPEs in `TransformedDStream` when parent DStream returns None instead of empty RDD. 2. Verify some input streams which will potentially return None. 3. Add unit test to verify the behavior when input stream returns None. cc tdas , please help to review, thanks a lot :). Author: jerryshao <sshao@hortonworks.com> Closes #9070 from jerryshao/SPARK-11060.
Diffstat (limited to 'streaming/src/main/scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala11
4 files changed, 17 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index f396c34758..4eb92dd8b1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -17,9 +17,10 @@
package org.apache.spark.streaming.dstream
+import scala.reflect.ClassTag
+
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, StreamingContext}
-import scala.reflect.ClassTag
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
@@ -27,6 +28,9 @@ import scala.reflect.ClassTag
class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
extends InputDStream[T](ssc_) {
+ require(rdd != null,
+ "parameter rdd null is illegal, which will lead to NPE in the following transformation")
+
override def start() {}
override def stop() {}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index a2685046e0..cd07364637 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -62,7 +62,7 @@ class QueueInputDStream[T: ClassTag](
} else if (defaultRDD != null) {
Some(defaultRDD)
} else {
- None
+ Some(ssc.sparkContext.emptyRDD)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index ab01f47d5c..5eabdf63dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.dstream
import scala.reflect.ClassTag
import org.apache.spark.SparkException
-import org.apache.spark.rdd.{PairRDDFunctions, RDD}
+import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
private[streaming]
@@ -39,7 +39,10 @@ class TransformedDStream[U: ClassTag] (
override def slideDuration: Duration = parents.head.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
- val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
+ val parentRDDs = parents.map { parent => parent.getOrCompute(validTime).getOrElse(
+ // Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE
+ throw new SparkException(s"Couldn't generate RDD from parent at time $validTime"))
+ }
val transformedRDD = transformFunc(parentRDDs, validTime)
if (transformedRDD == null) {
throw new SparkException("Transform function must not return null. " +
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index 9405dbaa12..d73ffdfd84 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -17,13 +17,14 @@
package org.apache.spark.streaming.dstream
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkException
import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.ClassTag
-
private[streaming]
class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
@@ -41,8 +42,8 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
val rdds = new ArrayBuffer[RDD[T]]()
parents.map(_.getOrCompute(validTime)).foreach {
case Some(rdd) => rdds += rdd
- case None => throw new Exception("Could not generate RDD from a parent for unifying at time "
- + validTime)
+ case None => throw new SparkException("Could not generate RDD from a parent for unifying at" +
+ s" time $validTime")
}
if (rdds.size > 0) {
Some(new UnionRDD(ssc.sc, rdds))