aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorYadong Qi <qiyadong2010@gmail.com>2014-11-19 15:53:06 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-19 15:53:19 -0800
commita250ca369208b23503d7fff1cf9ee52e2e1ba3e2 (patch)
tree9e0d67b117ab337b7985ce9a4dc670e3b3ccdc03 /streaming
parentc4abb2eb4f6a2875bbe22b12c246d8ae1773ece2 (diff)
downloadspark-a250ca369208b23503d7fff1cf9ee52e2e1ba3e2.tar.gz
spark-a250ca369208b23503d7fff1cf9ee52e2e1ba3e2.tar.bz2
spark-a250ca369208b23503d7fff1cf9ee52e2e1ba3e2.zip
[SPARK-4294][Streaming] UnionDStream stream should express the requirements in the same way as TransformedDStream
In class TransformedDStream: ```scala require(parents.length > 0, "List of DStreams to transform is empty") require(parents.map(.ssc).distinct.size == 1, "Some of the DStreams have different contexts") require(parents.map(.slideDuration).distinct.size == 1, "Some of the DStreams have different slide durations") ``` In class UnionDStream: ```scala if (parents.length == 0) { throw new IllegalArgumentException("Empty array of parents") } if (parents.map(.ssc).distinct.size > 1) { throw new IllegalArgumentException("Array of parents have different StreamingContexts") } if (parents.map(.slideDuration).distinct.size > 1) { throw new IllegalArgumentException("Array of parents have different slide times") } ``` The function is the same, but the realization is not. I think they shoule be the same. Author: Yadong Qi <qiyadong2010@gmail.com> Closes #3152 from watermen/bug-fix1 and squashes the following commits: ed66db6 [Yadong Qi] Change transform to union b6b3b8b [Yadong Qi] The same function should have the same realization. (cherry picked from commit c3002c4a61c4fc5b966aa384c41c3cba33de0aa6) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala15
1 files changed, 4 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index 57429a1532..abbc40befa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -28,17 +28,10 @@ private[streaming]
class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
- if (parents.length == 0) {
- throw new IllegalArgumentException("Empty array of parents")
- }
-
- if (parents.map(_.ssc).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different StreamingContexts")
- }
-
- if (parents.map(_.slideDuration).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different slide times")
- }
+ require(parents.length > 0, "List of DStreams to union is empty")
+ require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts")
+ require(parents.map(_.slideDuration).distinct.size == 1,
+ "Some of the DStreams have different slide durations")
override def dependencies = parents.toList