aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala8
1 files changed, 3 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index ca4edae38a..89c43ff935 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -39,8 +39,6 @@ class WindowedDStream[T: ClassTag](
throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
- val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "true").toBoolean
-
parent.persist(StorageLevel.MEMORY_ONLY_SER)
def windowDuration: Duration = _windowDuration
@@ -54,11 +52,11 @@ class WindowedDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
val rddsInWindow = parent.slice(currentWindow)
- val windowRDD = if (useNewUnion && rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
- logInfo("Using partition aware union")
+ val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
+ logDebug("Using partition aware union for windowing at " + validTime)
new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
} else {
- logInfo("Using normal union")
+ logDebug("Using normal union for windowing at " + validTime)
new UnionRDD(ssc.sc,rddsInWindow)
}
Some(windowRDD)