aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-26 14:17:16 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-26 14:17:16 -0800
commit577c8cc8340abbdbbbd141597b1c7b8ff19b20be (patch)
treece2a4415179d64eec6fabf801f1788856576c751 /streaming
parent3579647cdc8ace0170566783faaf7102ef1f2052 (diff)
downloadspark-577c8cc8340abbdbbbd141597b1c7b8ff19b20be.tar.gz
spark-577c8cc8340abbdbbbd141597b1c7b8ff19b20be.tar.bz2
spark-577c8cc8340abbdbbbd141597b1c7b8ff19b20be.zip
Removed unncessary options from WindowedDStream.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala8
1 files changed, 3 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index ca4edae38a..89c43ff935 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -39,8 +39,6 @@ class WindowedDStream[T: ClassTag](
throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
- val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "true").toBoolean
-
parent.persist(StorageLevel.MEMORY_ONLY_SER)
def windowDuration: Duration = _windowDuration
@@ -54,11 +52,11 @@ class WindowedDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
val rddsInWindow = parent.slice(currentWindow)
- val windowRDD = if (useNewUnion && rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
- logInfo("Using partition aware union")
+ val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
+ logDebug("Using partition aware union for windowing at " + validTime)
new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
} else {
- logInfo("Using normal union")
+ logDebug("Using normal union for windowing at " + validTime)
new UnionRDD(ssc.sc,rddsInWindow)
}
Some(windowRDD)