diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-26 14:17:16 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-26 14:17:16 -0800 |
commit | 577c8cc8340abbdbbbd141597b1c7b8ff19b20be (patch) | |
tree | ce2a4415179d64eec6fabf801f1788856576c751 | |
parent | 3579647cdc8ace0170566783faaf7102ef1f2052 (diff) | |
download | spark-577c8cc8340abbdbbbd141597b1c7b8ff19b20be.tar.gz spark-577c8cc8340abbdbbbd141597b1c7b8ff19b20be.tar.bz2 spark-577c8cc8340abbdbbbd141597b1c7b8ff19b20be.zip |
Removed unncessary options from WindowedDStream.
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala | 8 |
1 files changed, 3 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index ca4edae38a..89c43ff935 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -39,8 +39,6 @@ class WindowedDStream[T: ClassTag]( throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") - val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "true").toBoolean - parent.persist(StorageLevel.MEMORY_ONLY_SER) def windowDuration: Duration = _windowDuration @@ -54,11 +52,11 @@ class WindowedDStream[T: ClassTag]( override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) - val windowRDD = if (useNewUnion && rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { - logInfo("Using partition aware union") + val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { + logDebug("Using partition aware union for windowing at " + validTime) new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) } else { - logInfo("Using normal union") + logDebug("Using normal union for windowing at " + validTime) new UnionRDD(ssc.sc,rddsInWindow) } Some(windowRDD) |