diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 49f84310bc..464ac15ab6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -41,6 +41,8 @@ class WindowedDStream[T: ClassManifest]( throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "false").toBoolean + parent.persist(StorageLevel.MEMORY_ONLY_SER) def windowDuration: Duration = _windowDuration @@ -54,7 +56,7 @@ class WindowedDStream[T: ClassManifest]( override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) - val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { + val windowRDD = if (useNewUnion && rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { logInfo("Using partition aware union") new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) } else { |