aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-11-21 11:38:56 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-11-21 11:38:56 -0800
commit03ef6e889929befa968d35fa3757c687edc3a38b (patch)
treea88766540122f945f7590a7dff9cd184bc426fcc /streaming
parentfd031679df59b83ae0a735ea77c49623f6e257c4 (diff)
downloadspark-03ef6e889929befa968d35fa3757c687edc3a38b.tar.gz
spark-03ef6e889929befa968d35fa3757c687edc3a38b.tar.bz2
spark-03ef6e889929befa968d35fa3757c687edc3a38b.zip
Added flag in window operation to use partition awaare union.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 49f84310bc..464ac15ab6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -41,6 +41,8 @@ class WindowedDStream[T: ClassManifest](
throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "false").toBoolean
+
parent.persist(StorageLevel.MEMORY_ONLY_SER)
def windowDuration: Duration = _windowDuration
@@ -54,7 +56,7 @@ class WindowedDStream[T: ClassManifest](
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
val rddsInWindow = parent.slice(currentWindow)
- val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
+ val windowRDD = if (useNewUnion && rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
logInfo("Using partition aware union")
new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
} else {