aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala12
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala13
2 files changed, 23 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 5d46ca0715..ab01f47d5c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -17,9 +17,11 @@
package org.apache.spark.streaming.dstream
+import scala.reflect.ClassTag
+
+import org.apache.spark.SparkException
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming.{Duration, Time}
-import scala.reflect.ClassTag
private[streaming]
class TransformedDStream[U: ClassTag] (
@@ -38,6 +40,12 @@ class TransformedDStream[U: ClassTag] (
override def compute(validTime: Time): Option[RDD[U]] = {
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
- Some(transformFunc(parentRDDs, validTime))
+ val transformedRDD = transformFunc(parentRDDs, validTime)
+ if (transformedRDD == null) {
+ throw new SparkException("Transform function must not return null. " +
+ "Return SparkContext.emptyRDD() instead to represent no element " +
+ "as the result of transformation.")
+ }
+ Some(transformedRDD)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 255376807c..9988f410f0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -211,6 +211,19 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
+ test("transform with NULL") {
+ val input = Seq(1 to 4)
+ intercept[SparkException] {
+ testOperation(
+ input,
+ (r: DStream[Int]) => r.transform(rdd => null.asInstanceOf[RDD[Int]]),
+ Seq(Seq()),
+ 1,
+ false
+ )
+ }
+ }
+
test("transformWith") {
val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )